Browse Source

fmt

pull/7231/head
chrislu 2 months ago
parent
commit
fd235505f5
  1. 84
      weed/mq/kafka/consumer/group_coordinator_test.go
  2. 6
      weed/mq/kafka/gateway/server_test.go
  3. 2
      weed/mq/kafka/schema/broker_client_fetch_test.go

84
weed/mq/kafka/consumer/group_coordinator_test.go

@ -9,26 +9,26 @@ import (
func TestGroupCoordinator_CreateGroup(t *testing.T) { func TestGroupCoordinator_CreateGroup(t *testing.T) {
gc := NewGroupCoordinator() gc := NewGroupCoordinator()
defer gc.Close() defer gc.Close()
groupID := "test-group" groupID := "test-group"
group := gc.GetOrCreateGroup(groupID) group := gc.GetOrCreateGroup(groupID)
if group == nil { if group == nil {
t.Fatal("Expected group to be created") t.Fatal("Expected group to be created")
} }
if group.ID != groupID { if group.ID != groupID {
t.Errorf("Expected group ID %s, got %s", groupID, group.ID) t.Errorf("Expected group ID %s, got %s", groupID, group.ID)
} }
if group.State != GroupStateEmpty { if group.State != GroupStateEmpty {
t.Errorf("Expected initial state to be Empty, got %s", group.State) t.Errorf("Expected initial state to be Empty, got %s", group.State)
} }
if group.Generation != 0 { if group.Generation != 0 {
t.Errorf("Expected initial generation to be 0, got %d", group.Generation) t.Errorf("Expected initial generation to be 0, got %d", group.Generation)
} }
// Getting the same group should return the existing one // Getting the same group should return the existing one
group2 := gc.GetOrCreateGroup(groupID) group2 := gc.GetOrCreateGroup(groupID)
if group2 != group { if group2 != group {
@ -39,7 +39,7 @@ func TestGroupCoordinator_CreateGroup(t *testing.T) {
func TestGroupCoordinator_ValidateSessionTimeout(t *testing.T) { func TestGroupCoordinator_ValidateSessionTimeout(t *testing.T) {
gc := NewGroupCoordinator() gc := NewGroupCoordinator()
defer gc.Close() defer gc.Close()
// Test valid timeouts // Test valid timeouts
validTimeouts := []int32{6000, 30000, 300000} validTimeouts := []int32{6000, 30000, 300000}
for _, timeout := range validTimeouts { for _, timeout := range validTimeouts {
@ -47,7 +47,7 @@ func TestGroupCoordinator_ValidateSessionTimeout(t *testing.T) {
t.Errorf("Expected timeout %d to be valid", timeout) t.Errorf("Expected timeout %d to be valid", timeout)
} }
} }
// Test invalid timeouts // Test invalid timeouts
invalidTimeouts := []int32{1000, 5000, 400000} invalidTimeouts := []int32{1000, 5000, 400000}
for _, timeout := range invalidTimeouts { for _, timeout := range invalidTimeouts {
@ -60,9 +60,9 @@ func TestGroupCoordinator_ValidateSessionTimeout(t *testing.T) {
func TestGroupCoordinator_MemberManagement(t *testing.T) { func TestGroupCoordinator_MemberManagement(t *testing.T) {
gc := NewGroupCoordinator() gc := NewGroupCoordinator()
defer gc.Close() defer gc.Close()
group := gc.GetOrCreateGroup("test-group") group := gc.GetOrCreateGroup("test-group")
// Add members // Add members
member1 := &GroupMember{ member1 := &GroupMember{
ID: "member1", ID: "member1",
@ -72,31 +72,31 @@ func TestGroupCoordinator_MemberManagement(t *testing.T) {
State: MemberStateStable, State: MemberStateStable,
LastHeartbeat: time.Now(), LastHeartbeat: time.Now(),
} }
member2 := &GroupMember{ member2 := &GroupMember{
ID: "member2",
ID: "member2",
ClientID: "client2", ClientID: "client2",
SessionTimeout: 30000, SessionTimeout: 30000,
Subscription: []string{"topic1"}, Subscription: []string{"topic1"},
State: MemberStateStable, State: MemberStateStable,
LastHeartbeat: time.Now(), LastHeartbeat: time.Now(),
} }
group.Mu.Lock() group.Mu.Lock()
group.Members[member1.ID] = member1 group.Members[member1.ID] = member1
group.Members[member2.ID] = member2 group.Members[member2.ID] = member2
group.Mu.Unlock() group.Mu.Unlock()
// Update subscriptions // Update subscriptions
group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"}) group.UpdateMemberSubscription("member1", []string{"topic1", "topic3"})
group.Mu.RLock() group.Mu.RLock()
updatedMember := group.Members["member1"] updatedMember := group.Members["member1"]
expectedTopics := []string{"topic1", "topic3"} expectedTopics := []string{"topic1", "topic3"}
if len(updatedMember.Subscription) != len(expectedTopics) { if len(updatedMember.Subscription) != len(expectedTopics) {
t.Errorf("Expected %d subscribed topics, got %d", len(expectedTopics), len(updatedMember.Subscription)) t.Errorf("Expected %d subscribed topics, got %d", len(expectedTopics), len(updatedMember.Subscription))
} }
// Check group subscribed topics // Check group subscribed topics
if len(group.SubscribedTopics) != 2 { // topic1, topic3 if len(group.SubscribedTopics) != 2 { // topic1, topic3
t.Errorf("Expected 2 group subscribed topics, got %d", len(group.SubscribedTopics)) t.Errorf("Expected 2 group subscribed topics, got %d", len(group.SubscribedTopics))
@ -107,7 +107,7 @@ func TestGroupCoordinator_MemberManagement(t *testing.T) {
func TestGroupCoordinator_Stats(t *testing.T) { func TestGroupCoordinator_Stats(t *testing.T) {
gc := NewGroupCoordinator() gc := NewGroupCoordinator()
defer gc.Close() defer gc.Close()
// Create multiple groups in different states // Create multiple groups in different states
group1 := gc.GetOrCreateGroup("group1") group1 := gc.GetOrCreateGroup("group1")
group1.Mu.Lock() group1.Mu.Lock()
@ -115,30 +115,30 @@ func TestGroupCoordinator_Stats(t *testing.T) {
group1.Members["member1"] = &GroupMember{ID: "member1"} group1.Members["member1"] = &GroupMember{ID: "member1"}
group1.Members["member2"] = &GroupMember{ID: "member2"} group1.Members["member2"] = &GroupMember{ID: "member2"}
group1.Mu.Unlock() group1.Mu.Unlock()
group2 := gc.GetOrCreateGroup("group2") group2 := gc.GetOrCreateGroup("group2")
group2.Mu.Lock() group2.Mu.Lock()
group2.State = GroupStatePreparingRebalance group2.State = GroupStatePreparingRebalance
group2.Members["member3"] = &GroupMember{ID: "member3"} group2.Members["member3"] = &GroupMember{ID: "member3"}
group2.Mu.Unlock() group2.Mu.Unlock()
stats := gc.GetGroupStats() stats := gc.GetGroupStats()
totalGroups := stats["total_groups"].(int) totalGroups := stats["total_groups"].(int)
if totalGroups != 2 { if totalGroups != 2 {
t.Errorf("Expected 2 total groups, got %d", totalGroups) t.Errorf("Expected 2 total groups, got %d", totalGroups)
} }
totalMembers := stats["total_members"].(int) totalMembers := stats["total_members"].(int)
if totalMembers != 3 { if totalMembers != 3 {
t.Errorf("Expected 3 total members, got %d", totalMembers) t.Errorf("Expected 3 total members, got %d", totalMembers)
} }
stateCount := stats["group_states"].(map[string]int) stateCount := stats["group_states"].(map[string]int)
if stateCount["Stable"] != 1 { if stateCount["Stable"] != 1 {
t.Errorf("Expected 1 stable group, got %d", stateCount["Stable"]) t.Errorf("Expected 1 stable group, got %d", stateCount["Stable"])
} }
if stateCount["PreparingRebalance"] != 1 { if stateCount["PreparingRebalance"] != 1 {
t.Errorf("Expected 1 preparing rebalance group, got %d", stateCount["PreparingRebalance"]) t.Errorf("Expected 1 preparing rebalance group, got %d", stateCount["PreparingRebalance"])
} }
@ -147,46 +147,46 @@ func TestGroupCoordinator_Stats(t *testing.T) {
func TestGroupCoordinator_Cleanup(t *testing.T) { func TestGroupCoordinator_Cleanup(t *testing.T) {
gc := NewGroupCoordinator() gc := NewGroupCoordinator()
defer gc.Close() defer gc.Close()
// Create a group with an expired member // Create a group with an expired member
group := gc.GetOrCreateGroup("test-group") group := gc.GetOrCreateGroup("test-group")
expiredMember := &GroupMember{ expiredMember := &GroupMember{
ID: "expired-member", ID: "expired-member",
SessionTimeout: 1000, // 1 second
SessionTimeout: 1000, // 1 second
LastHeartbeat: time.Now().Add(-2 * time.Second), // 2 seconds ago LastHeartbeat: time.Now().Add(-2 * time.Second), // 2 seconds ago
State: MemberStateStable, State: MemberStateStable,
} }
activeMember := &GroupMember{ activeMember := &GroupMember{
ID: "active-member",
SessionTimeout: 30000, // 30 seconds
ID: "active-member",
SessionTimeout: 30000, // 30 seconds
LastHeartbeat: time.Now(), // just now LastHeartbeat: time.Now(), // just now
State: MemberStateStable, State: MemberStateStable,
} }
group.Mu.Lock() group.Mu.Lock()
group.Members[expiredMember.ID] = expiredMember group.Members[expiredMember.ID] = expiredMember
group.Members[activeMember.ID] = activeMember group.Members[activeMember.ID] = activeMember
group.Leader = expiredMember.ID // Make expired member the leader group.Leader = expiredMember.ID // Make expired member the leader
group.Mu.Unlock() group.Mu.Unlock()
// Perform cleanup // Perform cleanup
gc.performCleanup() gc.performCleanup()
group.Mu.RLock() group.Mu.RLock()
defer group.Mu.RUnlock() defer group.Mu.RUnlock()
// Expired member should be removed // Expired member should be removed
if _, exists := group.Members[expiredMember.ID]; exists { if _, exists := group.Members[expiredMember.ID]; exists {
t.Error("Expected expired member to be removed") t.Error("Expected expired member to be removed")
} }
// Active member should remain // Active member should remain
if _, exists := group.Members[activeMember.ID]; !exists { if _, exists := group.Members[activeMember.ID]; !exists {
t.Error("Expected active member to remain") t.Error("Expected active member to remain")
} }
// Leader should be reset since expired member was leader // Leader should be reset since expired member was leader
if group.Leader == expiredMember.ID { if group.Leader == expiredMember.ID {
t.Error("Expected leader to be reset after expired member removal") t.Error("Expected leader to be reset after expired member removal")
@ -196,33 +196,33 @@ func TestGroupCoordinator_Cleanup(t *testing.T) {
func TestGroupCoordinator_GenerateMemberID(t *testing.T) { func TestGroupCoordinator_GenerateMemberID(t *testing.T) {
gc := NewGroupCoordinator() gc := NewGroupCoordinator()
defer gc.Close() defer gc.Close()
// Test that same client/host combination generates consistent member ID // Test that same client/host combination generates consistent member ID
id1 := gc.GenerateMemberID("client1", "host1") id1 := gc.GenerateMemberID("client1", "host1")
id2 := gc.GenerateMemberID("client1", "host1") id2 := gc.GenerateMemberID("client1", "host1")
// Same client/host should generate same ID (deterministic) // Same client/host should generate same ID (deterministic)
if id1 != id2 { if id1 != id2 {
t.Errorf("Expected same member ID for same client/host: %s vs %s", id1, id2) t.Errorf("Expected same member ID for same client/host: %s vs %s", id1, id2)
} }
// Different clients should generate different IDs // Different clients should generate different IDs
id3 := gc.GenerateMemberID("client2", "host1") id3 := gc.GenerateMemberID("client2", "host1")
id4 := gc.GenerateMemberID("client1", "host2") id4 := gc.GenerateMemberID("client1", "host2")
if id1 == id3 { if id1 == id3 {
t.Errorf("Expected different member IDs for different clients: %s vs %s", id1, id3) t.Errorf("Expected different member IDs for different clients: %s vs %s", id1, id3)
} }
if id1 == id4 { if id1 == id4 {
t.Errorf("Expected different member IDs for different hosts: %s vs %s", id1, id4) t.Errorf("Expected different member IDs for different hosts: %s vs %s", id1, id4)
} }
// IDs should be properly formatted // IDs should be properly formatted
if len(id1) < 10 { // Should be longer than just "consumer-" if len(id1) < 10 { // Should be longer than just "consumer-"
t.Errorf("Expected member ID to be properly formatted, got: %s", id1) t.Errorf("Expected member ID to be properly formatted, got: %s", id1)
} }
// Should start with "consumer-" prefix // Should start with "consumer-" prefix
if !strings.HasPrefix(id1, "consumer-") { if !strings.HasPrefix(id1, "consumer-") {
t.Errorf("Expected member ID to start with 'consumer-', got: %s", id1) t.Errorf("Expected member ID to start with 'consumer-', got: %s", id1)

6
weed/mq/kafka/gateway/server_test.go

@ -12,7 +12,7 @@ func TestServerStartAndClose(t *testing.T) {
t.Skip("This test requires SeaweedMQ Agent integration - run manually with agent available") t.Skip("This test requires SeaweedMQ Agent integration - run manually with agent available")
srv := NewServer(Options{ srv := NewServer(Options{
Listen: ":0",
Listen: ":0",
Masters: "localhost:9333", // Use masters instead of AgentAddress Masters: "localhost:9333", // Use masters instead of AgentAddress
}) })
if err := srv.Start(); err != nil { if err := srv.Start(); err != nil {
@ -42,7 +42,7 @@ func TestGetListenerAddr(t *testing.T) {
// Test with localhost binding - should return the actual address // Test with localhost binding - should return the actual address
srv := NewServer(Options{ srv := NewServer(Options{
Listen: "127.0.0.1:0",
Listen: "127.0.0.1:0",
Masters: "localhost:9333", // Would need real agent for this test Masters: "localhost:9333", // Would need real agent for this test
}) })
if err := srv.Start(); err != nil { if err := srv.Start(); err != nil {
@ -60,7 +60,7 @@ func TestGetListenerAddr(t *testing.T) {
// Test IPv6 all interfaces binding - should resolve to non-loopback IP // Test IPv6 all interfaces binding - should resolve to non-loopback IP
srv6 := NewServer(Options{ srv6 := NewServer(Options{
Listen: "[::]:0",
Listen: "[::]:0",
Masters: "localhost:9333", // Would need real agent for this test Masters: "localhost:9333", // Would need real agent for this test
}) })
if err := srv6.Start(); err != nil { if err := srv6.Start(); err != nil {

2
weed/mq/kafka/schema/broker_client_fetch_test.go

@ -228,7 +228,7 @@ func TestBrokerClient_SubscriberConfiguration(t *testing.T) {
// With mock brokers, behavior may vary - just verify no panic // With mock brokers, behavior may vary - just verify no panic
t.Logf("Subscriber creation results: err1=%v, err2=%v", err1, err2) t.Logf("Subscriber creation results: err1=%v, err2=%v", err1, err2)
// Don't assert errors as mock behavior may vary // Don't assert errors as mock behavior may vary
// Verify broker client is still functional after failed subscriber creation // Verify broker client is still functional after failed subscriber creation
if brokerClient != nil { if brokerClient != nil {
t.Log("Broker client remains functional after subscriber creation attempts") t.Log("Broker client remains functional after subscriber creation attempts")

Loading…
Cancel
Save