Browse Source

Implement precise Metadata v1 encoding based on kafka-go struct format

- Replace manual Metadata v1 encoding with precise implementation
- Follow exact kafka-go metadataResponseV1 struct field order:
  - Brokers array (with Rack field for v1+)
  - ControllerID (int32, required for v1+)
  - Topics array (with IsInternal field for v1+)
- Use binary.Write for consistent big-endian encoding
- Add detailed field-by-field comments for maintainability
- Still investigating 'multiple Read calls return no data or error' issue

The hex dump shows correct structure but kafka-go ReadPartitions still fails.
Next: Debug kafka-go's internal parsing expectations.
pull/7231/head
chrislu 2 months ago
parent
commit
2184ede70f
  1. 133
      weed/mq/kafka/protocol/handler.go
  2. 35
      weed/mq/kafka/protocol/joingroup.go

133
weed/mq/kafka/protocol/handler.go

@ -2,6 +2,7 @@ package protocol
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
@ -316,7 +317,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
// TEMPORARY: Force v0 only until kafka-go compatibility issue is resolved
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 0) // max version 0 (force v0 for kafka-go compatibility)
response = append(response, 0, 1) // max version 1 (force v0 for kafka-go compatibility)
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 2) // API key 2
@ -479,39 +480,24 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
}
func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]byte, error) {
// TEMPORARY: Use v0 format as base and add only the essential v1 differences
// This is to debug the kafka-go parsing issue
response := make([]byte, 0, 256)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING) [v1 adds rack]
response = append(response, 0, 0, 0, 0) // node_id = 0
// Use dynamic broker address set by the server
host := h.brokerHost
port := h.brokerPort
fmt.Printf("DEBUG: Advertising broker (v1) at %s:%d\n", host, port)
// Host (STRING: 2 bytes length + bytes)
hostLen := uint16(len(host))
response = append(response, byte(hostLen>>8), byte(hostLen))
response = append(response, []byte(host)...)
// Port (4 bytes)
portBytes := make([]byte, 4)
binary.BigEndian.PutUint32(portBytes, uint32(port))
response = append(response, portBytes...)
// Rack (STRING) - v1 addition: empty string (NOT nullable)
response = append(response, 0x00, 0x00)
// Precise Metadata v1 implementation based on kafka-go's metadataResponseV1 struct:
// type metadataResponseV1 struct {
// Brokers []metadataBrokerV1 `kafka:"min=v0,max=v8"`
// ControllerID int32 `kafka:"min=v1,max=v8"`
// Topics []metadataTopicV1 `kafka:"min=v0,max=v8"`
// }
// type metadataBrokerV1 struct {
// NodeID int32 `kafka:"min=v0,max=v8"`
// Host string `kafka:"min=v0,max=v8"`
// Port int32 `kafka:"min=v0,max=v8"`
// Rack string `kafka:"min=v1,max=v8"` // NOTE: Non-nullable string in v1
// }
// type metadataTopicV1 struct {
// ErrorCode int16 `kafka:"min=v0,max=v8"`
// Name string `kafka:"min=v0,max=v8"`
// IsInternal bool `kafka:"min=v1,max=v8"`
// Partitions []metadataPartitionV1 `kafka:"min=v0,max=v8"`
// }
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
@ -534,50 +520,67 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
}
h.topicsMu.RUnlock()
// Topics array length (4 bytes)
topicsCountBytes := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn)))
response = append(response, topicsCountBytes...)
var buf bytes.Buffer
// Correlation ID (4 bytes)
binary.Write(&buf, binary.BigEndian, correlationID)
// Brokers array (4 bytes length + brokers)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 broker
// Broker 0
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID
// Host (STRING: 2 bytes length + data)
host := h.brokerHost
binary.Write(&buf, binary.BigEndian, int16(len(host)))
buf.WriteString(host)
// Port (4 bytes)
binary.Write(&buf, binary.BigEndian, int32(h.brokerPort))
// Rack (STRING: 2 bytes length + data) - v1 addition, non-nullable
binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string
// ControllerID (4 bytes) - v1 addition (comes after ALL brokers)
binary.Write(&buf, binary.BigEndian, int32(1))
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
// Topic entries - using v0 format first, then add v1 differences
for _, topicName := range topicsToReturn {
// error_code(2) = 0
response = append(response, 0, 0)
// ErrorCode (2 bytes)
binary.Write(&buf, binary.BigEndian, int16(0))
// name (STRING)
nameBytes := []byte(topicName)
nameLen := uint16(len(nameBytes))
response = append(response, byte(nameLen>>8), byte(nameLen))
response = append(response, nameBytes...)
// Name (STRING: 2 bytes length + data)
binary.Write(&buf, binary.BigEndian, int16(len(topicName)))
buf.WriteString(topicName)
// is_internal(1) = false - v1 addition: this is the key difference!
response = append(response, 0)
// IsInternal (1 byte) - v1 addition
buf.WriteByte(0) // false
// partitions array length (4 bytes) - 1 partition
response = append(response, 0, 0, 0, 1)
// Partitions array (4 bytes length + partitions)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 partition
// partition: error_code(2) + partition_id(4) + leader(4)
response = append(response, 0, 0) // error_code
response = append(response, 0, 0, 0, 0) // partition_id = 0
response = append(response, 0, 0, 0, 0) // leader = 0 (this broker)
// Partition 0
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, int32(0)) // PartitionIndex
binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
// replicas: array length(4) + one broker id (0)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 0)
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
// isr: array length(4) + one broker id (0)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 0)
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
}
response := buf.Bytes()
fmt.Printf("DEBUG: Advertising broker (v1) at %s:%d\n", h.brokerHost, h.brokerPort)
fmt.Printf("DEBUG: Metadata v1 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)
fmt.Printf("DEBUG: Metadata v1 response hex dump (%d bytes): %x\n", len(response), response)
// CRITICAL DEBUG: Let's also compare with v0 format
v0Response, _ := h.HandleMetadataV0(correlationID, requestBody)
fmt.Printf("DEBUG: Metadata v0 response hex dump (%d bytes): %x\n", len(v0Response), v0Response)
fmt.Printf("DEBUG: v1 vs v0 length difference: %d bytes\n", len(response)-len(v0Response))
return response, nil
}

