diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index ca758c30e..0f0e85320 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -97,11 +97,18 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers // Continue with polling } if hasDataAvailable() { + // Data became available during polling - return immediately with NO throttle + // Throttle time should only be used for quota enforcement, not for long-poll timing + throttleTimeMs = 0 break pollLoop } } - elapsed := time.Since(start) - throttleTimeMs = int32(elapsed / time.Millisecond) + // If we got here without breaking early, we hit the timeout + // Only set throttle time if we're returning without data (true long-poll timeout) + if throttleTimeMs == 0 && !hasDataAvailable() { + elapsed := time.Since(start) + throttleTimeMs = int32(elapsed / time.Millisecond) + } } // Build the response