Browse Source

more logs on offset resume

pull/7329/head
chrislu 1 week ago
parent
commit
6947d906a8
  1. 31
      test/kafka/e2e/offset_management_test.go
  2. 57
      weed/mq/kafka/integration/broker_client_subscribe.go

31
test/kafka/e2e/offset_management_test.go

@ -81,21 +81,50 @@ func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) {
msgGen := testutil.NewMessageGenerator()
// Produce messages
t.Logf("=== Phase 1: Producing 4 messages to topic %s ===", topic)
messages := msgGen.GenerateKafkaGoMessages(4)
err := client.ProduceMessages(topic, messages)
testutil.AssertNoError(t, err, "Failed to produce messages for resumption test")
t.Logf("Successfully produced %d messages", len(messages))
// Consume some messages
t.Logf("=== Phase 2: First consumer - consuming 2 messages with group %s ===", groupID)
consumed1, err := client.ConsumeWithGroup(topic, groupID, 2)
testutil.AssertNoError(t, err, "Failed to consume first batch")
t.Logf("First consumer consumed %d messages:", len(consumed1))
for i, msg := range consumed1 {
t.Logf(" Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value))
}
// Simulate consumer restart by consuming remaining messages with same group ID
t.Logf("=== Phase 3: Second consumer (simulated restart) - consuming remaining messages with same group %s ===", groupID)
consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
testutil.AssertNoError(t, err, "Failed to consume after restart")
t.Logf("Second consumer consumed %d messages:", len(consumed2))
for i, msg := range consumed2 {
t.Logf(" Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value))
}
// Verify total consumption
totalConsumed := len(consumed1) + len(consumed2)
t.Logf("=== Verification: Total consumed %d messages (expected %d) ===", totalConsumed, len(messages))
// Check for duplicates
offsetsSeen := make(map[int64]bool)
duplicateCount := 0
for _, msg := range append(consumed1, consumed2...) {
if offsetsSeen[msg.Offset] {
t.Logf("WARNING: Duplicate offset detected: %d", msg.Offset)
duplicateCount++
}
offsetsSeen[msg.Offset] = true
}
if duplicateCount > 0 {
t.Logf("ERROR: Found %d duplicate messages", duplicateCount)
}
testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart")
t.Logf("SUCCESS: Consumer group resumption test completed")
t.Logf("SUCCESS: Consumer group resumption test completed - no duplicates, all messages consumed exactly once")
}

57
weed/mq/kafka/integration/broker_client_subscribe.go

@ -311,10 +311,14 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
if endIdx > len(session.consumedRecords) {
endIdx = len(session.consumedRecords)
}
glog.V(2).Infof("[FETCH] Returning %d cached records for offset %d", endIdx-startIdx, requestedOffset)
glog.V(1).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)",
endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset)
session.mu.Unlock()
return session.consumedRecords[startIdx:endIdx], nil
}
} else {
glog.V(1).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]",
session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset)
}
}
@ -344,24 +348,25 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
key := session.Key()
session.mu.Unlock()
// CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset
// This prevents multiple threads from all deciding to recreate based on stale data
bc.subscribersLock.Lock()
// Double-check if another thread already recreated the session at the desired offset
// This prevents multiple concurrent threads from all trying to recreate the same session
if existingSession, exists := bc.subscribers[key]; exists {
existingSession.mu.Lock()
existingOffset := existingSession.StartOffset
existingSession.mu.Unlock()
// Check if the session was already recreated at (or before) the requested offset
if existingOffset <= requestedOffset {
bc.subscribersLock.Unlock()
glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, requestedOffset)
// Re-acquire the existing session and continue
return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords)
}
// CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset
// This prevents multiple threads from all deciding to recreate based on stale data
glog.V(1).Infof("[FETCH] Acquiring global lock to recreate session %s: requested=%d", key, requestedOffset)
bc.subscribersLock.Lock()
// Double-check if another thread already recreated the session at the desired offset
// This prevents multiple concurrent threads from all trying to recreate the same session
if existingSession, exists := bc.subscribers[key]; exists {
existingSession.mu.Lock()
existingOffset := existingSession.StartOffset
existingSession.mu.Unlock()
// Check if the session was already recreated at (or before) the requested offset
if existingOffset <= requestedOffset {
bc.subscribersLock.Unlock()
glog.V(0).Infof("[FETCH] Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset)
// Re-acquire the existing session and continue
return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords)
}
// Session still needs recreation - close it
if existingSession.Stream != nil {
@ -371,7 +376,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
existingSession.Cancel()
}
delete(bc.subscribers, key)
glog.V(1).Infof("[FETCH] Closed old subscriber session for backward seek: %s", key)
glog.V(0).Infof("[FETCH] Closed old subscriber session for backward seek: %s (was at offset %d, need offset %d)", key, existingOffset, requestedOffset)
}
// CRITICAL FIX: Don't unlock here! Keep holding the lock while we create the new session
// to prevent other threads from interfering. We'll create the session inline.
@ -379,6 +384,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
// Create a completely fresh subscriber at the requested offset
// INLINE SESSION CREATION to hold the lock continuously
glog.V(1).Infof("[FETCH] Creating inline subscriber session while holding lock: %s at offset %d", key, requestedOffset)
subscriberCtx := context.Background()
subscriberCancel := func() {} // No-op cancel
@ -441,12 +447,13 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
Cancel: subscriberCancel,
}
bc.subscribers[key] = newSession
bc.subscribersLock.Unlock()
glog.V(1).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset)
bc.subscribers[key] = newSession
bc.subscribersLock.Unlock()
glog.V(0).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset)
// Read from fresh subscriber
return bc.ReadRecords(ctx, newSession, maxRecords)
// Read from fresh subscriber
glog.V(1).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords)
return bc.ReadRecords(ctx, newSession, maxRecords)
}
// requestedOffset >= session.StartOffset: Keep reading forward from existing session

Loading…
Cancel
Save