diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index ced44ac36..eeda4a09b 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -56,16 +56,15 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers start := time.Now() deadline := start.Add(time.Duration(maxWaitMs) * time.Millisecond) for time.Now().Before(deadline) { - // Check for context cancellation first + // Use context-aware sleep instead of blocking time.Sleep select { case <-ctx.Done(): fmt.Printf("DEBUG: Fetch polling cancelled due to context cancellation\n") throttleTimeMs = int32(time.Since(start) / time.Millisecond) break - default: + case <-time.After(10 * time.Millisecond): + // Continue with polling } - - time.Sleep(10 * time.Millisecond) if hasDataAvailable() { break }