Browse Source

only recreate if we need to seek backward (requested offset < current offset), not on any mismatch

pull/7329/head
chrislu 1 week ago
parent
commit
6ef2f66198
  1. 6
      test/kafka/e2e/offset_management_test.go
  2. 7
      test/kafka/internal/testutil/clients.go
  3. 184
      weed/mq/kafka/integration/broker_client_subscribe.go
  4. 57
      weed/mq/kafka/protocol/offset_management.go

6
test/kafka/e2e/offset_management_test.go

@ -108,7 +108,7 @@ func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) {
// Verify total consumption
totalConsumed := len(consumed1) + len(consumed2)
t.Logf("=== Verification: Total consumed %d messages (expected %d) ===", totalConsumed, len(messages))
// Check for duplicates
offsetsSeen := make(map[int64]bool)
duplicateCount := 0
@ -119,11 +119,11 @@ func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) {
}
offsetsSeen[msg.Offset] = true
}
if duplicateCount > 0 {
t.Logf("ERROR: Found %d duplicate messages", duplicateCount)
}
testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart")
t.Logf("SUCCESS: Consumer group resumption test completed - no duplicates, all messages consumed exactly once")

7
test/kafka/internal/testutil/clients.go

@ -140,6 +140,10 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun
})
defer reader.Close()
// Log the initial offset position
offset := reader.Offset()
k.t.Logf("Consumer group reader created for group %s, initial offset: %d", groupID, offset)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
@ -151,14 +155,17 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun
return messages, fmt.Errorf("read message %d: %w", i, err)
}
messages = append(messages, msg)
k.t.Logf(" Fetched message %d: offset=%d, partition=%d", i, msg.Offset, msg.Partition)
// Commit with simple retry to handle transient connection churn
var commitErr error
for attempt := 0; attempt < 3; attempt++ {
commitErr = reader.CommitMessages(ctx, msg)
if commitErr == nil {
k.t.Logf(" ✓ Committed offset %d (attempt %d)", msg.Offset, attempt+1)
break
}
k.t.Logf(" × Commit attempt %d failed for offset %d: %v", attempt+1, msg.Offset, commitErr)
// brief backoff
time.Sleep(time.Duration(50*(1<<attempt)) * time.Millisecond)
}

184
weed/mq/kafka/integration/broker_client_subscribe.go

