diff --git a/test/kafka/e2e/offset_management_test.go b/test/kafka/e2e/offset_management_test.go index 398647843..d4e1fbb97 100644 --- a/test/kafka/e2e/offset_management_test.go +++ b/test/kafka/e2e/offset_management_test.go @@ -81,21 +81,50 @@ func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) { msgGen := testutil.NewMessageGenerator() // Produce messages + t.Logf("=== Phase 1: Producing 4 messages to topic %s ===", topic) messages := msgGen.GenerateKafkaGoMessages(4) err := client.ProduceMessages(topic, messages) testutil.AssertNoError(t, err, "Failed to produce messages for resumption test") + t.Logf("Successfully produced %d messages", len(messages)) // Consume some messages + t.Logf("=== Phase 2: First consumer - consuming 2 messages with group %s ===", groupID) consumed1, err := client.ConsumeWithGroup(topic, groupID, 2) testutil.AssertNoError(t, err, "Failed to consume first batch") + t.Logf("First consumer consumed %d messages:", len(consumed1)) + for i, msg := range consumed1 { + t.Logf(" Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value)) + } // Simulate consumer restart by consuming remaining messages with same group ID + t.Logf("=== Phase 3: Second consumer (simulated restart) - consuming remaining messages with same group %s ===", groupID) consumed2, err := client.ConsumeWithGroup(topic, groupID, 2) testutil.AssertNoError(t, err, "Failed to consume after restart") + t.Logf("Second consumer consumed %d messages:", len(consumed2)) + for i, msg := range consumed2 { + t.Logf(" Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value)) + } // Verify total consumption totalConsumed := len(consumed1) + len(consumed2) + t.Logf("=== Verification: Total consumed %d messages (expected %d) ===", totalConsumed, len(messages)) + + // Check for duplicates + offsetsSeen := make(map[int64]bool) + duplicateCount := 0 + for _, msg := range append(consumed1, consumed2...) { + if offsetsSeen[msg.Offset] { + t.Logf("WARNING: Duplicate offset detected: %d", msg.Offset) + duplicateCount++ + } + offsetsSeen[msg.Offset] = true + } + + if duplicateCount > 0 { + t.Logf("ERROR: Found %d duplicate messages", duplicateCount) + } + testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart") - t.Logf("SUCCESS: Consumer group resumption test completed") + t.Logf("SUCCESS: Consumer group resumption test completed - no duplicates, all messages consumed exactly once") } diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index 7e2bfa173..57b86f908 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -311,10 +311,14 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok if endIdx > len(session.consumedRecords) { endIdx = len(session.consumedRecords) } - glog.V(2).Infof("[FETCH] Returning %d cached records for offset %d", endIdx-startIdx, requestedOffset) + glog.V(1).Infof("[FETCH] ✓ Returning %d cached records for %s at offset %d (cache: %d-%d)", + endIdx-startIdx, session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) session.mu.Unlock() return session.consumedRecords[startIdx:endIdx], nil } + } else { + glog.V(1).Infof("[FETCH] Cache miss for %s: requested=%d, cache=[%d-%d]", + session.Key(), requestedOffset, cacheStartOffset, cacheEndOffset) } } @@ -344,24 +348,25 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok key := session.Key() session.mu.Unlock() - // CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset - // This prevents multiple threads from all deciding to recreate based on stale data - bc.subscribersLock.Lock() - - // Double-check if another thread already recreated the session at the desired offset - // This prevents multiple concurrent threads from all trying to recreate the same session - if existingSession, exists := bc.subscribers[key]; exists { - existingSession.mu.Lock() - existingOffset := existingSession.StartOffset - existingSession.mu.Unlock() - - // Check if the session was already recreated at (or before) the requested offset - if existingOffset <= requestedOffset { - bc.subscribersLock.Unlock() - glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, requestedOffset) - // Re-acquire the existing session and continue - return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords) - } + // CRITICAL FIX: Acquire the global lock FIRST, then re-check the session offset + // This prevents multiple threads from all deciding to recreate based on stale data + glog.V(1).Infof("[FETCH] Acquiring global lock to recreate session %s: requested=%d", key, requestedOffset) + bc.subscribersLock.Lock() + + // Double-check if another thread already recreated the session at the desired offset + // This prevents multiple concurrent threads from all trying to recreate the same session + if existingSession, exists := bc.subscribers[key]; exists { + existingSession.mu.Lock() + existingOffset := existingSession.StartOffset + existingSession.mu.Unlock() + + // Check if the session was already recreated at (or before) the requested offset + if existingOffset <= requestedOffset { + bc.subscribersLock.Unlock() + glog.V(0).Infof("[FETCH] Session %s already recreated by another thread at offset %d (requested %d) - reusing", key, existingOffset, requestedOffset) + // Re-acquire the existing session and continue + return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords) + } // Session still needs recreation - close it if existingSession.Stream != nil { @@ -371,7 +376,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok existingSession.Cancel() } delete(bc.subscribers, key) - glog.V(1).Infof("[FETCH] Closed old subscriber session for backward seek: %s", key) + glog.V(0).Infof("[FETCH] Closed old subscriber session for backward seek: %s (was at offset %d, need offset %d)", key, existingOffset, requestedOffset) } // CRITICAL FIX: Don't unlock here! Keep holding the lock while we create the new session // to prevent other threads from interfering. We'll create the session inline. @@ -379,6 +384,7 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok // Create a completely fresh subscriber at the requested offset // INLINE SESSION CREATION to hold the lock continuously + glog.V(1).Infof("[FETCH] Creating inline subscriber session while holding lock: %s at offset %d", key, requestedOffset) subscriberCtx := context.Background() subscriberCancel := func() {} // No-op cancel @@ -441,12 +447,13 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok Cancel: subscriberCancel, } - bc.subscribers[key] = newSession - bc.subscribersLock.Unlock() - glog.V(1).Infof("[FETCH] Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) + bc.subscribers[key] = newSession + bc.subscribersLock.Unlock() + glog.V(0).Infof("[FETCH] ✓ Created fresh subscriber session for backward seek: %s at offset %d", key, requestedOffset) - // Read from fresh subscriber - return bc.ReadRecords(ctx, newSession, maxRecords) + // Read from fresh subscriber + glog.V(1).Infof("[FETCH] Reading from fresh subscriber %s at offset %d (maxRecords=%d)", key, requestedOffset, maxRecords) + return bc.ReadRecords(ctx, newSession, maxRecords) } // requestedOffset >= session.StartOffset: Keep reading forward from existing session