diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 611f2fbc6..757056cdb 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -244,8 +244,10 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers // Phase 2: Wait for all results with adequate timeout for CI environments // CRITICAL: We MUST return a result for every requested partition or Sarama will error results := make([]*partitionFetchResult, len(pending)) - // Use the client's requested MaxWaitTime - deadline := time.After(time.Duration(maxWaitMs) * time.Millisecond) + // Use the client's requested MaxWaitTime plus a small buffer + // The 100ms buffer provides backpressure for downstream consumers (like Schema Registry's + // internal consumer) that may need time to catch up after rapid writes + deadline := time.After(time.Duration(maxWaitMs)*time.Millisecond + 100*time.Millisecond) // Collect results one by one with shared deadline for i, pf := range pending {