diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index fd1ccab91..fabbb8cd4 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -149,7 +149,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re for _, p := range t.Partitions { - // Create consumer offset key for SMQ storage + // Create consumer offset key for SMQ storage (not used immediately) key := ConsumerOffsetKey{ Topic: t.Name, Partition: p.Index, @@ -157,29 +157,26 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re ConsumerGroupInstance: req.GroupInstanceID, } - // Commit offset to in-memory first for immediate client response - // Then commit to persistent storage asynchronously to avoid blocking + // Commit offset synchronously for immediate consistency var errCode int16 = ErrorCodeNone if generationMatches { - // Store in in-memory map first (fast path - enables immediate client response) + // Store in in-memory map for immediate response + // This is the primary committed offset position for consumers if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { errCode = ErrorCodeOffsetMetadataTooLarge - glog.V(2).Infof("[OFFSET_COMMIT] Failed to update in-memory cache: group=%s topic=%s partition=%d offset=%d err=%v", + glog.V(2).Infof("[OFFSET_COMMIT] Failed to commit offset: group=%s topic=%s partition=%d offset=%d err=%v", req.GroupID, t.Name, p.Index, p.Offset, err) } else { - // Also store in SMQ persistent storage asynchronously (non-blocking) - // This ensures recovery after restarts while not blocking the consumer - go func(k ConsumerOffsetKey, off int64, meta string) { - if err := h.commitOffsetToSMQ(k, off, meta); err != nil { - glog.V(3).Infof("[OFFSET_COMMIT] Background persist failed: group=%s topic=%s partition=%d offset=%d err=%v", - k.ConsumerGroup, k.Topic, k.Partition, off, err) - } else { - glog.V(4).Infof("[OFFSET_COMMIT] Background persist succeeded: group=%s topic=%s partition=%d offset=%d", - k.ConsumerGroup, k.Topic, k.Partition, off) - } - }(key, p.Offset, p.Metadata) - - glog.V(3).Infof("[OFFSET_COMMIT] Committed in-memory: group=%s topic=%s partition=%d offset=%d gen=%d (persistent async)", + // Also persist to SMQ storage for durability across broker restarts + // This is done synchronously to ensure offset is not lost + if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { + // Log the error but don't fail the commit + // In-memory commit is the source of truth for active consumers + // SMQ persistence is best-effort for crash recovery + glog.V(3).Infof("[OFFSET_COMMIT] SMQ persist failed (non-fatal): group=%s topic=%s partition=%d offset=%d err=%v", + req.GroupID, t.Name, p.Index, p.Offset, err) + } + glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", req.GroupID, t.Name, p.Index, p.Offset, group.Generation) } } else {