|
|
@ -244,13 +244,15 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers |
|
|
|
// Phase 2: Wait for all results with adequate timeout for CI environments
|
|
|
|
// CRITICAL: We MUST return a result for every requested partition or Sarama will error
|
|
|
|
results := make([]*partitionFetchResult, len(pending)) |
|
|
|
// Use 90% of client's MaxWaitTime to ensure we return BEFORE client timeout
|
|
|
|
// This gives us time to package and transmit the response before the client gives up
|
|
|
|
// For 500ms client timeout, we use 450ms internally, leaving 50ms buffer for:
|
|
|
|
// Use 95% of client's MaxWaitTime to ensure we return BEFORE client timeout
|
|
|
|
// This maximizes data collection time while leaving a safety buffer for:
|
|
|
|
// - Response serialization, network transmission, client processing
|
|
|
|
effectiveDeadlineMs := time.Duration(maxWaitMs) * 9 / 10 |
|
|
|
// For 500ms client timeout: 475ms internal fetch, 25ms buffer
|
|
|
|
// For 100ms client timeout: 95ms internal fetch, 5ms buffer
|
|
|
|
effectiveDeadlineMs := time.Duration(maxWaitMs) * 95 / 100 |
|
|
|
deadline := time.After(effectiveDeadlineMs * time.Millisecond) |
|
|
|
if maxWaitMs < 10 { |
|
|
|
if maxWaitMs < 20 { |
|
|
|
// For very short timeouts (< 20ms), use full timeout to maximize data collection
|
|
|
|
deadline = time.After(time.Duration(maxWaitMs) * time.Millisecond) |
|
|
|
} |
|
|
|
|
|
|
|