diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 24ed3db64..c69a46efe 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -234,7 +234,13 @@ func (h *Handler) HandleConn(conn net.Conn) error { fmt.Printf("DEBUG: JoinGroup response hex dump (%d bytes): %x\n", len(response), response) } case 14: // SyncGroup - response, err = h.handleSyncGroup(correlationID, messageBuf[8:]) // skip header + fmt.Printf("DEBUG: *** SYNCGROUP REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) + response, err = h.handleSyncGroup(correlationID, apiVersion, messageBuf[8:]) // skip header + if err != nil { + fmt.Printf("DEBUG: SyncGroup error: %v\n", err) + } else { + fmt.Printf("DEBUG: SyncGroup response hex dump (%d bytes): %x\n", len(response), response) + } case 8: // OffsetCommit response, err = h.handleOffsetCommit(correlationID, messageBuf[8:]) // skip header case 9: // OffsetFetch diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index dc74e60e0..ff23cbd55 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -277,6 +277,12 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) + // JoinGroup v2 Response Format: throttle_time_ms + error_code + generation_id + ... + // Throttle time (4 bytes) - CRITICAL: This was missing! + throttleTimeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling + result = append(result, throttleTimeBytes...) + // Error code (2 bytes) errorCodeBytes := make([]byte, 2) binary.BigEndian.PutUint16(errorCodeBytes, uint16(response.ErrorCode)) @@ -332,9 +338,6 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { result = append(result, member.Metadata...) } - // Throttle time (4 bytes, 0 = no throttling) - result = append(result, 0, 0, 0, 0) - return result } @@ -404,13 +407,24 @@ const ( ErrorCodeInconsistentGroupProtocol int16 = 23 ) -func (h *Handler) handleSyncGroup(correlationID uint32, requestBody []byte) ([]byte, error) { +func (h *Handler) handleSyncGroup(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + // DEBUG: Hex dump the request to understand format + dumpLen := len(requestBody) + if dumpLen > 100 { + dumpLen = 100 + } + fmt.Printf("DEBUG: SyncGroup request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) + // Parse SyncGroup request request, err := h.parseSyncGroupRequest(requestBody) if err != nil { + fmt.Printf("DEBUG: SyncGroup parseSyncGroupRequest error: %v\n", err) return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil } + fmt.Printf("DEBUG: SyncGroup parsed request - GroupID: '%s', MemberID: '%s', GenerationID: %d\n", + request.GroupID, request.MemberID, request.GenerationID) + // Validate request if request.GroupID == "" || request.MemberID == "" { return h.buildSyncGroupErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil