You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
101 lines
3.9 KiB
101 lines
3.9 KiB
package e2e
|
|
|
|
import (
|
|
"os"
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
|
|
)
|
|
|
|
// TestOffsetManagement tests end-to-end offset management scenarios
|
|
// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
|
|
func TestOffsetManagement(t *testing.T) {
|
|
gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
|
|
defer gateway.CleanupAndClose()
|
|
|
|
addr := gateway.StartAndWait()
|
|
|
|
// If schema registry is configured, ensure gateway is in schema mode and log
|
|
if v := os.Getenv("SCHEMA_REGISTRY_URL"); v != "" {
|
|
t.Logf("Schema Registry detected at %s - running offset tests in schematized mode", v)
|
|
}
|
|
|
|
// Log which backend we're using
|
|
if gateway.IsSMQMode() {
|
|
t.Logf("Running offset management tests with SMQ backend - offsets will be persisted")
|
|
} else {
|
|
t.Logf("Running offset management tests with mock backend - offsets are in-memory only")
|
|
}
|
|
|
|
topic := testutil.GenerateUniqueTopicName("offset-management")
|
|
groupID := testutil.GenerateUniqueGroupID("offset-test-group")
|
|
|
|
gateway.AddTestTopic(topic)
|
|
|
|
t.Run("BasicOffsetCommitFetch", func(t *testing.T) {
|
|
testBasicOffsetCommitFetch(t, addr, topic, groupID)
|
|
})
|
|
|
|
t.Run("ConsumerGroupResumption", func(t *testing.T) {
|
|
testConsumerGroupResumption(t, addr, topic, groupID+"2")
|
|
})
|
|
}
|
|
|
|
func testBasicOffsetCommitFetch(t *testing.T, addr, topic, groupID string) {
|
|
client := testutil.NewKafkaGoClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Produce test messages
|
|
if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
|
|
if id, err := testutil.EnsureValueSchema(t, url, topic); err == nil {
|
|
t.Logf("Ensured value schema id=%d for subject %s-value", id, topic)
|
|
} else {
|
|
t.Logf("Schema registration failed (non-fatal for test): %v", err)
|
|
}
|
|
}
|
|
messages := msgGen.GenerateKafkaGoMessages(5)
|
|
err := client.ProduceMessages(topic, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce offset test messages")
|
|
|
|
// Phase 1: Consume first 3 messages and commit offsets
|
|
t.Logf("=== Phase 1: Consuming first 3 messages ===")
|
|
consumed1, err := client.ConsumeWithGroup(topic, groupID, 3)
|
|
testutil.AssertNoError(t, err, "Failed to consume first batch")
|
|
testutil.AssertEqual(t, 3, len(consumed1), "Should consume exactly 3 messages")
|
|
|
|
// Phase 2: Create new consumer with same group ID - should resume from committed offset
|
|
t.Logf("=== Phase 2: Resuming from committed offset ===")
|
|
consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
|
|
testutil.AssertNoError(t, err, "Failed to consume remaining messages")
|
|
testutil.AssertEqual(t, 2, len(consumed2), "Should consume remaining 2 messages")
|
|
|
|
// Verify that we got all messages without duplicates
|
|
totalConsumed := len(consumed1) + len(consumed2)
|
|
testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages exactly once")
|
|
|
|
t.Logf("SUCCESS: Offset management test completed - consumed %d + %d messages", len(consumed1), len(consumed2))
|
|
}
|
|
|
|
func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) {
|
|
client := testutil.NewKafkaGoClient(t, addr)
|
|
msgGen := testutil.NewMessageGenerator()
|
|
|
|
// Produce messages
|
|
messages := msgGen.GenerateKafkaGoMessages(4)
|
|
err := client.ProduceMessages(topic, messages)
|
|
testutil.AssertNoError(t, err, "Failed to produce messages for resumption test")
|
|
|
|
// Consume some messages
|
|
consumed1, err := client.ConsumeWithGroup(topic, groupID, 2)
|
|
testutil.AssertNoError(t, err, "Failed to consume first batch")
|
|
|
|
// Simulate consumer restart by consuming remaining messages with same group ID
|
|
consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
|
|
testutil.AssertNoError(t, err, "Failed to consume after restart")
|
|
|
|
// Verify total consumption
|
|
totalConsumed := len(consumed1) + len(consumed2)
|
|
testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart")
|
|
|
|
t.Logf("SUCCESS: Consumer group resumption test completed")
|
|
}
|