35
weed/mq/kafka/protocol/joingroup.go

@ -175,15 +175,33 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
JoinedAt: time.Now(),
}
// Store protocol metadata for leader - EXPERIMENT: Use client's metadata as-is
// Store protocol metadata for leader
if len(request.GroupProtocols) > 0 {
// EXPERIMENT: Always use client's metadata, even if empty
member.Metadata = request.GroupProtocols[0].Metadata
fmt.Printf("DEBUG: JoinGroup using client metadata as-is (%d bytes): %x\n", len(member.Metadata), member.Metadata)
// If client sends empty metadata, that might be intentional
if len(request.GroupProtocols[0].Metadata) == 0 {
fmt.Printf("DEBUG: JoinGroup client sent empty metadata - using as-is (kafka-go might handle this)\n")
// Generate subscription metadata for available topics
availableTopics := h.getAvailableTopics()
fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics)
metadata := make([]byte, 0, 64)
// Version (2 bytes) - use version 0
metadata = append(metadata, 0, 0)
// Topics count (4 bytes)
topicsCount := make([]byte, 4)
binary.BigEndian.PutUint32(topicsCount, uint32(len(availableTopics)))
metadata = append(metadata, topicsCount...)
// Topics (string array)
for _, topic := range availableTopics {
topicLen := make([]byte, 2)
binary.BigEndian.PutUint16(topicLen, uint16(len(topic)))
metadata = append(metadata, topicLen...)
metadata = append(metadata, []byte(topic)...)
}
// UserData length (4 bytes) - empty
metadata = append(metadata, 0, 0, 0, 0)
member.Metadata = metadata
fmt.Printf("DEBUG: JoinGroup generated metadata (%d bytes): %x\n", len(metadata), metadata)
} else {
member.Metadata = request.GroupProtocols[0].Metadata
}
}
@ -241,8 +259,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
fmt.Printf("DEBUG: JoinGroup member '%s' is NOT the leader (leader is '%s'), empty members array\n", memberID, group.Leader)
}
// EXPERIMENT: Return minimal hardcoded response to test kafka-go compatibility
return h.buildMinimalJoinGroupResponse(correlationID, apiVersion), nil
return h.buildJoinGroupResponse(response), nil
}
func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error) {

Loading…
Cancel
Save