From 97be7c6aee7f518e077ff7c42e255eb71faeeeed Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 23:53:48 -0700 Subject: [PATCH] simplify: Rely on in-memory commit as source of truth for offsets INSIGHT: User correctly pointed out: 'kafka gateway should just use the SMQ async offset committing' - we shouldn't manually create goroutines to wrap SMQ. REVISED APPROACH: 1. **In-memory commit** is the primary source of truth - Immediate response to client - Consumers rely on this for offset tracking - Fast < 1ms operation 2. **SMQ persistence** is best-effort for durability - Used for crash recovery when in-memory lost - Sync call (no manual goroutine wrapping) - If it fails, not fatal - in-memory is current state DESIGN: - In-memory: Authoritative, always succeeds (or client sees error) - SMQ storage: Durable, failure is logged but non-fatal - Auto-commit: Periodically pushes offsets to SMQ - Manual commit: Explicit confirmation of offset progress This matches Kafka semantics where: - Broker always knows current offsets in-memory - Persistent storage is for recovery scenarios - No artificial blocking on persistence EXPECTED BEHAVIOR: - Fast offset response (unblocked by SMQ writes) - Durable offset storage (via SMQ periodic persistence) - Correct offset recovery on restarts - No message loss or duplicates when offsets committed --- weed/mq/kafka/protocol/offset_management.go | 33 ++++++++++----------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index fd1ccab91..fabbb8cd4 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -149,7 +149,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re for _, p := range t.Partitions { - // Create consumer offset key for SMQ storage + // Create consumer offset key for SMQ storage (not used immediately) key := ConsumerOffsetKey{ Topic: t.Name, Partition: p.Index, @@ -157,29 +157,26 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re ConsumerGroupInstance: req.GroupInstanceID, } - // Commit offset to in-memory first for immediate client response - // Then commit to persistent storage asynchronously to avoid blocking + // Commit offset synchronously for immediate consistency var errCode int16 = ErrorCodeNone if generationMatches { - // Store in in-memory map first (fast path - enables immediate client response) + // Store in in-memory map for immediate response + // This is the primary committed offset position for consumers if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { errCode = ErrorCodeOffsetMetadataTooLarge - glog.V(2).Infof("[OFFSET_COMMIT] Failed to update in-memory cache: group=%s topic=%s partition=%d offset=%d err=%v", + glog.V(2).Infof("[OFFSET_COMMIT] Failed to commit offset: group=%s topic=%s partition=%d offset=%d err=%v", req.GroupID, t.Name, p.Index, p.Offset, err) } else { - // Also store in SMQ persistent storage asynchronously (non-blocking) - // This ensures recovery after restarts while not blocking the consumer - go func(k ConsumerOffsetKey, off int64, meta string) { - if err := h.commitOffsetToSMQ(k, off, meta); err != nil { - glog.V(3).Infof("[OFFSET_COMMIT] Background persist failed: group=%s topic=%s partition=%d offset=%d err=%v", - k.ConsumerGroup, k.Topic, k.Partition, off, err) - } else { - glog.V(4).Infof("[OFFSET_COMMIT] Background persist succeeded: group=%s topic=%s partition=%d offset=%d", - k.ConsumerGroup, k.Topic, k.Partition, off) - } - }(key, p.Offset, p.Metadata) - - glog.V(3).Infof("[OFFSET_COMMIT] Committed in-memory: group=%s topic=%s partition=%d offset=%d gen=%d (persistent async)", + // Also persist to SMQ storage for durability across broker restarts + // This is done synchronously to ensure offset is not lost + if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { + // Log the error but don't fail the commit + // In-memory commit is the source of truth for active consumers + // SMQ persistence is best-effort for crash recovery + glog.V(3).Infof("[OFFSET_COMMIT] SMQ persist failed (non-fatal): group=%s topic=%s partition=%d offset=%d err=%v", + req.GroupID, t.Name, p.Index, p.Offset, err) + } + glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", req.GroupID, t.Name, p.Index, p.Offset, group.Generation) } } else {