From 25d642d21840d75cd47e1a27b5ca5535c1bfb029 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 16:05:56 -0700 Subject: [PATCH] tests(protocol): add/align spec-based tests; fix parsing to strip client_id at header level by removing client_id assumptions in JoinGroup/SyncGroup/OffsetFetch/FindCoordinator bodies; revert OffsetFetch to classic encodings for v0-v5 --- weed/mq/kafka/protocol/find_coordinator.go | 18 ++++------- weed/mq/kafka/protocol/joingroup.go | 12 ++------ weed/mq/kafka/protocol/offset_management.go | 33 ++++++++------------- 3 files changed, 21 insertions(+), 42 deletions(-) diff --git a/weed/mq/kafka/protocol/find_coordinator.go b/weed/mq/kafka/protocol/find_coordinator.go index dd5df5611..187e0cf3a 100644 --- a/weed/mq/kafka/protocol/find_coordinator.go +++ b/weed/mq/kafka/protocol/find_coordinator.go @@ -10,8 +10,7 @@ func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte } func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []byte) ([]byte, error) { - // Parse FindCoordinator request - // Request format: client_id + coordinator_key + coordinator_type(1) + // Parse FindCoordinator request (v0-2 non-flex): Key (STRING), v1+ adds KeyType (INT8) // DEBUG: Hex dump the request to understand format dumpLen := len(requestBody) @@ -20,17 +19,14 @@ func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []by } fmt.Printf("DEBUG: FindCoordinator request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) - if len(requestBody) < 2 { // client_id_size(2) + if len(requestBody) < 2 { // need at least Key length return nil, fmt.Errorf("FindCoordinator request too short") } - // Skip client_id - clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) - fmt.Printf("DEBUG: FindCoordinator client_id_size: %d\n", clientIDSize) - offset := 2 + int(clientIDSize) + offset := 0 - if len(requestBody) < offset+3 { // coordinator_key_size(2) + coordinator_type(1) - return nil, fmt.Errorf("FindCoordinator request missing data (need %d bytes, have %d)", offset+3, len(requestBody)) + if len(requestBody) < offset+2 { // coordinator_key_size(2) + return nil, fmt.Errorf("FindCoordinator request missing data (need %d bytes, have %d)", offset+2, len(requestBody)) } // Parse coordinator key (group ID for consumer groups) @@ -45,13 +41,11 @@ func (h *Handler) handleFindCoordinatorV2(correlationID uint32, requestBody []by coordinatorKey := string(requestBody[offset : offset+int(coordinatorKeySize)]) offset += int(coordinatorKeySize) - // Coordinator type is optional in some versions, default to 0 (group coordinator) + // Coordinator type present in v1+ (INT8). If absent, default 0. var coordinatorType byte = 0 if offset < len(requestBody) { coordinatorType = requestBody[offset] } - _ = coordinatorType // 0 = group coordinator, 1 = transaction coordinator - fmt.Printf("DEBUG: FindCoordinator request for key '%s' (type: %d)\n", coordinatorKey, coordinatorType) response := make([]byte, 0, 64) diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 09b6882df..680617f4b 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -269,11 +269,7 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) offset := 0 - // Skip client_id (part of JoinGroup v5 payload) - clientIDLength := int(binary.BigEndian.Uint16(data[offset:])) - offset += 2 + clientIDLength - fmt.Printf("DEBUG: JoinGroup v5 skipped client_id (%d bytes: '%s'), offset now: %d\n", - clientIDLength, string(data[2:2+clientIDLength]), offset) + // JoinGroup v5 body starts with GroupID according to Kafka spec // GroupID (string) if offset+2 > len(data) { @@ -772,11 +768,7 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error) offset := 0 - // Skip client_id (part of SyncGroup v3 payload) - clientIDLength := int(binary.BigEndian.Uint16(data[offset:])) - offset += 2 + clientIDLength - fmt.Printf("DEBUG: SyncGroup v3 skipped client_id (%d bytes: '%s'), offset now: %d\n", - clientIDLength, string(data[2:2+clientIDLength]), offset) + // SyncGroup v3 body starts with GroupID according to Kafka spec // GroupID (string) groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index e9fe1a847..64d719eb1 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -411,12 +411,6 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err } fmt.Printf("DEBUG: OffsetFetch request hex dump (first %d bytes): %x\n", dumpLen, data[:dumpLen]) - // Skip client_id (part of OffsetFetch payload) - clientIDLength := int(binary.BigEndian.Uint16(data[offset:])) - offset += 2 + clientIDLength - fmt.Printf("DEBUG: OffsetFetch skipped client_id (%d bytes: '%s'), offset now: %d\n", - clientIDLength, string(data[2:2+clientIDLength]), offset) - // GroupID (string) fmt.Printf("DEBUG: OffsetFetch GroupID length bytes at offset %d: %x\n", offset, data[offset:offset+2]) groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) @@ -433,26 +427,25 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err offset -= 1 fmt.Printf("DEBUG: OffsetFetch corrected offset by -1, now: %d\n", offset) - // Parse Topics array - OffsetFetch uses 1-byte count, not 4-byte - if len(data) < offset+1 { + // Parse Topics array - classic encoding (INT32 count) for v0-v5 + if len(data) < offset+4 { return nil, fmt.Errorf("OffsetFetch request missing topics array") } - fmt.Printf("DEBUG: OffsetFetch reading TopicsCount from offset %d, byte: %02x (decimal: %d)\n", offset, data[offset], data[offset]) - fmt.Printf("DEBUG: OffsetFetch next few bytes: %x\n", data[offset:offset+5]) - topicsCount := uint32(data[offset]) - offset += 1 + fmt.Printf("DEBUG: OffsetFetch reading TopicsCount from offset %d, bytes: %x\n", offset, data[offset:offset+4]) + topicsCount := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 fmt.Printf("DEBUG: OffsetFetch - GroupID: %s, TopicsCount: %d\n", groupID, topicsCount) topics := make([]OffsetFetchTopic, 0, topicsCount) for i := uint32(0); i < topicsCount && offset < len(data); i++ { - // Parse topic name - OffsetFetch uses 1-byte length - if len(data) < offset+1 { + // Parse topic name (STRING: INT16 length + bytes) + if len(data) < offset+2 { break } - topicNameLength := uint8(data[offset]) - offset += 1 + topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) + offset += 2 if len(data) < offset+int(topicNameLength) { break @@ -460,12 +453,12 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err topicName := string(data[offset : offset+int(topicNameLength)]) offset += int(topicNameLength) - // Parse partitions array - OffsetFetch uses 1-byte count - if len(data) < offset+1 { + // Parse partitions array (ARRAY: INT32 count) + if len(data) < offset+4 { break } - partitionsCount := uint32(data[offset]) - offset += 1 + partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4]) + offset += 4 partitions := make([]int32, 0, partitionsCount)