From 50c16e08391a3db960e0f0c7f5fd7f85bd44e07b Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 17 Oct 2025 15:34:27 -0700 Subject: [PATCH] debug: Add verbose offset management logging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 12: ROOT CAUSE FOUND - Duplicates due to Topic Persistence Bug Duplicate Analysis: - 8104 duplicates (66.5%), ALL read exactly 2 times - Suggests single rebalance/restart event - Duplicates start at offset 0, go to ~800 (50% of data) Investigation Results: 1. Offset commits ARE working (logging shows commits every 20 msgs) 2. NO rebalance during normal operation (only 10 OFFSET_FETCH at start) 3. Consumer error logs show REPEATED failures: 'Request was for a topic or partition that does not exist' 4. Broker logs show: 'no entry is found in filer store' for topic-2 Root Cause: Auto-created topics are NOT being reliably persisted to filer! - Producer auto-creates topic-2 - Topic config NOT saved to filer - Consumer tries to fetch metadata → broker says 'doesn't exist' - Consumer group errors → Sarama triggers rebalance - During rebalance, OffsetFetch returns -1 (no offset found) - Consumer starts from offset 0 again → DUPLICATES! The Flow: 1. Consumers start, read 0-800, commit offsets 2. Consumer tries to fetch metadata for topic-2 3. Broker can't find topic config in filer 4. Consumer group crashes/rebalances 5. OffsetFetch during rebalance returns -1 6. Consumers restart from offset 0 → re-read 0-800 7. Then continue from 800-1600 → 66% duplicates Next Fix: Ensure topic auto-creation RELIABLY persists config to filer before returning success to producers. --- weed/mq/kafka/protocol/offset_management.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index fabbb8cd4..af1562c92 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -176,8 +176,8 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re glog.V(3).Infof("[OFFSET_COMMIT] SMQ persist failed (non-fatal): group=%s topic=%s partition=%d offset=%d err=%v", req.GroupID, t.Name, p.Index, p.Offset, err) } - glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", - req.GroupID, t.Name, p.Index, p.Offset, group.Generation) + glog.V(0).Infof("[OFFSET_COMMIT] ✅ Committed: group=%s topic=%s partition=%d offset=%d gen=%d", + req.GroupID, t.Name, p.Index, p.Offset, group.Generation) } } else { // Do not store commit if generation mismatch @@ -255,7 +255,7 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 { fetchedOffset = off metadata = meta - glog.V(4).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", + glog.V(0).Infof("[OFFSET_FETCH] ✅ Found in memory: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, off) } else { // Fallback: try fetching from SMQ persistent storage @@ -269,10 +269,10 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { fetchedOffset = off metadata = meta - glog.V(4).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d", + glog.V(0).Infof("[OFFSET_FETCH] ✅ Found in storage: group=%s topic=%s partition=%d offset=%d", request.GroupID, topic.Name, partition, off) } else { - glog.V(4).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)", + glog.V(0).Infof("[OFFSET_FETCH] ⚠️ No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset=earliest, offset=0)", request.GroupID, topic.Name, partition) } // No offset found in either location (-1 indicates no committed offset)