Browse Source

tests(protocol): add/align spec-based tests; fix parsing to strip client_id at header level by removing client_id assumptions in JoinGroup/SyncGroup/OffsetFetch/FindCoordinator bodies; revert OffsetFetch to classic encodings for v0-v5

pull/7231/head
chrislu 2 months ago
parent
commit
25d642d218
  1. 18
      weed/mq/kafka/protocol/find_coordinator.go
  2. 12
      weed/mq/kafka/protocol/joingroup.go
  3. 33
      weed/mq/kafka/protocol/offset_management.go

18
weed/mq/kafka/protocol/find_coordinator.go

@ -10,8 +10,7 @@ func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte
} }
func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []byte) ([]byte, error) { func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []byte) ([]byte, error) {
// Parse FindCoordinator request
// Request format: client_id + coordinator_key + coordinator_type(1)
// Parse FindCoordinator request (v0-2 non-flex): Key (STRING), v1+ adds KeyType (INT8)
// DEBUG: Hex dump the request to understand format // DEBUG: Hex dump the request to understand format
dumpLen := len(requestBody) dumpLen := len(requestBody)
@ -20,17 +19,14 @@ func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []by
} }
fmt.Printf("DEBUG: FindCoordinator request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) fmt.Printf("DEBUG: FindCoordinator request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen])
if len(requestBody) < 2 { // client_id_size(2)
if len(requestBody) < 2 { // need at least Key length
return nil, fmt.Errorf("FindCoordinator request too short") return nil, fmt.Errorf("FindCoordinator request too short")
} }
// Skip client_id
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
fmt.Printf("DEBUG: FindCoordinator client_id_size: %d\n", clientIDSize)
offset := 2 + int(clientIDSize)
offset := 0
if len(requestBody) < offset+3 { // coordinator_key_size(2) + coordinator_type(1)
return nil, fmt.Errorf("FindCoordinator request missing data (need %d bytes, have %d)", offset+3, len(requestBody))
if len(requestBody) < offset+2 { // coordinator_key_size(2)
return nil, fmt.Errorf("FindCoordinator request missing data (need %d bytes, have %d)", offset+2, len(requestBody))
} }
// Parse coordinator key (group ID for consumer groups) // Parse coordinator key (group ID for consumer groups)
@ -45,13 +41,11 @@ func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []by
coordinatorKey := string(requestBody[offset : offset+int(coordinatorKeySize)]) coordinatorKey := string(requestBody[offset : offset+int(coordinatorKeySize)])
offset += int(coordinatorKeySize) offset += int(coordinatorKeySize)
// Coordinator type is optional in some versions, default to 0 (group coordinator)
// Coordinator type present in v1+ (INT8). If absent, default 0.
var coordinatorType byte = 0 var coordinatorType byte = 0
if offset < len(requestBody) { if offset < len(requestBody) {
coordinatorType = requestBody[offset] coordinatorType = requestBody[offset]
} }
_ = coordinatorType // 0 = group coordinator, 1 = transaction coordinator
fmt.Printf("DEBUG: FindCoordinator request for key '%s' (type: %d)\n", coordinatorKey, coordinatorType) fmt.Printf("DEBUG: FindCoordinator request for key '%s' (type: %d)\n", coordinatorKey, coordinatorType)
response := make([]byte, 0, 64) response := make([]byte, 0, 64)

12
weed/mq/kafka/protocol/joingroup.go

@ -269,11 +269,7 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
offset := 0 offset := 0
// Skip client_id (part of JoinGroup v5 payload)
clientIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2 + clientIDLength
fmt.Printf("DEBUG: JoinGroup v5 skipped client_id (%d bytes: '%s'), offset now: %d\n",
clientIDLength, string(data[2:2+clientIDLength]), offset)
// JoinGroup v5 body starts with GroupID according to Kafka spec
// GroupID (string) // GroupID (string)
if offset+2 > len(data) { if offset+2 > len(data) {
@ -772,11 +768,7 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error)
offset := 0 offset := 0
// Skip client_id (part of SyncGroup v3 payload)
clientIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2 + clientIDLength
fmt.Printf("DEBUG: SyncGroup v3 skipped client_id (%d bytes: '%s'), offset now: %d\n",
clientIDLength, string(data[2:2+clientIDLength]), offset)
// SyncGroup v3 body starts with GroupID according to Kafka spec
// GroupID (string) // GroupID (string)
groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))

