From 5c0f215eb58a1357b82fa6358aaf08478ef8bed7 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 20:15:07 -0700 Subject: [PATCH] fix: Ensure offset fetch checks SMQ storage as fallback This minimal fix addresses offset persistence issues during consumer group operations without introducing timeouts or delays. KEY CHANGES: 1. OffsetFetch now checks SMQ storage as fallback when offset not found in memory 2. Immediately cache offsets in in-memory map after SMQ fetch 3. Prevents future SMQ lookups for same offset 4. No retry logic or delays that could cause timeouts ROOT CAUSE: When offsets are persisted to SMQ but not yet in memory cache, consumers would get -1 (not found) and default to offset 0 or auto.offset.reset, causing message loss. FIX: Simple fallback to SMQ + immediate cache ensures offset is always available for subsequent queries without delays. --- weed/mq/kafka/protocol/offset_management.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 58ba7a476..def902dc4 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -268,6 +268,8 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { fetchedOffset = off metadata = meta + // Load into in-memory cache for future queries + _ = h.commitOffset(group, topic.Name, partition, off, meta) glog.V(4).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, off) } else {