From fd235505f5ff3965095b948fe540f632ffd07fe7 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 21:34:18 -0700 Subject: [PATCH] fmt --- .../kafka/consumer/group_coordinator_test.go | 84 +++++++++---------- weed/mq/kafka/gateway/server_test.go | 6 +- .../kafka/schema/broker_client_fetch_test.go | 2 +- 3 files changed, 46 insertions(+), 46 deletions(-) diff --git a/weed/mq/kafka/consumer/group_coordinator_test.go b/weed/mq/kafka/consumer/group_coordinator_test.go index 223f19dba..5be4f7f93 100644 --- a/weed/mq/kafka/consumer/group_coordinator_test.go +++ b/weed/mq/kafka/consumer/group_coordinator_test.go @@ -9,26 +9,26 @@ import ( func TestGroupCoordinator_CreateGroup(t *testing.T) { gc := NewGroupCoordinator() defer gc.Close() - + groupID := "test-group" group := gc.GetOrCreateGroup(groupID) - + if group == nil { t.Fatal("Expected group to be created") } - + if group.ID != groupID { t.Errorf("Expected group ID %s, got %s", groupID, group.ID) } - + if group.State != GroupStateEmpty { t.Errorf("Expected initial state to be Empty, got %s", group.State) } - + if group.Generation != 0 { t.Errorf("Expected initial generation to be 0, got %d", group.Generation) } - + // Getting the same group should return the existing one group2 := gc.GetOrCreateGroup(groupID) if group2 != group { @@ -39,7 +39,7 @@ func TestGroupCoordinator_CreateGroup(t *testing.T) { func TestGroupCoordinator_ValidateSessionTimeout(t *testing.T) { gc := NewGroupCoordinator() defer gc.Close() - + // Test valid timeouts validTimeouts := []int32{6000, 30000, 300000} for _, timeout := range validTimeouts { @@ -47,7 +47,7 @@ func TestGroupCoordinator_ValidateSessionTimeout(t *testing.T) { t.Errorf("Expected timeout %d to be valid", timeout) } } - + // Test invalid timeouts invalidTimeouts := []int32{1000, 5000, 400000} for _, timeout := range invalidTimeouts { @@ -60,9 +60,9 @@ func TestGroupCoordinator_ValidateSessionTimeout(t *testing.T) { func TestGroupCoordinator_MemberManagement(t *testing.T) { gc := NewGroupCoordinator() defer gc.Close() - + group := gc.GetOrCreateGroup("test-group") - + // Add members member1 := &GroupMember{ ID: "member1", @@ -72,31 +72,31 @@ func TestGroupCoordinator_MemberManagement(t *testing.T) { State: MemberStateStable, LastHeartbeat: time.Now(), } - + member2 := &GroupMember{ - ID: "member2", + ID: "member2", ClientID: "client2", SessionTimeout: 30000, Subscription: []string{"topic1"}, State: MemberStateStable, LastHeartbeat: time.Now(), } - + group.Mu.Lock() group.Members[member1.ID] = member1 group.Members[member2.ID] = member2 group.Mu.Unlock() - + // Update subscriptions group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"}) - + group.Mu.RLock() updatedMember := group.Members["member1"] expectedTopics := []string{"topic1", "topic3"} if len(updatedMember.Subscription) != len(expectedTopics) { t.Errorf("Expected %d subscribed topics, got %d", len(expectedTopics), len(updatedMember.Subscription)) } - + // Check group subscribed topics if len(group.SubscribedTopics) != 2 { // topic1, topic3 t.Errorf("Expected 2 group subscribed topics, got %d", len(group.SubscribedTopics)) @@ -107,7 +107,7 @@ func TestGroupCoordinator_MemberManagement(t *testing.T) { func TestGroupCoordinator_Stats(t *testing.T) { gc := NewGroupCoordinator() defer gc.Close() - + // Create multiple groups in different states group1 := gc.GetOrCreateGroup("group1") group1.Mu.Lock() @@ -115,30 +115,30 @@ func TestGroupCoordinator_Stats(t *testing.T) { group1.Members["member1"] = &GroupMember{ID: "member1"} group1.Members["member2"] = &GroupMember{ID: "member2"} group1.Mu.Unlock() - + group2 := gc.GetOrCreateGroup("group2") group2.Mu.Lock() group2.State = GroupStatePreparingRebalance group2.Members["member3"] = &GroupMember{ID: "member3"} group2.Mu.Unlock() - + stats := gc.GetGroupStats() - + totalGroups := stats["total_groups"].(int) if totalGroups != 2 { t.Errorf("Expected 2 total groups, got %d", totalGroups) } - + totalMembers := stats["total_members"].(int) if totalMembers != 3 { t.Errorf("Expected 3 total members, got %d", totalMembers) } - + stateCount := stats["group_states"].(map[string]int) if stateCount["Stable"] != 1 { t.Errorf("Expected 1 stable group, got %d", stateCount["Stable"]) } - + if stateCount["PreparingRebalance"] != 1 { t.Errorf("Expected 1 preparing rebalance group, got %d", stateCount["PreparingRebalance"]) } @@ -147,46 +147,46 @@ func TestGroupCoordinator_Stats(t *testing.T) { func TestGroupCoordinator_Cleanup(t *testing.T) { gc := NewGroupCoordinator() defer gc.Close() - + // Create a group with an expired member group := gc.GetOrCreateGroup("test-group") - + expiredMember := &GroupMember{ ID: "expired-member", - SessionTimeout: 1000, // 1 second + SessionTimeout: 1000, // 1 second LastHeartbeat: time.Now().Add(-2 * time.Second), // 2 seconds ago State: MemberStateStable, } - + activeMember := &GroupMember{ - ID: "active-member", - SessionTimeout: 30000, // 30 seconds + ID: "active-member", + SessionTimeout: 30000, // 30 seconds LastHeartbeat: time.Now(), // just now State: MemberStateStable, } - + group.Mu.Lock() group.Members[expiredMember.ID] = expiredMember group.Members[activeMember.ID] = activeMember group.Leader = expiredMember.ID // Make expired member the leader group.Mu.Unlock() - + // Perform cleanup gc.performCleanup() - + group.Mu.RLock() defer group.Mu.RUnlock() - + // Expired member should be removed if _, exists := group.Members[expiredMember.ID]; exists { t.Error("Expected expired member to be removed") } - + // Active member should remain if _, exists := group.Members[activeMember.ID]; !exists { t.Error("Expected active member to remain") } - + // Leader should be reset since expired member was leader if group.Leader == expiredMember.ID { t.Error("Expected leader to be reset after expired member removal") @@ -196,33 +196,33 @@ func TestGroupCoordinator_Cleanup(t *testing.T) { func TestGroupCoordinator_GenerateMemberID(t *testing.T) { gc := NewGroupCoordinator() defer gc.Close() - + // Test that same client/host combination generates consistent member ID id1 := gc.GenerateMemberID("client1", "host1") id2 := gc.GenerateMemberID("client1", "host1") - + // Same client/host should generate same ID (deterministic) if id1 != id2 { t.Errorf("Expected same member ID for same client/host: %s vs %s", id1, id2) } - + // Different clients should generate different IDs id3 := gc.GenerateMemberID("client2", "host1") id4 := gc.GenerateMemberID("client1", "host2") - + if id1 == id3 { t.Errorf("Expected different member IDs for different clients: %s vs %s", id1, id3) } - + if id1 == id4 { t.Errorf("Expected different member IDs for different hosts: %s vs %s", id1, id4) } - + // IDs should be properly formatted if len(id1) < 10 { // Should be longer than just "consumer-" t.Errorf("Expected member ID to be properly formatted, got: %s", id1) } - + // Should start with "consumer-" prefix if !strings.HasPrefix(id1, "consumer-") { t.Errorf("Expected member ID to start with 'consumer-', got: %s", id1) diff --git a/weed/mq/kafka/gateway/server_test.go b/weed/mq/kafka/gateway/server_test.go index a60746fdb..6ffbea01f 100644 --- a/weed/mq/kafka/gateway/server_test.go +++ b/weed/mq/kafka/gateway/server_test.go @@ -12,7 +12,7 @@ func TestServerStartAndClose(t *testing.T) { t.Skip("This test requires SeaweedMQ Agent integration - run manually with agent available") srv := NewServer(Options{ - Listen: ":0", + Listen: ":0", Masters: "localhost:9333", // Use masters instead of AgentAddress }) if err := srv.Start(); err != nil { @@ -42,7 +42,7 @@ func TestGetListenerAddr(t *testing.T) { // Test with localhost binding - should return the actual address srv := NewServer(Options{ - Listen: "127.0.0.1:0", + Listen: "127.0.0.1:0", Masters: "localhost:9333", // Would need real agent for this test }) if err := srv.Start(); err != nil { @@ -60,7 +60,7 @@ func TestGetListenerAddr(t *testing.T) { // Test IPv6 all interfaces binding - should resolve to non-loopback IP srv6 := NewServer(Options{ - Listen: "[::]:0", + Listen: "[::]:0", Masters: "localhost:9333", // Would need real agent for this test }) if err := srv6.Start(); err != nil { diff --git a/weed/mq/kafka/schema/broker_client_fetch_test.go b/weed/mq/kafka/schema/broker_client_fetch_test.go index 890cf8a80..e77bca642 100644 --- a/weed/mq/kafka/schema/broker_client_fetch_test.go +++ b/weed/mq/kafka/schema/broker_client_fetch_test.go @@ -228,7 +228,7 @@ func TestBrokerClient_SubscriberConfiguration(t *testing.T) { // With mock brokers, behavior may vary - just verify no panic t.Logf("Subscriber creation results: err1=%v, err2=%v", err1, err2) // Don't assert errors as mock behavior may vary - + // Verify broker client is still functional after failed subscriber creation if brokerClient != nil { t.Log("Broker client remains functional after subscriber creation attempts")