diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index b815cc520..bf28342e6 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -22,6 +22,15 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo // Basic long-polling to avoid client busy-looping when there's no data. var throttleTimeMs int32 = 0 + // Only long-poll when all referenced topics exist; unknown topics should not block + allTopicsExist := func() bool { + for _, topic := range fetchRequest.Topics { + if !h.seaweedMQHandler.TopicExists(topic.Name) { + return false + } + } + return true + } hasDataAvailable := func() bool { for _, topic := range fetchRequest.Topics { for _, p := range topic.Partitions { @@ -36,9 +45,15 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo } return false } - if fetchRequest.MinBytes > 0 && fetchRequest.MaxWaitTime > 0 && !hasDataAvailable() { + // Cap long-polling to avoid blocking connection shutdowns in tests + maxWaitMs := fetchRequest.MaxWaitTime + if maxWaitMs > 1000 { + maxWaitMs = 1000 + } + shouldLongPoll := fetchRequest.MinBytes > 0 && maxWaitMs > 0 && !hasDataAvailable() && allTopicsExist() + if shouldLongPoll { start := time.Now() - deadline := start.Add(time.Duration(fetchRequest.MaxWaitTime) * time.Millisecond) + deadline := start.Add(time.Duration(maxWaitMs) * time.Millisecond) for time.Now().Before(deadline) { time.Sleep(10 * time.Millisecond) if hasDataAvailable() {