You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

441 lines
13 KiB

package protocol
import (
"encoding/binary"
"testing"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
// Helper function to build JoinGroup request body for testing
func buildJoinGroupRequestBody(req *JoinGroupRequest) []byte {
// This is a simplified version for testing
// In practice, you'd use the actual protocol serialization
body := make([]byte, 0, 256)
// Group ID (string)
groupIDLen := make([]byte, 2)
binary.BigEndian.PutUint16(groupIDLen, uint16(len(req.GroupID)))
body = append(body, groupIDLen...)
body = append(body, []byte(req.GroupID)...)
// Session timeout (4 bytes)
sessionTimeout := make([]byte, 4)
binary.BigEndian.PutUint32(sessionTimeout, uint32(req.SessionTimeout))
body = append(body, sessionTimeout...)
// Rebalance timeout (4 bytes)
rebalanceTimeout := make([]byte, 4)
binary.BigEndian.PutUint32(rebalanceTimeout, uint32(req.RebalanceTimeout))
body = append(body, rebalanceTimeout...)
// Member ID (string)
memberIDLen := make([]byte, 2)
binary.BigEndian.PutUint16(memberIDLen, uint16(len(req.MemberID)))
body = append(body, memberIDLen...)
body = append(body, []byte(req.MemberID)...)
// Group instance ID (string, v5+)
instanceIDLen := make([]byte, 2)
if req.GroupInstanceID == "" {
binary.BigEndian.PutUint16(instanceIDLen, 0xFFFF) // null
} else {
binary.BigEndian.PutUint16(instanceIDLen, uint16(len(req.GroupInstanceID)))
}
body = append(body, instanceIDLen...)
if req.GroupInstanceID != "" {
body = append(body, []byte(req.GroupInstanceID)...)
}
// Protocol type (string)
protocolTypeLen := make([]byte, 2)
binary.BigEndian.PutUint16(protocolTypeLen, uint16(len(req.ProtocolType)))
body = append(body, protocolTypeLen...)
body = append(body, []byte(req.ProtocolType)...)
// Group protocols array (simplified)
protocolsCount := make([]byte, 4)
binary.BigEndian.PutUint32(protocolsCount, uint32(len(req.GroupProtocols)))
body = append(body, protocolsCount...)
for _, protocol := range req.GroupProtocols {
// Protocol name
nameLen := make([]byte, 2)
binary.BigEndian.PutUint16(nameLen, uint16(len(protocol.Name)))
body = append(body, nameLen...)
body = append(body, []byte(protocol.Name)...)
// Protocol metadata
metadataLen := make([]byte, 4)
binary.BigEndian.PutUint32(metadataLen, uint32(len(protocol.Metadata)))
body = append(body, metadataLen...)
body = append(body, protocol.Metadata...)
}
return body
}
// Helper function to build LeaveGroup request body for testing
func buildLeaveGroupRequestBody(req *LeaveGroupRequest) []byte {
body := make([]byte, 0, 128)
// Group ID (string)
groupIDLen := make([]byte, 2)
binary.BigEndian.PutUint16(groupIDLen, uint16(len(req.GroupID)))
body = append(body, groupIDLen...)
body = append(body, []byte(req.GroupID)...)
// Member ID (string)
memberIDLen := make([]byte, 2)
binary.BigEndian.PutUint16(memberIDLen, uint16(len(req.MemberID)))
body = append(body, memberIDLen...)
body = append(body, []byte(req.MemberID)...)
// Group instance ID (string, v3+)
instanceIDLen := make([]byte, 2)
if req.GroupInstanceID == "" {
binary.BigEndian.PutUint16(instanceIDLen, 0xFFFF) // null
} else {
binary.BigEndian.PutUint16(instanceIDLen, uint16(len(req.GroupInstanceID)))
}
body = append(body, instanceIDLen...)
if req.GroupInstanceID != "" {
body = append(body, []byte(req.GroupInstanceID)...)
}
return body
}
func TestHandler_JoinGroup_StaticMembership(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Test static member joining
t.Run("StaticMemberJoin", func(t *testing.T) {
request := &JoinGroupRequest{
GroupID: "test-group",
SessionTimeout: 30000,
RebalanceTimeout: 300000,
MemberID: "", // New member
GroupInstanceID: "static-instance-1",
ProtocolType: "consumer",
GroupProtocols: []GroupProtocol{
{Name: "range", Metadata: []byte{}},
},
}
// Build request body (simplified for testing)
requestBody := buildJoinGroupRequestBody(request)
response, err := handler.handleJoinGroup(12345, 5, requestBody)
if err != nil {
t.Fatalf("JoinGroup failed: %v", err)
}
// Parse response to verify member was added
if len(response) < 10 {
t.Fatalf("Response too short: %d bytes", len(response))
}
// Check error code (should be 0)
errorCode := binary.BigEndian.Uint16(response[4:6])
if errorCode != 0 {
t.Errorf("Expected error code 0, got %d", errorCode)
}
// Verify member was added to group
group := handler.groupCoordinator.GetGroup("test-group")
if group == nil {
t.Fatal("Group not found")
}
if len(group.Members) != 1 {
t.Errorf("Expected 1 member, got %d", len(group.Members))
}
// Find the member and verify static membership
var member *consumer.GroupMember
for _, m := range group.Members {
member = m
break
}
if member == nil {
t.Fatal("Member not found")
}
if member.GroupInstanceID == nil || *member.GroupInstanceID != "static-instance-1" {
t.Errorf("Expected GroupInstanceID 'static-instance-1', got %v", member.GroupInstanceID)
}
if !handler.groupCoordinator.IsStaticMember(member) {
t.Error("Expected member to be static")
}
})
// Test static member reconnection
t.Run("StaticMemberReconnection", func(t *testing.T) {
// First join
request1 := &JoinGroupRequest{
GroupID: "test-group-2",
SessionTimeout: 30000,
RebalanceTimeout: 300000,
MemberID: "",
GroupInstanceID: "static-instance-2",
ProtocolType: "consumer",
GroupProtocols: []GroupProtocol{
{Name: "range", Metadata: []byte{}},
},
}
requestBody1 := buildJoinGroupRequestBody(request1)
_, err := handler.handleJoinGroup(12346, 5, requestBody1)
if err != nil {
t.Fatalf("First JoinGroup failed: %v", err)
}
group := handler.groupCoordinator.GetGroup("test-group-2")
if len(group.Members) != 1 {
t.Errorf("Expected 1 member after first join, got %d", len(group.Members))
}
// Get the first member ID
var firstMemberID string
for memberID := range group.Members {
firstMemberID = memberID
break
}
// Second join with same instance ID (simulating reconnection)
request2 := &JoinGroupRequest{
GroupID: "test-group-2",
SessionTimeout: 30000,
RebalanceTimeout: 300000,
MemberID: "",
GroupInstanceID: "static-instance-2", // Same instance ID
ProtocolType: "consumer",
GroupProtocols: []GroupProtocol{
{Name: "range", Metadata: []byte{}},
},
}
requestBody2 := buildJoinGroupRequestBody(request2)
_, err = handler.handleJoinGroup(12347, 5, requestBody2)
if err != nil {
t.Fatalf("Second JoinGroup failed: %v", err)
}
// Should still have the same member (static member reused)
if len(group.Members) != 1 {
t.Errorf("Expected 1 member after reconnection, got %d", len(group.Members))
}
// Verify the member ID is the same (static member reused)
var secondMemberID string
for memberID := range group.Members {
secondMemberID = memberID
break
}
if firstMemberID != secondMemberID {
t.Errorf("Expected same member ID for static member reconnection, got %s vs %s", firstMemberID, secondMemberID)
}
})
// Test dynamic member (no instance ID)
t.Run("DynamicMember", func(t *testing.T) {
request := &JoinGroupRequest{
GroupID: "test-group-3",
SessionTimeout: 30000,
RebalanceTimeout: 300000,
MemberID: "",
GroupInstanceID: "", // No instance ID (dynamic member)
ProtocolType: "consumer",
GroupProtocols: []GroupProtocol{
{Name: "range", Metadata: []byte{}},
},
}
requestBody := buildJoinGroupRequestBody(request)
_, err := handler.handleJoinGroup(12348, 5, requestBody)
if err != nil {
t.Fatalf("JoinGroup failed: %v", err)
}
group := handler.groupCoordinator.GetGroup("test-group-3")
if len(group.Members) != 1 {
t.Errorf("Expected 1 member, got %d", len(group.Members))
}
// Find the member and verify it's dynamic
var member *consumer.GroupMember
for _, m := range group.Members {
member = m
break
}
if member == nil {
t.Fatal("Member not found")
}
if handler.groupCoordinator.IsStaticMember(member) {
t.Error("Expected member to be dynamic")
}
})
}
func TestHandler_LeaveGroup_StaticMembership(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Setup: Add a static member
group := handler.groupCoordinator.GetOrCreateGroup("test-group")
instanceID := "static-instance-1"
member := &consumer.GroupMember{
ID: "member-1",
ClientID: "client-1",
ClientHost: "localhost",
GroupInstanceID: &instanceID,
SessionTimeout: 30000,
State: consumer.MemberStatePending,
}
group.Members[member.ID] = member
handler.groupCoordinator.RegisterStaticMember(group, member)
// Test leaving static member
t.Run("StaticMemberLeave", func(t *testing.T) {
request := &LeaveGroupRequest{
GroupID: "test-group",
MemberID: "member-1",
GroupInstanceID: "static-instance-1",
}
requestBody := buildLeaveGroupRequestBody(request)
response, err := handler.handleLeaveGroup(12349, 3, requestBody)
if err != nil {
t.Fatalf("LeaveGroup failed: %v", err)
}
// Check error code (should be 0)
if len(response) < 6 {
t.Fatalf("Response too short: %d bytes", len(response))
}
errorCode := binary.BigEndian.Uint16(response[4:6])
if errorCode != 0 {
t.Errorf("Expected error code 0, got %d", errorCode)
}
// Verify member was removed
if len(group.Members) != 0 {
t.Errorf("Expected 0 members after leave, got %d", len(group.Members))
}
// Verify static member was unregistered
foundMember := handler.groupCoordinator.FindStaticMember(group, instanceID)
if foundMember != nil {
t.Error("Expected static member to be unregistered")
}
})
// Test leaving with wrong instance ID
t.Run("StaticMemberLeaveWrongInstanceID", func(t *testing.T) {
// Setup: Add another static member
instanceID2 := "static-instance-2"
member2 := &consumer.GroupMember{
ID: "member-2",
ClientID: "client-2",
ClientHost: "localhost",
GroupInstanceID: &instanceID2,
SessionTimeout: 30000,
State: consumer.MemberStatePending,
}
group.Members[member2.ID] = member2
handler.groupCoordinator.RegisterStaticMember(group, member2)
request := &LeaveGroupRequest{
GroupID: "test-group",
MemberID: "member-2",
GroupInstanceID: "wrong-instance-id", // Wrong instance ID
}
requestBody := buildLeaveGroupRequestBody(request)
response, err := handler.handleLeaveGroup(12350, 3, requestBody)
if err != nil {
t.Fatalf("LeaveGroup failed: %v", err)
}
// Check error code (should be FENCED_INSTANCE_ID)
if len(response) < 6 {
t.Fatalf("Response too short: %d bytes", len(response))
}
errorCode := binary.BigEndian.Uint16(response[4:6])
if errorCode != uint16(ErrorCodeFencedInstanceID) {
t.Errorf("Expected error code %d (FENCED_INSTANCE_ID), got %d", ErrorCodeFencedInstanceID, errorCode)
}
// Verify member was NOT removed
if len(group.Members) != 1 {
t.Errorf("Expected 1 member after failed leave, got %d", len(group.Members))
}
})
}
func TestHandler_DescribeGroups_StaticMembership(t *testing.T) {
handler := NewTestHandler()
defer handler.Close()
// Setup: Add static and dynamic members
group := handler.groupCoordinator.GetOrCreateGroup("test-group")
// Static member
staticInstanceID := "static-instance-1"
staticMember := &consumer.GroupMember{
ID: "static-member",
ClientID: "static-client",
ClientHost: "localhost",
GroupInstanceID: &staticInstanceID,
SessionTimeout: 30000,
State: consumer.MemberStateStable,
}
group.Members[staticMember.ID] = staticMember
handler.groupCoordinator.RegisterStaticMember(group, staticMember)
// Dynamic member
dynamicMember := &consumer.GroupMember{
ID: "dynamic-member",
ClientID: "dynamic-client",
ClientHost: "localhost",
GroupInstanceID: nil,
SessionTimeout: 30000,
State: consumer.MemberStateStable,
}
group.Members[dynamicMember.ID] = dynamicMember
group.State = consumer.GroupStateStable
// Test DescribeGroups includes GroupInstanceID
// Build a simple DescribeGroups request body
requestBody := make([]byte, 0, 64)
// Groups array count (4 bytes)
groupsCount := make([]byte, 4)
binary.BigEndian.PutUint32(groupsCount, 1)
requestBody = append(requestBody, groupsCount...)
// Group ID (string)
groupIDLen := make([]byte, 2)
binary.BigEndian.PutUint16(groupIDLen, uint16(len("test-group")))
requestBody = append(requestBody, groupIDLen...)
requestBody = append(requestBody, []byte("test-group")...)
response, err := handler.handleDescribeGroups(12351, 5, requestBody)
if err != nil {
t.Fatalf("DescribeGroups failed: %v", err)
}
if len(response) < 10 {
t.Fatalf("Response too short: %d bytes", len(response))
}
// Basic validation - response should contain group information
// In a real test, we'd parse the full response to verify GroupInstanceID is included
t.Logf("DescribeGroups response length: %d bytes", len(response))
}