From bbc8668fd61fd734172c70bbcfc0d02f3b336038 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 16:39:55 -0700 Subject: [PATCH] Fix kafka-go client infinite polling loop --- weed/mq/kafka/protocol/fetch.go | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 0dc155caf..b815cc520 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -20,6 +20,34 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo return nil, fmt.Errorf("parse fetch request: %w", err) } + // Basic long-polling to avoid client busy-looping when there's no data. + var throttleTimeMs int32 = 0 + hasDataAvailable := func() bool { + for _, topic := range fetchRequest.Topics { + for _, p := range topic.Partitions { + ledger := h.GetLedger(topic.Name, p.PartitionID) + if ledger == nil { + continue + } + if ledger.GetHighWaterMark() > p.FetchOffset { + return true + } + } + } + return false + } + if fetchRequest.MinBytes > 0 && fetchRequest.MaxWaitTime > 0 && !hasDataAvailable() { + start := time.Now() + deadline := start.Add(time.Duration(fetchRequest.MaxWaitTime) * time.Millisecond) + for time.Now().Before(deadline) { + time.Sleep(10 * time.Millisecond) + if hasDataAvailable() { + break + } + } + throttleTimeMs = int32(time.Since(start) / time.Millisecond) + } + // Build the response response := make([]byte, 0, 1024) @@ -30,7 +58,9 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo // Fetch v1+ has throttle_time_ms at the beginning if apiVersion >= 1 { - response = append(response, 0, 0, 0, 0) // throttle_time_ms (4 bytes, 0 = no throttling) + throttleBytes := make([]byte, 4) + binary.BigEndian.PutUint32(throttleBytes, uint32(throttleTimeMs)) + response = append(response, throttleBytes...) } // Fetch v7+ has error_code and session_id