From ed5d801ee6ea168bdbb57bf884c20212d035bd51 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 20:13:55 -0700 Subject: [PATCH] Revert "fix: Critical offset persistence race condition causing message loss" This reverts commit f18ff58476bc014c2925f276c8a0135124c8465a. --- weed/mq/kafka/protocol/offset_management.go | 100 +++++++------------- 1 file changed, 35 insertions(+), 65 deletions(-) diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 54222305a..58ba7a476 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -163,35 +163,20 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re // Store in in-memory map first (works for both mock and SMQ backends) if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { errCode = ErrorCodeOffsetMetadataTooLarge - glog.Warningf("[OFFSET_COMMIT] Failed to commit in-memory: group=%s topic=%s partition=%d offset=%d err=%v", - req.GroupID, t.Name, p.Index, p.Offset, err) } - // CRITICAL: Store in SMQ persistent storage - MUST NOT FAIL SILENTLY - // If SMQ storage fails, we must notify the client, as offset won't survive rebalancing + // Also store in SMQ persistent storage if available if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { - // Only suppress error for mock backends where SMQ handler is not available - // For real deployments, this is a critical error that must be reported - if errCode == ErrorCodeNone { - errCode = 45 // OutOfOrderSequenceException (closest to "storage unavailable") - glog.Errorf("[OFFSET_COMMIT] CRITICAL: Failed to persist offset to SMQ: group=%s topic=%s partition=%d offset=%d err=%v", - req.GroupID, t.Name, p.Index, p.Offset, err) - glog.Errorf("[OFFSET_COMMIT] WARNING: This offset will NOT survive rebalancing and may cause message loss!") - } - } else { - // Successfully persisted - glog.V(3).Infof("[OFFSET_COMMIT] Persisted to SMQ: group=%s topic=%s partition=%d offset=%d", - req.GroupID, t.Name, p.Index, p.Offset) + // SMQ storage may not be available (e.g., in mock mode) - that's okay + glog.V(4).Infof("[OFFSET_COMMIT] SMQ storage not available: %v", err) } - if errCode == ErrorCodeNone { - if groupIsEmpty { - glog.V(3).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d", - req.GroupID, t.Name, p.Index, p.Offset) - } else { - glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", - req.GroupID, t.Name, p.Index, p.Offset, group.Generation) - } + if groupIsEmpty { + glog.V(3).Infof("[OFFSET_COMMIT] Committed (empty group): group=%s topic=%s partition=%d offset=%d", + req.GroupID, t.Name, p.Index, p.Offset) + } else { + glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d", + req.GroupID, t.Name, p.Index, p.Offset, group.Generation) } } else { // Do not store commit if generation mismatch @@ -212,36 +197,37 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re return h.buildOffsetCommitResponse(resp, apiVersion), nil } -func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, - requestBody []byte) ([]byte, error) { - req, err := h.parseOffsetFetchRequest(requestBody) +func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + // Parse OffsetFetch request + request, err := h.parseOffsetFetchRequest(requestBody) if err != nil { return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - glog.V(2).Infof("[OFFSET_FETCH] group=%s topics=%d", req.GroupID, len(req.Topics)) - for _, t := range req.Topics { - for _, p := range t.Partitions { - glog.V(2).Infof("[OFFSET_FETCH] topic=%s partition=%d", t.Name, p) - } + // Validate request + if request.GroupID == "" { + return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } - // Get the consumer group - group := h.groupCoordinator.GetOrCreateGroup(req.GroupID) + // Get or create consumer group + // IMPORTANT: Use GetOrCreateGroup (not GetGroup) to allow fetching persisted offsets + // even if the group doesn't exist in memory yet. This is critical for consumer restarts. + // Kafka allows offset fetches for groups that haven't joined yet (e.g., simple consumers). + group := h.groupCoordinator.GetOrCreateGroup(request.GroupID) group.Mu.RLock() defer group.Mu.RUnlock() - glog.V(4).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", req.GroupID, len(req.Topics)) + glog.V(4).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics)) // Build response response := OffsetFetchResponse{ CorrelationID: correlationID, - Topics: make([]OffsetFetchTopicResponse, 0, len(req.Topics)), + Topics: make([]OffsetFetchTopicResponse, 0, len(request.Topics)), ErrorCode: ErrorCodeNone, } - for _, topic := range req.Topics { + for _, topic := range request.Topics { topicResponse := OffsetFetchTopicResponse{ Name: topic.Name, Partitions: make([]OffsetFetchPartitionResponse, 0), @@ -268,41 +254,25 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 { fetchedOffset = off metadata = meta - glog.V(2).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", - req.GroupID, topic.Name, partition, off) + glog.V(4).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", + request.GroupID, topic.Name, partition, off) } else { // Fallback: try fetching from SMQ persistent storage // This handles cases where offsets are stored in SMQ but not yet loaded into memory key := ConsumerOffsetKey{ Topic: topic.Name, Partition: partition, - ConsumerGroup: req.GroupID, - ConsumerGroupInstance: req.GroupInstanceID, - } - - // CRITICAL FIX: Retry offset fetch from SMQ with backoff - // During rebalancing, there can be a brief race where the offset was committed - // but not yet persisted to SMQ. A single retry with small delay fixes this. - var smqErr error - for attempt := 0; attempt < 3; attempt++ { - if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { - fetchedOffset = off - metadata = meta - glog.V(2).Infof("[OFFSET_FETCH] Found in storage (attempt %d): group=%s topic=%s partition=%d offset=%d", - attempt+1, req.GroupID, topic.Name, partition, off) - break - } else { - smqErr = err - if attempt < 2 { - // Brief backoff before retry: 5ms * attempt - time.Sleep(time.Duration(5*(attempt+1)) * time.Millisecond) - } - } + ConsumerGroup: request.GroupID, + ConsumerGroupInstance: request.GroupInstanceID, } - - if fetchedOffset < 0 && smqErr != nil { - glog.V(2).Infof("[OFFSET_FETCH] No offset found after retries: group=%s topic=%s partition=%d err=%v (will start from auto.offset.reset)", - req.GroupID, topic.Name, partition, smqErr) + if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { + fetchedOffset = off + metadata = meta + glog.V(4).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d", + request.GroupID, topic.Name, partition, off) + } else { + glog.V(4).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)", + request.GroupID, topic.Name, partition) } // No offset found in either location (-1 indicates no committed offset) }