You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

196 lines
5.2 KiB

package consumer
import (
"testing"
"time"
)
func TestGroupCoordinator_StaticMembership(t *testing.T) {
gc := NewGroupCoordinator()
defer gc.Close()
group := gc.GetOrCreateGroup("test-group")
// Test static member registration
instanceID := "static-instance-1"
member := &GroupMember{
ID: "member-1",
ClientID: "client-1",
ClientHost: "localhost",
GroupInstanceID: &instanceID,
SessionTimeout: 30000,
State: MemberStatePending,
LastHeartbeat: time.Now(),
JoinedAt: time.Now(),
}
// Add member to group
group.Members[member.ID] = member
gc.RegisterStaticMember(group, member)
// Test finding static member
foundMember := gc.FindStaticMember(group, instanceID)
if foundMember == nil {
t.Error("Expected to find static member, got nil")
}
if foundMember.ID != member.ID {
t.Errorf("Expected member ID %s, got %s", member.ID, foundMember.ID)
}
// Test IsStaticMember
if !gc.IsStaticMember(member) {
t.Error("Expected member to be static")
}
// Test dynamic member (no instance ID)
dynamicMember := &GroupMember{
ID: "member-2",
ClientID: "client-2",
ClientHost: "localhost",
GroupInstanceID: nil,
SessionTimeout: 30000,
State: MemberStatePending,
LastHeartbeat: time.Now(),
JoinedAt: time.Now(),
}
if gc.IsStaticMember(dynamicMember) {
t.Error("Expected member to be dynamic")
}
// Test unregistering static member
gc.UnregisterStaticMember(group, instanceID)
foundMember = gc.FindStaticMember(group, instanceID)
if foundMember != nil {
t.Error("Expected static member to be unregistered")
}
}
func TestGroupCoordinator_StaticMemberReconnection(t *testing.T) {
gc := NewGroupCoordinator()
defer gc.Close()
group := gc.GetOrCreateGroup("test-group")
instanceID := "static-instance-1"
// First connection
member1 := &GroupMember{
ID: "member-1",
ClientID: "client-1",
ClientHost: "localhost",
GroupInstanceID: &instanceID,
SessionTimeout: 30000,
State: MemberStatePending,
LastHeartbeat: time.Now(),
JoinedAt: time.Now(),
}
group.Members[member1.ID] = member1
gc.RegisterStaticMember(group, member1)
// Simulate disconnection and reconnection with same instance ID
delete(group.Members, member1.ID)
// Reconnection with same instance ID should reuse the mapping
member2 := &GroupMember{
ID: "member-2", // Different member ID
ClientID: "client-1",
ClientHost: "localhost",
GroupInstanceID: &instanceID, // Same instance ID
SessionTimeout: 30000,
State: MemberStatePending,
LastHeartbeat: time.Now(),
JoinedAt: time.Now(),
}
group.Members[member2.ID] = member2
gc.RegisterStaticMember(group, member2)
// Should find the new member with the same instance ID
foundMember := gc.FindStaticMember(group, instanceID)
if foundMember == nil {
t.Error("Expected to find static member after reconnection")
}
if foundMember.ID != member2.ID {
t.Errorf("Expected member ID %s, got %s", member2.ID, foundMember.ID)
}
}
func TestGroupCoordinator_StaticMembershipEdgeCases(t *testing.T) {
gc := NewGroupCoordinator()
defer gc.Close()
group := gc.GetOrCreateGroup("test-group")
// Test empty instance ID
member := &GroupMember{
ID: "member-1",
ClientID: "client-1",
ClientHost: "localhost",
GroupInstanceID: nil,
SessionTimeout: 30000,
State: MemberStatePending,
LastHeartbeat: time.Now(),
JoinedAt: time.Now(),
}
gc.RegisterStaticMember(group, member) // Should be no-op
foundMember := gc.FindStaticMember(group, "")
if foundMember != nil {
t.Error("Expected not to find member with empty instance ID")
}
// Test empty string instance ID
emptyInstanceID := ""
member.GroupInstanceID = &emptyInstanceID
gc.RegisterStaticMember(group, member) // Should be no-op
foundMember = gc.FindStaticMember(group, emptyInstanceID)
if foundMember != nil {
t.Error("Expected not to find member with empty string instance ID")
}
// Test unregistering non-existent instance ID
gc.UnregisterStaticMember(group, "non-existent") // Should be no-op
}
func TestGroupCoordinator_StaticMembershipConcurrency(t *testing.T) {
gc := NewGroupCoordinator()
defer gc.Close()
group := gc.GetOrCreateGroup("test-group")
instanceID := "static-instance-1"
// Test concurrent access
done := make(chan bool, 2)
// Goroutine 1: Register static member
go func() {
member := &GroupMember{
ID: "member-1",
ClientID: "client-1",
ClientHost: "localhost",
GroupInstanceID: &instanceID,
SessionTimeout: 30000,
State: MemberStatePending,
LastHeartbeat: time.Now(),
JoinedAt: time.Now(),
}
group.Members[member.ID] = member
gc.RegisterStaticMember(group, member)
done <- true
}()
// Goroutine 2: Find static member
go func() {
time.Sleep(10 * time.Millisecond) // Small delay to ensure registration happens first
foundMember := gc.FindStaticMember(group, instanceID)
if foundMember == nil {
t.Error("Expected to find static member in concurrent access")
}
done <- true
}()
// Wait for both goroutines to complete
<-done
<-done
}