diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go index f00b6a92a..61cd19f78 100644 --- a/weed/mq/kafka/protocol/fetch_multibatch.go +++ b/weed/mq/kafka/protocol/fetch_multibatch.go @@ -57,9 +57,25 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName totalSize := int32(0) batchCount := 0 - // Parameters for batch fetching - start smaller to respect maxBytes better - recordsPerBatch := int32(10) // Start with smaller batch size - maxBatchesPerFetch := 10 // Limit number of batches to avoid infinite loops + // Estimate records per batch based on maxBytes available + // Assume average message size + batch overhead + // Client requested maxBytes, we should use most of it + // Start with larger batches to maximize throughput + estimatedMsgSize := int32(1024) // Typical message size with overhead + recordsPerBatch := (maxBytes - 200) / estimatedMsgSize // Use available space efficiently + if recordsPerBatch < 100 { + recordsPerBatch = 100 // Minimum 100 records per batch + } + if recordsPerBatch > 10000 { + recordsPerBatch = 10000 // Cap at 10k records per batch to avoid huge memory allocations + } + maxBatchesPerFetch := int((maxBytes - 200) / (estimatedMsgSize * 10)) // Reasonable limit + if maxBatchesPerFetch < 5 { + maxBatchesPerFetch = 5 // At least 5 batches + } + if maxBatchesPerFetch > 100 { + maxBatchesPerFetch = 100 // At most 100 batches + } for batchCount < maxBatchesPerFetch && currentOffset < highWaterMark { @@ -70,8 +86,13 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName } // Adapt records per batch based on remaining space - if remainingBytes < 1000 { - recordsPerBatch = 10 // Smaller batches when space is limited + // If we have less space remaining, fetch fewer records to avoid going over + currentBatchSize := recordsPerBatch + if remainingBytes < recordsPerBatch*estimatedMsgSize { + currentBatchSize = remainingBytes / estimatedMsgSize + if currentBatchSize < 1 { + currentBatchSize = 1 + } } // Calculate how many records to fetch for this batch @@ -80,7 +101,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName break } - recordsToFetch := recordsPerBatch + recordsToFetch := currentBatchSize if int64(recordsToFetch) > recordsAvailable { recordsToFetch = int32(recordsAvailable) }