diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index f70a206b2..9f5d0686f 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -317,7 +317,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { // TODO: Fix Metadata v1 format - kafka-go rejects our v1 response with "Unknown Topic Or Partition" response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 0) // max version 0 (v1 has format issue) + response = append(response, 0, 1) // max version 1 // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -490,7 +490,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] // Brokers array length (4 bytes) - 1 broker (this gateway) response = append(response, 0, 0, 0, 1) - // Broker 0: node_id(4) + host(STRING) + port(4) + rack(NULLABLE_STRING) + // Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING) response = append(response, 0, 0, 0, 0) // node_id = 0 // Use dynamic broker address set by the server @@ -508,14 +508,11 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] binary.BigEndian.PutUint32(portBytes, uint32(port)) response = append(response, portBytes...) - // Rack (NULLABLE_STRING) - null (-1 length, 2 bytes) - response = append(response, 0xFF, 0xFF) + // Rack (STRING, NOT nullable in v1) - use empty string + response = append(response, 0x00, 0x00) - // Cluster ID (NULLABLE_STRING) - null (-1 length, 2 bytes) - response = append(response, 0xFF, 0xFF) - - // Controller ID (4 bytes) - -1 (no controller) - response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) + // Controller ID (4 bytes) - use broker 0 as controller + response = append(response, 0x00, 0x00, 0x00, 0x00) // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) @@ -543,7 +540,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) response = append(response, topicsCountBytes...) - // Topic entries (same format as v0) + // Topic entries (v1) for _, topicName := range topicsToReturn { // error_code(2) = 0 response = append(response, 0, 0) @@ -579,49 +576,57 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] } func (h *Handler) parseMetadataTopics(requestBody []byte) []string { - // Parse Metadata request to extract requested topics - // Format: client_id + topics_array - - // Temporarily disable debug logging to test performance - if len(requestBody) < 6 { // at minimum: client_id_size(2) + topics_count(4) - return []string{} // Return empty - means "all topics" + // Support both v0/v1 parsing: v1 payload starts directly with topics array length (int32), + // while older assumptions may have included a client_id string first. + if len(requestBody) < 4 { + return []string{} } - // Skip client_id - clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) - offset := 2 + int(clientIDSize) + // Try path A: interpret first 4 bytes as topics_count + offset := 0 + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + if topicsCount == 0xFFFFFFFF { // -1 means all topics + return []string{} + } + if topicsCount <= 1000000 { // sane bound + offset += 4 + topics := make([]string, 0, topicsCount) + for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ { + nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + if offset+nameLen > len(requestBody) { + break + } + topics = append(topics, string(requestBody[offset:offset+nameLen])) + offset += nameLen + } + return topics + } + // Path B: assume leading client_id string then topics_count + if len(requestBody) < 6 { + return []string{} + } + clientIDLen := int(binary.BigEndian.Uint16(requestBody[0:2])) + offset = 2 + clientIDLen if len(requestBody) < offset+4 { return []string{} } - - // Parse topics count - topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + topicsCount = binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - - if topicsCount == 0 || topicsCount > 1000000 { // sanity check - return []string{} // Return empty - means "all topics" + if topicsCount == 0xFFFFFFFF { + return []string{} } - - // Parse each requested topic name topics := make([]string, 0, topicsCount) - for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { - if len(requestBody) < offset+2 { - break - } - - topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ { + nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) offset += 2 - - if len(requestBody) < offset+int(topicNameSize) { + if offset+nameLen > len(requestBody) { break } - - topicName := string(requestBody[offset : offset+int(topicNameSize)]) - topics = append(topics, topicName) - offset += int(topicNameSize) + topics = append(topics, string(requestBody[offset:offset+nameLen])) + offset += nameLen } - return topics } @@ -1044,7 +1049,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { supportedVersions := map[uint16][2]uint16{ 18: {0, 3}, // ApiVersions: v0-v3 - 3: {0, 0}, // Metadata: v0 only (v1 has format issue) + 3: {0, 1}, // Metadata: v0-v1 0: {0, 1}, // Produce: v0-v1 1: {0, 1}, // Fetch: v0-v1 2: {0, 5}, // ListOffsets: v0-v5 diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index 25e57c4a6..7af218514 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -36,6 +36,7 @@ type JoinGroupResponse struct { GroupProtocol string GroupLeader string MemberID string + Version uint16 Members []JoinGroupMember // Only populated for group leader } @@ -184,7 +185,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics) metadata := make([]byte, 0, 64) - // Version (2 bytes) - use version 0 + // Version (2 bytes) - use version 0 to exclude OwnedPartitions metadata = append(metadata, 0, 0) // Topics count (4 bytes) topicsCount := make([]byte, 4) @@ -197,6 +198,10 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque metadata = append(metadata, topicLen...) metadata = append(metadata, []byte(topic)...) } + // UserData (nullable bytes) - encode empty (length 0) + userDataLen := make([]byte, 4) + binary.BigEndian.PutUint32(userDataLen, 0) + metadata = append(metadata, userDataLen...) member.Metadata = metadata fmt.Printf("DEBUG: JoinGroup generated metadata (%d bytes): %x\n", len(metadata), metadata) } else { @@ -236,6 +241,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque GroupProtocol: groupProtocol, GroupLeader: group.Leader, MemberID: memberID, + Version: apiVersion, } fmt.Printf("DEBUG: JoinGroup response - Generation: %d, Protocol: '%s', Leader: '%s', Member: '%s'\n", @@ -336,9 +342,26 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { // Estimate response size - estimatedSize := 32 + len(response.GroupProtocol) + len(response.GroupLeader) + len(response.MemberID) + estimatedSize := 0 + // CorrelationID(4) + (optional throttle 4) + error_code(2) + generation_id(4) + if response.Version >= 2 { + estimatedSize = 4 + 4 + 2 + 4 + } else { + estimatedSize = 4 + 2 + 4 + } + estimatedSize += 2 + len(response.GroupProtocol) // protocol string + estimatedSize += 2 + len(response.GroupLeader) // leader string + estimatedSize += 2 + len(response.MemberID) // member id string + estimatedSize += 4 // members array count for _, member := range response.Members { - estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + len(member.Metadata) + 8 + // MemberID string + estimatedSize += 2 + len(member.MemberID) + if response.Version >= 5 { + // GroupInstanceID string + estimatedSize += 2 + len(member.GroupInstanceID) + } + // Metadata bytes (4 + len) + estimatedSize += 4 + len(member.Metadata) } result := make([]byte, 0, estimatedSize) @@ -348,11 +371,12 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID) result = append(result, correlationIDBytes...) - // JoinGroup v2 Response Format: throttle_time_ms + error_code + generation_id + ... - // Throttle time (4 bytes) - CRITICAL: This was missing! - throttleTimeBytes := make([]byte, 4) - binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling - result = append(result, throttleTimeBytes...) + // JoinGroup v2 adds throttle_time_ms + if response.Version >= 2 { + throttleTimeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling + result = append(result, throttleTimeBytes...) + } // Error code (2 bytes) errorCodeBytes := make([]byte, 2) @@ -394,12 +418,14 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte { result = append(result, memberLength...) result = append(result, []byte(member.MemberID)...) - // Group instance ID (string) - can be empty - instanceIDLength := make([]byte, 2) - binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID))) - result = append(result, instanceIDLength...) - if len(member.GroupInstanceID) > 0 { - result = append(result, []byte(member.GroupInstanceID)...) + if response.Version >= 5 { + // Group instance ID (string) - can be empty + instanceIDLength := make([]byte, 2) + binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID))) + result = append(result, instanceIDLength...) + if len(member.GroupInstanceID) > 0 { + result = append(result, []byte(member.GroupInstanceID)...) + } } // Metadata (bytes) @@ -420,6 +446,7 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in GroupProtocol: "", GroupLeader: "", MemberID: "", + Version: 2, Members: []JoinGroupMember{}, }