From d6f688a44fc4cabc5e3c0c7187f1759218740b40 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 09:52:11 -0700 Subject: [PATCH] Limit Metadata API to v4 to fix kafka-go client compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PARTIAL FIX: Force kafka-go to use Metadata v4 instead of v6 ## Issue Identified: - kafka-go was using Metadata v6 due to ApiVersions advertising v0-v6 - Our Metadata v6 implementation has format issues causing client failures - Sarama works because it uses Metadata v4, not v6 ## Changes: - Limited Metadata API max version from 6 to 4 in ApiVersions response - Added debug test to isolate Metadata parsing issues - kafka-go now uses Metadata v4 (same as working Sarama) ## Status: - ✅ kafka-go now uses v4 instead of v6 - ❌ Still has metadata loops (deeper issue with response format) - ✅ Produce operations work correctly - ❌ ReadPartitions API still fails ## Next Steps: - Investigate why kafka-go keeps requesting metadata even with v4 - Compare exact byte format between working Sarama and failing kafka-go - May need to fix specific fields in Metadata v4 response format This is progress toward full kafka-go compatibility but more investigation needed. --- test/kafka/metadata_debug_test.go | 232 +++--------------------------- weed/mq/kafka/protocol/handler.go | 46 +++--- 2 files changed, 41 insertions(+), 237 deletions(-) diff --git a/test/kafka/metadata_debug_test.go b/test/kafka/metadata_debug_test.go index 3af9bb512..490607d5b 100644 --- a/test/kafka/metadata_debug_test.go +++ b/test/kafka/metadata_debug_test.go @@ -1,243 +1,47 @@ package kafka import ( - "encoding/binary" "fmt" - "net" "testing" "time" + "github.com/segmentio/kafka-go" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" ) -// TestMetadataV1DebugCapture captures the exact bytes kafka-go sends and expects -func TestMetadataV1DebugCapture(t *testing.T) { - // Start gateway server - gatewayServer := gateway.NewServer(gateway.Options{ - Listen: ":0", // random port - }) - +func TestMetadataV6Debug(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{Listen: "127.0.0.1:0"}) go func() { if err := gatewayServer.Start(); err != nil { - t.Errorf("Gateway server error: %v", err) + t.Errorf("Failed to start gateway: %v", err) } }() defer gatewayServer.Close() - // Wait for server to start time.Sleep(100 * time.Millisecond) - // Get the actual listening address host, port := gatewayServer.GetListenerAddr() - brokerAddr := fmt.Sprintf("%s:%d", host, port) - t.Logf("Gateway running on %s", brokerAddr) - - // Get handler and configure it - handler := gatewayServer.GetHandler() - handler.SetBrokerAddress(host, port) - - // Add test topic - topicName := "debug-topic" - handler.AddTopicForTesting(topicName, 1) + addr := fmt.Sprintf("%s:%d", host, port) + topic := "metadata-debug-topic" + gatewayServer.GetHandler().AddTopicForTesting(topic, 1) - // Create raw TCP connection to manually send Metadata v1 request - conn, err := net.Dial("tcp", brokerAddr) + // Create a simple kafka-go client that just gets metadata + conn, err := kafka.Dial("tcp", addr) if err != nil { t.Fatalf("Failed to connect: %v", err) } defer conn.Close() - // Send ApiVersions request first - t.Log("=== Sending ApiVersions request ===") - apiVersionsRequest := []byte{ - // Request header: api_key(2) + api_version(2) + correlation_id(4) + client_id_len(2) + client_id - 0x00, 0x12, // api_key = 18 (ApiVersions) - 0x00, 0x00, // api_version = 0 - 0x00, 0x00, 0x00, 0x01, // correlation_id = 1 - 0x00, 0x09, // client_id length = 9 - 'd', 'e', 'b', 'u', 'g', '-', 't', 'e', 's', 't', // client_id = "debug-test" - } - - // Prepend message length - messageLen := make([]byte, 4) - binary.BigEndian.PutUint32(messageLen, uint32(len(apiVersionsRequest))) - fullRequest := append(messageLen, apiVersionsRequest...) - - _, err = conn.Write(fullRequest) - if err != nil { - t.Fatalf("Failed to send ApiVersions: %v", err) - } - - // Read ApiVersions response - responseLen := make([]byte, 4) - _, err = conn.Read(responseLen) - if err != nil { - t.Fatalf("Failed to read ApiVersions response length: %v", err) - } - - respLen := binary.BigEndian.Uint32(responseLen) - response := make([]byte, respLen) - _, err = conn.Read(response) - if err != nil { - t.Fatalf("Failed to read ApiVersions response: %v", err) - } - - t.Logf("ApiVersions response (%d bytes): %x", len(response), response) - - // Now send Metadata v1 request - t.Log("=== Sending Metadata v1 request ===") - metadataRequest := []byte{ - // Request header: api_key(2) + api_version(2) + correlation_id(4) + client_id_len(2) + client_id - 0x00, 0x03, // api_key = 3 (Metadata) - 0x00, 0x01, // api_version = 1 - 0x00, 0x00, 0x00, 0x02, // correlation_id = 2 - 0x00, 0x09, // client_id length = 9 - 'd', 'e', 'b', 'u', 'g', '-', 't', 'e', 's', 't', // client_id = "debug-test" - // Metadata request body: topics_count(4) + topic_name_len(2) + topic_name - 0x00, 0x00, 0x00, 0x01, // topics_count = 1 - 0x00, 0x0B, // topic_name length = 11 - 'd', 'e', 'b', 'u', 'g', '-', 't', 'o', 'p', 'i', 'c', // topic_name = "debug-topic" - } - - // Prepend message length - messageLen = make([]byte, 4) - binary.BigEndian.PutUint32(messageLen, uint32(len(metadataRequest))) - fullRequest = append(messageLen, metadataRequest...) - - t.Logf("Sending Metadata v1 request (%d bytes): %x", len(fullRequest), fullRequest) - - _, err = conn.Write(fullRequest) + // Get metadata - this should work without loops + partitions, err := conn.ReadPartitions(topic) if err != nil { - t.Fatalf("Failed to send Metadata: %v", err) + t.Fatalf("Failed to read partitions: %v", err) } - // Read Metadata response - responseLen = make([]byte, 4) - _, err = conn.Read(responseLen) - if err != nil { - t.Fatalf("Failed to read Metadata response length: %v", err) - } - - respLen = binary.BigEndian.Uint32(responseLen) - response = make([]byte, respLen) - _, err = conn.Read(response) - if err != nil { - t.Fatalf("Failed to read Metadata response: %v", err) - } - - t.Logf("Metadata v1 response (%d bytes): %x", len(response), response) - - // Parse the response manually to understand the format - t.Log("=== Parsing Metadata v1 response ===") - offset := 0 - - // Correlation ID (4 bytes) - correlationID := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 - t.Logf("Correlation ID: %d", correlationID) - - // Brokers array length (4 bytes) - brokersCount := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 - t.Logf("Brokers count: %d", brokersCount) - - // Parse each broker - for i := uint32(0); i < brokersCount; i++ { - // node_id (4 bytes) - nodeID := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 - t.Logf("Broker %d node_id: %d", i, nodeID) - - // host (STRING: 2 bytes length + bytes) - hostLen := binary.BigEndian.Uint16(response[offset:offset+2]) - offset += 2 - hostBytes := response[offset:offset+int(hostLen)] - offset += int(hostLen) - t.Logf("Broker %d host: %s", i, string(hostBytes)) - - // port (4 bytes) - portNum := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 - t.Logf("Broker %d port: %d", i, portNum) - - // rack (STRING: 2 bytes length + bytes) - v1 addition - rackLen := binary.BigEndian.Uint16(response[offset:offset+2]) - offset += 2 - if rackLen > 0 { - rackBytes := response[offset:offset+int(rackLen)] - offset += int(rackLen) - t.Logf("Broker %d rack: %s", i, string(rackBytes)) - } else { - t.Logf("Broker %d rack: (empty)", i) - } - } - - // Topics array length (4 bytes) - if offset < len(response) { - topicsCount := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 - t.Logf("Topics count: %d", topicsCount) - - // Parse each topic - for i := uint32(0); i < topicsCount && offset < len(response); i++ { - // error_code (2 bytes) - errorCode := binary.BigEndian.Uint16(response[offset:offset+2]) - offset += 2 - t.Logf("Topic %d error_code: %d", i, errorCode) - - // name (STRING: 2 bytes length + bytes) - nameLen := binary.BigEndian.Uint16(response[offset:offset+2]) - offset += 2 - nameBytes := response[offset:offset+int(nameLen)] - offset += int(nameLen) - t.Logf("Topic %d name: %s", i, string(nameBytes)) - - // is_internal (1 byte) - v1 addition - if offset < len(response) { - isInternal := response[offset] - offset += 1 - t.Logf("Topic %d is_internal: %d", i, isInternal) - } - - // partitions array length (4 bytes) - if offset+4 <= len(response) { - partitionsCount := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 - t.Logf("Topic %d partitions count: %d", i, partitionsCount) - - // Skip partition details for brevity - for j := uint32(0); j < partitionsCount && offset < len(response); j++ { - // error_code (2) + partition_id (4) + leader (4) + replicas (4+n*4) + isr (4+n*4) - if offset+2 <= len(response) { - partErrorCode := binary.BigEndian.Uint16(response[offset:offset+2]) - offset += 2 - t.Logf(" Partition %d error_code: %d", j, partErrorCode) - } - if offset+4 <= len(response) { - partitionID := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 - t.Logf(" Partition %d id: %d", j, partitionID) - } - if offset+4 <= len(response) { - leader := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 - t.Logf(" Partition %d leader: %d", j, leader) - } - // Skip replicas and isr arrays for brevity - just advance offset - if offset+4 <= len(response) { - replicasCount := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 + int(replicasCount)*4 - t.Logf(" Partition %d replicas count: %d", j, replicasCount) - } - if offset+4 <= len(response) { - isrCount := binary.BigEndian.Uint32(response[offset:offset+4]) - offset += 4 + int(isrCount)*4 - t.Logf(" Partition %d isr count: %d", j, isrCount) - } - } - } - } + t.Logf("Successfully read %d partitions for topic %s", len(partitions), topic) + for _, p := range partitions { + t.Logf("Partition %d: Leader=%d, Replicas=%v, ISR=%v", + p.ID, p.Leader.ID, p.Replicas, p.Isr) } - - t.Logf("Parsed %d bytes, remaining: %d", offset, len(response)-offset) -} +} \ No newline at end of file diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index d0e41cd15..2907d2a0c 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -329,10 +329,11 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 3) // max version 3 // API Key 3 (Metadata): api_key(2) + min_version(2) + max_version(2) - // kafka-go negotiates v1,v6 - try v6 since our v6 works with Sarama + // TEMPORARY FIX: Limit to v4 since v6 has format issues with kafka-go + // Sarama works with v4, kafka-go should also work with v4 response = append(response, 0, 3) // API key 3 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 6) // max version 6 + response = append(response, 0, 4) // max version 4 (was 6) // API Key 2 (ListOffsets): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 2) // API key 2 @@ -1198,11 +1199,11 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { fmt.Printf("DEBUG: *** CREATETOPICS REQUEST RECEIVED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) - + if len(requestBody) < 2 { return nil, fmt.Errorf("CreateTopics request too short") } - + // Parse based on API version switch apiVersion { case 0, 1: @@ -1217,25 +1218,25 @@ func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, re func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { // CreateTopics v2+ format: // topics_array + timeout_ms(4) + validate_only(1) + [tagged_fields] - + offset := 0 - + // Parse topics array (compact array format in v2+) if len(requestBody) < offset+1 { return nil, fmt.Errorf("CreateTopics v2+ request missing topics array") } - + // Read topics count (compact array: length + 1) topicsCountRaw := requestBody[offset] offset += 1 - + var topicsCount uint32 if topicsCountRaw == 0 { topicsCount = 0 } else { topicsCount = uint32(topicsCountRaw) - 1 } - + fmt.Printf("DEBUG: CreateTopics v%d - Topics count: %d, remaining bytes: %d\n", apiVersion, topicsCount, len(requestBody)-offset) // DEBUG: Hex dump to understand request format @@ -1272,10 +1273,10 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint if len(requestBody) < offset+1 { break } - + topicNameLengthRaw := requestBody[offset] offset += 1 - + var topicNameLength int if topicNameLengthRaw == 0 { topicNameLength = 0 @@ -1297,7 +1298,7 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - // Parse replication_factor (2 bytes) + // Parse replication_factor (2 bytes) if len(requestBody) < offset+2 { break } @@ -1308,14 +1309,14 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint if len(requestBody) >= offset+1 { configsCountRaw := requestBody[offset] offset += 1 - + var configsCount uint32 if configsCountRaw == 0 { configsCount = 0 } else { configsCount = uint32(configsCountRaw) - 1 } - + // Skip configs for now (simplified) for j := uint32(0); j < configsCount && offset < len(requestBody); j++ { // Skip config name (compact string) @@ -1418,24 +1419,24 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint response = append(response, byte(len(errorMessage)+1)) // Compact string format response = append(response, []byte(errorMessage)...) } - + // Tagged fields (empty) response = append(response, 0) } - + // Parse timeout_ms and validate_only at the end (after all topics) if len(requestBody) >= offset+4 { timeoutMs := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 fmt.Printf("DEBUG: CreateTopics timeout_ms: %d\n", timeoutMs) } - + if len(requestBody) >= offset+1 { validateOnly := requestBody[offset] != 0 offset += 1 fmt.Printf("DEBUG: CreateTopics validate_only: %v\n", validateOnly) } - + // Tagged fields at the end response = append(response, 0) @@ -1447,18 +1448,18 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt // TODO: Implement v0/v1 parsing if needed // For now, return unsupported version error response := make([]byte, 0, 32) - + // Correlation ID correlationIDBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - + // Throttle time response = append(response, 0, 0, 0, 0) - + // Empty topics array response = append(response, 0, 0, 0, 0) - + return response, nil } @@ -1755,4 +1756,3 @@ func (h *Handler) IsSchemaEnabled() bool { func (h *Handler) IsBrokerIntegrationEnabled() bool { return h.IsSchemaEnabled() && h.brokerClient != nil } -