diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 58ba7a476..145e23cd7 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -160,23 +160,26 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re // Commit offset to both in-memory and SMQ storage var errCode int16 = ErrorCodeNone if generationMatches { - // Store in in-memory map first (works for both mock and SMQ backends) - if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { - errCode = ErrorCodeOffsetMetadataTooLarge - } - - // Also store in SMQ persistent storage if available + // CRITICAL: Commit to persistent storage FIRST + // If persistent storage succeeds, update in-memory cache + // This ensures in-memory state never diverges from persistent state + // If SMQ commit fails, the offset is not lost and client knows it failed if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { - // SMQ storage may not be available (e.g., in mock mode) - that's okay - glog.V(4).Infof("[OFFSET_COMMIT] SMQ storage not available: %v", err) - } - - if groupIsEmpty { - glog.V(3).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d", - req.GroupID, t.Name, p.Index, p.Offset) + // Persistent storage commit failed - report error to client + // Do NOT commit to in-memory to keep states in sync + errCode = ErrorCodeNotCoordinatorForGroup + glog.V(2).Infof("[OFFSET_COMMIT] Failed to commit to persistent storage: group=%s topic=%s partition=%d offset=%d err=%v", + req.GroupID, t.Name, p.Index, p.Offset, err) } else { - glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", - req.GroupID, t.Name, p.Index, p.Offset, group.Generation) + // Persistent storage commit succeeded - now update in-memory cache + if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { + errCode = ErrorCodeOffsetMetadataTooLarge + glog.V(2).Infof("[OFFSET_COMMIT] Failed to update in-memory cache: group=%s topic=%s partition=%d offset=%d err=%v", + req.GroupID, t.Name, p.Index, p.Offset, err) + } else { + glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d (persistent+memory)", + req.GroupID, t.Name, p.Index, p.Offset, group.Generation) + } } } else { // Do not store commit if generation mismatch