diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 721b71b22..31f6efaa6 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -555,7 +555,9 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib } readStart := time.Now() - ctx2, cancel2 := context.WithTimeout(context.Background(), currentTimeout) + // CRITICAL: Use parent context (ctx) to respect client's MaxWaitTime deadline + // The per-record timeout is combined with the overall fetch deadline + ctx2, cancel2 := context.WithTimeout(ctx, currentTimeout) recvChan2 := make(chan recvResult, 1) go func() { diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 06dba826f..611f2fbc6 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -244,9 +244,8 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers // Phase 2: Wait for all results with adequate timeout for CI environments // CRITICAL: We MUST return a result for every requested partition or Sarama will error results := make([]*partitionFetchResult, len(pending)) - // Deadline must be longer than subscriber's record batching timeout (1s) to avoid premature timeout - // Add 500ms buffer to account for network and processing overhead - deadline := time.After(1500 * time.Millisecond) + // Use the client's requested MaxWaitTime + deadline := time.After(time.Duration(maxWaitMs) * time.Millisecond) // Collect results one by one with shared deadline for i, pf := range pending {