diff --git a/weed/mq/kafka/protocol/find_coordinator.go b/weed/mq/kafka/protocol/find_coordinator.go index 7eb172c4c..a5123666a 100644 --- a/weed/mq/kafka/protocol/find_coordinator.go +++ b/weed/mq/kafka/protocol/find_coordinator.go @@ -8,80 +8,80 @@ import ( func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte) ([]byte, error) { // Parse FindCoordinator request // Request format: client_id + coordinator_key + coordinator_type(1) - + // DEBUG: Hex dump the request to understand format dumpLen := len(requestBody) if dumpLen > 50 { dumpLen = 50 } fmt.Printf("DEBUG: FindCoordinator request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) - + if len(requestBody) < 2 { // client_id_size(2) return nil, fmt.Errorf("FindCoordinator request too short") } - + // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) fmt.Printf("DEBUG: FindCoordinator client_id_size: %d\n", clientIDSize) offset := 2 + int(clientIDSize) - + if len(requestBody) < offset+3 { // coordinator_key_size(2) + coordinator_type(1) return nil, fmt.Errorf("FindCoordinator request missing data (need %d bytes, have %d)", offset+3, len(requestBody)) } - + // Parse coordinator key (group ID for consumer groups) coordinatorKeySize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) fmt.Printf("DEBUG: FindCoordinator coordinator_key_size: %d, offset: %d\n", coordinatorKeySize, offset) offset += 2 - + if len(requestBody) < offset+int(coordinatorKeySize) { return nil, fmt.Errorf("FindCoordinator request missing coordinator key (need %d bytes, have %d)", offset+int(coordinatorKeySize), len(requestBody)) } - + coordinatorKey := string(requestBody[offset : offset+int(coordinatorKeySize)]) offset += int(coordinatorKeySize) - + // Coordinator type is optional in some versions, default to 0 (group coordinator) var coordinatorType byte = 0 if offset < len(requestBody) { coordinatorType = requestBody[offset] } _ = coordinatorType // 0 = group coordinator, 1 = transaction coordinator - + fmt.Printf("DEBUG: FindCoordinator request for key '%s' (type: %d)\n", coordinatorKey, coordinatorType) - + response := make([]byte, 0, 64) - + // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // FindCoordinator v0 Response Format (no throttle_time or error_message): // - error_code (INT16) - // - node_id (INT32) + // - node_id (INT32) // - host (STRING) // - port (INT32) - + // Error code (2 bytes, 0 = no error) response = append(response, 0, 0) - + // Coordinator node_id (4 bytes) - use broker 0 (this gateway) response = append(response, 0, 0, 0, 0) - + // Coordinator host (string) host := h.brokerHost hostLen := uint16(len(host)) response = append(response, byte(hostLen>>8), byte(hostLen)) response = append(response, []byte(host)...) - + // Coordinator port (4 bytes) portBytes := make([]byte, 4) binary.BigEndian.PutUint32(portBytes, uint32(h.brokerPort)) response = append(response, portBytes...) - + fmt.Printf("DEBUG: FindCoordinator response: coordinator at %s:%d\n", host, h.brokerPort) fmt.Printf("DEBUG: FindCoordinator response hex dump (%d bytes): %x\n", len(response), response) - + return response, nil } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 27a2e4e39..24ed3db64 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -226,7 +226,13 @@ func (h *Handler) HandleConn(conn net.Conn) error { case 1: // Fetch response, err = h.handleFetch(correlationID, messageBuf[8:]) // skip header case 11: // JoinGroup - response, err = h.handleJoinGroup(correlationID, messageBuf[8:]) // skip header + fmt.Printf("DEBUG: *** JOINGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) + response, err = h.handleJoinGroup(correlationID, apiVersion, messageBuf[8:]) // skip header + if err != nil { + fmt.Printf("DEBUG: JoinGroup error: %v\n", err) + } else { + fmt.Printf("DEBUG: JoinGroup response hex dump (%d bytes): %x\n", len(response), response) + } case 14: // SyncGroup response, err = h.handleSyncGroup(correlationID, messageBuf[8:]) // skip header case 8: // OffsetCommit diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 08aba4451..dc74e60e0 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -57,13 +57,24 @@ const ( ErrorCodeFencedInstanceID int16 = 82 ) -func (h *Handler) handleJoinGroup(correlationID uint32, requestBody []byte) ([]byte, error) { +func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + // DEBUG: Hex dump the request to understand format + dumpLen := len(requestBody) + if dumpLen > 100 { + dumpLen = 100 + } + fmt.Printf("DEBUG: JoinGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) + // Parse JoinGroup request request, err := h.parseJoinGroupRequest(requestBody) if err != nil { + fmt.Printf("DEBUG: JoinGroup parseJoinGroupRequest error: %v\n", err) return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } + fmt.Printf("DEBUG: JoinGroup parsed request - GroupID: '%s', MemberID: '%s', SessionTimeout: %d\n", + request.GroupID, request.MemberID, request.SessionTimeout) + // Validate request if request.GroupID == "" { return h.buildJoinGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil @@ -185,7 +196,15 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) offset := 0 + // Skip client_id (part of request header, not JoinGroup payload) + clientIDLength := int(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + clientIDLength + fmt.Printf("DEBUG: JoinGroup skipped client_id (%d bytes), offset now: %d\n", clientIDLength, offset) + // GroupID (string) + if offset+2 > len(data) { + return nil, fmt.Errorf("missing group ID length") + } groupIDLength := int(binary.BigEndian.Uint16(data[offset:])) offset += 2 if offset+groupIDLength > len(data) { @@ -193,6 +212,7 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) } groupID := string(data[offset : offset+groupIDLength]) offset += groupIDLength + fmt.Printf("DEBUG: JoinGroup parsed GroupID: '%s', offset now: %d\n", groupID, offset) // Session timeout (4 bytes) if offset+4 > len(data) {