From 5d5c820c8803f1d79fb481a847793e947282f33b Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 21:07:24 -0700 Subject: [PATCH] timeouts --- weed/mq/kafka/protocol/fetch.go | 8 +++++++- weed/mq/kafka/protocol/handler.go | 5 ++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index eeda4a09b..fea59d74b 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -54,7 +54,13 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers shouldLongPoll := fetchRequest.MinBytes > 0 && maxWaitMs > 0 && !hasDataAvailable() && allTopicsExist() if shouldLongPoll { start := time.Now() - deadline := start.Add(time.Duration(maxWaitMs) * time.Millisecond) + // Limit polling time to maximum 2 seconds to prevent hanging in CI + maxPollTime := time.Duration(maxWaitMs) * time.Millisecond + if maxPollTime > 2*time.Second { + maxPollTime = 2 * time.Second + fmt.Printf("DEBUG: Limiting fetch polling to 2 seconds to prevent hanging\n") + } + deadline := start.Add(maxPollTime) for time.Now().Before(deadline) { // Use context-aware sleep instead of blocking time.Sleep select { diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index f803a9cf8..529233e7b 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -300,11 +300,14 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { readChan <- readResult{n: n, err: err} }() - // Wait for either the read to complete or context cancellation + // Wait for either the read to complete or context cancellation with a backup timeout select { case <-ctx.Done(): fmt.Printf("DEBUG: [%s] Context cancelled during read, closing connection\n", connectionID) return ctx.Err() + case <-time.After(2 * time.Second): + fmt.Printf("DEBUG: [%s] Read operation timed out after 2 seconds, closing connection\n", connectionID) + return fmt.Errorf("read timeout") case result := <-readChan: if result.err != nil { if result.err == io.EOF {