From a8bee174f446be50b62dbf06200528c54f13d5e5 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 17:14:51 -0700 Subject: [PATCH] Update fetch.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Skip long-polling if any requested topic does not exist. Only long-poll when MinBytes > 0, data isn’t available yet, and all topics exist. Cap the long-polling wait to 1s in tests to prevent hanging on shutdown. --- weed/mq/kafka/protocol/fetch.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index b815cc520..bf28342e6 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -22,6 +22,15 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo // Basic long-polling to avoid client busy-looping when there's no data. var throttleTimeMs int32 = 0 + // Only long-poll when all referenced topics exist; unknown topics should not block + allTopicsExist := func() bool { + for _, topic := range fetchRequest.Topics { + if !h.seaweedMQHandler.TopicExists(topic.Name) { + return false + } + } + return true + } hasDataAvailable := func() bool { for _, topic := range fetchRequest.Topics { for _, p := range topic.Partitions { @@ -36,9 +45,15 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo } return false } - if fetchRequest.MinBytes > 0 && fetchRequest.MaxWaitTime > 0 && !hasDataAvailable() { + // Cap long-polling to avoid blocking connection shutdowns in tests + maxWaitMs := fetchRequest.MaxWaitTime + if maxWaitMs > 1000 { + maxWaitMs = 1000 + } + shouldLongPoll := fetchRequest.MinBytes > 0 && maxWaitMs > 0 && !hasDataAvailable() && allTopicsExist() + if shouldLongPoll { start := time.Now() - deadline := start.Add(time.Duration(fetchRequest.MaxWaitTime) * time.Millisecond) + deadline := start.Add(time.Duration(maxWaitMs) * time.Millisecond) for time.Now().Before(deadline) { time.Sleep(10 * time.Millisecond) if hasDataAvailable() {