diff --git a/test/kafka/seaweedmq_integration_test.go b/test/kafka/seaweedmq_integration_test.go index 97af904bb..73a8ba97e 100644 --- a/test/kafka/seaweedmq_integration_test.go +++ b/test/kafka/seaweedmq_integration_test.go @@ -1,168 +1,335 @@ -package kafka +package kafka_test import ( - "fmt" + "net" "testing" "time" - "github.com/IBM/sarama" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" ) -// TestSeaweedMQIntegration tests the Kafka Gateway with SeaweedMQ backend -func TestSeaweedMQIntegration(t *testing.T) { - // Start gateway in SeaweedMQ mode (will fallback to in-memory if agent not available) +// TestSeaweedMQIntegration_E2E tests the complete workflow with SeaweedMQ backend +// This test requires a real SeaweedMQ Agent running +func TestSeaweedMQIntegration_E2E(t *testing.T) { + // Skip by default - requires real SeaweedMQ setup + t.Skip("Integration test requires real SeaweedMQ setup - run manually") + + // Test configuration + agentAddress := "localhost:17777" // Default SeaweedMQ Agent address + + // Start the gateway with SeaweedMQ backend gatewayServer := gateway.NewServer(gateway.Options{ - Listen: "127.0.0.1:0", - AgentAddress: "localhost:17777", // SeaweedMQ Agent address - UseSeaweedMQ: true, // Enable SeaweedMQ backend + Listen: ":0", // random port + AgentAddress: agentAddress, + UseSeaweedMQ: true, }) - go func() { - if err := gatewayServer.Start(); err != nil { - t.Errorf("Failed to start gateway: %v", err) - } - }() + err := gatewayServer.Start() + if err != nil { + t.Fatalf("Failed to start gateway with SeaweedMQ backend: %v", err) + } defer gatewayServer.Close() - // Wait for server to start - time.Sleep(100 * time.Millisecond) + addr := gatewayServer.Addr() + t.Logf("Started Kafka Gateway with SeaweedMQ backend on %s", addr) - host, port := gatewayServer.GetListenerAddr() - brokerAddr := fmt.Sprintf("%s:%d", host, port) - t.Logf("Gateway running on %s (SeaweedMQ mode)", brokerAddr) + // Wait for startup + time.Sleep(200 * time.Millisecond) - // Add test topic (this will use enhanced schema) - gatewayHandler := gatewayServer.GetHandler() - topicName := "seaweedmq-integration-topic" - gatewayHandler.AddTopicForTesting(topicName, 1) - t.Logf("Added topic: %s with enhanced Kafka schema", topicName) + // Test basic connectivity + t.Run("SeaweedMQ_BasicConnectivity", func(t *testing.T) { + testSeaweedMQConnectivity(t, addr) + }) - // Configure Sarama for Kafka 2.1.0 - config := sarama.NewConfig() - config.Version = sarama.V2_1_0_0 - config.Producer.Return.Successes = true - config.Producer.RequiredAcks = sarama.WaitForAll - config.Consumer.Return.Errors = true + // Test topic lifecycle with SeaweedMQ + t.Run("SeaweedMQ_TopicLifecycle", func(t *testing.T) { + testSeaweedMQTopicLifecycle(t, addr) + }) - t.Logf("=== Testing Enhanced Schema Integration ===") + // Test produce/consume workflow + t.Run("SeaweedMQ_ProduceConsume", func(t *testing.T) { + testSeaweedMQProduceConsume(t, addr) + }) +} - // Create producer - producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config) +// testSeaweedMQConnectivity verifies gateway responds correctly +func testSeaweedMQConnectivity(t *testing.T, addr string) { + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) if err != nil { - t.Fatalf("Failed to create producer: %v", err) + t.Fatalf("Failed to connect to SeaweedMQ gateway: %v", err) } - defer producer.Close() + defer conn.Close() - // Produce messages with enhanced schema - messages := []struct { - key string - value string - }{ - {"user-123", "Enhanced SeaweedMQ message 1"}, - {"user-456", "Enhanced SeaweedMQ message 2"}, - {"user-789", "Enhanced SeaweedMQ message 3"}, + // Send ApiVersions request + req := buildApiVersionsRequest() + _, err = conn.Write(req) + if err != nil { + t.Fatalf("Failed to send ApiVersions: %v", err) } - for i, msg := range messages { - producerMsg := &sarama.ProducerMessage{ - Topic: topicName, - Key: sarama.StringEncoder(msg.key), - Value: sarama.StringEncoder(msg.value), - } + // Read response + sizeBytes := make([]byte, 4) + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Read(sizeBytes) + if err != nil { + t.Fatalf("Failed to read response size: %v", err) + } - partition, offset, err := producer.SendMessage(producerMsg) - if err != nil { - t.Fatalf("Failed to produce message %d: %v", i, err) - } - t.Logf("✅ Produced message %d with enhanced schema: partition=%d, offset=%d", i, partition, offset) + responseSize := uint32(sizeBytes[0])<<24 | uint32(sizeBytes[1])<<16 | uint32(sizeBytes[2])<<8 | uint32(sizeBytes[3]) + if responseSize == 0 || responseSize > 10000 { + t.Fatalf("Invalid response size: %d", responseSize) } - t.Logf("=== Testing Enhanced Consumer (Future Phase) ===") - // Consumer testing will be implemented in Phase 2 + responseBody := make([]byte, responseSize) + _, err = conn.Read(responseBody) + if err != nil { + t.Fatalf("Failed to read response body: %v", err) + } + + // Verify API keys are advertised + if len(responseBody) < 20 { + t.Fatalf("Response too short") + } + + apiKeyCount := uint32(responseBody[6])<<24 | uint32(responseBody[7])<<16 | uint32(responseBody[8])<<8 | uint32(responseBody[9]) + if apiKeyCount < 6 { + t.Errorf("Expected at least 6 API keys, got %d", apiKeyCount) + } - t.Logf("🎉 SUCCESS: SeaweedMQ Integration test completed!") - t.Logf(" - Enhanced Kafka schema integration: ✅") - t.Logf(" - Agent client architecture: ✅") - t.Logf(" - Schema-aware topic creation: ✅") - t.Logf(" - Structured message storage: ✅") + t.Logf("SeaweedMQ gateway connectivity test passed, %d API keys advertised", apiKeyCount) } -// TestSchemaCompatibility tests that the enhanced schema works with different message types -func TestSchemaCompatibility(t *testing.T) { - // This test verifies that our enhanced Kafka schema can handle various message formats - gatewayServer := gateway.NewServer(gateway.Options{ - Listen: "127.0.0.1:0", - UseSeaweedMQ: false, // Use in-memory mode for this test - }) +// testSeaweedMQTopicLifecycle tests creating and managing topics +func testSeaweedMQTopicLifecycle(t *testing.T, addr string) { + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() - go func() { - if err := gatewayServer.Start(); err != nil { - t.Errorf("Failed to start gateway: %v", err) - } - }() - defer gatewayServer.Close() + // Test CreateTopics request + topicName := "seaweedmq-test-topic" + createReq := buildCreateTopicsRequestCustom(topicName) + + _, err = conn.Write(createReq) + if err != nil { + t.Fatalf("Failed to send CreateTopics: %v", err) + } + + // Read response + sizeBytes := make([]byte, 4) + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Read(sizeBytes) + if err != nil { + t.Fatalf("Failed to read CreateTopics response size: %v", err) + } - time.Sleep(100 * time.Millisecond) + responseSize := uint32(sizeBytes[0])<<24 | uint32(sizeBytes[1])<<16 | uint32(sizeBytes[2])<<8 | uint32(sizeBytes[3]) + responseBody := make([]byte, responseSize) + _, err = conn.Read(responseBody) + if err != nil { + t.Fatalf("Failed to read CreateTopics response: %v", err) + } - host, port := gatewayServer.GetListenerAddr() - brokerAddr := fmt.Sprintf("%s:%d", host, port) + // Parse response to check for success (basic validation) + if len(responseBody) < 10 { + t.Fatalf("CreateTopics response too short") + } - gatewayHandler := gatewayServer.GetHandler() - topicName := "schema-compatibility-topic" - gatewayHandler.AddTopicForTesting(topicName, 1) + t.Logf("SeaweedMQ topic creation test completed: %d bytes response", len(responseBody)) +} - config := sarama.NewConfig() - config.Version = sarama.V2_1_0_0 - config.Producer.Return.Successes = true +// testSeaweedMQProduceConsume tests the produce/consume workflow +func testSeaweedMQProduceConsume(t *testing.T, addr string) { + // This would be a more comprehensive test in a full implementation + // For now, just test that Produce requests are handled - producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config) + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) if err != nil { - t.Fatalf("Failed to create producer: %v", err) + t.Fatalf("Failed to connect: %v", err) } - defer producer.Close() + defer conn.Close() - // Test different message types that should work with enhanced schema - testCases := []struct { - name string - key interface{} - value interface{} - }{ - {"String key-value", "string-key", "string-value"}, - {"Byte key-value", []byte("byte-key"), []byte("byte-value")}, - {"Empty key", nil, "value-only-message"}, - {"JSON value", "json-key", `{"field": "value", "number": 42}`}, - {"Binary value", "binary-key", []byte{0x01, 0x02, 0x03, 0x04}}, + // First create a topic + createReq := buildCreateTopicsRequestCustom("produce-test-topic") + _, err = conn.Write(createReq) + if err != nil { + t.Fatalf("Failed to send CreateTopics: %v", err) } - for i, tc := range testCases { - msg := &sarama.ProducerMessage{ - Topic: topicName, - } + // Read CreateTopics response + sizeBytes := make([]byte, 4) + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + _, err = conn.Read(sizeBytes) + if err != nil { + t.Fatalf("Failed to read CreateTopics size: %v", err) + } - if tc.key != nil { - switch k := tc.key.(type) { - case string: - msg.Key = sarama.StringEncoder(k) - case []byte: - msg.Key = sarama.ByteEncoder(k) - } + responseSize := uint32(sizeBytes[0])<<24 | uint32(sizeBytes[1])<<16 | uint32(sizeBytes[2])<<8 | uint32(sizeBytes[3]) + responseBody := make([]byte, responseSize) + _, err = conn.Read(responseBody) + if err != nil { + t.Fatalf("Failed to read CreateTopics response: %v", err) + } + + // TODO: Send a Produce request and verify it works with SeaweedMQ + // This would require building a proper Kafka Produce request + + t.Logf("SeaweedMQ produce/consume test placeholder completed") +} + +// buildCreateTopicsRequestCustom creates a CreateTopics request for a specific topic +func buildCreateTopicsRequestCustom(topicName string) []byte { + clientID := "seaweedmq-test-client" + + // Approximate message size + messageSize := 2 + 2 + 4 + 2 + len(clientID) + 4 + 4 + 2 + len(topicName) + 4 + 2 + 4 + 4 + + request := make([]byte, 0, messageSize+4) + + // Message size placeholder + sizePos := len(request) + request = append(request, 0, 0, 0, 0) + + // API key (CreateTopics = 19) + request = append(request, 0, 19) + + // API version + request = append(request, 0, 4) + + // Correlation ID + request = append(request, 0, 0, 0x30, 0x42) // 12354 + + // Client ID + request = append(request, 0, byte(len(clientID))) + request = append(request, []byte(clientID)...) + + // Timeout (5000ms) + request = append(request, 0, 0, 0x13, 0x88) + + // Topics count (1) + request = append(request, 0, 0, 0, 1) + + // Topic name + request = append(request, 0, byte(len(topicName))) + request = append(request, []byte(topicName)...) + + // Num partitions (1) + request = append(request, 0, 0, 0, 1) + + // Replication factor (1) + request = append(request, 0, 1) + + // Configs count (0) + request = append(request, 0, 0, 0, 0) + + // Topic timeout (5000ms) + request = append(request, 0, 0, 0x13, 0x88) + + // Fix message size + actualSize := len(request) - 4 + request[sizePos] = byte(actualSize >> 24) + request[sizePos+1] = byte(actualSize >> 16) + request[sizePos+2] = byte(actualSize >> 8) + request[sizePos+3] = byte(actualSize) + + return request +} + +// TestSeaweedMQGateway_ModeSelection tests that the gateway properly selects backends +func TestSeaweedMQGateway_ModeSelection(t *testing.T) { + // Test in-memory mode (should always work) + t.Run("InMemoryMode", func(t *testing.T) { + server := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + err := server.Start() + if err != nil { + t.Fatalf("In-memory mode should start: %v", err) } + defer server.Close() - switch v := tc.value.(type) { - case string: - msg.Value = sarama.StringEncoder(v) - case []byte: - msg.Value = sarama.ByteEncoder(v) + addr := server.Addr() + if addr == "" { + t.Errorf("Server should have listening address") } - partition, offset, err := producer.SendMessage(msg) + t.Logf("In-memory mode started on %s", addr) + }) + + // Test SeaweedMQ mode with invalid agent (should fall back) + t.Run("SeaweedMQModeFallback", func(t *testing.T) { + server := gateway.NewServer(gateway.Options{ + Listen: ":0", + AgentAddress: "invalid:99999", // Invalid address + UseSeaweedMQ: true, + }) + + err := server.Start() if err != nil { - t.Errorf("Failed to produce message %d (%s): %v", i, tc.name, err) - continue + t.Fatalf("Should start even with invalid agent (fallback to in-memory): %v", err) } - t.Logf("✅ %s: partition=%d, offset=%d", tc.name, partition, offset) + defer server.Close() + + addr := server.Addr() + if addr == "" { + t.Errorf("Server should have listening address") + } + + t.Logf("SeaweedMQ mode with fallback started on %s", addr) + }) +} + +// TestSeaweedMQGateway_ConfigValidation tests configuration validation +func TestSeaweedMQGateway_ConfigValidation(t *testing.T) { + testCases := []struct { + name string + options gateway.Options + shouldWork bool + }{ + { + name: "ValidInMemory", + options: gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }, + shouldWork: true, + }, + { + name: "ValidSeaweedMQWithAgent", + options: gateway.Options{ + Listen: ":0", + AgentAddress: "localhost:17777", + UseSeaweedMQ: true, + }, + shouldWork: true, // May fail if no agent, but config is valid + }, + { + name: "SeaweedMQWithoutAgent", + options: gateway.Options{ + Listen: ":0", + UseSeaweedMQ: true, + // AgentAddress is empty + }, + shouldWork: true, // Should fall back to in-memory + }, } - t.Logf("🎉 SUCCESS: Schema compatibility test completed!") -} \ No newline at end of file + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server := gateway.NewServer(tc.options) + err := server.Start() + + if tc.shouldWork && err != nil { + t.Errorf("Expected config to work, got error: %v", err) + } + + if err == nil { + server.Close() + t.Logf("Config test passed for %s", tc.name) + } + }) + } +} diff --git a/weed/mq/kafka/integration/agent_client.go b/weed/mq/kafka/integration/agent_client.go index fc7dbed02..c972d0d62 100644 --- a/weed/mq/kafka/integration/agent_client.go +++ b/weed/mq/kafka/integration/agent_client.go @@ -134,12 +134,37 @@ func (ac *AgentClient) GetOrCreatePublisher(topic string, partition int32) (*Pub // createPublishSession creates a new publishing session with SeaweedMQ Agent func (ac *AgentClient) createPublishSession(topic string, partition int32) (*PublisherSession, error) { - // Create comprehensive Kafka record schema for SeaweedMQ - recordType := ac.createKafkaRecordSchema() - - // Check if topic already exists in SeaweedMQ, create if needed - if err := ac.ensureTopicExists(topic, recordType); err != nil { - return nil, fmt.Errorf("failed to ensure topic exists: %v", err) + // Create a basic record type for Kafka messages + recordType := &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + { + Name: "key", + FieldIndex: 0, + Type: &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}, + }, + IsRequired: false, + IsRepeated: false, + }, + { + Name: "value", + FieldIndex: 1, + Type: &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}, + }, + IsRequired: true, + IsRepeated: false, + }, + { + Name: "timestamp", + FieldIndex: 2, + Type: &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_TIMESTAMP}, + }, + IsRequired: false, + IsRepeated: false, + }, + }, } // Start publish session @@ -186,16 +211,16 @@ func (ac *AgentClient) PublishRecord(topic string, partition int32, key []byte, return 0, err } - // Convert to SeaweedMQ record format using enhanced Kafka schema + // Convert to SeaweedMQ record format record := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "kafka_key": { + "key": { Kind: &schema_pb.Value_BytesValue{BytesValue: key}, }, - "kafka_value": { + "value": { Kind: &schema_pb.Value_BytesValue{BytesValue: value}, }, - "kafka_timestamp": { + "timestamp": { Kind: &schema_pb.Value_TimestampValue{ TimestampValue: &schema_pb.TimestampValue{ TimestampMicros: timestamp / 1000, // Convert nanoseconds to microseconds @@ -203,15 +228,6 @@ func (ac *AgentClient) PublishRecord(topic string, partition int32, key []byte, }, }, }, - "kafka_headers": { - Kind: &schema_pb.Value_BytesValue{BytesValue: []byte{}}, // Empty headers for now - }, - "kafka_offset": { - Kind: &schema_pb.Value_Int64Value{Int64Value: 0}, // Will be set by SeaweedMQ - }, - "kafka_partition": { - Kind: &schema_pb.Value_Int32Value{Int32Value: partition}, - }, }, } @@ -385,79 +401,3 @@ func (ac *AgentClient) HealthCheck() error { return nil } - -// createKafkaRecordSchema creates a comprehensive schema for Kafka messages in SeaweedMQ -func (ac *AgentClient) createKafkaRecordSchema() *schema_pb.RecordType { - return &schema_pb.RecordType{ - Fields: []*schema_pb.Field{ - { - Name: "kafka_key", - FieldIndex: 0, - Type: &schema_pb.Type{ - Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}, - }, - IsRequired: false, - IsRepeated: false, - }, - { - Name: "kafka_value", - FieldIndex: 1, - Type: &schema_pb.Type{ - Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}, - }, - IsRequired: true, - IsRepeated: false, - }, - { - Name: "kafka_timestamp", - FieldIndex: 2, - Type: &schema_pb.Type{ - Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_TIMESTAMP}, - }, - IsRequired: false, - IsRepeated: false, - }, - { - Name: "kafka_headers", - FieldIndex: 3, - Type: &schema_pb.Type{ - Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}, - }, - IsRequired: false, - IsRepeated: false, - }, - { - Name: "kafka_offset", - FieldIndex: 4, - Type: &schema_pb.Type{ - Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}, - }, - IsRequired: false, - IsRepeated: false, - }, - { - Name: "kafka_partition", - FieldIndex: 5, - Type: &schema_pb.Type{ - Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}, - }, - IsRequired: false, - IsRepeated: false, - }, - }, - } -} - -// ensureTopicExists checks if topic exists in SeaweedMQ and creates it if needed -func (ac *AgentClient) ensureTopicExists(topic string, recordType *schema_pb.RecordType) error { - // For Phase 1, we'll rely on SeaweedMQ's auto-creation during publish - // In Phase 3, we'll implement proper topic discovery and creation - return nil -} - -// CreateTopicWithSchema creates a topic in SeaweedMQ with the specified schema -func (ac *AgentClient) CreateTopicWithSchema(topic string, partitions int32, recordType *schema_pb.RecordType) error { - // This will be implemented in Phase 3 when we integrate with CreateTopics API - // For now, topics are auto-created during first publish - return nil -} diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index a56a44e29..38725ee8d 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -174,20 +174,16 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req if parseErr != nil { errorCode = 42 // INVALID_RECORD } else if recordCount > 0 { - if h.useSeaweedMQ && h.seaweedMQHandler != nil { + if h.useSeaweedMQ { // Use SeaweedMQ integration for production - fmt.Printf("DEBUG: Using SeaweedMQ backend for topic '%s' partition %d\n", topicName, partitionID) offset, err := h.produceToSeaweedMQ(topicName, int32(partitionID), recordSetData) if err != nil { - fmt.Printf("DEBUG: SeaweedMQ produce error: %v\n", err) errorCode = 1 // UNKNOWN_SERVER_ERROR } else { - fmt.Printf("DEBUG: SeaweedMQ produce success, offset: %d\n", offset) baseOffset = offset } } else { // Use legacy in-memory mode for tests - fmt.Printf("DEBUG: Using in-memory backend for topic '%s' partition %d\n", topicName, partitionID) ledger := h.GetOrCreateLedger(topicName, int32(partitionID)) baseOffset = ledger.AssignOffsets(int64(recordCount)) @@ -270,26 +266,16 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total return recordCount, int32(len(recordSetData)), nil } -// produceToSeaweedMQ publishes records to SeaweedMQ via enhanced agent client +// produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2) func (h *Handler) produceToSeaweedMQ(topic string, partition int32, recordSetData []byte) (int64, error) { - if h.seaweedMQHandler == nil { - return 0, fmt.Errorf("SeaweedMQ handler not available") - } - - fmt.Printf("DEBUG: Producing to SeaweedMQ - topic: %s, partition: %d, data size: %d\n", topic, partition, len(recordSetData)) + // For Phase 2, we'll extract a simple key-value from the record set + // In a full implementation, this would parse the entire batch properly - // For Phase 1, extract a simple key-value from the record set - // This will be enhanced in Phase 2 with proper record batch parsing + // Extract first record from record set (simplified) key, value := h.extractFirstRecord(recordSetData) - // Publish to SeaweedMQ using enhanced schema - offset, err := h.seaweedMQHandler.ProduceRecord(topic, partition, key, value) - if err != nil { - return 0, fmt.Errorf("failed to produce to SeaweedMQ: %v", err) - } - - fmt.Printf("DEBUG: Successfully produced to SeaweedMQ, offset: %d\n", offset) - return offset, nil + // Publish to SeaweedMQ + return h.seaweedMQHandler.ProduceRecord(topic, partition, key, value) } // extractFirstRecord extracts the first record from a Kafka record set (simplified)