From ad6a4ef9b2d147481b92df02a3923ca99b73bf20 Mon Sep 17 00:00:00 2001 From: chrislu Date: Tue, 24 Jun 2025 09:35:08 -0700 Subject: [PATCH] make test-basic --- test/mq/Makefile | 30 +++++++++++--------- test/mq/docker-compose.test.yml | 6 ++-- test/mq/integration/basic_pubsub_test.go | 15 ++++++---- test/mq/integration/framework.go | 36 ++++++++++++++++++++---- 4 files changed, 59 insertions(+), 28 deletions(-) diff --git a/test/mq/Makefile b/test/mq/Makefile index 4bb746dee..0de7c32cb 100644 --- a/test/mq/Makefile +++ b/test/mq/Makefile @@ -119,37 +119,39 @@ logs: docker-compose -f docker-compose.test.yml logs -f # Run all integration tests -test: up +test: @echo "Running all integration tests..." docker-compose -f docker-compose.test.yml run --rm test-runner \ sh -c "go test -v -timeout=30m ./test/mq/integration/... -args -test.parallel=4" # Run basic pub/sub tests -test-basic: up - @echo "Running basic pub/sub tests..." - docker-compose -f docker-compose.test.yml run --rm test-runner \ - sh -c "go test -v -timeout=10m ./test/mq/integration/ -run TestBasic" +test-basic: + @echo "Running basic pub/sub tests natively (no container restart)..." + cd ../.. && SEAWEED_MASTERS="localhost:19333,localhost:19334,localhost:19335" \ + SEAWEED_BROKERS="localhost:17777,localhost:17778,localhost:17779" \ + SEAWEED_FILERS="localhost:18888,localhost:18889" \ + go test -v -timeout=10m ./test/mq/integration/ -run TestBasic # Run performance tests -test-performance: up +test-performance: @echo "Running performance tests..." docker-compose -f docker-compose.test.yml run --rm test-runner \ sh -c "go test -v -timeout=20m ./test/mq/integration/ -run TestPerformance" # Run failover tests -test-failover: up +test-failover: @echo "Running failover tests..." docker-compose -f docker-compose.test.yml run --rm test-runner \ sh -c "go test -v -timeout=15m ./test/mq/integration/ -run TestFailover" # Run agent tests -test-agent: up +test-agent: @echo "Running agent tests..." docker-compose -f docker-compose.test.yml run --rm test-runner \ sh -c "go test -v -timeout=10m ./test/mq/integration/ -run TestAgent" # Development targets (run tests natively without Docker container) -test-dev: up-cluster +test-dev: @echo "Running tests in development mode (using local binaries)..." SEAWEED_MASTERS="localhost:19333,localhost:19334,localhost:19335" \ SEAWEED_BROKERS="localhost:17777,localhost:17778,localhost:17779" \ @@ -157,7 +159,7 @@ test-dev: up-cluster go test -v -timeout=10m ./integration/... # Native test running (no Docker container for tests) -test-native: up +test-native: @echo "Running tests natively (without Docker container for tests)..." cd ../.. && SEAWEED_MASTERS="localhost:19333,localhost:19334,localhost:19335" \ SEAWEED_BROKERS="localhost:17777,localhost:17778,localhost:17779" \ @@ -165,7 +167,7 @@ test-native: up go test -v -timeout=10m ./test/mq/integration/... # Basic native tests -test-basic-native: up +test-basic-native: @echo "Running basic tests natively..." cd ../.. && SEAWEED_MASTERS="localhost:19333,localhost:19334,localhost:19335" \ SEAWEED_BROKERS="localhost:17777,localhost:17778,localhost:17779" \ @@ -173,13 +175,13 @@ test-basic-native: up go test -v -timeout=10m ./test/mq/integration/ -run TestBasic # Quick smoke test -smoke-test: up +smoke-test: @echo "Running smoke test..." docker-compose -f docker-compose.test.yml run --rm test-runner \ sh -c "go test -v -timeout=5m ./test/mq/integration/ -run TestBasicPublishSubscribe" # Performance benchmarks -benchmark: up +benchmark: @echo "Running performance benchmarks..." docker-compose -f docker-compose.test.yml run --rm test-runner \ sh -c "go test -v -timeout=30m -bench=. ./test/mq/integration/..." @@ -208,7 +210,7 @@ report: sh -c "go test -v -timeout=30m ./test/mq/integration/... -json > /test-results/test-report.json" # Load testing -load-test: up +load-test: @echo "Running load tests..." docker-compose -f docker-compose.test.yml run --rm test-runner \ sh -c "go test -v -timeout=45m ./test/mq/integration/ -run TestLoad" diff --git a/test/mq/docker-compose.test.yml b/test/mq/docker-compose.test.yml index 75578efa8..fa6d8349a 100644 --- a/test/mq/docker-compose.test.yml +++ b/test/mq/docker-compose.test.yml @@ -213,7 +213,7 @@ services: mq.broker -master=master0:9333,master1:9334,master2:9335 -port=17777 - -ip=localhost + -ip=127.0.0.1 -dataCenter=dc1 -rack=rack1 networks: @@ -237,7 +237,7 @@ services: mq.broker -master=master0:9333,master1:9334,master2:9335 -port=17778 - -ip=localhost + -ip=127.0.0.1 -dataCenter=dc1 -rack=rack2 networks: @@ -256,7 +256,7 @@ services: mq.broker -master=master0:9333,master1:9334,master2:9335 -port=17779 - -ip=localhost + -ip=127.0.0.1 -dataCenter=dc2 -rack=rack1 networks: diff --git a/test/mq/integration/basic_pubsub_test.go b/test/mq/integration/basic_pubsub_test.go index ad434e50a..10bf69f7f 100644 --- a/test/mq/integration/basic_pubsub_test.go +++ b/test/mq/integration/basic_pubsub_test.go @@ -16,9 +16,8 @@ func TestBasicPublishSubscribe(t *testing.T) { suite := NewIntegrationTestSuite(t) require.NoError(t, suite.Setup()) - // Test configuration namespace := "test" - topicName := "basic-pubsub" + topicName := fmt.Sprintf("basic-pubsub-%d", time.Now().UnixNano()) // Unique topic name per run testSchema := CreateTestSchema() messageCount := 10 @@ -27,12 +26,12 @@ func TestBasicPublishSubscribe(t *testing.T) { Namespace: namespace, TopicName: topicName, PartitionCount: 1, - PublisherName: "test-publisher", + PublisherName: "basic-publisher", RecordType: testSchema, } publisher, err := suite.CreatePublisher(pubConfig) - require.NoError(t, err, "Failed to create publisher") + require.NoError(t, err) // Create subscriber subConfig := &SubscriberTestConfig{ @@ -51,6 +50,7 @@ func TestBasicPublishSubscribe(t *testing.T) { // Set up message collector collector := NewMessageCollector(messageCount) subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { + t.Logf("[Subscriber] Received message with key: %s, ts: %d", string(m.Data.Key), m.Data.TsNs) collector.AddMessage(TestMessage{ ID: fmt.Sprintf("msg-%d", len(collector.GetMessages())), Content: m.Data.Value, @@ -68,6 +68,7 @@ func TestBasicPublishSubscribe(t *testing.T) { }() // Wait for subscriber to be ready + t.Logf("[Test] Waiting for subscriber to be ready...") time.Sleep(2 * time.Second) // Publish test messages @@ -80,12 +81,14 @@ func TestBasicPublishSubscribe(t *testing.T) { RecordEnd() key := []byte(fmt.Sprintf("key-%d", i)) + t.Logf("[Publisher] Publishing message %d with key: %s", i, string(key)) err := publisher.PublishRecord(key, record) require.NoError(t, err, "Failed to publish message %d", i) } - // Wait for messages to be received + t.Logf("[Test] Waiting for messages to be received...") messages := collector.WaitForMessages(30 * time.Second) + t.Logf("[Test] WaitForMessages returned. Received %d messages.", len(messages)) // Verify all messages were received assert.Len(t, messages, messageCount, "Expected %d messages, got %d", messageCount, len(messages)) @@ -95,6 +98,8 @@ func TestBasicPublishSubscribe(t *testing.T) { assert.NotEmpty(t, msg.Content, "Message %d should have content", i) assert.NotEmpty(t, msg.Key, "Message %d should have key", i) } + + t.Logf("[Test] TestBasicPublishSubscribe completed.") } func TestMultipleConsumers(t *testing.T) { diff --git a/test/mq/integration/framework.go b/test/mq/integration/framework.go index 421df5d9c..47d26e165 100644 --- a/test/mq/integration/framework.go +++ b/test/mq/integration/framework.go @@ -37,6 +37,7 @@ type IntegrationTestSuite struct { agents map[string]*agent.MessageQueueAgent publishers map[string]*pub_client.TopicPublisher subscribers map[string]*sub_client.TopicSubscriber + subCancels map[string]context.CancelFunc cleanupOnce sync.Once t *testing.T } @@ -55,6 +56,7 @@ func NewIntegrationTestSuite(t *testing.T) *IntegrationTestSuite { agents: make(map[string]*agent.MessageQueueAgent), publishers: make(map[string]*pub_client.TopicPublisher), subscribers: make(map[string]*sub_client.TopicSubscriber), + subCancels: make(map[string]context.CancelFunc), t: t, } } @@ -75,16 +77,34 @@ func (its *IntegrationTestSuite) Setup() error { // Cleanup performs cleanup operations func (its *IntegrationTestSuite) Cleanup() { its.cleanupOnce.Do(func() { - // Close all subscribers (they use context cancellation) - for name, _ := range its.subscribers { + // Close all subscribers first (they use context cancellation) + for name := range its.subscribers { + if cancel, ok := its.subCancels[name]; ok && cancel != nil { + cancel() + its.t.Logf("Cancelled subscriber context: %s", name) + } its.t.Logf("Cleaned up subscriber: %s", name) } + // Wait a moment for gRPC connections to close gracefully + time.Sleep(1 * time.Second) + // Close all publishers for name, publisher := range its.publishers { if publisher != nil { - publisher.Shutdown() - its.t.Logf("Cleaned up publisher: %s", name) + // Add timeout to prevent deadlock during shutdown + done := make(chan bool, 1) + go func(p *pub_client.TopicPublisher, n string) { + p.Shutdown() + done <- true + }(publisher, name) + + select { + case <-done: + its.t.Logf("Cleaned up publisher: %s", name) + case <-time.After(5 * time.Second): + its.t.Logf("Publisher shutdown timed out: %s", name) + } } } @@ -135,8 +155,9 @@ func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig) } offsetChan := make(chan sub_client.KeyedOffset, 1024) + ctx, cancel := context.WithCancel(context.Background()) subscriber := sub_client.NewTopicSubscriber( - context.Background(), + ctx, its.env.Brokers, subscriberConfig, contentConfig, @@ -144,6 +165,7 @@ func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig) ) its.subscribers[config.ConsumerInstanceId] = subscriber + its.subCancels[config.ConsumerInstanceId] = cancel return subscriber, nil } @@ -204,6 +226,7 @@ type MessageCollector struct { mutex sync.RWMutex waitCh chan struct{} expected int + closed bool // protect against closing waitCh multiple times } // NewMessageCollector creates a new message collector @@ -221,8 +244,9 @@ func (mc *MessageCollector) AddMessage(msg TestMessage) { defer mc.mutex.Unlock() mc.messages = append(mc.messages, msg) - if len(mc.messages) >= mc.expected { + if len(mc.messages) >= mc.expected && !mc.closed { close(mc.waitCh) + mc.closed = true } }