Browse Source

fix: Ensure offset fetch checks SMQ storage as fallback

This minimal fix addresses offset persistence issues during consumer
group operations without introducing timeouts or delays.

KEY CHANGES:
1. OffsetFetch now checks SMQ storage as fallback when offset not found in memory
2. Immediately cache offsets in in-memory map after SMQ fetch
3. Prevents future SMQ lookups for same offset
4. No retry logic or delays that could cause timeouts

ROOT CAUSE:
When offsets are persisted to SMQ but not yet in memory cache,
consumers would get -1 (not found) and default to offset 0 or
auto.offset.reset, causing message loss.

FIX:
Simple fallback to SMQ + immediate cache ensures offset is always
available for subsequent queries without delays.
pull/7329/head
chrislu 3 days ago
parent
commit
5c0f215eb5
  1. 2
      weed/mq/kafka/protocol/offset_management.go

2
weed/mq/kafka/protocol/offset_management.go

@ -268,6 +268,8 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 {
fetchedOffset = off
metadata = meta
// Load into in-memory cache for future queries
_ = h.commitOffset(group, topic.Name, partition, off, meta)
glog.V(4).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, off)
} else {

Loading…
Cancel
Save