Browse Source

mq(kafka): advertise Metadata v1 and implement v1 response; stabilize JoinGroup IDs; encode consumer subscription metadata with UserData; gate JoinGroup fields by version; revert subscription version to 0 for compatibility

pull/7231/head
chrislu 2 months ago
parent
commit
ef609eebd2
  1. 87
      weed/mq/kafka/protocol/handler.go
  2. 55
      weed/mq/kafka/protocol/joingroup.go

87
weed/mq/kafka/protocol/handler.go

@ -317,7 +317,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
// TODO: Fix Metadata v1 format - kafka-go rejects our v1 response with "Unknown Topic Or Partition"
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 0) // max version 0 (v1 has format issue)
response = append(response, 0, 1) // max version 1
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 2) // API key 2
@ -490,7 +490,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host(STRING) + port(4) + rack(NULLABLE_STRING)
// Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING)
response = append(response, 0, 0, 0, 0) // node_id = 0
// Use dynamic broker address set by the server
@ -508,14 +508,11 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
binary.BigEndian.PutUint32(portBytes, uint32(port))
response = append(response, portBytes...)
// Rack (NULLABLE_STRING) - null (-1 length, 2 bytes)
response = append(response, 0xFF, 0xFF)
// Rack (STRING, NOT nullable in v1) - use empty string
response = append(response, 0x00, 0x00)
// Cluster ID (NULLABLE_STRING) - null (-1 length, 2 bytes)
response = append(response, 0xFF, 0xFF)
// Controller ID (4 bytes) - -1 (no controller)
response = append(response, 0xFF, 0xFF, 0xFF, 0xFF)
// Controller ID (4 bytes) - use broker 0 as controller
response = append(response, 0x00, 0x00, 0x00, 0x00)
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
@ -543,7 +540,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn)))
response = append(response, topicsCountBytes...)
// Topic entries (same format as v0)
// Topic entries (v1)
for _, topicName := range topicsToReturn {
// error_code(2) = 0
response = append(response, 0, 0)
@ -579,49 +576,57 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
}
func (h *Handler) parseMetadataTopics(requestBody []byte) []string {
// Parse Metadata request to extract requested topics
// Format: client_id + topics_array
// Temporarily disable debug logging to test performance
if len(requestBody) < 6 { // at minimum: client_id_size(2) + topics_count(4)
return []string{} // Return empty - means "all topics"
// Support both v0/v1 parsing: v1 payload starts directly with topics array length (int32),
// while older assumptions may have included a client_id string first.
if len(requestBody) < 4 {
return []string{}
}
// Skip client_id
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)
// Try path A: interpret first 4 bytes as topics_count
offset := 0
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
if topicsCount == 0xFFFFFFFF { // -1 means all topics
return []string{}
}
if topicsCount <= 1000000 { // sane bound
offset += 4
topics := make([]string, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ {
nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
if offset+nameLen > len(requestBody) {
break
}
topics = append(topics, string(requestBody[offset:offset+nameLen]))
offset += nameLen
}
return topics
}
// Path B: assume leading client_id string then topics_count
if len(requestBody) < 6 {
return []string{}
}
clientIDLen := int(binary.BigEndian.Uint16(requestBody[0:2]))
offset = 2 + clientIDLen
if len(requestBody) < offset+4 {
return []string{}
}
// Parse topics count
topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
topicsCount = binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
if topicsCount == 0 || topicsCount > 1000000 { // sanity check
return []string{} // Return empty - means "all topics"
if topicsCount == 0xFFFFFFFF {
return []string{}
}
// Parse each requested topic name
topics := make([]string, 0, topicsCount)
for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
if len(requestBody) < offset+2 {
break
}
topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ {
nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
offset += 2
if len(requestBody) < offset+int(topicNameSize) {
if offset+nameLen > len(requestBody) {
break
}
topicName := string(requestBody[offset : offset+int(topicNameSize)])
topics = append(topics, topicName)
offset += int(topicNameSize)
topics = append(topics, string(requestBody[offset:offset+nameLen]))
offset += nameLen
}
return topics
}
@ -1044,7 +1049,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) (
func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error {
supportedVersions := map[uint16][2]uint16{
18: {0, 3}, // ApiVersions: v0-v3
3: {0, 0}, // Metadata: v0 only (v1 has format issue)
3: {0, 1}, // Metadata: v0-v1
0: {0, 1}, // Produce: v0-v1
1: {0, 1}, // Fetch: v0-v1
2: {0, 5}, // ListOffsets: v0-v5

55
weed/mq/kafka/protocol/joingroup.go

@ -36,6 +36,7 @@ type JoinGroupResponse struct {
GroupProtocol string
GroupLeader string
MemberID string
Version uint16
Members []JoinGroupMember // Only populated for group leader
}
@ -184,7 +185,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
fmt.Printf("DEBUG: JoinGroup generating subscription metadata for topics: %v\n", availableTopics)
metadata := make([]byte, 0, 64)
// Version (2 bytes) - use version 0
// Version (2 bytes) - use version 0 to exclude OwnedPartitions
metadata = append(metadata, 0, 0)
// Topics count (4 bytes)
topicsCount := make([]byte, 4)
@ -197,6 +198,10 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
metadata = append(metadata, topicLen...)
metadata = append(metadata, []byte(topic)...)
}
// UserData (nullable bytes) - encode empty (length 0)
userDataLen := make([]byte, 4)
binary.BigEndian.PutUint32(userDataLen, 0)
metadata = append(metadata, userDataLen...)
member.Metadata = metadata
fmt.Printf("DEBUG: JoinGroup generated metadata (%d bytes): %x\n", len(metadata), metadata)
} else {
@ -236,6 +241,7 @@ func (h *Handler) handleJoinGroup(correlationID uint32, apiVersion uint16, reque
GroupProtocol: groupProtocol,
GroupLeader: group.Leader,
MemberID: memberID,
Version: apiVersion,
}
fmt.Printf("DEBUG: JoinGroup response - Generation: %d, Protocol: '%s', Leader: '%s', Member: '%s'\n",
@ -336,9 +342,26 @@ func (h *Handler) parseJoinGroupRequest(data []byte) (*JoinGroupRequest, error)
func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
// Estimate response size
estimatedSize := 32 + len(response.GroupProtocol) + len(response.GroupLeader) + len(response.MemberID)
estimatedSize := 0
// CorrelationID(4) + (optional throttle 4) + error_code(2) + generation_id(4)
if response.Version >= 2 {
estimatedSize = 4 + 4 + 2 + 4
} else {
estimatedSize = 4 + 2 + 4
}
estimatedSize += 2 + len(response.GroupProtocol) // protocol string
estimatedSize += 2 + len(response.GroupLeader) // leader string
estimatedSize += 2 + len(response.MemberID) // member id string
estimatedSize += 4 // members array count
for _, member := range response.Members {
estimatedSize += len(member.MemberID) + len(member.GroupInstanceID) + len(member.Metadata) + 8
// MemberID string
estimatedSize += 2 + len(member.MemberID)
if response.Version >= 5 {
// GroupInstanceID string
estimatedSize += 2 + len(member.GroupInstanceID)
}
// Metadata bytes (4 + len)
estimatedSize += 4 + len(member.Metadata)
}
result := make([]byte, 0, estimatedSize)
@ -348,11 +371,12 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
binary.BigEndian.PutUint32(correlationIDBytes, response.CorrelationID)
result = append(result, correlationIDBytes...)
// JoinGroup v2 Response Format: throttle_time_ms + error_code + generation_id + ...
// Throttle time (4 bytes) - CRITICAL: This was missing!
throttleTimeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling
result = append(result, throttleTimeBytes...)
// JoinGroup v2 adds throttle_time_ms
if response.Version >= 2 {
throttleTimeBytes := make([]byte, 4)
binary.BigEndian.PutUint32(throttleTimeBytes, 0) // No throttling
result = append(result, throttleTimeBytes...)
}
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
@ -394,12 +418,14 @@ func (h *Handler) buildJoinGroupResponse(response JoinGroupResponse) []byte {
result = append(result, memberLength...)
result = append(result, []byte(member.MemberID)...)
// Group instance ID (string) - can be empty
instanceIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID)))
result = append(result, instanceIDLength...)
if len(member.GroupInstanceID) > 0 {
result = append(result, []byte(member.GroupInstanceID)...)
if response.Version >= 5 {
// Group instance ID (string) - can be empty
instanceIDLength := make([]byte, 2)
binary.BigEndian.PutUint16(instanceIDLength, uint16(len(member.GroupInstanceID)))
result = append(result, instanceIDLength...)
if len(member.GroupInstanceID) > 0 {
result = append(result, []byte(member.GroupInstanceID)...)
}
}
// Metadata (bytes)
@ -420,6 +446,7 @@ func (h *Handler) buildJoinGroupErrorResponse(correlationID uint32, errorCode in
GroupProtocol: "",
GroupLeader: "",
MemberID: "",
Version: 2,
Members: []JoinGroupMember{},
}

Loading…
Cancel
Save