Browse Source

kafka gateway: add comprehensive version matrix tests for JoinGroup v0/v5, SyncGroup v0/v3, OffsetFetch v1/v2, FindCoordinator v0/v1/v2, ListOffsets v0/v1/v2; make parsers version-aware for RebalanceTimeout (v1+) and GroupInstanceID (v5+ for JoinGroup, v3+ for SyncGroup); ensure format correctness across API versions

pull/7231/head
chrislu 2 months ago
parent
commit
ceab8a8222
  1. 66
      weed/mq/kafka/protocol/joingroup.go
  2. 566
      weed/mq/kafka/protocol/version_matrix_test.go

66
weed/mq/kafka/protocol/joingroup.go

@ -67,7 +67,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
fmt.Printf("DEBUG: JoinGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
// Parse JoinGroup request
request, err := h.parseJoinGroupRequest(requestBody)
request, err := h.parseJoinGroupRequest(requestBody, apiVersion)
if err != nil {
fmt.Printf("DEBUG: JoinGroup parseJoinGroupRequest error: %v\n", err)
return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
@ -262,7 +262,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
return h.buildJoinGroupResponse(response), nil
}
func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) {
func (h *Handler) parseJoinGroupRequest(data []byte, apiVersion uint16) (*JoinGroupRequest, error) {
if len(data) < 8 {
return nil, fmt.Errorf("request too short")
}
@ -282,7 +282,6 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
}
groupID := string(data[offset : offset+groupIDLength])
offset += groupIDLength
fmt.Printf("DEBUG: JoinGroup parsed GroupID: '%s', offset now: %d\n", groupID, offset)
// Session timeout (4 bytes)
if offset+4 > len(data) {
@ -291,9 +290,9 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
sessionTimeout := int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4
// Rebalance timeout (4 bytes) - for newer versions
rebalanceTimeout := sessionTimeout // Default to session timeout
if offset+4 <= len(data) {
// Rebalance timeout (4 bytes) - for v1+ versions
rebalanceTimeout := sessionTimeout // Default to session timeout for v0
if apiVersion >= 1 && offset+4 <= len(data) {
rebalanceTimeout = int32(binary.BigEndian.Uint32(data[offset:]))
offset += 4
}
@ -315,25 +314,24 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
// Parse Group Instance ID (nullable string) - for JoinGroup v5+
var groupInstanceID string
if offset+2 > len(data) {
return nil, fmt.Errorf("missing group instance ID length")
}
instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if apiVersion >= 5 {
if offset+2 > len(data) {
return nil, fmt.Errorf("missing group instance ID length")
}
instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if instanceIDLength == -1 {
groupInstanceID = "" // null string
} else if instanceIDLength >= 0 {
if offset+int(instanceIDLength) > len(data) {
return nil, fmt.Errorf("invalid group instance ID length")
if instanceIDLength == -1 {
groupInstanceID = "" // null string
} else if instanceIDLength >= 0 {
if offset+int(instanceIDLength) > len(data) {
return nil, fmt.Errorf("invalid group instance ID length")
}
groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
offset += int(instanceIDLength)
}
groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
offset += int(instanceIDLength)
}
fmt.Printf("DEBUG: JoinGroup v5 - MemberID: '%s', GroupInstanceID: '%s' (len=%d), offset now: %d\n",
memberID, groupInstanceID, instanceIDLength, offset)
// Parse Protocol Type
if len(data) < offset+2 {
return nil, fmt.Errorf("JoinGroup request missing protocol type")
@ -682,7 +680,7 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
fmt.Printf("DEBUG: SyncGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
// Parse SyncGroup request
request, err := h.parseSyncGroupRequest(requestBody)
request, err := h.parseSyncGroupRequest(requestBody, apiVersion)
if err != nil {
fmt.Printf("DEBUG: SyncGroup parseSyncGroupRequest error: %v\n", err)
return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
@ -761,7 +759,7 @@ func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, reque
return h.buildSyncGroupResponse(response), nil
}
func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error) {
func (h *Handler) parseSyncGroupRequest(data []byte, apiVersion uint16) (*SyncGroupRequest, error) {
if len(data) < 8 {
return nil, fmt.Errorf("request too short")
}
@ -798,6 +796,26 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error)
memberID := string(data[offset : offset+memberIDLength])
offset += memberIDLength
// GroupInstanceID (nullable string) - for SyncGroup v3+
var groupInstanceID string
if apiVersion >= 3 {
if offset+2 > len(data) {
return nil, fmt.Errorf("missing group instance ID length")
}
instanceIDLength := int16(binary.BigEndian.Uint16(data[offset:]))
offset += 2
if instanceIDLength == -1 {
groupInstanceID = "" // null string
} else if instanceIDLength >= 0 {
if offset+int(instanceIDLength) > len(data) {
return nil, fmt.Errorf("invalid group instance ID length")
}
groupInstanceID = string(data[offset : offset+int(instanceIDLength)])
offset += int(instanceIDLength)
}
}
// For simplicity, we'll parse basic fields
// In a full implementation, we'd parse the full group assignments array
@ -805,7 +823,7 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error)
GroupID: groupID,
GenerationID: generationID,
MemberID: memberID,
GroupInstanceID: "",
GroupInstanceID: groupInstanceID,
GroupAssignments: []GroupAssignment{},
}, nil
}

566
weed/mq/kafka/protocol/version_matrix_test.go

@ -0,0 +1,566 @@
package protocol
import (
"bytes"
"encoding/binary"
"testing"
)
// TestVersionMatrix_JoinGroup tests JoinGroup request parsing across versions
func TestVersionMatrix_JoinGroup(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectReq *JoinGroupRequest
}{
{
name: "JoinGroup v0",
version: 0,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// session_timeout_ms
binary.Write(buf, binary.BigEndian, int32(30000))
// member_id
binary.Write(buf, binary.BigEndian, int16(0)) // empty
// protocol_type
binary.Write(buf, binary.BigEndian, int16(8))
buf.WriteString("consumer")
// group_protocols array count
binary.Write(buf, binary.BigEndian, int32(1))
// protocol_name
binary.Write(buf, binary.BigEndian, int16(5))
buf.WriteString("range")
// protocol_metadata
binary.Write(buf, binary.BigEndian, int32(0)) // empty
return buf.Bytes()
},
expectErr: false,
expectReq: &JoinGroupRequest{
GroupID: "test-group",
SessionTimeout: 30000,
MemberID: "",
ProtocolType: "consumer",
GroupProtocols: []GroupProtocol{{Name: "range", Metadata: []byte{}}},
},
},
{
name: "JoinGroup v5",
version: 5,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// session_timeout_ms
binary.Write(buf, binary.BigEndian, int32(30000))
// rebalance_timeout_ms (v1+)
binary.Write(buf, binary.BigEndian, int32(300000))
// member_id
binary.Write(buf, binary.BigEndian, int16(0)) // empty
// group_instance_id (v5+, nullable)
binary.Write(buf, binary.BigEndian, int16(-1)) // null
// protocol_type
binary.Write(buf, binary.BigEndian, int16(8))
buf.WriteString("consumer")
// group_protocols array count
binary.Write(buf, binary.BigEndian, int32(1))
// protocol_name
binary.Write(buf, binary.BigEndian, int16(5))
buf.WriteString("range")
// protocol_metadata
binary.Write(buf, binary.BigEndian, int32(0)) // empty
return buf.Bytes()
},
expectErr: false,
expectReq: &JoinGroupRequest{
GroupID: "test-group",
SessionTimeout: 30000,
RebalanceTimeout: 300000,
MemberID: "",
GroupInstanceID: "",
ProtocolType: "consumer",
GroupProtocols: []GroupProtocol{{Name: "range", Metadata: []byte{}}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := NewHandler()
body := tt.buildBody()
req, err := h.parseJoinGroupRequest(body, uint16(tt.version))
if tt.expectErr && err == nil {
t.Errorf("expected error but got none")
}
if !tt.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
}
if !tt.expectErr && req != nil {
if req.GroupID != tt.expectReq.GroupID {
t.Errorf("GroupID: got %q, want %q", req.GroupID, tt.expectReq.GroupID)
}
if req.SessionTimeout != tt.expectReq.SessionTimeout {
t.Errorf("SessionTimeout: got %d, want %d", req.SessionTimeout, tt.expectReq.SessionTimeout)
}
if tt.version >= 1 && req.RebalanceTimeout != tt.expectReq.RebalanceTimeout {
t.Errorf("RebalanceTimeout: got %d, want %d", req.RebalanceTimeout, tt.expectReq.RebalanceTimeout)
}
if req.MemberID != tt.expectReq.MemberID {
t.Errorf("MemberID: got %q, want %q", req.MemberID, tt.expectReq.MemberID)
}
if tt.version >= 5 && req.GroupInstanceID != tt.expectReq.GroupInstanceID {
t.Errorf("GroupInstanceID: got %q, want %q", req.GroupInstanceID, tt.expectReq.GroupInstanceID)
}
if req.ProtocolType != tt.expectReq.ProtocolType {
t.Errorf("ProtocolType: got %q, want %q", req.ProtocolType, tt.expectReq.ProtocolType)
}
}
})
}
}
// TestVersionMatrix_SyncGroup tests SyncGroup request parsing across versions
func TestVersionMatrix_SyncGroup(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectReq *SyncGroupRequest
}{
{
name: "SyncGroup v0",
version: 0,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// generation_id
binary.Write(buf, binary.BigEndian, int32(1))
// member_id
binary.Write(buf, binary.BigEndian, int16(6))
buf.WriteString("member")
// group_assignment array count
binary.Write(buf, binary.BigEndian, int32(0)) // empty
return buf.Bytes()
},
expectErr: false,
expectReq: &SyncGroupRequest{
GroupID: "test-group",
GenerationID: 1,
MemberID: "member",
GroupAssignments: []GroupAssignment{},
},
},
{
name: "SyncGroup v3",
version: 3,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// generation_id
binary.Write(buf, binary.BigEndian, int32(1))
// member_id
binary.Write(buf, binary.BigEndian, int16(6))
buf.WriteString("member")
// group_instance_id (v3+, nullable)
binary.Write(buf, binary.BigEndian, int16(-1)) // null
// group_assignment array count
binary.Write(buf, binary.BigEndian, int32(0)) // empty
return buf.Bytes()
},
expectErr: false,
expectReq: &SyncGroupRequest{
GroupID: "test-group",
GenerationID: 1,
MemberID: "member",
GroupInstanceID: "",
GroupAssignments: []GroupAssignment{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := NewHandler()
body := tt.buildBody()
req, err := h.parseSyncGroupRequest(body, uint16(tt.version))
if tt.expectErr && err == nil {
t.Errorf("expected error but got none")
}
if !tt.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
}
if !tt.expectErr && req != nil {
if req.GroupID != tt.expectReq.GroupID {
t.Errorf("GroupID: got %q, want %q", req.GroupID, tt.expectReq.GroupID)
}
if req.GenerationID != tt.expectReq.GenerationID {
t.Errorf("GenerationID: got %d, want %d", req.GenerationID, tt.expectReq.GenerationID)
}
if req.MemberID != tt.expectReq.MemberID {
t.Errorf("MemberID: got %q, want %q", req.MemberID, tt.expectReq.MemberID)
}
if tt.version >= 3 && req.GroupInstanceID != tt.expectReq.GroupInstanceID {
t.Errorf("GroupInstanceID: got %q, want %q", req.GroupInstanceID, tt.expectReq.GroupInstanceID)
}
}
})
}
}
// TestVersionMatrix_OffsetFetch tests OffsetFetch request parsing across versions
func TestVersionMatrix_OffsetFetch(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectReq *OffsetFetchRequest
}{
{
name: "OffsetFetch v1",
version: 1,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
return buf.Bytes()
},
expectErr: false,
expectReq: &OffsetFetchRequest{
GroupID: "test-group",
Topics: []OffsetFetchTopic{
{
Name: "test-topic",
Partitions: []int32{0},
},
},
},
},
{
name: "OffsetFetch v2",
version: 2,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
return buf.Bytes()
},
expectErr: false,
expectReq: &OffsetFetchRequest{
GroupID: "test-group",
Topics: []OffsetFetchTopic{
{
Name: "test-topic",
Partitions: []int32{0},
},
},
},
},
{
name: "OffsetFetch v2 - empty topics (fetch all)",
version: 2,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// group_id
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// topics array count (0 = fetch all)
binary.Write(buf, binary.BigEndian, int32(0))
return buf.Bytes()
},
expectErr: false,
expectReq: &OffsetFetchRequest{
GroupID: "test-group",
Topics: []OffsetFetchTopic{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := NewHandler()
body := tt.buildBody()
req, err := h.parseOffsetFetchRequest(body)
if tt.expectErr && err == nil {
t.Errorf("expected error but got none")
}
if !tt.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
}
if !tt.expectErr && req != nil {
if req.GroupID != tt.expectReq.GroupID {
t.Errorf("GroupID: got %q, want %q", req.GroupID, tt.expectReq.GroupID)
}
if len(req.Topics) != len(tt.expectReq.Topics) {
t.Errorf("Topics count: got %d, want %d", len(req.Topics), len(tt.expectReq.Topics))
}
for i, topic := range req.Topics {
if i < len(tt.expectReq.Topics) {
if topic.Name != tt.expectReq.Topics[i].Name {
t.Errorf("Topic[%d] name: got %q, want %q", i, topic.Name, tt.expectReq.Topics[i].Name)
}
}
}
}
})
}
}
// TestVersionMatrix_FindCoordinator tests FindCoordinator request parsing across versions
func TestVersionMatrix_FindCoordinator(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectKey string
expectType int8
}{
{
name: "FindCoordinator v0",
version: 0,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// coordinator_key
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
return buf.Bytes()
},
expectErr: false,
expectKey: "test-group",
expectType: 0, // GROUP (default for v0)
},
{
name: "FindCoordinator v1",
version: 1,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// coordinator_key
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// coordinator_type (v1+)
binary.Write(buf, binary.BigEndian, int8(0)) // GROUP
return buf.Bytes()
},
expectErr: false,
expectKey: "test-group",
expectType: 0, // GROUP
},
{
name: "FindCoordinator v2",
version: 2,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// coordinator_key
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-group")
// coordinator_type (v1+)
binary.Write(buf, binary.BigEndian, int8(1)) // TRANSACTION
return buf.Bytes()
},
expectErr: false,
expectKey: "test-group",
expectType: 1, // TRANSACTION
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
body := tt.buildBody()
// Parse the request manually to test the format
offset := 0
// coordinator_key
if offset+2 > len(body) {
t.Fatalf("body too short for coordinator_key length")
}
keyLen := int(binary.BigEndian.Uint16(body[offset:offset+2]))
offset += 2
if offset+keyLen > len(body) {
t.Fatalf("body too short for coordinator_key")
}
key := string(body[offset:offset+keyLen])
offset += keyLen
// coordinator_type (v1+)
var coordType int8 = 0 // default GROUP
if tt.version >= 1 {
if offset+1 > len(body) {
t.Fatalf("body too short for coordinator_type")
}
coordType = int8(body[offset])
offset++
}
if key != tt.expectKey {
t.Errorf("coordinator_key: got %q, want %q", key, tt.expectKey)
}
if coordType != tt.expectType {
t.Errorf("coordinator_type: got %d, want %d", coordType, tt.expectType)
}
})
}
}
// TestVersionMatrix_ListOffsets tests ListOffsets request parsing across versions
func TestVersionMatrix_ListOffsets(t *testing.T) {
tests := []struct {
name string
version int16
buildBody func() []byte
expectErr bool
expectReplica int32
expectTopics int
}{
{
name: "ListOffsets v0",
version: 0,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
// timestamp
binary.Write(buf, binary.BigEndian, int64(-2)) // earliest
return buf.Bytes()
},
expectErr: false,
expectReplica: -1, // no replica_id in v0
expectTopics: 1,
},
{
name: "ListOffsets v1",
version: 1,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// replica_id (v1+)
binary.Write(buf, binary.BigEndian, int32(-1))
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
// timestamp
binary.Write(buf, binary.BigEndian, int64(-1)) // latest
return buf.Bytes()
},
expectErr: false,
expectReplica: -1,
expectTopics: 1,
},
{
name: "ListOffsets v2",
version: 2,
buildBody: func() []byte {
buf := &bytes.Buffer{}
// replica_id (v1+)
binary.Write(buf, binary.BigEndian, int32(-1))
// isolation_level (v2+)
binary.Write(buf, binary.BigEndian, int8(0)) // READ_UNCOMMITTED
// topics array count
binary.Write(buf, binary.BigEndian, int32(1))
// topic_name
binary.Write(buf, binary.BigEndian, int16(10))
buf.WriteString("test-topic")
// partitions array count
binary.Write(buf, binary.BigEndian, int32(1))
// partition_id
binary.Write(buf, binary.BigEndian, int32(0))
// timestamp
binary.Write(buf, binary.BigEndian, int64(-1)) // latest
// leader_epoch (v4+, but we'll test basic v2)
return buf.Bytes()
},
expectErr: false,
expectReplica: -1,
expectTopics: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
body := tt.buildBody()
// Parse the request manually to test the format
offset := 0
// replica_id (v1+)
var replicaID int32 = -1
if tt.version >= 1 {
if offset+4 > len(body) {
t.Fatalf("body too short for replica_id")
}
replicaID = int32(binary.BigEndian.Uint32(body[offset:offset+4]))
offset += 4
}
// isolation_level (v2+)
if tt.version >= 2 {
if offset+1 > len(body) {
t.Fatalf("body too short for isolation_level")
}
// isolationLevel := int8(body[offset])
offset += 1
}
// topics array count
if offset+4 > len(body) {
t.Fatalf("body too short for topics count")
}
topicsCount := int(binary.BigEndian.Uint32(body[offset:offset+4]))
offset += 4
if replicaID != tt.expectReplica {
t.Errorf("replica_id: got %d, want %d", replicaID, tt.expectReplica)
}
if topicsCount != tt.expectTopics {
t.Errorf("topics count: got %d, want %d", topicsCount, tt.expectTopics)
}
})
}
}
Loading…
Cancel
Save