You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
566 lines
16 KiB
566 lines
16 KiB
package protocol
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"testing"
|
|
)
|
|
|
|
// TestVersionMatrix_JoinGroup tests JoinGroup request parsing across versions
|
|
func TestVersionMatrix_JoinGroup(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
version int16
|
|
buildBody func() []byte
|
|
expectErr bool
|
|
expectReq *JoinGroupRequest
|
|
}{
|
|
{
|
|
name: "JoinGroup v0",
|
|
version: 0,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// group_id
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
// session_timeout_ms
|
|
binary.Write(buf, binary.BigEndian, int32(30000))
|
|
// member_id
|
|
binary.Write(buf, binary.BigEndian, int16(0)) // empty
|
|
// protocol_type
|
|
binary.Write(buf, binary.BigEndian, int16(8))
|
|
buf.WriteString("consumer")
|
|
// group_protocols array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// protocol_name
|
|
binary.Write(buf, binary.BigEndian, int16(5))
|
|
buf.WriteString("range")
|
|
// protocol_metadata
|
|
binary.Write(buf, binary.BigEndian, int32(0)) // empty
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReq: &JoinGroupRequest{
|
|
GroupID: "test-group",
|
|
SessionTimeout: 30000,
|
|
MemberID: "",
|
|
ProtocolType: "consumer",
|
|
GroupProtocols: []GroupProtocol{{Name: "range", Metadata: []byte{}}},
|
|
},
|
|
},
|
|
{
|
|
name: "JoinGroup v5",
|
|
version: 5,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// group_id
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
// session_timeout_ms
|
|
binary.Write(buf, binary.BigEndian, int32(30000))
|
|
// rebalance_timeout_ms (v1+)
|
|
binary.Write(buf, binary.BigEndian, int32(300000))
|
|
// member_id
|
|
binary.Write(buf, binary.BigEndian, int16(0)) // empty
|
|
// group_instance_id (v5+, nullable)
|
|
binary.Write(buf, binary.BigEndian, int16(-1)) // null
|
|
// protocol_type
|
|
binary.Write(buf, binary.BigEndian, int16(8))
|
|
buf.WriteString("consumer")
|
|
// group_protocols array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// protocol_name
|
|
binary.Write(buf, binary.BigEndian, int16(5))
|
|
buf.WriteString("range")
|
|
// protocol_metadata
|
|
binary.Write(buf, binary.BigEndian, int32(0)) // empty
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReq: &JoinGroupRequest{
|
|
GroupID: "test-group",
|
|
SessionTimeout: 30000,
|
|
RebalanceTimeout: 300000,
|
|
MemberID: "",
|
|
GroupInstanceID: "",
|
|
ProtocolType: "consumer",
|
|
GroupProtocols: []GroupProtocol{{Name: "range", Metadata: []byte{}}},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
h := NewTestHandler()
|
|
body := tt.buildBody()
|
|
|
|
req, err := h.parseJoinGroupRequest(body, uint16(tt.version))
|
|
if tt.expectErr && err == nil {
|
|
t.Errorf("expected error but got none")
|
|
}
|
|
if !tt.expectErr && err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
if !tt.expectErr && req != nil {
|
|
if req.GroupID != tt.expectReq.GroupID {
|
|
t.Errorf("GroupID: got %q, want %q", req.GroupID, tt.expectReq.GroupID)
|
|
}
|
|
if req.SessionTimeout != tt.expectReq.SessionTimeout {
|
|
t.Errorf("SessionTimeout: got %d, want %d", req.SessionTimeout, tt.expectReq.SessionTimeout)
|
|
}
|
|
if tt.version >= 1 && req.RebalanceTimeout != tt.expectReq.RebalanceTimeout {
|
|
t.Errorf("RebalanceTimeout: got %d, want %d", req.RebalanceTimeout, tt.expectReq.RebalanceTimeout)
|
|
}
|
|
if req.MemberID != tt.expectReq.MemberID {
|
|
t.Errorf("MemberID: got %q, want %q", req.MemberID, tt.expectReq.MemberID)
|
|
}
|
|
if tt.version >= 5 && req.GroupInstanceID != tt.expectReq.GroupInstanceID {
|
|
t.Errorf("GroupInstanceID: got %q, want %q", req.GroupInstanceID, tt.expectReq.GroupInstanceID)
|
|
}
|
|
if req.ProtocolType != tt.expectReq.ProtocolType {
|
|
t.Errorf("ProtocolType: got %q, want %q", req.ProtocolType, tt.expectReq.ProtocolType)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestVersionMatrix_SyncGroup tests SyncGroup request parsing across versions
|
|
func TestVersionMatrix_SyncGroup(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
version int16
|
|
buildBody func() []byte
|
|
expectErr bool
|
|
expectReq *SyncGroupRequest
|
|
}{
|
|
{
|
|
name: "SyncGroup v0",
|
|
version: 0,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// group_id
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
// generation_id
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// member_id
|
|
binary.Write(buf, binary.BigEndian, int16(6))
|
|
buf.WriteString("member")
|
|
// group_assignment array count
|
|
binary.Write(buf, binary.BigEndian, int32(0)) // empty
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReq: &SyncGroupRequest{
|
|
GroupID: "test-group",
|
|
GenerationID: 1,
|
|
MemberID: "member",
|
|
GroupAssignments: []GroupAssignment{},
|
|
},
|
|
},
|
|
{
|
|
name: "SyncGroup v3",
|
|
version: 3,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// group_id
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
// generation_id
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// member_id
|
|
binary.Write(buf, binary.BigEndian, int16(6))
|
|
buf.WriteString("member")
|
|
// group_instance_id (v3+, nullable)
|
|
binary.Write(buf, binary.BigEndian, int16(-1)) // null
|
|
// group_assignment array count
|
|
binary.Write(buf, binary.BigEndian, int32(0)) // empty
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReq: &SyncGroupRequest{
|
|
GroupID: "test-group",
|
|
GenerationID: 1,
|
|
MemberID: "member",
|
|
GroupInstanceID: "",
|
|
GroupAssignments: []GroupAssignment{},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
h := NewTestHandler()
|
|
body := tt.buildBody()
|
|
|
|
req, err := h.parseSyncGroupRequest(body, uint16(tt.version))
|
|
if tt.expectErr && err == nil {
|
|
t.Errorf("expected error but got none")
|
|
}
|
|
if !tt.expectErr && err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
if !tt.expectErr && req != nil {
|
|
if req.GroupID != tt.expectReq.GroupID {
|
|
t.Errorf("GroupID: got %q, want %q", req.GroupID, tt.expectReq.GroupID)
|
|
}
|
|
if req.GenerationID != tt.expectReq.GenerationID {
|
|
t.Errorf("GenerationID: got %d, want %d", req.GenerationID, tt.expectReq.GenerationID)
|
|
}
|
|
if req.MemberID != tt.expectReq.MemberID {
|
|
t.Errorf("MemberID: got %q, want %q", req.MemberID, tt.expectReq.MemberID)
|
|
}
|
|
if tt.version >= 3 && req.GroupInstanceID != tt.expectReq.GroupInstanceID {
|
|
t.Errorf("GroupInstanceID: got %q, want %q", req.GroupInstanceID, tt.expectReq.GroupInstanceID)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestVersionMatrix_OffsetFetch tests OffsetFetch request parsing across versions
|
|
func TestVersionMatrix_OffsetFetch(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
version int16
|
|
buildBody func() []byte
|
|
expectErr bool
|
|
expectReq *OffsetFetchRequest
|
|
}{
|
|
{
|
|
name: "OffsetFetch v1",
|
|
version: 1,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// group_id
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
// topics array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// topic_name
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-topic")
|
|
// partitions array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// partition_id
|
|
binary.Write(buf, binary.BigEndian, int32(0))
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReq: &OffsetFetchRequest{
|
|
GroupID: "test-group",
|
|
Topics: []OffsetFetchTopic{
|
|
{
|
|
Name: "test-topic",
|
|
Partitions: []int32{0},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "OffsetFetch v2",
|
|
version: 2,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// group_id
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
// topics array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// topic_name
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-topic")
|
|
// partitions array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// partition_id
|
|
binary.Write(buf, binary.BigEndian, int32(0))
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReq: &OffsetFetchRequest{
|
|
GroupID: "test-group",
|
|
Topics: []OffsetFetchTopic{
|
|
{
|
|
Name: "test-topic",
|
|
Partitions: []int32{0},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "OffsetFetch v2 - empty topics (fetch all)",
|
|
version: 2,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// group_id
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
// topics array count (0 = fetch all)
|
|
binary.Write(buf, binary.BigEndian, int32(0))
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReq: &OffsetFetchRequest{
|
|
GroupID: "test-group",
|
|
Topics: []OffsetFetchTopic{},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
h := NewTestHandler()
|
|
body := tt.buildBody()
|
|
|
|
req, err := h.parseOffsetFetchRequest(body)
|
|
if tt.expectErr && err == nil {
|
|
t.Errorf("expected error but got none")
|
|
}
|
|
if !tt.expectErr && err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
if !tt.expectErr && req != nil {
|
|
if req.GroupID != tt.expectReq.GroupID {
|
|
t.Errorf("GroupID: got %q, want %q", req.GroupID, tt.expectReq.GroupID)
|
|
}
|
|
if len(req.Topics) != len(tt.expectReq.Topics) {
|
|
t.Errorf("Topics count: got %d, want %d", len(req.Topics), len(tt.expectReq.Topics))
|
|
}
|
|
for i, topic := range req.Topics {
|
|
if i < len(tt.expectReq.Topics) {
|
|
if topic.Name != tt.expectReq.Topics[i].Name {
|
|
t.Errorf("Topic[%d] name: got %q, want %q", i, topic.Name, tt.expectReq.Topics[i].Name)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestVersionMatrix_FindCoordinator tests FindCoordinator request parsing across versions
|
|
func TestVersionMatrix_FindCoordinator(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
version int16
|
|
buildBody func() []byte
|
|
expectErr bool
|
|
expectKey string
|
|
expectType int8
|
|
}{
|
|
{
|
|
name: "FindCoordinator v0",
|
|
version: 0,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// coordinator_key
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectKey: "test-group",
|
|
expectType: 0, // GROUP (default for v0)
|
|
},
|
|
{
|
|
name: "FindCoordinator v1",
|
|
version: 1,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// coordinator_key
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
// coordinator_type (v1+)
|
|
binary.Write(buf, binary.BigEndian, int8(0)) // GROUP
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectKey: "test-group",
|
|
expectType: 0, // GROUP
|
|
},
|
|
{
|
|
name: "FindCoordinator v2",
|
|
version: 2,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// coordinator_key
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-group")
|
|
// coordinator_type (v1+)
|
|
binary.Write(buf, binary.BigEndian, int8(1)) // TRANSACTION
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectKey: "test-group",
|
|
expectType: 1, // TRANSACTION
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
body := tt.buildBody()
|
|
|
|
// Parse the request manually to test the format
|
|
offset := 0
|
|
|
|
// coordinator_key
|
|
if offset+2 > len(body) {
|
|
t.Fatalf("body too short for coordinator_key length")
|
|
}
|
|
keyLen := int(binary.BigEndian.Uint16(body[offset : offset+2]))
|
|
offset += 2
|
|
|
|
if offset+keyLen > len(body) {
|
|
t.Fatalf("body too short for coordinator_key")
|
|
}
|
|
key := string(body[offset : offset+keyLen])
|
|
offset += keyLen
|
|
|
|
// coordinator_type (v1+)
|
|
var coordType int8 = 0 // default GROUP
|
|
if tt.version >= 1 {
|
|
if offset+1 > len(body) {
|
|
t.Fatalf("body too short for coordinator_type")
|
|
}
|
|
coordType = int8(body[offset])
|
|
offset++
|
|
}
|
|
|
|
if key != tt.expectKey {
|
|
t.Errorf("coordinator_key: got %q, want %q", key, tt.expectKey)
|
|
}
|
|
if coordType != tt.expectType {
|
|
t.Errorf("coordinator_type: got %d, want %d", coordType, tt.expectType)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestVersionMatrix_ListOffsets tests ListOffsets request parsing across versions
|
|
func TestVersionMatrix_ListOffsets(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
version int16
|
|
buildBody func() []byte
|
|
expectErr bool
|
|
expectReplica int32
|
|
expectTopics int
|
|
}{
|
|
{
|
|
name: "ListOffsets v0",
|
|
version: 0,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// topics array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// topic_name
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-topic")
|
|
// partitions array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// partition_id
|
|
binary.Write(buf, binary.BigEndian, int32(0))
|
|
// timestamp
|
|
binary.Write(buf, binary.BigEndian, int64(-2)) // earliest
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReplica: -1, // no replica_id in v0
|
|
expectTopics: 1,
|
|
},
|
|
{
|
|
name: "ListOffsets v1",
|
|
version: 1,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// replica_id (v1+)
|
|
binary.Write(buf, binary.BigEndian, int32(-1))
|
|
// topics array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// topic_name
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-topic")
|
|
// partitions array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// partition_id
|
|
binary.Write(buf, binary.BigEndian, int32(0))
|
|
// timestamp
|
|
binary.Write(buf, binary.BigEndian, int64(-1)) // latest
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReplica: -1,
|
|
expectTopics: 1,
|
|
},
|
|
{
|
|
name: "ListOffsets v2",
|
|
version: 2,
|
|
buildBody: func() []byte {
|
|
buf := &bytes.Buffer{}
|
|
// replica_id (v1+)
|
|
binary.Write(buf, binary.BigEndian, int32(-1))
|
|
// isolation_level (v2+)
|
|
binary.Write(buf, binary.BigEndian, int8(0)) // READ_UNCOMMITTED
|
|
// topics array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// topic_name
|
|
binary.Write(buf, binary.BigEndian, int16(10))
|
|
buf.WriteString("test-topic")
|
|
// partitions array count
|
|
binary.Write(buf, binary.BigEndian, int32(1))
|
|
// partition_id
|
|
binary.Write(buf, binary.BigEndian, int32(0))
|
|
// timestamp
|
|
binary.Write(buf, binary.BigEndian, int64(-1)) // latest
|
|
// leader_epoch (v4+, but we'll test basic v2)
|
|
return buf.Bytes()
|
|
},
|
|
expectErr: false,
|
|
expectReplica: -1,
|
|
expectTopics: 1,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
body := tt.buildBody()
|
|
|
|
// Parse the request manually to test the format
|
|
offset := 0
|
|
|
|
// replica_id (v1+)
|
|
var replicaID int32 = -1
|
|
if tt.version >= 1 {
|
|
if offset+4 > len(body) {
|
|
t.Fatalf("body too short for replica_id")
|
|
}
|
|
replicaID = int32(binary.BigEndian.Uint32(body[offset : offset+4]))
|
|
offset += 4
|
|
}
|
|
|
|
// isolation_level (v2+)
|
|
if tt.version >= 2 {
|
|
if offset+1 > len(body) {
|
|
t.Fatalf("body too short for isolation_level")
|
|
}
|
|
// isolationLevel := int8(body[offset])
|
|
offset += 1
|
|
}
|
|
|
|
// topics array count
|
|
if offset+4 > len(body) {
|
|
t.Fatalf("body too short for topics count")
|
|
}
|
|
topicsCount := int(binary.BigEndian.Uint32(body[offset : offset+4]))
|
|
offset += 4
|
|
|
|
if replicaID != tt.expectReplica {
|
|
t.Errorf("replica_id: got %d, want %d", replicaID, tt.expectReplica)
|
|
}
|
|
if topicsCount != tt.expectTopics {
|
|
t.Errorf("topics count: got %d, want %d", topicsCount, tt.expectTopics)
|
|
}
|
|
})
|
|
}
|
|
}
|