Browse Source

use client timeout wait

pull/7329/head
chrislu 1 week ago
parent
commit
7c0c212d33
  1. 4
      weed/mq/kafka/integration/broker_client_subscribe.go
  2. 5
      weed/mq/kafka/protocol/fetch.go

4
weed/mq/kafka/integration/broker_client_subscribe.go

@ -555,7 +555,9 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
}
readStart := time.Now()
ctx2, cancel2 := context.WithTimeout(context.Background(), currentTimeout)
// CRITICAL: Use parent context (ctx) to respect client's MaxWaitTime deadline
// The per-record timeout is combined with the overall fetch deadline
ctx2, cancel2 := context.WithTimeout(ctx, currentTimeout)
recvChan2 := make(chan recvResult, 1)
go func() {

5
weed/mq/kafka/protocol/fetch.go

@ -244,9 +244,8 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
// Phase 2: Wait for all results with adequate timeout for CI environments
// CRITICAL: We MUST return a result for every requested partition or Sarama will error
results := make([]*partitionFetchResult, len(pending))
// Deadline must be longer than subscriber's record batching timeout (1s) to avoid premature timeout
// Add 500ms buffer to account for network and processing overhead
deadline := time.After(1500 * time.Millisecond)
// Use the client's requested MaxWaitTime
deadline := time.After(time.Duration(maxWaitMs) * time.Millisecond)
// Collect results one by one with shared deadline
for i, pf := range pending {

Loading…
Cancel
Save