You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

566 lines
16 KiB

package protocol
import (
"bytes"
"encoding/binary"
"testing"
)
// TestVersionMatrix_JoinGroup tests JoinGroup request parsing across versions
func TestVersionMatrix_JoinGroup(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectReq *JoinGroupRequest
}{
{
name: "JoinGroup v0",
version: 0,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// session_timeout_ms
binary.Write(buf, binary.BigEndian, int32(30000))
// member_id
binary.Write(buf, binary.BigEndian, int16(0)) // empty
// protocol_type
binary.Write(buf, binary.BigEndian, int16(8))
buf.WriteString("consumer")
// group_protocols array count
binary.Write(buf, binary.BigEndian, int32(1))
// protocol_name
binary.Write(buf, binary.BigEndian, int16(5))
buf.WriteString("range")
// protocol_metadata
binary.Write(buf, binary.BigEndian, int32(0)) // empty
return buf.Bytes()
},
expectErr: false,
expectReq: &JoinGroupRequest{
GroupID: "test-group",
SessionTimeout: 30000,
MemberID: "",
ProtocolType: "consumer",
GroupProtocols: []GroupProtocol{{Name: "range", Metadata: []byte{}}},
},
},
{
name: "JoinGroup v5",
version: 5,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// session_timeout_ms
binary.Write(buf, binary.BigEndian, int32(30000))
// rebalance_timeout_ms (v1+)
binary.Write(buf, binary.BigEndian, int32(300000))
// member_id
binary.Write(buf, binary.BigEndian, int16(0)) // empty
// group_instance_id (v5+, nullable)
binary.Write(buf, binary.BigEndian, int16(-1)) // null
// protocol_type
binary.Write(buf, binary.BigEndian, int16(8))
buf.WriteString("consumer")
// group_protocols array count
binary.Write(buf, binary.BigEndian, int32(1))
// protocol_name
binary.Write(buf, binary.BigEndian, int16(5))
buf.WriteString("range")
// protocol_metadata
binary.Write(buf, binary.BigEndian, int32(0)) // empty
return buf.Bytes()
},
expectErr: false,
expectReq: &JoinGroupRequest{
GroupID: "test-group",
SessionTimeout: 30000,
RebalanceTimeout: 300000,
MemberID: "",
GroupInstanceID: "",
ProtocolType: "consumer",
GroupProtocols: []GroupProtocol{{Name: "range", Metadata: []byte{}}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := NewTestHandler()
body := tt.buildBody()
req, err := h.parseJoinGroupRequest(body, uint16(tt.version))
if tt.expectErr && err == nil {
t.Errorf("expected error but got none")
}
if !tt.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
}
if !tt.expectErr && req != nil {
if req.GroupID != tt.expectReq.GroupID {
t.Errorf("GroupID: got %q, want %q", req.GroupID, tt.expectReq.GroupID)
}
if req.SessionTimeout != tt.expectReq.SessionTimeout {
t.Errorf("SessionTimeout: got %d, want %d", req.SessionTimeout, tt.expectReq.SessionTimeout)
}
if tt.version >= 1 && req.RebalanceTimeout != tt.expectReq.RebalanceTimeout {
t.Errorf("RebalanceTimeout: got %d, want %d", req.RebalanceTimeout, tt.expectReq.RebalanceTimeout)
}
if req.MemberID != tt.expectReq.MemberID {
t.Errorf("MemberID: got %q, want %q", req.MemberID, tt.expectReq.MemberID)
}
if tt.version >= 5 && req.GroupInstanceID != tt.expectReq.GroupInstanceID {
t.Errorf("GroupInstanceID: got %q, want %q", req.GroupInstanceID, tt.expectReq.GroupInstanceID)
}
if req.ProtocolType != tt.expectReq.ProtocolType {
t.Errorf("ProtocolType: got %q, want %q", req.ProtocolType, tt.expectReq.ProtocolType)
}
}
})
}
}
// TestVersionMatrix_SyncGroup tests SyncGroup request parsing across versions
func TestVersionMatrix_SyncGroup(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectReq *SyncGroupRequest
}{
{
name: "SyncGroup v0",
version: 0,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// generation_id
binary.Write(buf, binary.BigEndian, int32(1))
// member_id
binary.Write(buf, binary.BigEndian, int16(6))
buf.WriteString("member")
// group_assignment array count
binary.Write(buf, binary.BigEndian, int32(0)) // empty
return buf.Bytes()
},
expectErr: false,
expectReq: &SyncGroupRequest{
GroupID: "test-group",
GenerationID: 1,
MemberID: "member",
GroupAssignments: []GroupAssignment{},
},
},
{
name: "SyncGroup v3",
version: 3,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// generation_id
binary.Write(buf, binary.BigEndian, int32(1))
// member_id
binary.Write(buf, binary.BigEndian, int16(6))
buf.WriteString("member")
// group_instance_id (v3+, nullable)
binary.Write(buf, binary.BigEndian, int16(-1)) // null
// group_assignment array count
binary.Write(buf, binary.BigEndian, int32(0)) // empty
return buf.Bytes()
},
expectErr: false,
expectReq: &SyncGroupRequest{
GroupID: "test-group",
GenerationID: 1,
MemberID: "member",
GroupInstanceID: "",
GroupAssignments: []GroupAssignment{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := NewTestHandler()
body := tt.buildBody()
req, err := h.parseSyncGroupRequest(body, uint16(tt.version))
if tt.expectErr && err == nil {
t.Errorf("expected error but got none")
}
if !tt.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
}
if !tt.expectErr && req != nil {
if req.GroupID != tt.expectReq.GroupID {
t.Errorf("GroupID: got %q, want %q", req.GroupID, tt.expectReq.GroupID)
}
if req.GenerationID != tt.expectReq.GenerationID {
t.Errorf("GenerationID: got %d, want %d", req.GenerationID, tt.expectReq.GenerationID)
}
if req.MemberID != tt.expectReq.MemberID {
t.Errorf("MemberID: got %q, want %q", req.MemberID, tt.expectReq.MemberID)
}
if tt.version >= 3 && req.GroupInstanceID != tt.expectReq.GroupInstanceID {
t.Errorf("GroupInstanceID: got %q, want %q", req.GroupInstanceID, tt.expectReq.GroupInstanceID)
}
}
})
}
}
// TestVersionMatrix_OffsetFetch tests OffsetFetch request parsing across versions
func TestVersionMatrix_OffsetFetch(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectReq *OffsetFetchRequest
}{
{
name: "OffsetFetch v1",
version: 1,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
return buf.Bytes()
},
expectErr: false,
expectReq: &OffsetFetchRequest{
GroupID: "test-group",
Topics: []OffsetFetchTopic{
{
Name: "test-topic",
Partitions: []int32{0},
},
},
},
},
{
name: "OffsetFetch v2",
version: 2,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
return buf.Bytes()
},
expectErr: false,
expectReq: &OffsetFetchRequest{
GroupID: "test-group",
Topics: []OffsetFetchTopic{
{
Name: "test-topic",
Partitions: []int32{0},
},
},
},
},
{
name: "OffsetFetch v2 - empty topics (fetch all)",
version: 2,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// topics array count (0 = fetch all)
binary.Write(buf, binary.BigEndian, int32(0))
return buf.Bytes()
},
expectErr: false,
expectReq: &OffsetFetchRequest{
GroupID: "test-group",
Topics: []OffsetFetchTopic{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := NewTestHandler()
body := tt.buildBody()
req, err := h.parseOffsetFetchRequest(body)
if tt.expectErr && err == nil {
t.Errorf("expected error but got none")
}
if !tt.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
}
if !tt.expectErr && req != nil {
if req.GroupID != tt.expectReq.GroupID {
t.Errorf("GroupID: got %q, want %q", req.GroupID, tt.expectReq.GroupID)
}
if len(req.Topics) != len(tt.expectReq.Topics) {
t.Errorf("Topics count: got %d, want %d", len(req.Topics), len(tt.expectReq.Topics))
}
for i, topic := range req.Topics {
if i < len(tt.expectReq.Topics) {
if topic.Name != tt.expectReq.Topics[i].Name {
t.Errorf("Topic[%d] name: got %q, want %q", i, topic.Name, tt.expectReq.Topics[i].Name)
}
}
}
}
})
}
}
// TestVersionMatrix_FindCoordinator tests FindCoordinator request parsing across versions
func TestVersionMatrix_FindCoordinator(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectKey string
expectType int8
}{
{
name: "FindCoordinator v0",
version: 0,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// coordinator_key
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
return buf.Bytes()
},
expectErr: false,
expectKey: "test-group",
expectType: 0, // GROUP (default for v0)
},
{
name: "FindCoordinator v1",
version: 1,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// coordinator_key
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// coordinator_type (v1+)
binary.Write(buf, binary.BigEndian, int8(0)) // GROUP
return buf.Bytes()
},
expectErr: false,
expectKey: "test-group",
expectType: 0, // GROUP
},
{
name: "FindCoordinator v2",
version: 2,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// coordinator_key
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// coordinator_type (v1+)
binary.Write(buf, binary.BigEndian, int8(1)) // TRANSACTION
return buf.Bytes()
},
expectErr: false,
expectKey: "test-group",
expectType: 1, // TRANSACTION
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
body := tt.buildBody()
// Parse the request manually to test the format
offset := 0
// coordinator_key
if offset+2 > len(body) {
t.Fatalf("body too short for coordinator_key length")
}
keyLen := int(binary.BigEndian.Uint16(body[offset : offset+2]))
offset += 2
if offset+keyLen > len(body) {
t.Fatalf("body too short for coordinator_key")
}
key := string(body[offset : offset+keyLen])
offset += keyLen
// coordinator_type (v1+)
var coordType int8 = 0 // default GROUP
if tt.version >= 1 {
if offset+1 > len(body) {
t.Fatalf("body too short for coordinator_type")
}
coordType = int8(body[offset])
offset++
}
if key != tt.expectKey {
t.Errorf("coordinator_key: got %q, want %q", key, tt.expectKey)
}
if coordType != tt.expectType {
t.Errorf("coordinator_type: got %d, want %d", coordType, tt.expectType)
}
})
}
}
// TestVersionMatrix_ListOffsets tests ListOffsets request parsing across versions
func TestVersionMatrix_ListOffsets(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectReplica int32
expectTopics int
}{
{
name: "ListOffsets v0",
version: 0,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
// timestamp
binary.Write(buf, binary.BigEndian, int64(-2)) // earliest
return buf.Bytes()
},
expectErr: false,
expectReplica: -1, // no replica_id in v0
expectTopics: 1,
},
{
name: "ListOffsets v1",
version: 1,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// replica_id (v1+)
binary.Write(buf, binary.BigEndian, int32(-1))
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
// timestamp
binary.Write(buf, binary.BigEndian, int64(-1)) // latest
return buf.Bytes()
},
expectErr: false,
expectReplica: -1,
expectTopics: 1,
},
{
name: "ListOffsets v2",
version: 2,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// replica_id (v1+)
binary.Write(buf, binary.BigEndian, int32(-1))
// isolation_level (v2+)
binary.Write(buf, binary.BigEndian, int8(0)) // READ_UNCOMMITTED
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
// timestamp
binary.Write(buf, binary.BigEndian, int64(-1)) // latest
// leader_epoch (v4+, but we'll test basic v2)
return buf.Bytes()
},
expectErr: false,
expectReplica: -1,
expectTopics: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
body := tt.buildBody()
// Parse the request manually to test the format
offset := 0
// replica_id (v1+)
var replicaID int32 = -1
if tt.version >= 1 {
if offset+4 > len(body) {
t.Fatalf("body too short for replica_id")
}
replicaID = int32(binary.BigEndian.Uint32(body[offset : offset+4]))
offset += 4
}
// isolation_level (v2+)
if tt.version >= 2 {
if offset+1 > len(body) {
t.Fatalf("body too short for isolation_level")
}
// isolationLevel := int8(body[offset])
offset += 1
}
// topics array count
if offset+4 > len(body) {
t.Fatalf("body too short for topics count")
}
topicsCount := int(binary.BigEndian.Uint32(body[offset : offset+4]))
offset += 4
if replicaID != tt.expectReplica {
t.Errorf("replica_id: got %d, want %d", replicaID, tt.expectReplica)
}
if topicsCount != tt.expectTopics {
t.Errorf("topics count: got %d, want %d", topicsCount, tt.expectTopics)
}
})
}
}