33
weed/mq/kafka/protocol/offset_management.go

@ -411,12 +411,6 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err
} }
fmt.Printf("DEBUG: OffsetFetch request hex dump (first %d bytes): %x\n", dumpLen, data[:dumpLen]) fmt.Printf("DEBUG: OffsetFetch request hex dump (first %d bytes): %x\n", dumpLen, data[:dumpLen])
// Skip client_id (part of OffsetFetch payload)
clientIDLength := int(binary.BigEndian.Uint16(data[offset:]))
offset += 2 + clientIDLength
fmt.Printf("DEBUG: OffsetFetch skipped client_id (%d bytes: '%s'), offset now: %d\n",
clientIDLength, string(data[2:2+clientIDLength]), offset)
// GroupID (string) // GroupID (string)
fmt.Printf("DEBUG: OffsetFetch GroupID length bytes at offset %d: %x\n", offset, data[offset:offset+2]) fmt.Printf("DEBUG: OffsetFetch GroupID length bytes at offset %d: %x\n", offset, data[offset:offset+2])
groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) groupIDLength := int(binary.BigEndian.Uint16(data[offset:]))
@ -433,26 +427,25 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err
offset -= 1 offset -= 1
fmt.Printf("DEBUG: OffsetFetch corrected offset by -1, now: %d\n", offset) fmt.Printf("DEBUG: OffsetFetch corrected offset by -1, now: %d\n", offset)
// Parse Topics array - OffsetFetch uses 1-byte count, not 4-byte
if len(data) < offset+1 {
// Parse Topics array - classic encoding (INT32 count) for v0-v5
if len(data) < offset+4 {
return nil, fmt.Errorf("OffsetFetch request missing topics array") return nil, fmt.Errorf("OffsetFetch request missing topics array")
} }
fmt.Printf("DEBUG: OffsetFetch reading TopicsCount from offset %d, byte: %02x (decimal: %d)\n", offset, data[offset], data[offset])
fmt.Printf("DEBUG: OffsetFetch next few bytes: %x\n", data[offset:offset+5])
topicsCount := uint32(data[offset])
offset += 1
fmt.Printf("DEBUG: OffsetFetch reading TopicsCount from offset %d, bytes: %x\n", offset, data[offset:offset+4])
topicsCount := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
fmt.Printf("DEBUG: OffsetFetch - GroupID: %s, TopicsCount: %d\n", groupID, topicsCount) fmt.Printf("DEBUG: OffsetFetch - GroupID: %s, TopicsCount: %d\n", groupID, topicsCount)
topics := make([]OffsetFetchTopic, 0, topicsCount) topics := make([]OffsetFetchTopic, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset < len(data); i++ { for i := uint32(0); i < topicsCount && offset < len(data); i++ {
// Parse topic name - OffsetFetch uses 1-byte length
if len(data) < offset+1 {
// Parse topic name (STRING: INT16 length + bytes)
if len(data) < offset+2 {
break break
} }
topicNameLength := uint8(data[offset])
offset += 1
topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2])
offset += 2
if len(data) < offset+int(topicNameLength) { if len(data) < offset+int(topicNameLength) {
break break
@ -460,12 +453,12 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err
topicName := string(data[offset : offset+int(topicNameLength)]) topicName := string(data[offset : offset+int(topicNameLength)])
offset += int(topicNameLength) offset += int(topicNameLength)
// Parse partitions array - OffsetFetch uses 1-byte count
if len(data) < offset+1 {
// Parse partitions array (ARRAY: INT32 count)
if len(data) < offset+4 {
break break
} }
partitionsCount := uint32(data[offset])
offset += 1
partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
partitions := make([]int32, 0, partitionsCount) partitions := make([]int32, 0, partitionsCount)

Loading…
Cancel
Save