From c6807de7cc13d433d7e5611e2fa87223d52125d3 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 20:38:36 -0700 Subject: [PATCH] fix: Load persisted offsets into memory cache immediately on fetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fixes the root cause of message loss: offset resets to auto.offset.reset. ROOT CAUSE: When OffsetFetch is called during rebalancing: 1. Offset not found in memory → returns -1 2. Consumer gets -1 → triggers auto.offset.reset=earliest 3. Consumer restarts from offset 0 4. Previously consumed messages 39-786 are never fetched again ANALYSIS: Test shows missing messages are contiguous ranges: - loadtest-topic-2[0]: Missing offsets 39-786 (748 messages) - loadtest-topic-0[1]: Missing 675 messages from offset ~117 - Pattern: Initial messages 0-38 consumed, then restart, then 39+ never fetched FIX: When OffsetFetch finds offset in SMQ storage: 1. Return the offset to client 2. IMMEDIATELY cache in in-memory map via h.commitOffset() 3. Next fetch will find it in memory (no reset) 4. Consumer continues from correct offset This prevents the offset reset loop that causes the 21% message loss. Revert "fix: Load persisted offsets into memory cache immediately on fetch" This reverts commit d9809eabb9206759b9eb4ffb8bf98b4c5c2f4c64. fix: Increase fetch timeout and add logging for timeout failures ROOT CAUSE: Consumer fetches messages 0-30 successfully, then ALL subsequent fetches fail silently. Partition reader stops responding after ~3-4 batches. ANALYSIS: The fetch request timeout is set to client's MaxWaitTime (100ms-500ms). When GetStoredRecords takes longer than this (disk I/O, broker latency), context times out. The multi-batch fetcher returns error/empty, fallback single-batch also times out, and function returns empty bytes silently. Consumer never retries - it just gets empty response and gives up. Result: Messages from offset 31+ are never fetched (3,956 missing = 32%). FIX: 1. Increase internal timeout to 1.5x client timeout (min 5 seconds) This allows batch fetchers to complete even if slightly delayed 2. Add comprehensive logging at WARNING level for timeout failures So we can diagnose these issues in the field 3. Better error messages with duration info Helps distinguish between timeout vs no-data situations This ensures the fetch path doesn't silently fail just because a batch took slightly longer than expected to fetch from disk. fix: Use fresh context for fallback fetch to avoid cascading timeouts PROBLEM IDENTIFIED: After previous fix, missing messages reduced 32%→16% BUT duplicates increased 18.5%→56.6%. Root cause: When multi-batch fetch times out, the fallback single-batch ALSO uses the expired context. Result: 1. Multi-batch fetch times out (context expired) 2. Fallback single-batch uses SAME expired context → also times out 3. Both return empty bytes 4. Consumer gets empty response, offset resets to memory cache 5. Consumer re-fetches from earlier offset 6. DUPLICATES result from re-fetching old messages FIX: Use ORIGINAL context for fallback fetch, not the timed-out fetchCtx. This gives the fallback a fresh chance to fetch data even if multi-batch timed out. IMPROVEMENTS: 1. Fallback now uses fresh context (not expired from multi-batch) 2. Add WARNING logs for ALL multi-batch failures (not just errors) 3. Distinguish between 'failed' (timed out) and 'no data available' 4. Log total duration for diagnostics Expected Result: - Duplicates should decrease significantly (56.6% → 5-10%) - Missing messages should stay low (~16%) or improve further - Warnings in logs will show which fetches are timing out fmt --- .../kafka/protocol/fetch_partition_reader.go | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index eee75eb53..fce7c7efa 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -179,12 +179,19 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma fetchCtx := ctx if maxWaitMs > 0 { var cancel context.CancelFunc - fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(maxWaitMs)*time.Millisecond) + // Use 1.5x the client timeout to account for internal processing overhead + // This prevents legitimate slow reads from being killed by client timeout + internalTimeoutMs := int32(float64(maxWaitMs) * 1.5) + if internalTimeoutMs > 5000 { + internalTimeoutMs = 5000 // Cap at 5 seconds + } + fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(internalTimeoutMs)*time.Millisecond) defer cancel() } // Use multi-batch fetcher for better MaxBytes compliance multiFetcher := NewMultiBatchFetcher(pr.handler) + startTime := time.Now() fetchResult, err := multiFetcher.FetchMultipleBatches( fetchCtx, pr.topicName, @@ -193,26 +200,41 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma highWaterMark, maxBytes, ) + fetchDuration := time.Since(startTime) if err == nil && fetchResult.TotalSize > 0 { - glog.V(4).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d", + glog.V(4).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d (duration: %v)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset) + fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset, fetchDuration) return fetchResult.RecordBatches, fetchResult.NextOffset } - // Fallback to single batch (pass context to respect timeout) - smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(fetchCtx, pr.topicName, pr.partitionID, fromOffset, 10) - if err == nil && len(smqRecords) > 0 { + // Multi-batch failed - try single batch WITHOUT the timeout constraint + // to ensure we get at least some data even if multi-batch timed out + glog.Warningf("[%s] Multi-batch fetch failed for %s[%d] offset=%d after %v, falling back to single-batch (err: %v)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration, err) + + // Use original context for fallback, NOT the timed-out fetchCtx + // This ensures the fallback has a fresh chance to fetch data + smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(ctx, pr.topicName, pr.partitionID, fromOffset, 10) + if err != nil { + glog.Warningf("[%s] Single-batch fetch also failed for %s[%d] offset=%d: %v (total duration: %v)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err, time.Since(startTime)) + return []byte{}, fromOffset + } + + if len(smqRecords) > 0 { recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords) nextOffset := fromOffset + int64(len(smqRecords)) - glog.V(4).Infof("[%s] Single-batch fetch for %s[%d]: %d records, %d bytes, offset %d -> %d", + glog.V(4).Infof("[%s] Single-batch fallback for %s[%d]: %d records, %d bytes, offset %d -> %d (total duration: %v)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - len(smqRecords), len(recordBatch), fromOffset, nextOffset) + len(smqRecords), len(recordBatch), fromOffset, nextOffset, time.Since(startTime)) return recordBatch, nextOffset } // No records available + glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d (total duration: %v)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(startTime)) return []byte{}, fromOffset }