From a0426ff2ac41f68a7284e06ae7cfb380d3498ada Mon Sep 17 00:00:00 2001 From: chrislu Date: Wed, 10 Sep 2025 14:07:42 -0700 Subject: [PATCH] mq(kafka): Fix CreateTopics v2 request parsing - Phase 4 progress - Fixed CreateTopics v2 request parsing (was reading wrong offset) - kafka-go uses CreateTopics v2, not v0 as we implemented - Removed incorrect timeout field parsing for v2 format - Topics count now parses correctly (was 1274981, now 1) - Response size increased from 12 to 37 bytes (processing topics correctly) - Added detailed debug logging for protocol analysis - Added hex dump capability to analyze request structure - Still working on v2 response format compatibility This fixes the critical parsing bug where we were reading topics count from inside the client ID string due to wrong v2 format assumptions. Next: Fix v2 response format for full CreateTopics compatibility. --- test/kafka/client_integration_test.go | 369 ++++++++++++++++++++++++++ test/kafka/debug_connection_test.go | 241 +++++++++++++++++ weed/mq/kafka/protocol/handler.go | 36 ++- 3 files changed, 640 insertions(+), 6 deletions(-) create mode 100644 test/kafka/client_integration_test.go create mode 100644 test/kafka/debug_connection_test.go diff --git a/test/kafka/client_integration_test.go b/test/kafka/client_integration_test.go new file mode 100644 index 000000000..9a23cb324 --- /dev/null +++ b/test/kafka/client_integration_test.go @@ -0,0 +1,369 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "strings" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestKafkaGoClient_BasicProduceConsume tests our gateway with real kafka-go client +func TestKafkaGoClient_BasicProduceConsume(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", // Use random port + UseSeaweedMQ: false, // Use in-memory mode for testing + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + // Get the actual address + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + // Create topic first + topicName := "test-kafka-go-topic" + if err := createTopicWithKafkaGo(brokerAddr, topicName); err != nil { + t.Fatalf("Failed to create topic: %v", err) + } + + // Test basic produce + messages := []kafka.Message{ + { + Key: []byte("key1"), + Value: []byte("Hello, Kafka Gateway!"), + }, + { + Key: []byte("key2"), + Value: []byte("This is message 2"), + }, + { + Key: []byte("key3"), + Value: []byte("Final test message"), + }, + } + + if err := produceMessages(brokerAddr, topicName, messages); err != nil { + t.Fatalf("Failed to produce messages: %v", err) + } + + // Test basic consume + consumedMessages, err := consumeMessages(brokerAddr, topicName, len(messages)) + if err != nil { + t.Fatalf("Failed to consume messages: %v", err) + } + + // Validate consumed messages + if len(consumedMessages) != len(messages) { + t.Errorf("Expected %d messages, got %d", len(messages), len(consumedMessages)) + } + + for i, msg := range consumedMessages { + if i < len(messages) { + expectedValue := string(messages[i].Value) + actualValue := string(msg.Value) + if actualValue != expectedValue { + t.Errorf("Message %d: expected value %q, got %q", i, expectedValue, actualValue) + } + } + } + + t.Logf("Successfully produced and consumed %d messages", len(consumedMessages)) +} + +// TestKafkaGoClient_ConsumerGroups tests consumer group functionality +func TestKafkaGoClient_ConsumerGroups(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + topicName := "test-consumer-group-topic" + groupID := "test-consumer-group" + + // Create topic + if err := createTopicWithKafkaGo(brokerAddr, topicName); err != nil { + t.Fatalf("Failed to create topic: %v", err) + } + + // Produce some messages + messages := []kafka.Message{ + {Value: []byte("group-message-1")}, + {Value: []byte("group-message-2")}, + {Value: []byte("group-message-3")}, + } + + if err := produceMessages(brokerAddr, topicName, messages); err != nil { + t.Fatalf("Failed to produce messages: %v", err) + } + + // Test consumer group + consumedMessages, err := consumeWithGroup(brokerAddr, topicName, groupID, len(messages)) + if err != nil { + t.Fatalf("Failed to consume with group: %v", err) + } + + if len(consumedMessages) != len(messages) { + t.Errorf("Expected %d messages, got %d", len(messages), len(consumedMessages)) + } + + t.Logf("Consumer group successfully consumed %d messages", len(consumedMessages)) +} + +// TestKafkaGoClient_MultiplePartitions tests behavior with multiple partitions +func TestKafkaGoClient_MultiplePartitions(t *testing.T) { + t.Skip("TODO: Enable once partition support is improved") + + // This test will be enabled once we fix partition handling + // For now, our implementation assumes single partition per topic +} + +// TestKafkaGoClient_OffsetManagement tests offset commit/fetch operations +func TestKafkaGoClient_OffsetManagement(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + topicName := "test-offset-topic" + groupID := "test-offset-group" + + // Create topic + if err := createTopicWithKafkaGo(brokerAddr, topicName); err != nil { + t.Fatalf("Failed to create topic: %v", err) + } + + // Produce messages + messages := []kafka.Message{ + {Value: []byte("offset-message-1")}, + {Value: []byte("offset-message-2")}, + {Value: []byte("offset-message-3")}, + {Value: []byte("offset-message-4")}, + {Value: []byte("offset-message-5")}, + } + + if err := produceMessages(brokerAddr, topicName, messages); err != nil { + t.Fatalf("Failed to produce messages: %v", err) + } + + // Consume only first 3 messages and commit offset + partialMessages, err := consumeWithGroupAndCommit(brokerAddr, topicName, groupID, 3) + if err != nil { + t.Fatalf("Failed to consume with offset commit: %v", err) + } + + if len(partialMessages) != 3 { + t.Errorf("Expected 3 messages, got %d", len(partialMessages)) + } + + // Create new consumer with same group ID - should resume from committed offset + remainingMessages, err := consumeWithGroup(brokerAddr, topicName, groupID, 2) + if err != nil { + t.Fatalf("Failed to consume remaining messages: %v", err) + } + + if len(remainingMessages) != 2 { + t.Errorf("Expected 2 remaining messages, got %d", len(remainingMessages)) + } + + t.Logf("Offset management test passed: consumed %d + %d messages", + len(partialMessages), len(remainingMessages)) +} + +// Helper functions + +func createTopicWithKafkaGo(brokerAddr, topicName string) error { + // Create connection with timeout + dialer := &kafka.Dialer{ + Timeout: 5 * time.Second, + DualStack: true, + } + + conn, err := dialer.Dial("tcp", brokerAddr) + if err != nil { + return fmt.Errorf("dial broker: %w", err) + } + defer conn.Close() + + // Set read/write deadlines for debugging + conn.SetDeadline(time.Now().Add(10 * time.Second)) + + fmt.Printf("DEBUG: Connected to broker at %s\n", brokerAddr) + + topicConfigs := []kafka.TopicConfig{ + { + Topic: topicName, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + fmt.Printf("DEBUG: Creating topic %s with 1 partition\n", topicName) + err = conn.CreateTopics(topicConfigs...) + if err != nil { + return fmt.Errorf("create topic: %w", err) + } + + fmt.Printf("DEBUG: Topic %s created successfully\n", topicName) + return nil +} + +func produceMessages(brokerAddr, topicName string, messages []kafka.Message) error { + writer := &kafka.Writer{ + Addr: kafka.TCP(brokerAddr), + Topic: topicName, + Balancer: &kafka.LeastBytes{}, + // Enable detailed logging for debugging protocol issues + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "error") || strings.Contains(msg, "failed") { + fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...) + }), + } + defer writer.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + return writer.WriteMessages(ctx, messages...) +} + +func consumeMessages(brokerAddr, topicName string, expectedCount int) ([]kafka.Message, error) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topicName, + // Start from the beginning + StartOffset: kafka.FirstOffset, + // Enable detailed logging for debugging protocol issues + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "error") || strings.Contains(msg, "failed") { + fmt.Printf("CONSUMER ERROR: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("CONSUMER ERROR: "+msg+"\n", args...) + }), + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + msg, err := reader.ReadMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + } + + return messages, nil +} + +func consumeWithGroup(brokerAddr, topicName, groupID string, expectedCount int) ([]kafka.Message, error) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topicName, + GroupID: groupID, + // Enable detailed logging for debugging protocol issues + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "error") || strings.Contains(msg, "failed") { + fmt.Printf("GROUP CONSUMER ERROR: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("GROUP CONSUMER ERROR: "+msg+"\n", args...) + }), + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + msg, err := reader.ReadMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + } + + return messages, nil +} + +func consumeWithGroupAndCommit(brokerAddr, topicName, groupID string, expectedCount int) ([]kafka.Message, error) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topicName, + GroupID: groupID, + // Enable detailed logging for debugging protocol issues + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "error") || strings.Contains(msg, "failed") { + fmt.Printf("GROUP CONSUMER WITH COMMIT ERROR: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("GROUP CONSUMER WITH COMMIT ERROR: "+msg+"\n", args...) + }), + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + msg, err := reader.ReadMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + + // Commit the message + if err := reader.CommitMessages(ctx, msg); err != nil { + return messages, fmt.Errorf("commit message %d: %w", i, err) + } + } + + return messages, nil +} + +// waitForPort waits for a TCP port to become available +func waitForPort(addr string) error { + for i := 0; i < 50; i++ { // Wait up to 5 seconds + conn, err := net.Dial("tcp", addr) + if err == nil { + conn.Close() + return nil + } + time.Sleep(100 * time.Millisecond) + } + return fmt.Errorf("port %s not available after 5 seconds", addr) +} diff --git a/test/kafka/debug_connection_test.go b/test/kafka/debug_connection_test.go new file mode 100644 index 000000000..9e99b1632 --- /dev/null +++ b/test/kafka/debug_connection_test.go @@ -0,0 +1,241 @@ +package kafka + +import ( + "encoding/binary" + "net" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestGateway_BasicConnection tests if the gateway can handle basic TCP connections +func TestGateway_BasicConnection(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + // Test basic TCP connection + conn, err := net.Dial("tcp", brokerAddr) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + t.Logf("Successfully connected to gateway") +} + +// TestGateway_ApiVersionsRequest tests if we can send an ApiVersions request +func TestGateway_ApiVersionsRequest(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + // Create connection + conn, err := net.Dial("tcp", brokerAddr) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Set read timeout + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + + // Build ApiVersions request (API key 18, version 0) + // Request format: message_size(4) + api_key(2) + api_version(2) + correlation_id(4) + client_id(2+string) + correlationID := uint32(1) + clientID := "debug-client" + + request := make([]byte, 0, 64) + + // Build message body first (without size) + msgBody := make([]byte, 0, 32) + msgBody = append(msgBody, 0, 18) // API key 18 (ApiVersions) + msgBody = append(msgBody, 0, 0) // API version 0 + + // Correlation ID + correlationBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationBytes, correlationID) + msgBody = append(msgBody, correlationBytes...) + + // Client ID string + clientIDBytes := []byte(clientID) + msgBody = append(msgBody, byte(len(clientIDBytes)>>8), byte(len(clientIDBytes))) + msgBody = append(msgBody, clientIDBytes...) + + // Message size (4 bytes) + message body + sizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) + request = append(request, sizeBytes...) + request = append(request, msgBody...) + + t.Logf("Sending ApiVersions request: %d bytes", len(request)) + + // Send request + _, err = conn.Write(request) + if err != nil { + t.Fatalf("Failed to write request: %v", err) + } + + // Read response size + var responseSizeBytes [4]byte + _, err = conn.Read(responseSizeBytes[:]) + if err != nil { + t.Fatalf("Failed to read response size: %v", err) + } + + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) + t.Logf("Response size: %d bytes", responseSize) + + if responseSize == 0 || responseSize > 1024*1024 { + t.Fatalf("Invalid response size: %d", responseSize) + } + + // Read response body + responseBody := make([]byte, responseSize) + totalRead := 0 + for totalRead < int(responseSize) { + n, err := conn.Read(responseBody[totalRead:]) + if err != nil { + t.Fatalf("Failed to read response body: %v (read %d/%d bytes)", err, totalRead, responseSize) + } + totalRead += n + } + + t.Logf("Received response: %d bytes", len(responseBody)) + + // Parse basic response structure + if len(responseBody) < 4 { + t.Fatalf("Response too short: %d bytes", len(responseBody)) + } + + responseCorrelationID := binary.BigEndian.Uint32(responseBody[0:4]) + if responseCorrelationID != correlationID { + t.Errorf("Correlation ID mismatch: sent %d, got %d", correlationID, responseCorrelationID) + } + + t.Logf("ApiVersions request completed successfully") +} + +// TestGateway_CreateTopicsRequest tests if we can send a CreateTopics request +func TestGateway_CreateTopicsRequest(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + // Create connection + conn, err := net.Dial("tcp", brokerAddr) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Set read timeout + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + + // Build CreateTopics request (API key 19, version 0) + correlationID := uint32(2) + clientID := "debug-client" + topicName := "debug-topic" + + // Build message body + msgBody := make([]byte, 0, 128) + msgBody = append(msgBody, 0, 19) // API key 19 (CreateTopics) + msgBody = append(msgBody, 0, 0) // API version 0 + + // Correlation ID + correlationBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationBytes, correlationID) + msgBody = append(msgBody, correlationBytes...) + + // Client ID string + clientIDBytes := []byte(clientID) + msgBody = append(msgBody, byte(len(clientIDBytes)>>8), byte(len(clientIDBytes))) + msgBody = append(msgBody, clientIDBytes...) + + // Topics array - count (4 bytes) + msgBody = append(msgBody, 0, 0, 0, 1) // 1 topic + + // Topic name + topicNameBytes := []byte(topicName) + msgBody = append(msgBody, byte(len(topicNameBytes)>>8), byte(len(topicNameBytes))) + msgBody = append(msgBody, topicNameBytes...) + + // Num partitions (4 bytes) + msgBody = append(msgBody, 0, 0, 0, 1) // 1 partition + + // Replication factor (2 bytes) + msgBody = append(msgBody, 0, 1) // replication factor 1 + + // Configs count (4 bytes) + msgBody = append(msgBody, 0, 0, 0, 0) // 0 configs + + // Timeout (4 bytes) + msgBody = append(msgBody, 0, 0, 0x75, 0x30) // 30 seconds + + // Message size + message body + sizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) + request := append(sizeBytes, msgBody...) + + t.Logf("Sending CreateTopics request: %d bytes", len(request)) + + // Send request + _, err = conn.Write(request) + if err != nil { + t.Fatalf("Failed to write request: %v", err) + } + + // Read response size + var responseSizeBytes [4]byte + _, err = conn.Read(responseSizeBytes[:]) + if err != nil { + t.Fatalf("Failed to read response size: %v", err) + } + + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) + t.Logf("Response size: %d bytes", responseSize) + + // Read response body + responseBody := make([]byte, responseSize) + totalRead := 0 + for totalRead < int(responseSize) { + n, err := conn.Read(responseBody[totalRead:]) + if err != nil { + t.Fatalf("Failed to read response body: %v (read %d/%d bytes)", err, totalRead, responseSize) + } + totalRead += n + } + + t.Logf("CreateTopics request completed successfully, received %d bytes", len(responseBody)) +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 656ad71bd..9c0d74cdc 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -163,6 +163,10 @@ func (h *Handler) HandleConn(conn net.Conn) error { apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) + // DEBUG: Log all incoming requests for debugging client compatibility + fmt.Printf("DEBUG: Received request - API Key: %d, Version: %d, Correlation: %d, Size: %d\n", + apiKey, apiVersion, correlationID, size) + // TODO: IMPORTANT - API version validation is missing // Different API versions have different request/response formats // Need to validate apiVersion against supported versions for each API @@ -207,6 +211,9 @@ func (h *Handler) HandleConn(conn net.Conn) error { return fmt.Errorf("handle request: %w", err) } + // DEBUG: Log response details + fmt.Printf("DEBUG: Sending response for API %d - Size: %d bytes\n", apiKey, len(response)) + // Write response size and data responseSizeBytes := make([]byte, 4) binary.BigEndian.PutUint32(responseSizeBytes, uint32(len(response))) @@ -488,6 +495,11 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ } func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ([]byte, error) { + // TODO: CRITICAL - This function only supports CreateTopics v0 format + // kafka-go uses v2 which has a different request structure! + // The wrong topics count (1274981) shows we're parsing from wrong offset + // Need to implement proper v2 request parsing or negotiate API version + // Parse minimal CreateTopics request // Request format: client_id + timeout(4) + topics_array @@ -498,17 +510,29 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) offset := 2 + int(clientIDSize) - - if len(requestBody) < offset+8 { // timeout(4) + topics_count(4) - return nil, fmt.Errorf("CreateTopics request missing data") + + fmt.Printf("DEBUG: Client ID size: %d, client ID: %s\n", clientIDSize, string(requestBody[2:2+clientIDSize])) + + // CreateTopics v2 has different format than v0 + // v2 format: client_id + topics_array + timeout(4) + validate_only(1) + // (no separate timeout field before topics like in v0) + + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics request missing topics array") } - // Skip timeout - offset += 4 - + // Read topics count directly (no timeout field before it in v2) topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 + // DEBUG: Hex dump first 50 bytes to understand v2 format + dumpLen := len(requestBody) + if dumpLen > 50 { + dumpLen = 50 + } + fmt.Printf("DEBUG: CreateTopics v2 request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) + fmt.Printf("DEBUG: CreateTopics v2 - Topics count: %d, remaining bytes: %d\n", topicsCount, len(requestBody)-offset) + response := make([]byte, 0, 256) // Correlation ID