diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 757056cdb..f14fcf8a3 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -247,7 +247,10 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers // Use the client's requested MaxWaitTime plus a small buffer // The 100ms buffer provides backpressure for downstream consumers (like Schema Registry's // internal consumer) that may need time to catch up after rapid writes - deadline := time.After(time.Duration(maxWaitMs)*time.Millisecond + 100*time.Millisecond) + deadline := time.After(time.Duration(maxWaitMs)*time.Millisecond - 50*time.Millisecond) + if maxWaitMs < 50 { + deadline = time.After(time.Duration(maxWaitMs) * time.Millisecond) + } // Collect results one by one with shared deadline for i, pf := range pending {