diff --git a/test/kafka/client_integration_test.go b/test/kafka/client_integration_test.go new file mode 100644 index 000000000..9a23cb324 --- /dev/null +++ b/test/kafka/client_integration_test.go @@ -0,0 +1,369 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "strings" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestKafkaGoClient_BasicProduceConsume tests our gateway with real kafka-go client +func TestKafkaGoClient_BasicProduceConsume(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", // Use random port + UseSeaweedMQ: false, // Use in-memory mode for testing + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + // Get the actual address + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + // Create topic first + topicName := "test-kafka-go-topic" + if err := createTopicWithKafkaGo(brokerAddr, topicName); err != nil { + t.Fatalf("Failed to create topic: %v", err) + } + + // Test basic produce + messages := []kafka.Message{ + { + Key: []byte("key1"), + Value: []byte("Hello, Kafka Gateway!"), + }, + { + Key: []byte("key2"), + Value: []byte("This is message 2"), + }, + { + Key: []byte("key3"), + Value: []byte("Final test message"), + }, + } + + if err := produceMessages(brokerAddr, topicName, messages); err != nil { + t.Fatalf("Failed to produce messages: %v", err) + } + + // Test basic consume + consumedMessages, err := consumeMessages(brokerAddr, topicName, len(messages)) + if err != nil { + t.Fatalf("Failed to consume messages: %v", err) + } + + // Validate consumed messages + if len(consumedMessages) != len(messages) { + t.Errorf("Expected %d messages, got %d", len(messages), len(consumedMessages)) + } + + for i, msg := range consumedMessages { + if i < len(messages) { + expectedValue := string(messages[i].Value) + actualValue := string(msg.Value) + if actualValue != expectedValue { + t.Errorf("Message %d: expected value %q, got %q", i, expectedValue, actualValue) + } + } + } + + t.Logf("Successfully produced and consumed %d messages", len(consumedMessages)) +} + +// TestKafkaGoClient_ConsumerGroups tests consumer group functionality +func TestKafkaGoClient_ConsumerGroups(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + topicName := "test-consumer-group-topic" + groupID := "test-consumer-group" + + // Create topic + if err := createTopicWithKafkaGo(brokerAddr, topicName); err != nil { + t.Fatalf("Failed to create topic: %v", err) + } + + // Produce some messages + messages := []kafka.Message{ + {Value: []byte("group-message-1")}, + {Value: []byte("group-message-2")}, + {Value: []byte("group-message-3")}, + } + + if err := produceMessages(brokerAddr, topicName, messages); err != nil { + t.Fatalf("Failed to produce messages: %v", err) + } + + // Test consumer group + consumedMessages, err := consumeWithGroup(brokerAddr, topicName, groupID, len(messages)) + if err != nil { + t.Fatalf("Failed to consume with group: %v", err) + } + + if len(consumedMessages) != len(messages) { + t.Errorf("Expected %d messages, got %d", len(messages), len(consumedMessages)) + } + + t.Logf("Consumer group successfully consumed %d messages", len(consumedMessages)) +} + +// TestKafkaGoClient_MultiplePartitions tests behavior with multiple partitions +func TestKafkaGoClient_MultiplePartitions(t *testing.T) { + t.Skip("TODO: Enable once partition support is improved") + + // This test will be enabled once we fix partition handling + // For now, our implementation assumes single partition per topic +} + +// TestKafkaGoClient_OffsetManagement tests offset commit/fetch operations +func TestKafkaGoClient_OffsetManagement(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + topicName := "test-offset-topic" + groupID := "test-offset-group" + + // Create topic + if err := createTopicWithKafkaGo(brokerAddr, topicName); err != nil { + t.Fatalf("Failed to create topic: %v", err) + } + + // Produce messages + messages := []kafka.Message{ + {Value: []byte("offset-message-1")}, + {Value: []byte("offset-message-2")}, + {Value: []byte("offset-message-3")}, + {Value: []byte("offset-message-4")}, + {Value: []byte("offset-message-5")}, + } + + if err := produceMessages(brokerAddr, topicName, messages); err != nil { + t.Fatalf("Failed to produce messages: %v", err) + } + + // Consume only first 3 messages and commit offset + partialMessages, err := consumeWithGroupAndCommit(brokerAddr, topicName, groupID, 3) + if err != nil { + t.Fatalf("Failed to consume with offset commit: %v", err) + } + + if len(partialMessages) != 3 { + t.Errorf("Expected 3 messages, got %d", len(partialMessages)) + } + + // Create new consumer with same group ID - should resume from committed offset + remainingMessages, err := consumeWithGroup(brokerAddr, topicName, groupID, 2) + if err != nil { + t.Fatalf("Failed to consume remaining messages: %v", err) + } + + if len(remainingMessages) != 2 { + t.Errorf("Expected 2 remaining messages, got %d", len(remainingMessages)) + } + + t.Logf("Offset management test passed: consumed %d + %d messages", + len(partialMessages), len(remainingMessages)) +} + +// Helper functions + +func createTopicWithKafkaGo(brokerAddr, topicName string) error { + // Create connection with timeout + dialer := &kafka.Dialer{ + Timeout: 5 * time.Second, + DualStack: true, + } + + conn, err := dialer.Dial("tcp", brokerAddr) + if err != nil { + return fmt.Errorf("dial broker: %w", err) + } + defer conn.Close() + + // Set read/write deadlines for debugging + conn.SetDeadline(time.Now().Add(10 * time.Second)) + + fmt.Printf("DEBUG: Connected to broker at %s\n", brokerAddr) + + topicConfigs := []kafka.TopicConfig{ + { + Topic: topicName, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + fmt.Printf("DEBUG: Creating topic %s with 1 partition\n", topicName) + err = conn.CreateTopics(topicConfigs...) + if err != nil { + return fmt.Errorf("create topic: %w", err) + } + + fmt.Printf("DEBUG: Topic %s created successfully\n", topicName) + return nil +} + +func produceMessages(brokerAddr, topicName string, messages []kafka.Message) error { + writer := &kafka.Writer{ + Addr: kafka.TCP(brokerAddr), + Topic: topicName, + Balancer: &kafka.LeastBytes{}, + // Enable detailed logging for debugging protocol issues + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "error") || strings.Contains(msg, "failed") { + fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...) + }), + } + defer writer.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + return writer.WriteMessages(ctx, messages...) +} + +func consumeMessages(brokerAddr, topicName string, expectedCount int) ([]kafka.Message, error) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topicName, + // Start from the beginning + StartOffset: kafka.FirstOffset, + // Enable detailed logging for debugging protocol issues + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "error") || strings.Contains(msg, "failed") { + fmt.Printf("CONSUMER ERROR: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("CONSUMER ERROR: "+msg+"\n", args...) + }), + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + msg, err := reader.ReadMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + } + + return messages, nil +} + +func consumeWithGroup(brokerAddr, topicName, groupID string, expectedCount int) ([]kafka.Message, error) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topicName, + GroupID: groupID, + // Enable detailed logging for debugging protocol issues + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "error") || strings.Contains(msg, "failed") { + fmt.Printf("GROUP CONSUMER ERROR: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("GROUP CONSUMER ERROR: "+msg+"\n", args...) + }), + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + msg, err := reader.ReadMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + } + + return messages, nil +} + +func consumeWithGroupAndCommit(brokerAddr, topicName, groupID string, expectedCount int) ([]kafka.Message, error) { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topicName, + GroupID: groupID, + // Enable detailed logging for debugging protocol issues + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "error") || strings.Contains(msg, "failed") { + fmt.Printf("GROUP CONSUMER WITH COMMIT ERROR: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("GROUP CONSUMER WITH COMMIT ERROR: "+msg+"\n", args...) + }), + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + msg, err := reader.ReadMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + + // Commit the message + if err := reader.CommitMessages(ctx, msg); err != nil { + return messages, fmt.Errorf("commit message %d: %w", i, err) + } + } + + return messages, nil +} + +// waitForPort waits for a TCP port to become available +func waitForPort(addr string) error { + for i := 0; i < 50; i++ { // Wait up to 5 seconds + conn, err := net.Dial("tcp", addr) + if err == nil { + conn.Close() + return nil + } + time.Sleep(100 * time.Millisecond) + } + return fmt.Errorf("port %s not available after 5 seconds", addr) +} diff --git a/test/kafka/debug_connection_test.go b/test/kafka/debug_connection_test.go new file mode 100644 index 000000000..9e99b1632 --- /dev/null +++ b/test/kafka/debug_connection_test.go @@ -0,0 +1,241 @@ +package kafka + +import ( + "encoding/binary" + "net" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestGateway_BasicConnection tests if the gateway can handle basic TCP connections +func TestGateway_BasicConnection(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + // Test basic TCP connection + conn, err := net.Dial("tcp", brokerAddr) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + t.Logf("Successfully connected to gateway") +} + +// TestGateway_ApiVersionsRequest tests if we can send an ApiVersions request +func TestGateway_ApiVersionsRequest(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + // Create connection + conn, err := net.Dial("tcp", brokerAddr) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Set read timeout + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + + // Build ApiVersions request (API key 18, version 0) + // Request format: message_size(4) + api_key(2) + api_version(2) + correlation_id(4) + client_id(2+string) + correlationID := uint32(1) + clientID := "debug-client" + + request := make([]byte, 0, 64) + + // Build message body first (without size) + msgBody := make([]byte, 0, 32) + msgBody = append(msgBody, 0, 18) // API key 18 (ApiVersions) + msgBody = append(msgBody, 0, 0) // API version 0 + + // Correlation ID + correlationBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationBytes, correlationID) + msgBody = append(msgBody, correlationBytes...) + + // Client ID string + clientIDBytes := []byte(clientID) + msgBody = append(msgBody, byte(len(clientIDBytes)>>8), byte(len(clientIDBytes))) + msgBody = append(msgBody, clientIDBytes...) + + // Message size (4 bytes) + message body + sizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) + request = append(request, sizeBytes...) + request = append(request, msgBody...) + + t.Logf("Sending ApiVersions request: %d bytes", len(request)) + + // Send request + _, err = conn.Write(request) + if err != nil { + t.Fatalf("Failed to write request: %v", err) + } + + // Read response size + var responseSizeBytes [4]byte + _, err = conn.Read(responseSizeBytes[:]) + if err != nil { + t.Fatalf("Failed to read response size: %v", err) + } + + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) + t.Logf("Response size: %d bytes", responseSize) + + if responseSize == 0 || responseSize > 1024*1024 { + t.Fatalf("Invalid response size: %d", responseSize) + } + + // Read response body + responseBody := make([]byte, responseSize) + totalRead := 0 + for totalRead < int(responseSize) { + n, err := conn.Read(responseBody[totalRead:]) + if err != nil { + t.Fatalf("Failed to read response body: %v (read %d/%d bytes)", err, totalRead, responseSize) + } + totalRead += n + } + + t.Logf("Received response: %d bytes", len(responseBody)) + + // Parse basic response structure + if len(responseBody) < 4 { + t.Fatalf("Response too short: %d bytes", len(responseBody)) + } + + responseCorrelationID := binary.BigEndian.Uint32(responseBody[0:4]) + if responseCorrelationID != correlationID { + t.Errorf("Correlation ID mismatch: sent %d, got %d", correlationID, responseCorrelationID) + } + + t.Logf("ApiVersions request completed successfully") +} + +// TestGateway_CreateTopicsRequest tests if we can send a CreateTopics request +func TestGateway_CreateTopicsRequest(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + // Create connection + conn, err := net.Dial("tcp", brokerAddr) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Set read timeout + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + + // Build CreateTopics request (API key 19, version 0) + correlationID := uint32(2) + clientID := "debug-client" + topicName := "debug-topic" + + // Build message body + msgBody := make([]byte, 0, 128) + msgBody = append(msgBody, 0, 19) // API key 19 (CreateTopics) + msgBody = append(msgBody, 0, 0) // API version 0 + + // Correlation ID + correlationBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationBytes, correlationID) + msgBody = append(msgBody, correlationBytes...) + + // Client ID string + clientIDBytes := []byte(clientID) + msgBody = append(msgBody, byte(len(clientIDBytes)>>8), byte(len(clientIDBytes))) + msgBody = append(msgBody, clientIDBytes...) + + // Topics array - count (4 bytes) + msgBody = append(msgBody, 0, 0, 0, 1) // 1 topic + + // Topic name + topicNameBytes := []byte(topicName) + msgBody = append(msgBody, byte(len(topicNameBytes)>>8), byte(len(topicNameBytes))) + msgBody = append(msgBody, topicNameBytes...) + + // Num partitions (4 bytes) + msgBody = append(msgBody, 0, 0, 0, 1) // 1 partition + + // Replication factor (2 bytes) + msgBody = append(msgBody, 0, 1) // replication factor 1 + + // Configs count (4 bytes) + msgBody = append(msgBody, 0, 0, 0, 0) // 0 configs + + // Timeout (4 bytes) + msgBody = append(msgBody, 0, 0, 0x75, 0x30) // 30 seconds + + // Message size + message body + sizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) + request := append(sizeBytes, msgBody...) + + t.Logf("Sending CreateTopics request: %d bytes", len(request)) + + // Send request + _, err = conn.Write(request) + if err != nil { + t.Fatalf("Failed to write request: %v", err) + } + + // Read response size + var responseSizeBytes [4]byte + _, err = conn.Read(responseSizeBytes[:]) + if err != nil { + t.Fatalf("Failed to read response size: %v", err) + } + + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) + t.Logf("Response size: %d bytes", responseSize) + + // Read response body + responseBody := make([]byte, responseSize) + totalRead := 0 + for totalRead < int(responseSize) { + n, err := conn.Read(responseBody[totalRead:]) + if err != nil { + t.Fatalf("Failed to read response body: %v (read %d/%d bytes)", err, totalRead, responseSize) + } + totalRead += n + } + + t.Logf("CreateTopics request completed successfully, received %d bytes", len(responseBody)) +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 656ad71bd..9c0d74cdc 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -163,6 +163,10 @@ func (h *Handler) HandleConn(conn net.Conn) error { apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) + // DEBUG: Log all incoming requests for debugging client compatibility + fmt.Printf("DEBUG: Received request - API Key: %d, Version: %d, Correlation: %d, Size: %d\n", + apiKey, apiVersion, correlationID, size) + // TODO: IMPORTANT - API version validation is missing // Different API versions have different request/response formats // Need to validate apiVersion against supported versions for each API @@ -207,6 +211,9 @@ func (h *Handler) HandleConn(conn net.Conn) error { return fmt.Errorf("handle request: %w", err) } + // DEBUG: Log response details + fmt.Printf("DEBUG: Sending response for API %d - Size: %d bytes\n", apiKey, len(response)) + // Write response size and data responseSizeBytes := make([]byte, 4) binary.BigEndian.PutUint32(responseSizeBytes, uint32(len(response))) @@ -488,6 +495,11 @@ func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([ } func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ([]byte, error) { + // TODO: CRITICAL - This function only supports CreateTopics v0 format + // kafka-go uses v2 which has a different request structure! + // The wrong topics count (1274981) shows we're parsing from wrong offset + // Need to implement proper v2 request parsing or negotiate API version + // Parse minimal CreateTopics request // Request format: client_id + timeout(4) + topics_array @@ -498,17 +510,29 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) offset := 2 + int(clientIDSize) - - if len(requestBody) < offset+8 { // timeout(4) + topics_count(4) - return nil, fmt.Errorf("CreateTopics request missing data") + + fmt.Printf("DEBUG: Client ID size: %d, client ID: %s\n", clientIDSize, string(requestBody[2:2+clientIDSize])) + + // CreateTopics v2 has different format than v0 + // v2 format: client_id + topics_array + timeout(4) + validate_only(1) + // (no separate timeout field before topics like in v0) + + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics request missing topics array") } - // Skip timeout - offset += 4 - + // Read topics count directly (no timeout field before it in v2) topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 + // DEBUG: Hex dump first 50 bytes to understand v2 format + dumpLen := len(requestBody) + if dumpLen > 50 { + dumpLen = 50 + } + fmt.Printf("DEBUG: CreateTopics v2 request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) + fmt.Printf("DEBUG: CreateTopics v2 - Topics count: %d, remaining bytes: %d\n", topicsCount, len(requestBody)-offset) + response := make([]byte, 0, 256) // Correlation ID