diff --git a/test/kafka/e2e_test.go b/test/kafka/e2e_test.go new file mode 100644 index 000000000..274ad1e40 --- /dev/null +++ b/test/kafka/e2e_test.go @@ -0,0 +1,463 @@ +package kafka_test + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestKafkaGateway_E2E tests the complete Kafka workflow using the gateway +func TestKafkaGateway_E2E(t *testing.T) { + // Start the gateway server + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: ":0", // use random port + }) + + err := gatewayServer.Start() + if err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer gatewayServer.Close() + + // Get the actual listening address + addr := gatewayServer.Addr() + t.Logf("Gateway started on %s", addr) + + // Wait a moment for the server to be fully ready + time.Sleep(100 * time.Millisecond) + + // For now, we'll do basic connectivity testing + // In a full implementation, this would use kafka-go client + t.Run("BasicConnectivity", func(t *testing.T) { + testBasicConnectivity(t, addr) + }) + + t.Run("ApiVersionsHandshake", func(t *testing.T) { + testApiVersionsHandshake(t, addr) + }) + + t.Run("TopicOperations", func(t *testing.T) { + testTopicOperations(t, addr) + }) +} + +// testBasicConnectivity verifies we can connect to the gateway +func testBasicConnectivity(t *testing.T, addr string) { + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) + if err != nil { + t.Fatalf("Failed to connect to gateway: %v", err) + } + defer conn.Close() + + // Send a simple ApiVersions request + apiVersionsReq := buildApiVersionsRequest() + + _, err = conn.Write(apiVersionsReq) + if err != nil { + t.Fatalf("Failed to send request: %v", err) + } + + // Read response + response := make([]byte, 1024) + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + n, err := conn.Read(response) + if err != nil { + t.Fatalf("Failed to read response: %v", err) + } + + if n < 10 { + t.Fatalf("Response too short: %d bytes", n) + } + + t.Logf("Received response of %d bytes", n) +} + +// testApiVersionsHandshake tests the API versions handshake +func testApiVersionsHandshake(t *testing.T, addr string) { + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Send ApiVersions request + req := buildApiVersionsRequest() + _, err = conn.Write(req) + if err != nil { + t.Fatalf("Write failed: %v", err) + } + + // Read response size + sizeBytes := make([]byte, 4) + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Read(sizeBytes) + if err != nil { + t.Fatalf("Read size failed: %v", err) + } + + responseSize := uint32(sizeBytes[0])<<24 | uint32(sizeBytes[1])<<16 | uint32(sizeBytes[2])<<8 | uint32(sizeBytes[3]) + if responseSize == 0 || responseSize > 10000 { + t.Fatalf("Invalid response size: %d", responseSize) + } + + // Read response body + responseBody := make([]byte, responseSize) + _, err = conn.Read(responseBody) + if err != nil { + t.Fatalf("Read body failed: %v", err) + } + + // Verify correlation ID matches + correlationID := uint32(responseBody[0])<<24 | uint32(responseBody[1])<<16 | uint32(responseBody[2])<<8 | uint32(responseBody[3]) + if correlationID != 12345 { + t.Errorf("Correlation ID mismatch: got %d, want 12345", correlationID) + } + + // Check that we have API keys advertised + if len(responseBody) < 20 { + t.Fatalf("Response too short for API key data") + } + + // Error code should be 0 + errorCode := uint16(responseBody[4])<<8 | uint16(responseBody[5]) + if errorCode != 0 { + t.Errorf("Error code: got %d, want 0", errorCode) + } + + // API keys count should be > 0 + apiKeyCount := uint32(responseBody[6])<<24 | uint32(responseBody[7])<<16 | uint32(responseBody[8])<<8 | uint32(responseBody[9]) + if apiKeyCount == 0 { + t.Errorf("No API keys advertised") + } + + t.Logf("Gateway advertises %d API keys", apiKeyCount) +} + +// testTopicOperations tests creating and listing topics +func testTopicOperations(t *testing.T, addr string) { + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Test Metadata request (should return empty topic list initially) + metadataReq := buildMetadataRequest() + _, err = conn.Write(metadataReq) + if err != nil { + t.Fatalf("Metadata request failed: %v", err) + } + + // Read metadata response + sizeBytes := make([]byte, 4) + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Read(sizeBytes) + if err != nil { + t.Fatalf("Read metadata size failed: %v", err) + } + + responseSize := uint32(sizeBytes[0])<<24 | uint32(sizeBytes[1])<<16 | uint32(sizeBytes[2])<<8 | uint32(sizeBytes[3]) + responseBody := make([]byte, responseSize) + _, err = conn.Read(responseBody) + if err != nil { + t.Fatalf("Read metadata body failed: %v", err) + } + + // Verify basic metadata response structure + if len(responseBody) < 20 { + t.Fatalf("Metadata response too short") + } + + t.Logf("Metadata response received: %d bytes", len(responseBody)) + + // Test CreateTopics request + createTopicsReq := buildCreateTopicsRequest() + _, err = conn.Write(createTopicsReq) + if err != nil { + t.Fatalf("CreateTopics request failed: %v", err) + } + + // Read CreateTopics response + _, err = conn.Read(sizeBytes) + if err != nil { + t.Fatalf("Read CreateTopics size failed: %v", err) + } + + responseSize = uint32(sizeBytes[0])<<24 | uint32(sizeBytes[1])<<16 | uint32(sizeBytes[2])<<8 | uint32(sizeBytes[3]) + responseBody = make([]byte, responseSize) + _, err = conn.Read(responseBody) + if err != nil { + t.Fatalf("Read CreateTopics body failed: %v", err) + } + + t.Logf("CreateTopics response received: %d bytes", len(responseBody)) + + // TODO: Parse response to verify topic creation success + // For Phase 1, we're just verifying the protocol works +} + +// buildApiVersionsRequest creates a Kafka ApiVersions request +func buildApiVersionsRequest() []byte { + // Message size (4) + API key (2) + API version (2) + correlation ID (4) + client ID + clientID := "test-e2e-client" + messageSize := 2 + 2 + 4 + 2 + len(clientID) // API key + version + correlation + client_id_len + client_id + + request := make([]byte, 0, messageSize+4) + + // Message size + request = append(request, byte(messageSize>>24), byte(messageSize>>16), byte(messageSize>>8), byte(messageSize)) + + // API key (ApiVersions = 18) + request = append(request, 0, 18) + + // API version + request = append(request, 0, 3) + + // Correlation ID + request = append(request, 0, 0, 0x30, 0x39) // 12345 + + // Client ID + request = append(request, 0, byte(len(clientID))) + request = append(request, []byte(clientID)...) + + return request +} + +// buildMetadataRequest creates a Kafka Metadata request +func buildMetadataRequest() []byte { + clientID := "test-e2e-client" + messageSize := 2 + 2 + 4 + 2 + len(clientID) + 4 // + topics_count(4) for empty topic list + + request := make([]byte, 0, messageSize+4) + + // Message size + request = append(request, byte(messageSize>>24), byte(messageSize>>16), byte(messageSize>>8), byte(messageSize)) + + // API key (Metadata = 3) + request = append(request, 0, 3) + + // API version + request = append(request, 0, 7) + + // Correlation ID + request = append(request, 0, 0, 0x30, 0x40) // 12352 + + // Client ID + request = append(request, 0, byte(len(clientID))) + request = append(request, []byte(clientID)...) + + // Topics count (0 = get all topics) + request = append(request, 0, 0, 0, 0) + + return request +} + +// buildCreateTopicsRequest creates a Kafka CreateTopics request +func buildCreateTopicsRequest() []byte { + clientID := "test-e2e-client" + topicName := "test-e2e-topic" + + // Approximate message size + messageSize := 2 + 2 + 4 + 2 + len(clientID) + 4 + 4 + 2 + len(topicName) + 4 + 2 + 4 + 4 + + request := make([]byte, 0, messageSize+4) + + // Message size placeholder + sizePos := len(request) + request = append(request, 0, 0, 0, 0) + + // API key (CreateTopics = 19) + request = append(request, 0, 19) + + // API version + request = append(request, 0, 4) + + // Correlation ID + request = append(request, 0, 0, 0x30, 0x41) // 12353 + + // Client ID + request = append(request, 0, byte(len(clientID))) + request = append(request, []byte(clientID)...) + + // Timeout (5000ms) + request = append(request, 0, 0, 0x13, 0x88) + + // Topics count (1) + request = append(request, 0, 0, 0, 1) + + // Topic name + request = append(request, 0, byte(len(topicName))) + request = append(request, []byte(topicName)...) + + // Num partitions (1) + request = append(request, 0, 0, 0, 1) + + // Replication factor (1) + request = append(request, 0, 1) + + // Configs count (0) + request = append(request, 0, 0, 0, 0) + + // Topic timeout (5000ms) + request = append(request, 0, 0, 0x13, 0x88) + + // Fix message size + actualSize := len(request) - 4 + request[sizePos] = byte(actualSize >> 24) + request[sizePos+1] = byte(actualSize >> 16) + request[sizePos+2] = byte(actualSize >> 8) + request[sizePos+3] = byte(actualSize) + + return request +} + +// TestKafkaGateway_ProduceConsume tests the produce/consume workflow +// Note: This is a placeholder for future implementation with kafka-go client +func TestKafkaGateway_ProduceConsume(t *testing.T) { + t.Skip("TODO: Implement with kafka-go client library for full E2E test") + + // This test would: + // 1. Start gateway + // 2. Create kafka-go producer and consumer + // 3. Create topic + // 4. Produce messages + // 5. Consume messages + // 6. Verify message content matches +} + +// TestKafkaGateway_MultipleClients tests concurrent client connections +func TestKafkaGateway_MultipleClients(t *testing.T) { + // Start the gateway server + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: ":0", // use random port + }) + + err := gatewayServer.Start() + if err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer gatewayServer.Close() + + addr := gatewayServer.Addr() + + // Test multiple concurrent connections + numClients := 5 + done := make(chan error, numClients) + + for i := 0; i < numClients; i++ { + go func(clientID int) { + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) + if err != nil { + done <- fmt.Errorf("client %d connect failed: %v", clientID, err) + return + } + defer conn.Close() + + // Each client sends ApiVersions request + req := buildApiVersionsRequest() + _, err = conn.Write(req) + if err != nil { + done <- fmt.Errorf("client %d write failed: %v", clientID, err) + return + } + + // Read response + sizeBytes := make([]byte, 4) + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Read(sizeBytes) + if err != nil { + done <- fmt.Errorf("client %d read failed: %v", clientID, err) + return + } + + done <- nil + }(i) + } + + // Wait for all clients to complete + for i := 0; i < numClients; i++ { + err := <-done + if err != nil { + t.Errorf("Client error: %v", err) + } + } +} + +// TestKafkaGateway_StressTest performs basic stress testing +func TestKafkaGateway_StressTest(t *testing.T) { + if testing.Short() { + t.Skip("skipping stress test in short mode") + } + + // Start the gateway server + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: ":0", + }) + + err := gatewayServer.Start() + if err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer gatewayServer.Close() + + addr := gatewayServer.Addr() + + // Send many requests rapidly + numRequests := 100 + errors := make(chan error, numRequests) + + for i := 0; i < numRequests; i++ { + go func(reqID int) { + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) + if err != nil { + errors <- err + return + } + defer conn.Close() + + // Send multiple requests on same connection + for j := 0; j < 3; j++ { + req := buildApiVersionsRequest() + _, err = conn.Write(req) + if err != nil { + errors <- err + return + } + + // Read response + sizeBytes := make([]byte, 4) + conn.SetReadDeadline(time.Now().Add(1 * time.Second)) + _, err = conn.Read(sizeBytes) + if err != nil { + errors <- err + return + } + + // Skip reading full response for speed + } + + errors <- nil + }(i) + } + + // Collect results + successCount := 0 + for i := 0; i < numRequests; i++ { + err := <-errors + if err == nil { + successCount++ + } else { + t.Logf("Request error: %v", err) + } + } + + successRate := float64(successCount) / float64(numRequests) + if successRate < 0.95 { // Allow some failures under stress + t.Errorf("Success rate too low: %.2f%% (expected >= 95%%)", successRate*100) + } + + t.Logf("Stress test completed: %d/%d successful (%.2f%%)", successCount, numRequests, successRate*100) +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index fac3437dc..15387d2fd 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -209,8 +209,8 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 7) // max version 7 // API Key 1 (Fetch): api_key(2) + min_version(2) + max_version(2) - response = append(response, 0, 1) // API key 1 - response = append(response, 0, 0) // min version 0 + response = append(response, 0, 1) // API key 1 + response = append(response, 0, 0) // min version 0 response = append(response, 0, 11) // max version 11 // Throttle time (4 bytes, 0 = no throttling)