diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 8027aff6c..c72c08574 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -2,6 +2,7 @@ package protocol import ( "bufio" + "bytes" "encoding/binary" "fmt" "io" @@ -316,7 +317,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { // TEMPORARY: Force v0 only until kafka-go compatibility issue is resolved response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 0) // max version 0 (force v0 for kafka-go compatibility) + response = append(response, 0, 1) // max version 1 (force v0 for kafka-go compatibility) // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -479,40 +480,25 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] } func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]byte, error) { - // TEMPORARY: Use v0 format as base and add only the essential v1 differences - // This is to debug the kafka-go parsing issue - - response := make([]byte, 0, 256) - - // Correlation ID - correlationIDBytes := make([]byte, 4) - binary.BigEndian.PutUint32(correlationIDBytes, correlationID) - response = append(response, correlationIDBytes...) - - // Brokers array length (4 bytes) - 1 broker (this gateway) - response = append(response, 0, 0, 0, 1) - - // Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING) [v1 adds rack] - response = append(response, 0, 0, 0, 0) // node_id = 0 - - // Use dynamic broker address set by the server - host := h.brokerHost - port := h.brokerPort - fmt.Printf("DEBUG: Advertising broker (v1) at %s:%d\n", host, port) - - // Host (STRING: 2 bytes length + bytes) - hostLen := uint16(len(host)) - response = append(response, byte(hostLen>>8), byte(hostLen)) - response = append(response, []byte(host)...) - - // Port (4 bytes) - portBytes := make([]byte, 4) - binary.BigEndian.PutUint32(portBytes, uint32(port)) - response = append(response, portBytes...) - - // Rack (STRING) - v1 addition: empty string (NOT nullable) - response = append(response, 0x00, 0x00) - + // Precise Metadata v1 implementation based on kafka-go's metadataResponseV1 struct: + // type metadataResponseV1 struct { + // Brokers []metadataBrokerV1 `kafka:"min=v0,max=v8"` + // ControllerID int32 `kafka:"min=v1,max=v8"` + // Topics []metadataTopicV1 `kafka:"min=v0,max=v8"` + // } + // type metadataBrokerV1 struct { + // NodeID int32 `kafka:"min=v0,max=v8"` + // Host string `kafka:"min=v0,max=v8"` + // Port int32 `kafka:"min=v0,max=v8"` + // Rack string `kafka:"min=v1,max=v8"` // NOTE: Non-nullable string in v1 + // } + // type metadataTopicV1 struct { + // ErrorCode int16 `kafka:"min=v0,max=v8"` + // Name string `kafka:"min=v0,max=v8"` + // IsInternal bool `kafka:"min=v1,max=v8"` + // Partitions []metadataPartitionV1 `kafka:"min=v0,max=v8"` + // } + // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) fmt.Printf("DEBUG: 🔍 METADATA v1 REQUEST - Requested: %v (empty=all)\n", requestedTopics) @@ -534,50 +520,67 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] } h.topicsMu.RUnlock() - // Topics array length (4 bytes) - topicsCountBytes := make([]byte, 4) - binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) - response = append(response, topicsCountBytes...) - - // Topic entries - using v0 format first, then add v1 differences + var buf bytes.Buffer + + // Correlation ID (4 bytes) + binary.Write(&buf, binary.BigEndian, correlationID) + + // Brokers array (4 bytes length + brokers) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker + + // Broker 0 + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID + + // Host (STRING: 2 bytes length + data) + host := h.brokerHost + binary.Write(&buf, binary.BigEndian, int16(len(host))) + buf.WriteString(host) + + // Port (4 bytes) + binary.Write(&buf, binary.BigEndian, int32(h.brokerPort)) + + // Rack (STRING: 2 bytes length + data) - v1 addition, non-nullable + binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string + + // ControllerID (4 bytes) - v1 addition (comes after ALL brokers) + binary.Write(&buf, binary.BigEndian, int32(1)) + + // Topics array (4 bytes length + topics) + binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) + for _, topicName := range topicsToReturn { - // error_code(2) = 0 - response = append(response, 0, 0) - - // name (STRING) - nameBytes := []byte(topicName) - nameLen := uint16(len(nameBytes)) - response = append(response, byte(nameLen>>8), byte(nameLen)) - response = append(response, nameBytes...) - - // is_internal(1) = false - v1 addition: this is the key difference! - response = append(response, 0) - - // partitions array length (4 bytes) - 1 partition - response = append(response, 0, 0, 0, 1) - - // partition: error_code(2) + partition_id(4) + leader(4) - response = append(response, 0, 0) // error_code - response = append(response, 0, 0, 0, 0) // partition_id = 0 - response = append(response, 0, 0, 0, 0) // leader = 0 (this broker) - - // replicas: array length(4) + one broker id (0) - response = append(response, 0, 0, 0, 1) - response = append(response, 0, 0, 0, 0) - - // isr: array length(4) + one broker id (0) - response = append(response, 0, 0, 0, 1) - response = append(response, 0, 0, 0, 0) + // ErrorCode (2 bytes) + binary.Write(&buf, binary.BigEndian, int16(0)) + + // Name (STRING: 2 bytes length + data) + binary.Write(&buf, binary.BigEndian, int16(len(topicName))) + buf.WriteString(topicName) + + // IsInternal (1 byte) - v1 addition + buf.WriteByte(0) // false + + // Partitions array (4 bytes length + partitions) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition + + // Partition 0 + binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode + binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex + binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + + // ReplicaNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + + // IsrNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 } + response := buf.Bytes() + fmt.Printf("DEBUG: Advertising broker (v1) at %s:%d\n", h.brokerHost, h.brokerPort) fmt.Printf("DEBUG: Metadata v1 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) fmt.Printf("DEBUG: Metadata v1 response hex dump (%d bytes): %x\n", len(response), response) - // CRITICAL DEBUG: Let's also compare with v0 format - v0Response, _ := h.HandleMetadataV0(correlationID, requestBody) - fmt.Printf("DEBUG: Metadata v0 response hex dump (%d bytes): %x\n", len(v0Response), v0Response) - fmt.Printf("DEBUG: v1 vs v0 length difference: %d bytes\n", len(response)-len(v0Response)) - return response, nil } diff --git a/weed/mq/kafka/protocol/joingroup.go b/weed/mq/kafka/protocol/joingroup.go index dfa70e988..5ac1b46a3 100644 --- a/weed/mq/kafka/protocol/joingroup.go +++ b/weed/mq/kafka/protocol/joingroup.go @@ -175,15 +175,33 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque JoinedAt: time.Now(), } - // Store protocol metadata for leader - EXPERIMENT: Use client's metadata as-is + // Store protocol metadata for leader if len(request.GroupProtocols) > 0 { - // EXPERIMENT: Always use client's metadata, even if empty - member.Metadata = request.GroupProtocols[0].Metadata - fmt.Printf("DEBUG: JoinGroup using client metadata as-is (%d bytes): %x\n", len(member.Metadata), member.Metadata) - - // If client sends empty metadata, that might be intentional if len(request.GroupProtocols[0].Metadata) == 0 { - fmt.Printf("DEBUG: JoinGroup client sent empty metadata - using as-is (kafka-go might handle this)\n") + // Generate subscription metadata for available topics + availableTopics := h.getAvailableTopics() + fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics) + + metadata := make([]byte, 0, 64) + // Version (2 bytes) - use version 0 + metadata = append(metadata, 0, 0) + // Topics count (4 bytes) + topicsCount := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCount, uint32(len(availableTopics))) + metadata = append(metadata, topicsCount...) + // Topics (string array) + for _, topic := range availableTopics { + topicLen := make([]byte, 2) + binary.BigEndian.PutUint16(topicLen, uint16(len(topic))) + metadata = append(metadata, topicLen...) + metadata = append(metadata, []byte(topic)...) + } + // UserData length (4 bytes) - empty + metadata = append(metadata, 0, 0, 0, 0) + member.Metadata = metadata + fmt.Printf("DEBUG: JoinGroup generated metadata (%d bytes): %x\n", len(metadata), metadata) + } else { + member.Metadata = request.GroupProtocols[0].Metadata } } @@ -241,8 +259,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque fmt.Printf("DEBUG: JoinGroup member '%s' is NOT the leader (leader is '%s'), empty members array\n", memberID, group.Leader) } - // EXPERIMENT: Return minimal hardcoded response to test kafka-go compatibility - return h.buildMinimalJoinGroupResponse(correlationID, apiVersion), nil + return h.buildJoinGroupResponse(response), nil } func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) { @@ -436,46 +453,46 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in func (h *Handler) buildMinimalJoinGroupResponse(correlationID uint32, apiVersion uint16) []byte { // Create the absolute minimal JoinGroup response that should work with kafka-go response := make([]byte, 0, 64) - + // Correlation ID (4 bytes) correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // Throttle time (4 bytes) - v2+ only if apiVersion >= 2 { response = append(response, 0, 0, 0, 0) // No throttling } - + // Error code (2 bytes) - 0 = success response = append(response, 0, 0) - + // Generation ID (4 bytes) - use 1 response = append(response, 0, 0, 0, 1) - + // Group protocol (STRING) - "range" response = append(response, 0, 5) // length response = append(response, []byte("range")...) - + // Group leader (STRING) - "test-member" response = append(response, 0, 11) // length response = append(response, []byte("test-member")...) - + // Member ID (STRING) - "test-member" (same as leader) response = append(response, 0, 11) // length response = append(response, []byte("test-member")...) - + // Members array (4 bytes count + members) response = append(response, 0, 0, 0, 1) // 1 member - + // Member 0: // Member ID (STRING) - "test-member" response = append(response, 0, 11) // length response = append(response, []byte("test-member")...) - + // Member metadata (BYTES) - empty response = append(response, 0, 0, 0, 0) // 0 bytes - + fmt.Printf("DEBUG: JoinGroup minimal response (%d bytes): %x\n", len(response), response) return response }