@ -95,74 +95,79 @@ func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, sta
bc.subscribersLock.RLock()
if session, exists := bc.subscribers[key]; exists {
// Check if we need to recreate the session
if session.StartOffset != startOffset {
// CRITICAL FIX: Check cache first before recreating
// If the requested offset is in cache, we can reuse the session
session.mu.Lock()
canUseCache := false
if len(session.consumedRecords) > 0 {
cacheStartOffset := session.consumedRecords[0].Offset
cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset
if startOffset >= cacheStartOffset && startOffset <= cacheEndOffset {
canUseCache = true
glog.V(2).Infof("[FETCH] Session offset mismatch for %s (session=%d, requested=%d), but offset is in cache [%d-%d]",
key, session.StartOffset, startOffset, cacheStartOffset, cacheEndOffset)
}
}
// CRITICAL: Only recreate if we need to seek BACKWARD
// Forward reads (startOffset >= session.StartOffset) can use the existing session
session.mu.Lock()
currentOffset := session.StartOffset
session.mu.Unlock()
session.mu.Unlock()
if startOffset >= currentOffset {
// Can read forward from existing session, or already at requested offset
bc.subscribersLock.RUnlock()
glog.V(2).Infof("[FETCH] Reusing existing session for %s: session at %d, requested %d (can read forward)",
key, currentOffset, startOffset)
return session, nil
}
if canUseCache {
// Offset is in cache, reuse session
bc.subscribersLock.RUnlock()
return session, nil
// startOffset < currentOffset: need to seek backward
// Check cache first before recreating
session.mu.Lock()
canUseCache := false
if len(session.consumedRecords) > 0 {
cacheStartOffset := session.consumedRecords[0].Offset
cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset
if startOffset >= cacheStartOffset && startOffset <= cacheEndOffset {
canUseCache = true
glog.V(2).Infof("[FETCH] Session for %s at offset %d, requested %d (backward seek), but offset is in cache [%d-%d]",
key, currentOffset, startOffset, cacheStartOffset, cacheEndOffset)
}
}
session.mu.Unlock()
// Not in cache - need to recreate session at the requested offset
glog.V(0).Infof("[FETCH] Recreating session for %s: session at %d, requested %d (not in cache)",
key, session.StartOffset, startOffset)
if canUseCache {
// Offset is in cache, reuse session
bc.subscribersLock.RUnlock()
return session, nil
}
// Close and delete the old session
bc.subscribersLock.Lock()
// CRITICAL: Double-check if another thread already recreated the session at the desired offset
// This prevents multiple concurrent threads from all trying to recreate the same session
if existingSession, exists := bc.subscribers[key]; exists {
existingSession.mu.Lock()
existingOffset := existingSession.StartOffset
existingSession.mu.Unlock()
// Check if the session was already recreated at (or before) the requested offset
if existingOffset <= startOffset {
bc.subscribersLock.Unlock()
glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, startOffset)
// Re-acquire the existing session and continue
return existingSession, nil
}
// Not in cache - need to recreate session at the requested offset
glog.V(0).Infof("[FETCH] Recreating session for %s: session at %d, requested %d (backward seek, not in cache)",
key, currentOffset, startOffset)
bc.subscribersLock.RUnlock()
// Session still needs recreation - close it
if existingSession.Stream != nil {
_ = existingSession.Stream.CloseSend()
}
if existingSession.Cancel != nil {
existingSession.Cancel()
}
delete(bc.subscribers, key)
// Close and delete the old session
bc.subscribersLock.Lock()
// CRITICAL: Double-check if another thread already recreated the session at the desired offset
// This prevents multiple concurrent threads from all trying to recreate the same session
if existingSession, exists := bc.subscribers[key]; exists {
existingSession.mu.Lock()
existingOffset := existingSession.StartOffset
existingSession.mu.Unlock()
// Check if the session was already recreated at (or before) the requested offset
if existingOffset <= startOffset {
bc.subscribersLock.Unlock()
glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, startOffset)
// Re-acquire the existing session and continue
return existingSession, nil
}
// CRITICAL FIX: Don't unlock here! Keep the write lock to prevent race condition
// where another thread creates a session at the wrong offset between our delete and create
// Fall through to session creation below while holding the lock
// bc.subscribersLock.Unlock() - REMOVED to fix race condition
// Write lock is already held - skip to session creation
goto createSession
} else {
// Exact match - reuse
bc.subscribersLock.RUnlock()
return session, nil
// Session still needs recreation - close it
if existingSession.Stream != nil {
_ = existingSession.Stream.CloseSend()
}
if existingSession.Cancel != nil {
existingSession.Cancel()
}
delete(bc.subscribers, key)
}
// CRITICAL FIX: Don't unlock here! Keep the write lock to prevent race condition
// where another thread creates a session at the wrong offset between our delete and create
// Fall through to session creation below while holding the lock
// bc.subscribersLock.Unlock() - REMOVED to fix race condition
// Write lock is already held - skip to session creation
goto createSession
} else {
bc.subscribersLock.RUnlock()
}
@ -311,13 +316,13 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
if endIdx > len(session.consumedRecords) {
endIdx = len(session.consumedRecords)
}
glog.V(1).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)",
glog.V(1).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)",
endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset)
session.mu.Unlock()
return session.consumedRecords[startIdx:endIdx], nil
}
} else {
glog.V(1).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]",
glog.V(1).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]",
session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset)
}
}
@ -348,25 +353,28 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
key := session.Key()
session.mu.Unlock()
// CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset
// This prevents multiple threads from all deciding to recreate based on stale data
glog.V(1).Infof("[FETCH] Acquiring global lock to recreate session %s: requested=%d", key, requestedOffset)
bc.subscribersLock.Lock()
// Double-check if another thread already recreated the session at the desired offset
// This prevents multiple concurrent threads from all trying to recreate the same session
if existingSession, exists := bc.subscribers[key]; exists {
existingSession.mu.Lock()
existingOffset := existingSession.StartOffset
existingSession.mu.Unlock()
// Check if the session was already recreated at (or before) the requested offset
if existingOffset <= requestedOffset {
bc.subscribersLock.Unlock()
glog.V(0).Infof("[FETCH] Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset)
// Re-acquire the existing session and continue
return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords)
}
// CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset
// This prevents multiple threads from all deciding to recreate based on stale data
glog.V(0).Infof("[FETCH] 🔒 Thread acquiring global lock to recreate session %s: requested=%d", key, requestedOffset)
bc.subscribersLock.Lock()
glog.V(0).Infof("[FETCH] 🔓 Thread acquired global lock for session %s: requested=%d", key, requestedOffset)
// Double-check if another thread already recreated the session at the desired offset
// This prevents multiple concurrent threads from all trying to recreate the same session
if existingSession, exists := bc.subscribers[key]; exists {
existingSession.mu.Lock()
existingOffset := existingSession.StartOffset
existingSession.mu.Unlock()
// Check if the session was already recreated at (or before) the requested offset
if existingOffset <= requestedOffset {
bc.subscribersLock.Unlock()
glog.V(0).Infof("[FETCH] ✓ Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset)
// Re-acquire the existing session and continue
return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords)
}
glog.V(0).Infof("[FETCH] ⚠️ Session %s still at wrong offset %d (requested %d) - must recreate", key, existingOffset, requestedOffset)
// Session still needs recreation - close it
if existingSession.Stream != nil {
@ -447,13 +455,13 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
Cancel: subscriberCancel,
}
bc.subscribers[key] = newSession
bc.subscribersLock.Unlock()
glog.V(0).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset)
bc.subscribers[key] = newSession
bc.subscribersLock.Unlock()
glog.V(0).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset)
// Read from fresh subscriber
glog.V(1).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords)
return bc.ReadRecords(ctx, newSession, maxRecords)
// Read from fresh subscriber
glog.V(1).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords)
return bc.ReadRecords(ctx, newSession, maxRecords)
}
// requestedOffset >= session.StartOffset: Keep reading forward from existing session
@ -645,6 +653,8 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
if result.err != nil {
glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err)
// Update session offset before returning
glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (error case, read %d records)",
session.Key(), session.StartOffset, currentOffset, len(records))
session.StartOffset = currentOffset
return records, nil
}
@ -669,6 +679,8 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
// Timeout - return what we have
glog.V(4).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart))
// CRITICAL: Update session offset so next fetch knows where we left off
glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (timeout case, read %d records)",
session.Key(), session.StartOffset, currentOffset, len(records))
session.StartOffset = currentOffset
return records, nil
}
@ -676,6 +688,8 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
glog.V(2).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records))
// Update session offset after successful read
glog.V(0).Infof("[FETCH] 📍 Updating %s offset: %d → %d (success case, read %d records)",
session.Key(), session.StartOffset, currentOffset, len(records))
session.StartOffset = currentOffset
// CRITICAL: Cache the consumed records to avoid broker tight loop

