Browse Source

Update fetch.go

pull/7329/head
chrislu 7 days ago
parent
commit
5c6b0eaa0d
  1. 12
      weed/mq/kafka/protocol/fetch.go

12
weed/mq/kafka/protocol/fetch.go

@ -244,11 +244,13 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
// Phase 2: Wait for all results with adequate timeout for CI environments
// CRITICAL: We MUST return a result for every requested partition or Sarama will error
results := make([]*partitionFetchResult, len(pending))
// Use the client's requested MaxWaitTime plus a small buffer
// The 100ms buffer provides backpressure for downstream consumers (like Schema Registry's
// internal consumer) that may need time to catch up after rapid writes
deadline := time.After(time.Duration(maxWaitMs)*time.Millisecond - 50*time.Millisecond)
if maxWaitMs < 50 {
// Use 90% of client's MaxWaitTime to ensure we return BEFORE client timeout
// This gives us time to package and transmit the response before the client gives up
// For 500ms client timeout, we use 450ms internally, leaving 50ms buffer for:
// - Response serialization, network transmission, client processing
effectiveDeadlineMs := time.Duration(maxWaitMs) * 9 / 10
deadline := time.After(effectiveDeadlineMs * time.Millisecond)
if maxWaitMs < 10 {
deadline = time.After(time.Duration(maxWaitMs) * time.Millisecond)
}

Loading…
Cancel
Save