You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

334 lines
7.5 KiB

package protocol
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
func TestHandler_handleDescribeGroups(t *testing.T) {
handler := &Handler{}
// Test with no group coordinator
t.Run("NoCoordinator", func(t *testing.T) {
request := []byte{
0, 0, 0, 1, // 1 group
0, 10, 't', 'e', 's', 't', '-', 'g', 'r', 'o', 'u', 'p', // "test-group"
}
response, err := handler.handleDescribeGroups(123, 0, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
// Should contain error code for GROUP_COORDINATOR_NOT_AVAILABLE
t.Logf("Response length: %d bytes", len(response))
})
// Test with group coordinator but no groups
t.Run("WithCoordinatorNoGroups", func(t *testing.T) {
coordinator := consumer.NewGroupCoordinator()
handler.groupCoordinator = coordinator
request := []byte{
0, 0, 0, 1, // 1 group
0, 10, 't', 'e', 's', 't', '-', 'g', 'r', 'o', 'u', 'p', // "test-group"
}
response, err := handler.handleDescribeGroups(123, 0, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
// Should contain error code for UNKNOWN_GROUP_ID
t.Logf("Response length: %d bytes", len(response))
})
// Test parsing edge cases
t.Run("InvalidRequest", func(t *testing.T) {
request := []byte{0, 0} // Too short
_, err := handler.handleDescribeGroups(123, 0, request)
if err == nil {
t.Fatal("Expected error for invalid request")
}
})
}
func TestHandler_handleListGroups(t *testing.T) {
handler := &Handler{}
// Test with no group coordinator
t.Run("NoCoordinator", func(t *testing.T) {
request := []byte{} // Empty request
response, err := handler.handleListGroups(123, 0, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(response))
})
// Test with group coordinator
t.Run("WithCoordinator", func(t *testing.T) {
coordinator := consumer.NewGroupCoordinator()
handler.groupCoordinator = coordinator
request := []byte{} // Empty request
response, err := handler.handleListGroups(123, 0, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(response))
})
// Test with states filter (v4+)
t.Run("WithStatesFilter", func(t *testing.T) {
coordinator := consumer.NewGroupCoordinator()
handler.groupCoordinator = coordinator
// Request with states filter: ["Stable"]
request := []byte{
0, 0, 0, 1, // 1 state filter
0, 6, 'S', 't', 'a', 'b', 'l', 'e', // "Stable"
}
response, err := handler.handleListGroups(123, 4, request)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if len(response) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(response))
})
}
func TestHandler_parseDescribeGroupsRequest(t *testing.T) {
handler := &Handler{}
tests := []struct {
name string
data []byte
apiVersion uint16
expectError bool
expectedLen int
}{
{
name: "single group",
data: []byte{
0, 0, 0, 1, // 1 group
0, 10, 't', 'e', 's', 't', '-', 'g', 'r', 'o', 'u', 'p', // "test-group"
},
apiVersion: 0,
expectError: false,
expectedLen: 1,
},
{
name: "multiple groups",
data: []byte{
0, 0, 0, 2, // 2 groups
0, 6, 'g', 'r', 'o', 'u', 'p', '1', // "group1"
0, 6, 'g', 'r', 'o', 'u', 'p', '2', // "group2"
},
apiVersion: 0,
expectError: false,
expectedLen: 2,
},
{
name: "empty request",
data: []byte{},
apiVersion: 0,
expectError: true,
},
{
name: "truncated request",
data: []byte{
0, 0, 0, 1, // 1 group
0, 10, 't', 'e', 's', 't', // Incomplete group name
},
apiVersion: 0,
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := handler.parseDescribeGroupsRequest(tt.data, tt.apiVersion)
if tt.expectError {
if err == nil {
t.Error("Expected error but got none")
}
return
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
if len(result.GroupIDs) != tt.expectedLen {
t.Errorf("Expected %d groups, got %d", tt.expectedLen, len(result.GroupIDs))
}
})
}
}
func TestHandler_parseListGroupsRequest(t *testing.T) {
handler := &Handler{}
tests := []struct {
name string
data []byte
apiVersion uint16
expectError bool
expectedLen int
}{
{
name: "empty request v0",
data: []byte{},
apiVersion: 0,
expectError: false,
expectedLen: 0,
},
{
name: "with states filter v4",
data: []byte{
0, 0, 0, 2, // 2 states
0, 6, 'S', 't', 'a', 'b', 'l', 'e', // "Stable"
0, 5, 'E', 'm', 'p', 't', 'y', // "Empty"
},
apiVersion: 4,
expectError: false,
expectedLen: 2,
},
{
name: "no states filter v4",
data: []byte{
0, 0, 0, 0, // 0 states
},
apiVersion: 4,
expectError: false,
expectedLen: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := handler.parseListGroupsRequest(tt.data, tt.apiVersion)
if tt.expectError {
if err == nil {
t.Error("Expected error but got none")
}
return
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
if len(result.StatesFilter) != tt.expectedLen {
t.Errorf("Expected %d states, got %d", tt.expectedLen, len(result.StatesFilter))
}
})
}
}
func TestHandler_buildDescribeGroupsResponse(t *testing.T) {
handler := &Handler{}
response := DescribeGroupsResponse{
ThrottleTimeMs: 0,
Groups: []DescribeGroupsGroup{
{
ErrorCode: 0,
GroupID: "test-group",
State: "Stable",
ProtocolType: "consumer",
Protocol: "range",
Members: []DescribeGroupsMember{
{
MemberID: "member-1",
GroupInstanceID: nil,
ClientID: "client-1",
ClientHost: "localhost",
MemberMetadata: []byte("metadata"),
MemberAssignment: []byte("assignment"),
},
},
AuthorizedOps: []int32{},
},
},
}
result := handler.buildDescribeGroupsResponse(response, 123, 0)
if len(result) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(result))
// Test with different API versions
resultV3 := handler.buildDescribeGroupsResponse(response, 123, 3)
if len(resultV3) <= len(result) {
t.Error("Expected v3 response to be larger (includes authorized ops)")
}
}
func TestHandler_buildListGroupsResponse(t *testing.T) {
handler := &Handler{}
response := ListGroupsResponse{
ThrottleTimeMs: 0,
ErrorCode: 0,
Groups: []ListGroupsGroup{
{
GroupID: "group-1",
ProtocolType: "consumer",
GroupState: "Stable",
},
{
GroupID: "group-2",
ProtocolType: "consumer",
GroupState: "Empty",
},
},
}
result := handler.buildListGroupsResponse(response, 123, 0)
if len(result) == 0 {
t.Fatal("Expected non-empty response")
}
t.Logf("Response length: %d bytes", len(result))
// Test with different API versions
resultV4 := handler.buildListGroupsResponse(response, 123, 4)
if len(resultV4) <= len(result) {
t.Error("Expected v4 response to be larger (includes group state)")
}
}