diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 145e23cd7..fd1ccab91 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -157,29 +157,30 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re ConsumerGroupInstance: req.GroupInstanceID, } - // Commit offset to both in-memory and SMQ storage + // Commit offset to in-memory first for immediate client response + // Then commit to persistent storage asynchronously to avoid blocking var errCode int16 = ErrorCodeNone if generationMatches { - // CRITICAL: Commit to persistent storage FIRST - // If persistent storage succeeds, update in-memory cache - // This ensures in-memory state never diverges from persistent state - // If SMQ commit fails, the offset is not lost and client knows it failed - if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { - // Persistent storage commit failed - report error to client - // Do NOT commit to in-memory to keep states in sync - errCode = ErrorCodeNotCoordinatorForGroup - glog.V(2).Infof("[OFFSET_COMMIT] Failed to commit to persistent storage: group=%s topic=%s partition=%d offset=%d err=%v", + // Store in in-memory map first (fast path - enables immediate client response) + if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { + errCode = ErrorCodeOffsetMetadataTooLarge + glog.V(2).Infof("[OFFSET_COMMIT] Failed to update in-memory cache: group=%s topic=%s partition=%d offset=%d err=%v", req.GroupID, t.Name, p.Index, p.Offset, err) } else { - // Persistent storage commit succeeded - now update in-memory cache - if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { - errCode = ErrorCodeOffsetMetadataTooLarge - glog.V(2).Infof("[OFFSET_COMMIT] Failed to update in-memory cache: group=%s topic=%s partition=%d offset=%d err=%v", - req.GroupID, t.Name, p.Index, p.Offset, err) - } else { - glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d (persistent+memory)", - req.GroupID, t.Name, p.Index, p.Offset, group.Generation) - } + // Also store in SMQ persistent storage asynchronously (non-blocking) + // This ensures recovery after restarts while not blocking the consumer + go func(k ConsumerOffsetKey, off int64, meta string) { + if err := h.commitOffsetToSMQ(k, off, meta); err != nil { + glog.V(3).Infof("[OFFSET_COMMIT] Background persist failed: group=%s topic=%s partition=%d offset=%d err=%v", + k.ConsumerGroup, k.Topic, k.Partition, off, err) + } else { + glog.V(4).Infof("[OFFSET_COMMIT] Background persist succeeded: group=%s topic=%s partition=%d offset=%d", + k.ConsumerGroup, k.Topic, k.Partition, off) + } + }(key, p.Offset, p.Metadata) + + glog.V(3).Infof("[OFFSET_COMMIT] Committed in-memory: group=%s topic=%s partition=%d offset=%d gen=%d (persistent async)", + req.GroupID, t.Name, p.Index, p.Offset, group.Generation) } } else { // Do not store commit if generation mismatch