You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
196 lines
5.2 KiB
196 lines
5.2 KiB
package consumer
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func TestGroupCoordinator_StaticMembership(t *testing.T) {
|
|
gc := NewGroupCoordinator()
|
|
defer gc.Close()
|
|
|
|
group := gc.GetOrCreateGroup("test-group")
|
|
|
|
// Test static member registration
|
|
instanceID := "static-instance-1"
|
|
member := &GroupMember{
|
|
ID: "member-1",
|
|
ClientID: "client-1",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: &instanceID,
|
|
SessionTimeout: 30000,
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now(),
|
|
}
|
|
|
|
// Add member to group
|
|
group.Members[member.ID] = member
|
|
gc.RegisterStaticMember(group, member)
|
|
|
|
// Test finding static member
|
|
foundMember := gc.FindStaticMember(group, instanceID)
|
|
if foundMember == nil {
|
|
t.Error("Expected to find static member, got nil")
|
|
}
|
|
if foundMember.ID != member.ID {
|
|
t.Errorf("Expected member ID %s, got %s", member.ID, foundMember.ID)
|
|
}
|
|
|
|
// Test IsStaticMember
|
|
if !gc.IsStaticMember(member) {
|
|
t.Error("Expected member to be static")
|
|
}
|
|
|
|
// Test dynamic member (no instance ID)
|
|
dynamicMember := &GroupMember{
|
|
ID: "member-2",
|
|
ClientID: "client-2",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: nil,
|
|
SessionTimeout: 30000,
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now(),
|
|
}
|
|
|
|
if gc.IsStaticMember(dynamicMember) {
|
|
t.Error("Expected member to be dynamic")
|
|
}
|
|
|
|
// Test unregistering static member
|
|
gc.UnregisterStaticMember(group, instanceID)
|
|
foundMember = gc.FindStaticMember(group, instanceID)
|
|
if foundMember != nil {
|
|
t.Error("Expected static member to be unregistered")
|
|
}
|
|
}
|
|
|
|
func TestGroupCoordinator_StaticMemberReconnection(t *testing.T) {
|
|
gc := NewGroupCoordinator()
|
|
defer gc.Close()
|
|
|
|
group := gc.GetOrCreateGroup("test-group")
|
|
instanceID := "static-instance-1"
|
|
|
|
// First connection
|
|
member1 := &GroupMember{
|
|
ID: "member-1",
|
|
ClientID: "client-1",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: &instanceID,
|
|
SessionTimeout: 30000,
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now(),
|
|
}
|
|
|
|
group.Members[member1.ID] = member1
|
|
gc.RegisterStaticMember(group, member1)
|
|
|
|
// Simulate disconnection and reconnection with same instance ID
|
|
delete(group.Members, member1.ID)
|
|
|
|
// Reconnection with same instance ID should reuse the mapping
|
|
member2 := &GroupMember{
|
|
ID: "member-2", // Different member ID
|
|
ClientID: "client-1",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: &instanceID, // Same instance ID
|
|
SessionTimeout: 30000,
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now(),
|
|
}
|
|
|
|
group.Members[member2.ID] = member2
|
|
gc.RegisterStaticMember(group, member2)
|
|
|
|
// Should find the new member with the same instance ID
|
|
foundMember := gc.FindStaticMember(group, instanceID)
|
|
if foundMember == nil {
|
|
t.Error("Expected to find static member after reconnection")
|
|
}
|
|
if foundMember.ID != member2.ID {
|
|
t.Errorf("Expected member ID %s, got %s", member2.ID, foundMember.ID)
|
|
}
|
|
}
|
|
|
|
func TestGroupCoordinator_StaticMembershipEdgeCases(t *testing.T) {
|
|
gc := NewGroupCoordinator()
|
|
defer gc.Close()
|
|
|
|
group := gc.GetOrCreateGroup("test-group")
|
|
|
|
// Test empty instance ID
|
|
member := &GroupMember{
|
|
ID: "member-1",
|
|
ClientID: "client-1",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: nil,
|
|
SessionTimeout: 30000,
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now(),
|
|
}
|
|
|
|
gc.RegisterStaticMember(group, member) // Should be no-op
|
|
foundMember := gc.FindStaticMember(group, "")
|
|
if foundMember != nil {
|
|
t.Error("Expected not to find member with empty instance ID")
|
|
}
|
|
|
|
// Test empty string instance ID
|
|
emptyInstanceID := ""
|
|
member.GroupInstanceID = &emptyInstanceID
|
|
gc.RegisterStaticMember(group, member) // Should be no-op
|
|
foundMember = gc.FindStaticMember(group, emptyInstanceID)
|
|
if foundMember != nil {
|
|
t.Error("Expected not to find member with empty string instance ID")
|
|
}
|
|
|
|
// Test unregistering non-existent instance ID
|
|
gc.UnregisterStaticMember(group, "non-existent") // Should be no-op
|
|
}
|
|
|
|
func TestGroupCoordinator_StaticMembershipConcurrency(t *testing.T) {
|
|
gc := NewGroupCoordinator()
|
|
defer gc.Close()
|
|
|
|
group := gc.GetOrCreateGroup("test-group")
|
|
instanceID := "static-instance-1"
|
|
|
|
// Test concurrent access
|
|
done := make(chan bool, 2)
|
|
|
|
// Goroutine 1: Register static member
|
|
go func() {
|
|
member := &GroupMember{
|
|
ID: "member-1",
|
|
ClientID: "client-1",
|
|
ClientHost: "localhost",
|
|
GroupInstanceID: &instanceID,
|
|
SessionTimeout: 30000,
|
|
State: MemberStatePending,
|
|
LastHeartbeat: time.Now(),
|
|
JoinedAt: time.Now(),
|
|
}
|
|
group.Members[member.ID] = member
|
|
gc.RegisterStaticMember(group, member)
|
|
done <- true
|
|
}()
|
|
|
|
// Goroutine 2: Find static member
|
|
go func() {
|
|
time.Sleep(10 * time.Millisecond) // Small delay to ensure registration happens first
|
|
foundMember := gc.FindStaticMember(group, instanceID)
|
|
if foundMember == nil {
|
|
t.Error("Expected to find static member in concurrent access")
|
|
}
|
|
done <- true
|
|
}()
|
|
|
|
// Wait for both goroutines to complete
|
|
<-done
|
|
<-done
|
|
}
|