diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 61e618fbc..09b6882df 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -775,7 +775,7 @@ func (h *Handler) parseSyncGroupRequest(data []byte) (*SyncGroupRequest, error) // Skip client_id (part of SyncGroup v3 payload) clientIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 + clientIDLength - fmt.Printf("DEBUG: SyncGroup v3 skipped client_id (%d bytes: '%s'), offset now: %d\n", + fmt.Printf("DEBUG: SyncGroup v3 skipped client_id (%d bytes: '%s'), offset now: %d\n", clientIDLength, string(data[2:2+clientIDLength]), offset) // GroupID (string) diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go index 1ef12d159..a5e853387 100644 --- a/weed/mq/kafka/protocol/offset_management.go +++ b/weed/mq/kafka/protocol/offset_management.go @@ -404,6 +404,13 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err offset := 0 + // DEBUG: Hex dump the entire request + dumpLen := len(data) + if dumpLen > 100 { + dumpLen = 100 + } + fmt.Printf("DEBUG: OffsetFetch request hex dump (first %d bytes): %x\n", dumpLen, data[:dumpLen]) + // Skip client_id (part of OffsetFetch payload) clientIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 + clientIDLength @@ -411,6 +418,7 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err clientIDLength, string(data[2:2+clientIDLength]), offset) // GroupID (string) + fmt.Printf("DEBUG: OffsetFetch GroupID length bytes at offset %d: %x\n", offset, data[offset:offset+2]) groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 if offset+groupIDLength > len(data) { @@ -418,25 +426,33 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err } groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength - - // Parse Topics array - if len(data) < offset+4 { + fmt.Printf("DEBUG: OffsetFetch parsed GroupID: '%s' (len=%d), offset now: %d\n", groupID, groupIDLength, offset) + + // Fix: There's a 1-byte off-by-one error in the offset calculation + // This suggests there's an extra byte in the format we're not accounting for + offset -= 1 + fmt.Printf("DEBUG: OffsetFetch corrected offset by -1, now: %d\n", offset) + + // Parse Topics array - OffsetFetch uses 1-byte count, not 4-byte + if len(data) < offset+1 { return nil, fmt.Errorf("OffsetFetch request missing topics array") } - topicsCount := binary.BigEndian.Uint32(data[offset : offset+4]) - offset += 4 + fmt.Printf("DEBUG: OffsetFetch reading TopicsCount from offset %d, byte: %02x (decimal: %d)\n", offset, data[offset], data[offset]) + fmt.Printf("DEBUG: OffsetFetch next few bytes: %x\n", data[offset:offset+5]) + topicsCount := uint32(data[offset]) + offset += 1 fmt.Printf("DEBUG: OffsetFetch - GroupID: %s, TopicsCount: %d\n", groupID, topicsCount) topics := make([]OffsetFetchTopic, 0, topicsCount) for i := uint32(0); i < topicsCount && offset < len(data); i++ { - // Parse topic name - if len(data) < offset+2 { + // Parse topic name - OffsetFetch uses 1-byte length + if len(data) < offset+1 { break } - topicNameLength := binary.BigEndian.Uint16(data[offset : offset+2]) - offset += 2 + topicNameLength := uint8(data[offset]) + offset += 1 if len(data) < offset+int(topicNameLength) { break @@ -444,12 +460,12 @@ func (h *Handler) parseOffsetFetchRequest(data []byte) (*OffsetFetchRequest, err topicName := string(data[offset : offset+int(topicNameLength)]) offset += int(topicNameLength) - // Parse partitions array - if len(data) < offset+4 { + // Parse partitions array - OffsetFetch uses 1-byte count + if len(data) < offset+1 { break } - partitionsCount := binary.BigEndian.Uint32(data[offset : offset+4]) - offset += 4 + partitionsCount := uint32(data[offset]) + offset += 1 partitions := make([]int32, 0, partitionsCount)