From 6ef2f66198b7cc52aae1553a4124aba548953830 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 14 Oct 2025 00:26:55 -0700 Subject: [PATCH] only recreate if we need to seek backward (requested offset < current offset), not on any mismatch --- test/kafka/e2e/offset_management_test.go | 6 +- test/kafka/internal/testutil/clients.go | 7 + .../integration/broker_client_subscribe.go | 184 ++++++++++-------- weed/mq/kafka/protocol/offset_management.go | 57 ++++-- 4 files changed, 151 insertions(+), 103 deletions(-) diff --git a/test/kafka/e2e/offset_management_test.go b/test/kafka/e2e/offset_management_test.go index d4e1fbb97..11bbdc5ea 100644 --- a/test/kafka/e2e/offset_management_test.go +++ b/test/kafka/e2e/offset_management_test.go @@ -108,7 +108,7 @@ func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) { // Verify total consumption totalConsumed := len(consumed1) + len(consumed2) t.Logf("=== Verification: Total consumed %d messages (expected %d) ===", totalConsumed, len(messages)) - + // Check for duplicates offsetsSeen := make(map[int64]bool) duplicateCount := 0 @@ -119,11 +119,11 @@ func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) { } offsetsSeen[msg.Offset] = true } - + if duplicateCount > 0 { t.Logf("ERROR: Found %d duplicate messages", duplicateCount) } - + testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart") t.Logf("SUCCESS: Consumer group resumption test completed - no duplicates, all messages consumed exactly once") diff --git a/test/kafka/internal/testutil/clients.go b/test/kafka/internal/testutil/clients.go index 53cae52e0..65f581de3 100644 --- a/test/kafka/internal/testutil/clients.go +++ b/test/kafka/internal/testutil/clients.go @@ -140,6 +140,10 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun }) defer reader.Close() + // Log the initial offset position + offset := reader.Offset() + k.t.Logf("Consumer group reader created for group %s, initial offset: %d", groupID, offset) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -151,14 +155,17 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun return messages, fmt.Errorf("read message %d: %w", i, err) } messages = append(messages, msg) + k.t.Logf(" Fetched message %d: offset=%d, partition=%d", i, msg.Offset, msg.Partition) // Commit with simple retry to handle transient connection churn var commitErr error for attempt := 0; attempt < 3; attempt++ { commitErr = reader.CommitMessages(ctx, msg) if commitErr == nil { + k.t.Logf(" ✓ Committed offset %d (attempt %d)", msg.Offset, attempt+1) break } + k.t.Logf(" × Commit attempt %d failed for offset %d: %v", attempt+1, msg.Offset, commitErr) // brief backoff time.Sleep(time.Duration(50*(1< 0 { - cacheStartOffset := session.consumedRecords[0].Offset - cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset - if startOffset >= cacheStartOffset && startOffset <= cacheEndOffset { - canUseCache = true - glog.V(2).Infof("[FETCH] Session offset mismatch for %s (session=%d, requested=%d), but offset is in cache [%d-%d]", - key, session.StartOffset, startOffset, cacheStartOffset, cacheEndOffset) - } - } + // CRITICAL: Only recreate if we need to seek BACKWARD + // Forward reads (startOffset >= session.StartOffset) can use the existing session + session.mu.Lock() + currentOffset := session.StartOffset + session.mu.Unlock() - session.mu.Unlock() + if startOffset >= currentOffset { + // Can read forward from existing session, or already at requested offset + bc.subscribersLock.RUnlock() + glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (can read forward)", + key, currentOffset, startOffset) + return session, nil + } - if canUseCache { - // Offset is in cache, reuse session - bc.subscribersLock.RUnlock() - return session, nil + // startOffset < currentOffset: need to seek backward + // Check cache first before recreating + session.mu.Lock() + canUseCache := false + if len(session.consumedRecords) > 0 { + cacheStartOffset := session.consumedRecords[0].Offset + cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset + if startOffset >= cacheStartOffset && startOffset <= cacheEndOffset { + canUseCache = true + glog.V(2).Infof("[FETCH] Session for %s at offset %d, requested %d (backward seek), but offset is in cache [%d-%d]", + key, currentOffset, startOffset, cacheStartOffset, cacheEndOffset) } + } + session.mu.Unlock() - // Not in cache - need to recreate session at the requested offset - glog.V(0).Infof("[FETCH] Recreating session for %s: session at %d, requested %d (not in cache)", - key, session.StartOffset, startOffset) + if canUseCache { + // Offset is in cache, reuse session bc.subscribersLock.RUnlock() + return session, nil + } - // Close and delete the old session - bc.subscribersLock.Lock() - // CRITICAL: Double-check if another thread already recreated the session at the desired offset - // This prevents multiple concurrent threads from all trying to recreate the same session - if existingSession, exists := bc.subscribers[key]; exists { - existingSession.mu.Lock() - existingOffset := existingSession.StartOffset - existingSession.mu.Unlock() - - // Check if the session was already recreated at (or before) the requested offset - if existingOffset <= startOffset { - bc.subscribersLock.Unlock() - glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, startOffset) - // Re-acquire the existing session and continue - return existingSession, nil - } + // Not in cache - need to recreate session at the requested offset + glog.V(0).Infof("[FETCH] Recreating session for %s: session at %d, requested %d (backward seek, not in cache)", + key, currentOffset, startOffset) + bc.subscribersLock.RUnlock() - // Session still needs recreation - close it - if existingSession.Stream != nil { - _ = existingSession.Stream.CloseSend() - } - if existingSession.Cancel != nil { - existingSession.Cancel() - } - delete(bc.subscribers, key) + // Close and delete the old session + bc.subscribersLock.Lock() + // CRITICAL: Double-check if another thread already recreated the session at the desired offset + // This prevents multiple concurrent threads from all trying to recreate the same session + if existingSession, exists := bc.subscribers[key]; exists { + existingSession.mu.Lock() + existingOffset := existingSession.StartOffset + existingSession.mu.Unlock() + + // Check if the session was already recreated at (or before) the requested offset + if existingOffset <= startOffset { + bc.subscribersLock.Unlock() + glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, startOffset) + // Re-acquire the existing session and continue + return existingSession, nil } - // CRITICAL FIX: Don't unlock here! Keep the write lock to prevent race condition - // where another thread creates a session at the wrong offset between our delete and create - // Fall through to session creation below while holding the lock - // bc.subscribersLock.Unlock() - REMOVED to fix race condition - // Write lock is already held - skip to session creation - goto createSession - } else { - // Exact match - reuse - bc.subscribersLock.RUnlock() - return session, nil + // Session still needs recreation - close it + if existingSession.Stream != nil { + _ = existingSession.Stream.CloseSend() + } + if existingSession.Cancel != nil { + existingSession.Cancel() + } + delete(bc.subscribers, key) } + // CRITICAL FIX: Don't unlock here! Keep the write lock to prevent race condition + // where another thread creates a session at the wrong offset between our delete and create + // Fall through to session creation below while holding the lock + // bc.subscribersLock.Unlock() - REMOVED to fix race condition + + // Write lock is already held - skip to session creation + goto createSession } else { bc.subscribersLock.RUnlock() } @@ -311,13 +316,13 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok if endIdx > len(session.consumedRecords) { endIdx = len(session.consumedRecords) } - glog.V(1).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)", + glog.V(1).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)", endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) session.mu.Unlock() return session.consumedRecords[startIdx:endIdx], nil } } else { - glog.V(1).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]", + glog.V(1).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]", session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) } } @@ -348,25 +353,28 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok key := session.Key() session.mu.Unlock() - // CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset - // This prevents multiple threads from all deciding to recreate based on stale data - glog.V(1).Infof("[FETCH] Acquiring global lock to recreate session %s: requested=%d", key, requestedOffset) - bc.subscribersLock.Lock() - - // Double-check if another thread already recreated the session at the desired offset - // This prevents multiple concurrent threads from all trying to recreate the same session - if existingSession, exists := bc.subscribers[key]; exists { - existingSession.mu.Lock() - existingOffset := existingSession.StartOffset - existingSession.mu.Unlock() - - // Check if the session was already recreated at (or before) the requested offset - if existingOffset <= requestedOffset { - bc.subscribersLock.Unlock() - glog.V(0).Infof("[FETCH] Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset) - // Re-acquire the existing session and continue - return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords) - } + // CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset + // This prevents multiple threads from all deciding to recreate based on stale data + glog.V(0).Infof("[FETCH] 🔒 Thread acquiring global lock to recreate session %s: requested=%d", key, requestedOffset) + bc.subscribersLock.Lock() + glog.V(0).Infof("[FETCH] 🔓 Thread acquired global lock for session %s: requested=%d", key, requestedOffset) + + // Double-check if another thread already recreated the session at the desired offset + // This prevents multiple concurrent threads from all trying to recreate the same session + if existingSession, exists := bc.subscribers[key]; exists { + existingSession.mu.Lock() + existingOffset := existingSession.StartOffset + existingSession.mu.Unlock() + + // Check if the session was already recreated at (or before) the requested offset + if existingOffset <= requestedOffset { + bc.subscribersLock.Unlock() + glog.V(0).Infof("[FETCH] ✓ Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset) + // Re-acquire the existing session and continue + return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords) + } + + glog.V(0).Infof("[FETCH] ⚠️ Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset) // Session still needs recreation - close it if existingSession.Stream != nil { @@ -447,13 +455,13 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok Cancel: subscriberCancel, } - bc.subscribers[key] = newSession - bc.subscribersLock.Unlock() - glog.V(0).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) + bc.subscribers[key] = newSession + bc.subscribersLock.Unlock() + glog.V(0).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) - // Read from fresh subscriber - glog.V(1).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords) - return bc.ReadRecords(ctx, newSession, maxRecords) + // Read from fresh subscriber + glog.V(1).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords) + return bc.ReadRecords(ctx, newSession, maxRecords) } // requestedOffset >= session.StartOffset: Keep reading forward from existing session @@ -645,6 +653,8 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib if result.err != nil { glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) // Update session offset before returning + glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (error case, read %d records)", + session.Key(), session.StartOffset, currentOffset, len(records)) session.StartOffset = currentOffset return records, nil } @@ -669,6 +679,8 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib // Timeout - return what we have glog.V(4).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) // CRITICAL: Update session offset so next fetch knows where we left off + glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (timeout case, read %d records)", + session.Key(), session.StartOffset, currentOffset, len(records)) session.StartOffset = currentOffset return records, nil } @@ -676,6 +688,8 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib glog.V(2).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records)) // Update session offset after successful read + glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (success case, read %d records)", + session.Key(), session.StartOffset, currentOffset, len(records)) session.StartOffset = currentOffset // CRITICAL: Cache the consumed records to avoid broker tight loop diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 0a6e724fb..96c51f156 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" ) @@ -151,16 +152,27 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re ConsumerGroupInstance: req.GroupInstanceID, } - // Commit offset using SMQ storage (persistent to filer) + // Commit offset to both in-memory and SMQ storage var errCode int16 = ErrorCodeNone if generationMatches { - if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { + // Store in in-memory map first (works for both mock and SMQ backends) + if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil { errCode = ErrorCodeOffsetMetadataTooLarge - } else { } + + // Also store in SMQ persistent storage if available + if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil { + // SMQ storage may not be available (e.g., in mock mode) - that's okay + glog.V(2).Infof("[OFFSET_COMMIT] SMQ storage not available: %v", err) + } + + glog.V(0).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d", + req.GroupID, t.Name, p.Index, p.Offset) } else { // Do not store commit if generation mismatch errCode = 22 // IllegalGeneration + glog.V(0).Infof("[OFFSET_COMMIT] Generation mismatch: group=%s expected=%d got=%d", + req.GroupID, group.Generation, req.GenerationID) } topicResp.Partitions = append(topicResp.Partitions, OffsetCommitPartitionResponse{ @@ -190,12 +202,15 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req // Get consumer group group := h.groupCoordinator.GetGroup(request.GroupID) if group == nil { + glog.V(0).Infof("[OFFSET_FETCH] Group not found: %s", request.GroupID) return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } group.Mu.RLock() defer group.Mu.RUnlock() + glog.V(0).Infof("[OFFSET_FETCH] Request: group=%s topics=%v", request.GroupID, request.Topics) + // Build response response := OffsetFetchResponse{ CorrelationID: correlationID, @@ -222,25 +237,35 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req // Fetch offsets for requested partitions for _, partition := range partitionsToFetch { - // Create consumer offset key for SMQ storage - key := ConsumerOffsetKey{ - Topic: topic.Name, - Partition: partition, - ConsumerGroup: request.GroupID, - ConsumerGroupInstance: request.GroupInstanceID, - } - var fetchedOffset int64 = -1 var metadata string = "" var errorCode int16 = ErrorCodeNone - // Fetch offset directly from SMQ storage (persistent storage) - // No cache needed - offset fetching is infrequent compared to commits - if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { + // Try fetching from in-memory cache first (works for both mock and SMQ backends) + if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 { fetchedOffset = off metadata = meta + glog.V(0).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d", + request.GroupID, topic.Name, partition, off) } else { - // No offset found in persistent storage (-1 indicates no committed offset) + // Fallback: try fetching from SMQ persistent storage + // This handles cases where offsets are stored in SMQ but not yet loaded into memory + key := ConsumerOffsetKey{ + Topic: topic.Name, + Partition: partition, + ConsumerGroup: request.GroupID, + ConsumerGroupInstance: request.GroupInstanceID, + } + if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 { + fetchedOffset = off + metadata = meta + glog.V(0).Infof("[OFFSET_FETCH] Found in SMQ: group=%s topic=%s partition=%d offset=%d", + request.GroupID, topic.Name, partition, off) + } else { + glog.V(0).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d", + request.GroupID, topic.Name, partition) + } + // No offset found in either location (-1 indicates no committed offset) } partitionResponse := OffsetFetchPartitionResponse{ @@ -250,6 +275,8 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req Metadata: metadata, ErrorCode: errorCode, } + glog.V(0).Infof("[OFFSET_FETCH] Returning: group=%s topic=%s partition=%d offset=%d", + request.GroupID, topic.Name, partition, fetchedOffset) topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse) }