From 690754acb721bfa28c0686d3fdcaaaea50302c34 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 23:07:00 -0700 Subject: [PATCH] fix: Increase fetch batch sizes to utilize available maxBytes capacity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PROBLEM: Consumer throughput only 36.80 msgs/sec vs producer 50.21 msgs/sec. Test shows messages consumed at 73% of production rate. ROOT CAUSE: FetchMultipleBatches was hardcoded to fetch only: - 10 records per batch (5.1 KB per batch with 512-byte messages) - 10 batches max per fetch (~51 KB total per fetch) But clients request 10 MB per fetch! - Utilization: 0.5% of requested capacity - Massive inefficiency causing slow consumer throughput Analysis: - Client requests: 10 MB per fetch (FetchSize: 10e6) - Server returns: ~51 KB per fetch (200x less!) - Batches: 10 records each (way too small) - Result: Consumer falls behind producer by 26% FIX: Calculate optimal batch size based on maxBytes: - recordsPerBatch = (maxBytes - overhead) / estimatedMsgSize - Start with 9.8MB / 1024 bytes = ~9,600 records per fetch - Min 100 records, max 10,000 records per batch - Scale max batches based on available space - Adaptive sizing for remaining bytes EXPECTED IMPACT: - Consumer throughput: 36.80 → ~48+ msgs/sec (match producer) - Fetch efficiency: 0.5% → ~98% of maxBytes - Message loss: 45% → near 0% This is critical for matching Kafka semantics where clients specify fetch sizes and the broker should honor them. --- weed/mq/kafka/protocol/fetch_multibatch.go | 33 ++++++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go index f00b6a92a..61cd19f78 100644 --- a/weed/mq/kafka/protocol/fetch_multibatch.go +++ b/weed/mq/kafka/protocol/fetch_multibatch.go @@ -57,9 +57,25 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName totalSize := int32(0) batchCount := 0 - // Parameters for batch fetching - start smaller to respect maxBytes better - recordsPerBatch := int32(10) // Start with smaller batch size - maxBatchesPerFetch := 10 // Limit number of batches to avoid infinite loops + // Estimate records per batch based on maxBytes available + // Assume average message size + batch overhead + // Client requested maxBytes, we should use most of it + // Start with larger batches to maximize throughput + estimatedMsgSize := int32(1024) // Typical message size with overhead + recordsPerBatch := (maxBytes - 200) / estimatedMsgSize // Use available space efficiently + if recordsPerBatch < 100 { + recordsPerBatch = 100 // Minimum 100 records per batch + } + if recordsPerBatch > 10000 { + recordsPerBatch = 10000 // Cap at 10k records per batch to avoid huge memory allocations + } + maxBatchesPerFetch := int((maxBytes - 200) / (estimatedMsgSize * 10)) // Reasonable limit + if maxBatchesPerFetch < 5 { + maxBatchesPerFetch = 5 // At least 5 batches + } + if maxBatchesPerFetch > 100 { + maxBatchesPerFetch = 100 // At most 100 batches + } for batchCount < maxBatchesPerFetch && currentOffset < highWaterMark { @@ -70,8 +86,13 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName } // Adapt records per batch based on remaining space - if remainingBytes < 1000 { - recordsPerBatch = 10 // Smaller batches when space is limited + // If we have less space remaining, fetch fewer records to avoid going over + currentBatchSize := recordsPerBatch + if remainingBytes < recordsPerBatch*estimatedMsgSize { + currentBatchSize = remainingBytes / estimatedMsgSize + if currentBatchSize < 1 { + currentBatchSize = 1 + } } // Calculate how many records to fetch for this batch @@ -80,7 +101,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName break } - recordsToFetch := recordsPerBatch + recordsToFetch := currentBatchSize if int64(recordsToFetch) > recordsAvailable { recordsToFetch = int32(recordsAvailable) }