Browse Source

Limit Metadata API to v4 to fix kafka-go client compatibility

PARTIAL FIX: Force kafka-go to use Metadata v4 instead of v6

## Issue Identified:
- kafka-go was using Metadata v6 due to ApiVersions advertising v0-v6
- Our Metadata v6 implementation has format issues causing client failures
- Sarama works because it uses Metadata v4, not v6

## Changes:
- Limited Metadata API max version from 6 to 4 in ApiVersions response
- Added debug test to isolate Metadata parsing issues
- kafka-go now uses Metadata v4 (same as working Sarama)

## Status:
-  kafka-go now uses v4 instead of v6
-  Still has metadata loops (deeper issue with response format)
-  Produce operations work correctly
-  ReadPartitions API still fails

## Next Steps:
- Investigate why kafka-go keeps requesting metadata even with v4
- Compare exact byte format between working Sarama and failing kafka-go
- May need to fix specific fields in Metadata v4 response format

This is progress toward full kafka-go compatibility but more investigation needed.
pull/7231/head
chrislu 2 months ago
parent
commit
d6f688a44f
  1. 232
      test/kafka/metadata_debug_test.go
  2. 46
      weed/mq/kafka/protocol/handler.go

232
test/kafka/metadata_debug_test.go

@ -1,243 +1,47 @@
package kafka
import (
"encoding/binary"
"fmt"
"net"
"testing"
"time"
"github.com/segmentio/kafka-go"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
)
// TestMetadataV1DebugCapture captures the exact bytes kafka-go sends and expects
func TestMetadataV1DebugCapture(t *testing.T) {
// Start gateway server
gatewayServer := gateway.NewServer(gateway.Options{
Listen: ":0", // random port
})
func TestMetadataV6Debug(t *testing.T) {
// Start gateway
gatewayServer := gateway.NewServer(gateway.Options{Listen: "127.0.0.1:0"})
go func() {
if err := gatewayServer.Start(); err != nil {
t.Errorf("Gateway server error: %v", err)
t.Errorf("Failed to start gateway: %v", err)
}
}()
defer gatewayServer.Close()
// Wait for server to start
time.Sleep(100 * time.Millisecond)
// Get the actual listening address
host, port := gatewayServer.GetListenerAddr()
brokerAddr := fmt.Sprintf("%s:%d", host, port)
t.Logf("Gateway running on %s", brokerAddr)
// Get handler and configure it
handler := gatewayServer.GetHandler()
handler.SetBrokerAddress(host, port)
// Add test topic
topicName := "debug-topic"
handler.AddTopicForTesting(topicName, 1)
addr := fmt.Sprintf("%s:%d", host, port)
topic := "metadata-debug-topic"
gatewayServer.GetHandler().AddTopicForTesting(topic, 1)
// Create raw TCP connection to manually send Metadata v1 request
conn, err := net.Dial("tcp", brokerAddr)
// Create a simple kafka-go client that just gets metadata
conn, err := kafka.Dial("tcp", addr)
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
// Send ApiVersions request first
t.Log("=== Sending ApiVersions request ===")
apiVersionsRequest := []byte{
// Request header: api_key(2) + api_version(2) + correlation_id(4) + client_id_len(2) + client_id
0x00, 0x12, // api_key = 18 (ApiVersions)
0x00, 0x00, // api_version = 0
0x00, 0x00, 0x00, 0x01, // correlation_id = 1
0x00, 0x09, // client_id length = 9
'd', 'e', 'b', 'u', 'g', '-', 't', 'e', 's', 't', // client_id = "debug-test"
}
// Prepend message length
messageLen := make([]byte, 4)
binary.BigEndian.PutUint32(messageLen, uint32(len(apiVersionsRequest)))
fullRequest := append(messageLen, apiVersionsRequest...)
_, err = conn.Write(fullRequest)
if err != nil {
t.Fatalf("Failed to send ApiVersions: %v", err)
}
// Read ApiVersions response
responseLen := make([]byte, 4)
_, err = conn.Read(responseLen)
if err != nil {
t.Fatalf("Failed to read ApiVersions response length: %v", err)
}
respLen := binary.BigEndian.Uint32(responseLen)
response := make([]byte, respLen)
_, err = conn.Read(response)
if err != nil {
t.Fatalf("Failed to read ApiVersions response: %v", err)
}
t.Logf("ApiVersions response (%d bytes): %x", len(response), response)
// Now send Metadata v1 request
t.Log("=== Sending Metadata v1 request ===")
metadataRequest := []byte{
// Request header: api_key(2) + api_version(2) + correlation_id(4) + client_id_len(2) + client_id
0x00, 0x03, // api_key = 3 (Metadata)
0x00, 0x01, // api_version = 1
0x00, 0x00, 0x00, 0x02, // correlation_id = 2
0x00, 0x09, // client_id length = 9
'd', 'e', 'b', 'u', 'g', '-', 't', 'e', 's', 't', // client_id = "debug-test"
// Metadata request body: topics_count(4) + topic_name_len(2) + topic_name
0x00, 0x00, 0x00, 0x01, // topics_count = 1
0x00, 0x0B, // topic_name length = 11
'd', 'e', 'b', 'u', 'g', '-', 't', 'o', 'p', 'i', 'c', // topic_name = "debug-topic"
}
// Prepend message length
messageLen = make([]byte, 4)
binary.BigEndian.PutUint32(messageLen, uint32(len(metadataRequest)))
fullRequest = append(messageLen, metadataRequest...)
t.Logf("Sending Metadata v1 request (%d bytes): %x", len(fullRequest), fullRequest)
_, err = conn.Write(fullRequest)
// Get metadata - this should work without loops
partitions, err := conn.ReadPartitions(topic)
if err != nil {
t.Fatalf("Failed to send Metadata: %v", err)
t.Fatalf("Failed to read partitions: %v", err)
}
// Read Metadata response
responseLen = make([]byte, 4)
_, err = conn.Read(responseLen)
if err != nil {
t.Fatalf("Failed to read Metadata response length: %v", err)
}
respLen = binary.BigEndian.Uint32(responseLen)
response = make([]byte, respLen)
_, err = conn.Read(response)
if err != nil {
t.Fatalf("Failed to read Metadata response: %v", err)
}
t.Logf("Metadata v1 response (%d bytes): %x", len(response), response)
// Parse the response manually to understand the format
t.Log("=== Parsing Metadata v1 response ===")
offset := 0
// Correlation ID (4 bytes)
correlationID := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4
t.Logf("Correlation ID: %d", correlationID)
// Brokers array length (4 bytes)
brokersCount := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4
t.Logf("Brokers count: %d", brokersCount)
// Parse each broker
for i := uint32(0); i < brokersCount; i++ {
// node_id (4 bytes)
nodeID := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4
t.Logf("Broker %d node_id: %d", i, nodeID)
// host (STRING: 2 bytes length + bytes)
hostLen := binary.BigEndian.Uint16(response[offset:offset+2])
offset += 2
hostBytes := response[offset:offset+int(hostLen)]
offset += int(hostLen)
t.Logf("Broker %d host: %s", i, string(hostBytes))
// port (4 bytes)
portNum := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4
t.Logf("Broker %d port: %d", i, portNum)
// rack (STRING: 2 bytes length + bytes) - v1 addition
rackLen := binary.BigEndian.Uint16(response[offset:offset+2])
offset += 2
if rackLen > 0 {
rackBytes := response[offset:offset+int(rackLen)]
offset += int(rackLen)
t.Logf("Broker %d rack: %s", i, string(rackBytes))
} else {
t.Logf("Broker %d rack: (empty)", i)
}
}
// Topics array length (4 bytes)
if offset < len(response) {
topicsCount := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4
t.Logf("Topics count: %d", topicsCount)
// Parse each topic
for i := uint32(0); i < topicsCount && offset < len(response); i++ {
// error_code (2 bytes)
errorCode := binary.BigEndian.Uint16(response[offset:offset+2])
offset += 2
t.Logf("Topic %d error_code: %d", i, errorCode)
// name (STRING: 2 bytes length + bytes)
nameLen := binary.BigEndian.Uint16(response[offset:offset+2])
offset += 2
nameBytes := response[offset:offset+int(nameLen)]
offset += int(nameLen)
t.Logf("Topic %d name: %s", i, string(nameBytes))
// is_internal (1 byte) - v1 addition
if offset < len(response) {
isInternal := response[offset]
offset += 1
t.Logf("Topic %d is_internal: %d", i, isInternal)
}
// partitions array length (4 bytes)
if offset+4 <= len(response) {
partitionsCount := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4
t.Logf("Topic %d partitions count: %d", i, partitionsCount)
// Skip partition details for brevity
for j := uint32(0); j < partitionsCount && offset < len(response); j++ {
// error_code (2) + partition_id (4) + leader (4) + replicas (4+n*4) + isr (4+n*4)
if offset+2 <= len(response) {
partErrorCode := binary.BigEndian.Uint16(response[offset:offset+2])
offset += 2
t.Logf(" Partition %d error_code: %d", j, partErrorCode)
}
if offset+4 <= len(response) {
partitionID := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4
t.Logf(" Partition %d id: %d", j, partitionID)
}
if offset+4 <= len(response) {
leader := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4
t.Logf(" Partition %d leader: %d", j, leader)
}
// Skip replicas and isr arrays for brevity - just advance offset
if offset+4 <= len(response) {
replicasCount := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4 + int(replicasCount)*4
t.Logf(" Partition %d replicas count: %d", j, replicasCount)
}
if offset+4 <= len(response) {
isrCount := binary.BigEndian.Uint32(response[offset:offset+4])
offset += 4 + int(isrCount)*4
t.Logf(" Partition %d isr count: %d", j, isrCount)
}
}
}
}
t.Logf("Successfully read %d partitions for topic %s", len(partitions), topic)
for _, p := range partitions {
t.Logf("Partition %d: Leader=%d, Replicas=%v, ISR=%v",
p.ID, p.Leader.ID, p.Replicas, p.Isr)
}
t.Logf("Parsed %d bytes, remaining: %d", offset, len(response)-offset)
}
}

46
weed/mq/kafka/protocol/handler.go

@ -329,10 +329,11 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 3) // max version 3
// API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2)
// kafka-go negotiates v1,v6 - try v6 since our v6 works with Sarama
// TEMPORARY FIX: Limit to v4 since v6 has format issues with kafka-go
// Sarama works with v4, kafka-go should also work with v4
response = append(response, 0, 3) // API key 3
response = append(response, 0, 0) // min version 0
response = append(response, 0, 6) // max version 6
response = append(response, 0, 4) // max version 4 (was 6)
// API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2)
response = append(response, 0, 2) // API key 2
@ -1198,11 +1199,11 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
fmt.Printf("DEBUG: *** CREATETOPICS REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion)
if len(requestBody) < 2 {
return nil, fmt.Errorf("CreateTopics request too short")
}
// Parse based on API version
switch apiVersion {
case 0, 1:
@ -1217,25 +1218,25 @@ func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, re
func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
// CreateTopics v2+ format:
// topics_array + timeout_ms(4) + validate_only(1) + [tagged_fields]
offset := 0
// Parse topics array (compact array format in v2+)
if len(requestBody) < offset+1 {
return nil, fmt.Errorf("CreateTopics v2+ request missing topics array")
}
// Read topics count (compact array: length + 1)
topicsCountRaw := requestBody[offset]
offset += 1
var topicsCount uint32
if topicsCountRaw == 0 {
topicsCount = 0
} else {
topicsCount = uint32(topicsCountRaw) - 1
}
fmt.Printf("DEBUG: CreateTopics v%d - Topics count: %d, remaining bytes: %d\n", apiVersion, topicsCount, len(requestBody)-offset)
// DEBUG: Hex dump to understand request format
@ -1272,10 +1273,10 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint
if len(requestBody) < offset+1 {
break
}
topicNameLengthRaw := requestBody[offset]
offset += 1
var topicNameLength int
if topicNameLengthRaw == 0 {
topicNameLength = 0
@ -1297,7 +1298,7 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint
numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
// Parse replication_factor (2 bytes)
// Parse replication_factor (2 bytes)
if len(requestBody) < offset+2 {
break
}
@ -1308,14 +1309,14 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint
if len(requestBody) >= offset+1 {
configsCountRaw := requestBody[offset]
offset += 1
var configsCount uint32
if configsCountRaw == 0 {
configsCount = 0
} else {
configsCount = uint32(configsCountRaw) - 1
}
// Skip configs for now (simplified)
for j := uint32(0); j < configsCount && offset < len(requestBody); j++ {
// Skip config name (compact string)
@ -1418,24 +1419,24 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint
response = append(response, byte(len(errorMessage)+1)) // Compact string format
response = append(response, []byte(errorMessage)...)
}
// Tagged fields (empty)
response = append(response, 0)
}
// Parse timeout_ms and validate_only at the end (after all topics)
if len(requestBody) >= offset+4 {
timeoutMs := binary.BigEndian.Uint32(requestBody[offset : offset+4])
offset += 4
fmt.Printf("DEBUG: CreateTopics timeout_ms: %d\n", timeoutMs)
}
if len(requestBody) >= offset+1 {
validateOnly := requestBody[offset] != 0
offset += 1
fmt.Printf("DEBUG: CreateTopics validate_only: %v\n", validateOnly)
}
// Tagged fields at the end
response = append(response, 0)
@ -1447,18 +1448,18 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt
// TODO: Implement v0/v1 parsing if needed
// For now, return unsupported version error
response := make([]byte, 0, 32)
// Correlation ID
correlationIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...)
// Throttle time
response = append(response, 0, 0, 0, 0)
// Empty topics array
response = append(response, 0, 0, 0, 0)
return response, nil
}
@ -1755,4 +1756,3 @@ func (h *Handler) IsSchemaEnabled() bool {
func (h *Handler) IsBrokerIntegrationEnabled() bool {
return h.IsSchemaEnabled() && h.brokerClient != nil
}
Loading…
Cancel
Save