diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index f14fcf8a3..5d1b66ff5 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -244,11 +244,13 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers // Phase 2: Wait for all results with adequate timeout for CI environments // CRITICAL: We MUST return a result for every requested partition or Sarama will error results := make([]*partitionFetchResult, len(pending)) - // Use the client's requested MaxWaitTime plus a small buffer - // The 100ms buffer provides backpressure for downstream consumers (like Schema Registry's - // internal consumer) that may need time to catch up after rapid writes - deadline := time.After(time.Duration(maxWaitMs)*time.Millisecond - 50*time.Millisecond) - if maxWaitMs < 50 { + // Use 90% of client's MaxWaitTime to ensure we return BEFORE client timeout + // This gives us time to package and transmit the response before the client gives up + // For 500ms client timeout, we use 450ms internally, leaving 50ms buffer for: + // - Response serialization, network transmission, client processing + effectiveDeadlineMs := time.Duration(maxWaitMs) * 9 / 10 + deadline := time.After(effectiveDeadlineMs * time.Millisecond) + if maxWaitMs < 10 { deadline = time.After(time.Duration(maxWaitMs) * time.Millisecond) }