57
weed/mq/kafka/protocol/offset_management.go

@ -5,6 +5,7 @@ import (
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
@ -151,16 +152,27 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re
ConsumerGroupInstance: req.GroupInstanceID,
}
// Commit offset using SMQ storage (persistent to filer)
// Commit offset to both in-memory and SMQ storage
var errCode int16 = ErrorCodeNone
if generationMatches {
if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil {
// Store in in-memory map first (works for both mock and SMQ backends)
if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil {
errCode = ErrorCodeOffsetMetadataTooLarge
} else {
}
// Also store in SMQ persistent storage if available
if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil {
// SMQ storage may not be available (e.g., in mock mode) - that's okay
glog.V(2).Infof("[OFFSET_COMMIT] SMQ storage not available: %v", err)
}
glog.V(0).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d",
req.GroupID, t.Name, p.Index, p.Offset)
} else {
// Do not store commit if generation mismatch
errCode = 22 // IllegalGeneration
glog.V(0).Infof("[OFFSET_COMMIT] Generation mismatch: group=%s expected=%d got=%d",
req.GroupID, group.Generation, req.GenerationID)
}
topicResp.Partitions = append(topicResp.Partitions, OffsetCommitPartitionResponse{
@ -190,12 +202,15 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
// Get consumer group
group := h.groupCoordinator.GetGroup(request.GroupID)
if group == nil {
glog.V(0).Infof("[OFFSET_FETCH] Group not found: %s", request.GroupID)
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
group.Mu.RLock()
defer group.Mu.RUnlock()
glog.V(0).Infof("[OFFSET_FETCH] Request: group=%s topics=%v", request.GroupID, request.Topics)
// Build response
response := OffsetFetchResponse{
CorrelationID: correlationID,
@ -222,25 +237,35 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
// Fetch offsets for requested partitions
for _, partition := range partitionsToFetch {
// Create consumer offset key for SMQ storage
key := ConsumerOffsetKey{
Topic: topic.Name,
Partition: partition,
ConsumerGroup: request.GroupID,
ConsumerGroupInstance: request.GroupInstanceID,
}
var fetchedOffset int64 = -1
var metadata string = ""
var errorCode int16 = ErrorCodeNone
// Fetch offset directly from SMQ storage (persistent storage)
// No cache needed - offset fetching is infrequent compared to commits
if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 {
// Try fetching from in-memory cache first (works for both mock and SMQ backends)
if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 {
fetchedOffset = off
metadata = meta
glog.V(0).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, off)
} else {
// No offset found in persistent storage (-1 indicates no committed offset)
// Fallback: try fetching from SMQ persistent storage
// This handles cases where offsets are stored in SMQ but not yet loaded into memory
key := ConsumerOffsetKey{
Topic: topic.Name,
Partition: partition,
ConsumerGroup: request.GroupID,
ConsumerGroupInstance: request.GroupInstanceID,
}
if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 {
fetchedOffset = off
metadata = meta
glog.V(0).Infof("[OFFSET_FETCH] Found in SMQ: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, off)
} else {
glog.V(0).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d",
request.GroupID, topic.Name, partition)
}
// No offset found in either location (-1 indicates no committed offset)
}
partitionResponse := OffsetFetchPartitionResponse{
@ -250,6 +275,8 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
Metadata: metadata,
ErrorCode: errorCode,
}
glog.V(0).Infof("[OFFSET_FETCH] Returning: group=%s topic=%s partition=%d offset=%d",
request.GroupID, topic.Name, partition, fetchedOffset)
topicResponse.Partitions = append(topicResponse.Partitions, partitionResponse)
}

Loading…
Cancel
Save