diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index eee75eb53..fce7c7efa 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -179,12 +179,19 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma fetchCtx := ctx if maxWaitMs > 0 { var cancel context.CancelFunc - fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(maxWaitMs)*time.Millisecond) + // Use 1.5x the client timeout to account for internal processing overhead + // This prevents legitimate slow reads from being killed by client timeout + internalTimeoutMs := int32(float64(maxWaitMs) * 1.5) + if internalTimeoutMs > 5000 { + internalTimeoutMs = 5000 // Cap at 5 seconds + } + fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(internalTimeoutMs)*time.Millisecond) defer cancel() } // Use multi-batch fetcher for better MaxBytes compliance multiFetcher := NewMultiBatchFetcher(pr.handler) + startTime := time.Now() fetchResult, err := multiFetcher.FetchMultipleBatches( fetchCtx, pr.topicName, @@ -193,26 +200,41 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma highWaterMark, maxBytes, ) + fetchDuration := time.Since(startTime) if err == nil && fetchResult.TotalSize > 0 { - glog.V(4).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d", + glog.V(4).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d (duration: %v)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset) + fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset, fetchDuration) return fetchResult.RecordBatches, fetchResult.NextOffset } - // Fallback to single batch (pass context to respect timeout) - smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(fetchCtx, pr.topicName, pr.partitionID, fromOffset, 10) - if err == nil && len(smqRecords) > 0 { + // Multi-batch failed - try single batch WITHOUT the timeout constraint + // to ensure we get at least some data even if multi-batch timed out + glog.Warningf("[%s] Multi-batch fetch failed for %s[%d] offset=%d after %v, falling back to single-batch (err: %v)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration, err) + + // Use original context for fallback, NOT the timed-out fetchCtx + // This ensures the fallback has a fresh chance to fetch data + smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(ctx, pr.topicName, pr.partitionID, fromOffset, 10) + if err != nil { + glog.Warningf("[%s] Single-batch fetch also failed for %s[%d] offset=%d: %v (total duration: %v)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err, time.Since(startTime)) + return []byte{}, fromOffset + } + + if len(smqRecords) > 0 { recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords) nextOffset := fromOffset + int64(len(smqRecords)) - glog.V(4).Infof("[%s] Single-batch fetch for %s[%d]: %d records, %d bytes, offset %d -> %d", + glog.V(4).Infof("[%s] Single-batch fallback for %s[%d]: %d records, %d bytes, offset %d -> %d (total duration: %v)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - len(smqRecords), len(recordBatch), fromOffset, nextOffset) + len(smqRecords), len(recordBatch), fromOffset, nextOffset, time.Since(startTime)) return recordBatch, nextOffset } // No records available + glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d (total duration: %v)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(startTime)) return []byte{}, fromOffset }