diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index f97b57712..0dc155caf 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -112,15 +112,15 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo // Use multi-batch fetcher for better MaxBytes compliance multiFetcher := NewMultiBatchFetcher(h) result, err := multiFetcher.FetchMultipleBatches( - topic.Name, - partition.PartitionID, - partition.FetchOffset, - highWaterMark, + topic.Name, + partition.PartitionID, + partition.FetchOffset, + highWaterMark, partition.MaxBytes, ) if err == nil && result.TotalSize > 0 { - fmt.Printf("DEBUG: Multi-batch result - %d batches, %d bytes, next offset %d\n", + fmt.Printf("DEBUG: Multi-batch result - %d batches, %d bytes, next offset %d\n", result.BatchCount, result.TotalSize, result.NextOffset) recordBatch = result.RecordBatches } else {