You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							130 lines
						
					
					
						
							5.1 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							130 lines
						
					
					
						
							5.1 KiB
						
					
					
				
								package e2e
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"os"
							 | 
						|
									"testing"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// TestOffsetManagement tests end-to-end offset management scenarios
							 | 
						|
								// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
							 | 
						|
								func TestOffsetManagement(t *testing.T) {
							 | 
						|
									gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
							 | 
						|
									defer gateway.CleanupAndClose()
							 | 
						|
								
							 | 
						|
									addr := gateway.StartAndWait()
							 | 
						|
								
							 | 
						|
									// If schema registry is configured, ensure gateway is in schema mode and log
							 | 
						|
									if v := os.Getenv("SCHEMA_REGISTRY_URL"); v != "" {
							 | 
						|
										t.Logf("Schema Registry detected at %s - running offset tests in schematized mode", v)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Log which backend we're using
							 | 
						|
									if gateway.IsSMQMode() {
							 | 
						|
										t.Logf("Running offset management tests with SMQ backend - offsets will be persisted")
							 | 
						|
									} else {
							 | 
						|
										t.Logf("Running offset management tests with mock backend - offsets are in-memory only")
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									topic := testutil.GenerateUniqueTopicName("offset-management")
							 | 
						|
									groupID := testutil.GenerateUniqueGroupID("offset-test-group")
							 | 
						|
								
							 | 
						|
									gateway.AddTestTopic(topic)
							 | 
						|
								
							 | 
						|
									t.Run("BasicOffsetCommitFetch", func(t *testing.T) {
							 | 
						|
										testBasicOffsetCommitFetch(t, addr, topic, groupID)
							 | 
						|
									})
							 | 
						|
								
							 | 
						|
									t.Run("ConsumerGroupResumption", func(t *testing.T) {
							 | 
						|
										testConsumerGroupResumption(t, addr, topic, groupID+"2")
							 | 
						|
									})
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testBasicOffsetCommitFetch(t *testing.T, addr, topic, groupID string) {
							 | 
						|
									client := testutil.NewKafkaGoClient(t, addr)
							 | 
						|
									msgGen := testutil.NewMessageGenerator()
							 | 
						|
								
							 | 
						|
									// Produce test messages
							 | 
						|
									if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
							 | 
						|
										if id, err := testutil.EnsureValueSchema(t, url, topic); err == nil {
							 | 
						|
											t.Logf("Ensured value schema id=%d for subject %s-value", id, topic)
							 | 
						|
										} else {
							 | 
						|
											t.Logf("Schema registration failed (non-fatal for test): %v", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
									messages := msgGen.GenerateKafkaGoMessages(5)
							 | 
						|
									err := client.ProduceMessages(topic, messages)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to produce offset test messages")
							 | 
						|
								
							 | 
						|
									// Phase 1: Consume first 3 messages and commit offsets
							 | 
						|
									t.Logf("=== Phase 1: Consuming first 3 messages ===")
							 | 
						|
									consumed1, err := client.ConsumeWithGroup(topic, groupID, 3)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to consume first batch")
							 | 
						|
									testutil.AssertEqual(t, 3, len(consumed1), "Should consume exactly 3 messages")
							 | 
						|
								
							 | 
						|
									// Phase 2: Create new consumer with same group ID - should resume from committed offset
							 | 
						|
									t.Logf("=== Phase 2: Resuming from committed offset ===")
							 | 
						|
									consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to consume remaining messages")
							 | 
						|
									testutil.AssertEqual(t, 2, len(consumed2), "Should consume remaining 2 messages")
							 | 
						|
								
							 | 
						|
									// Verify that we got all messages without duplicates
							 | 
						|
									totalConsumed := len(consumed1) + len(consumed2)
							 | 
						|
									testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages exactly once")
							 | 
						|
								
							 | 
						|
									t.Logf("SUCCESS: Offset management test completed - consumed %d + %d messages", len(consumed1), len(consumed2))
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) {
							 | 
						|
									client := testutil.NewKafkaGoClient(t, addr)
							 | 
						|
									msgGen := testutil.NewMessageGenerator()
							 | 
						|
								
							 | 
						|
									// Produce messages
							 | 
						|
									t.Logf("=== Phase 1: Producing 4 messages to topic %s ===", topic)
							 | 
						|
									messages := msgGen.GenerateKafkaGoMessages(4)
							 | 
						|
									err := client.ProduceMessages(topic, messages)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to produce messages for resumption test")
							 | 
						|
									t.Logf("Successfully produced %d messages", len(messages))
							 | 
						|
								
							 | 
						|
									// Consume some messages
							 | 
						|
									t.Logf("=== Phase 2: First consumer - consuming 2 messages with group %s ===", groupID)
							 | 
						|
									consumed1, err := client.ConsumeWithGroup(topic, groupID, 2)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to consume first batch")
							 | 
						|
									t.Logf("First consumer consumed %d messages:", len(consumed1))
							 | 
						|
									for i, msg := range consumed1 {
							 | 
						|
										t.Logf("  Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value))
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Simulate consumer restart by consuming remaining messages with same group ID
							 | 
						|
									t.Logf("=== Phase 3: Second consumer (simulated restart) - consuming remaining messages with same group %s ===", groupID)
							 | 
						|
									consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
							 | 
						|
									testutil.AssertNoError(t, err, "Failed to consume after restart")
							 | 
						|
									t.Logf("Second consumer consumed %d messages:", len(consumed2))
							 | 
						|
									for i, msg := range consumed2 {
							 | 
						|
										t.Logf("  Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value))
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Verify total consumption
							 | 
						|
									totalConsumed := len(consumed1) + len(consumed2)
							 | 
						|
									t.Logf("=== Verification: Total consumed %d messages (expected %d) ===", totalConsumed, len(messages))
							 | 
						|
								
							 | 
						|
									// Check for duplicates
							 | 
						|
									offsetsSeen := make(map[int64]bool)
							 | 
						|
									duplicateCount := 0
							 | 
						|
									for _, msg := range append(consumed1, consumed2...) {
							 | 
						|
										if offsetsSeen[msg.Offset] {
							 | 
						|
											t.Logf("WARNING: Duplicate offset detected: %d", msg.Offset)
							 | 
						|
											duplicateCount++
							 | 
						|
										}
							 | 
						|
										offsetsSeen[msg.Offset] = true
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if duplicateCount > 0 {
							 | 
						|
										t.Logf("ERROR: Found %d duplicate messages", duplicateCount)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart")
							 | 
						|
								
							 | 
						|
									t.Logf("SUCCESS: Consumer group resumption test completed - no duplicates, all messages consumed exactly once")
							 | 
						|
								}
							 |