diff --git a/test/kafka/api_sequence_test.go b/test/kafka/api_sequence_test.go index 6ad51daae..d864a2eef 100644 --- a/test/kafka/api_sequence_test.go +++ b/test/kafka/api_sequence_test.go @@ -14,8 +14,7 @@ import ( func TestKafkaGateway_APISequence(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) if err := srv.Start(); err != nil { @@ -31,7 +30,7 @@ func TestKafkaGateway_APISequence(t *testing.T) { handler := srv.GetHandler() handler.AddTopicForTesting(topicName, 1) - // Create a writer and try to write a single message + // Create a writer and try to write a single message writer := &kafka.Writer{ Addr: kafka.TCP(brokerAddr), Topic: topicName, diff --git a/test/kafka/client_integration_test.go b/test/kafka/client_integration_test.go index d9d98e688..99dc14d8a 100644 --- a/test/kafka/client_integration_test.go +++ b/test/kafka/client_integration_test.go @@ -16,8 +16,7 @@ import ( func TestKafkaGoClient_BasicProduceConsume(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", // Use random port - UseSeaweedMQ: false, // Use in-memory mode for testing + Listen: ":0", // Use random port }) if err := srv.Start(); err != nil { @@ -83,8 +82,7 @@ func TestKafkaGoClient_BasicProduceConsume(t *testing.T) { func TestKafkaGoClient_ConsumerGroups(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) if err := srv.Start(); err != nil { @@ -137,8 +135,7 @@ func TestKafkaGoClient_MultiplePartitions(t *testing.T) { func TestKafkaGoClient_OffsetManagement(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) if err := srv.Start(); err != nil { diff --git a/test/kafka/comprehensive_e2e_test.go b/test/kafka/comprehensive_e2e_test.go index 339acd094..72798449a 100644 --- a/test/kafka/comprehensive_e2e_test.go +++ b/test/kafka/comprehensive_e2e_test.go @@ -16,8 +16,7 @@ import ( func TestComprehensiveE2E(t *testing.T) { // Start gateway gatewayServer := gateway.NewServer(gateway.Options{ - Listen: "127.0.0.1:0", - UseSeaweedMQ: false, // Use in-memory mode for testing + Listen: "127.0.0.1:0", }) go func() { @@ -359,8 +358,7 @@ func testSaramaToKafkaGo(t *testing.T, addr, topic string) { func TestOffsetManagement(t *testing.T) { // Start gateway gatewayServer := gateway.NewServer(gateway.Options{ - Listen: "127.0.0.1:0", - UseSeaweedMQ: false, + Listen: "127.0.0.1:0", }) go func() { diff --git a/test/kafka/consumer_test.go b/test/kafka/consumer_test.go index 80e3d93ef..f9e738979 100644 --- a/test/kafka/consumer_test.go +++ b/test/kafka/consumer_test.go @@ -14,8 +14,7 @@ import ( func TestKafkaGoReader(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) if err := srv.Start(); err != nil { diff --git a/test/kafka/debug_connection_test.go b/test/kafka/debug_connection_test.go index 8a4534c8e..dcd5300de 100644 --- a/test/kafka/debug_connection_test.go +++ b/test/kafka/debug_connection_test.go @@ -13,8 +13,7 @@ import ( func TestGateway_BasicConnection(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) if err := srv.Start(); err != nil { @@ -39,8 +38,7 @@ func TestGateway_BasicConnection(t *testing.T) { func TestGateway_ApiVersionsRequest(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) if err := srv.Start(); err != nil { @@ -141,8 +139,7 @@ func TestGateway_ApiVersionsRequest(t *testing.T) { func TestGateway_CreateTopicsRequest(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) if err := srv.Start(); err != nil { diff --git a/test/kafka/persistent_offset_integration_test.go b/test/kafka/persistent_offset_integration_test.go index 7417e6990..a7871499a 100644 --- a/test/kafka/persistent_offset_integration_test.go +++ b/test/kafka/persistent_offset_integration_test.go @@ -39,15 +39,13 @@ func TestPersistentOffsetIntegration(t *testing.T) { func testOffsetPersistenceAndRecovery(t *testing.T, brokers []string) { // Create offset storage - storage, err := offset.NewSeaweedMQStorage(brokers) - require.NoError(t, err) + storage := offset.NewSeaweedMQStorage() defer storage.Close() topicPartition := "test-persistence-topic-0" // Create first ledger and add some entries - ledger1, err := offset.NewPersistentLedger(topicPartition, storage) - require.NoError(t, err) + ledger1 := offset.NewPersistentLedger(topicPartition, storage) // Add test entries testEntries := []struct { @@ -77,8 +75,7 @@ func testOffsetPersistenceAndRecovery(t *testing.T, brokers []string) { time.Sleep(2 * time.Second) // Create second ledger (simulating restart) - ledger2, err := offset.NewPersistentLedger(topicPartition, storage) - require.NoError(t, err) + ledger2 := offset.NewPersistentLedger(topicPartition, storage) // Verify recovered state assert.Equal(t, ledger1.GetHighWaterMark(), ledger2.GetHighWaterMark()) diff --git a/test/kafka/produce_consume_test.go b/test/kafka/produce_consume_test.go index dada5dde1..775bf1894 100644 --- a/test/kafka/produce_consume_test.go +++ b/test/kafka/produce_consume_test.go @@ -15,8 +15,7 @@ import ( func TestKafkaGoClient_DirectProduceConsume(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) if err := srv.Start(); err != nil { diff --git a/test/kafka/raw_protocol_test.go b/test/kafka/raw_protocol_test.go index f4fde795a..635674048 100644 --- a/test/kafka/raw_protocol_test.go +++ b/test/kafka/raw_protocol_test.go @@ -13,8 +13,7 @@ import ( func TestRawProduceRequest(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) if err := srv.Start(); err != nil { @@ -62,42 +61,42 @@ func TestRawProduceRequest(t *testing.T) { func sendApiVersionsRequest(conn net.Conn) error { // Build ApiVersions request correlationID := uint32(1) - + msgBody := make([]byte, 0, 32) msgBody = append(msgBody, 0, 18) // API key 18 (ApiVersions) msgBody = append(msgBody, 0, 0) // API version 0 - + // Correlation ID correlationBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationBytes, correlationID) msgBody = append(msgBody, correlationBytes...) - + // Client ID (empty) msgBody = append(msgBody, 0, 0) // empty client ID - + // Send request sizeBytes := make([]byte, 4) binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) request := append(sizeBytes, msgBody...) - + if _, err := conn.Write(request); err != nil { return fmt.Errorf("write request: %w", err) } - + // Read response size var responseSizeBytes [4]byte if _, err := conn.Read(responseSizeBytes[:]); err != nil { return fmt.Errorf("read response size: %w", err) } - + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) - + // Read response body responseBody := make([]byte, responseSize) if _, err := conn.Read(responseBody); err != nil { return fmt.Errorf("read response body: %w", err) } - + fmt.Printf("ApiVersions response: %d bytes\n", responseSize) return nil } @@ -105,45 +104,45 @@ func sendApiVersionsRequest(conn net.Conn) error { func sendMetadataRequest(conn net.Conn) error { // Build Metadata request correlationID := uint32(2) - + msgBody := make([]byte, 0, 32) msgBody = append(msgBody, 0, 3) // API key 3 (Metadata) msgBody = append(msgBody, 0, 1) // API version 1 - + // Correlation ID correlationBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationBytes, correlationID) msgBody = append(msgBody, correlationBytes...) - + // Client ID (empty) msgBody = append(msgBody, 0, 0) // empty client ID - + // Topics array (empty = all topics) msgBody = append(msgBody, 0xFF, 0xFF, 0xFF, 0xFF) // -1 = all topics - + // Send request sizeBytes := make([]byte, 4) binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) request := append(sizeBytes, msgBody...) - + if _, err := conn.Write(request); err != nil { return fmt.Errorf("write request: %w", err) } - + // Read response size var responseSizeBytes [4]byte if _, err := conn.Read(responseSizeBytes[:]); err != nil { return fmt.Errorf("read response size: %w", err) } - + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) - + // Read response body responseBody := make([]byte, responseSize) if _, err := conn.Read(responseBody); err != nil { return fmt.Errorf("read response body: %w", err) } - + fmt.Printf("Metadata response: %d bytes\n", responseSize) return nil } @@ -151,73 +150,73 @@ func sendMetadataRequest(conn net.Conn) error { func sendProduceRequest(conn net.Conn, topicName string) error { // Build simple Produce request correlationID := uint32(3) - + msgBody := make([]byte, 0, 128) msgBody = append(msgBody, 0, 0) // API key 0 (Produce) msgBody = append(msgBody, 0, 1) // API version 1 - + // Correlation ID correlationBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationBytes, correlationID) msgBody = append(msgBody, correlationBytes...) - + // Client ID (empty) msgBody = append(msgBody, 0, 0) // empty client ID - + // Acks (-1 = all replicas) msgBody = append(msgBody, 0xFF, 0xFF) // -1 - + // Timeout (5000ms) msgBody = append(msgBody, 0, 0, 0x13, 0x88) // 5000ms - + // Topics count (1) msgBody = append(msgBody, 0, 0, 0, 1) - + // Topic name topicNameBytes := []byte(topicName) msgBody = append(msgBody, byte(len(topicNameBytes)>>8), byte(len(topicNameBytes))) msgBody = append(msgBody, topicNameBytes...) - + // Partitions count (1) msgBody = append(msgBody, 0, 0, 0, 1) - + // Partition 0 msgBody = append(msgBody, 0, 0, 0, 0) // partition ID = 0 - + // Record set (simple test record) testRecord := buildSimpleRecordSet("test-key", "test-value") recordSetSize := make([]byte, 4) binary.BigEndian.PutUint32(recordSetSize, uint32(len(testRecord))) msgBody = append(msgBody, recordSetSize...) msgBody = append(msgBody, testRecord...) - + // Send request sizeBytes := make([]byte, 4) binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) request := append(sizeBytes, msgBody...) - + fmt.Printf("Sending Produce request: %d bytes\n", len(request)) - + if _, err := conn.Write(request); err != nil { return fmt.Errorf("write request: %w", err) } - + // Read response size var responseSizeBytes [4]byte if _, err := conn.Read(responseSizeBytes[:]); err != nil { return fmt.Errorf("read response size: %w", err) } - + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) - + // Read response body responseBody := make([]byte, responseSize) if _, err := conn.Read(responseBody); err != nil { return fmt.Errorf("read response body: %w", err) } - + fmt.Printf("Produce response: %d bytes\n", responseSize) - + // Check if the response indicates success (simplified check) if responseSize > 8 { // Extract correlation ID and basic error code @@ -225,7 +224,7 @@ func sendProduceRequest(conn net.Conn, topicName string) error { if correlationResp == correlationID { fmt.Printf("✅ Produce request correlation ID matches: %d\n", correlationResp) } - + // Look for error codes in the response if len(responseBody) > 20 { // Skip to where partition error code should be (rough estimate) @@ -237,31 +236,31 @@ func sendProduceRequest(conn net.Conn, topicName string) error { } } } - + return nil } func buildSimpleRecordSet(key, value string) []byte { // Build a very simple Kafka record batch (v0 format for simplicity) record := make([]byte, 0, 64) - + // Record batch header (simplified v0 format) record = append(record, 0, 0, 0, 0, 0, 0, 0, 0) // base offset - record = append(record, 0, 0, 0, 30) // batch length (estimated) - record = append(record, 0, 0, 0, 0) // partition leader epoch - record = append(record, 0) // magic byte (v0) - record = append(record, 0, 0, 0, 0) // CRC32 (simplified) - record = append(record, 0, 0) // attributes - record = append(record, 0, 0, 0, 1) // record count = 1 - + record = append(record, 0, 0, 0, 30) // batch length (estimated) + record = append(record, 0, 0, 0, 0) // partition leader epoch + record = append(record, 0) // magic byte (v0) + record = append(record, 0, 0, 0, 0) // CRC32 (simplified) + record = append(record, 0, 0) // attributes + record = append(record, 0, 0, 0, 1) // record count = 1 + // Simple record: key_length + key + value_length + value keyBytes := []byte(key) valueBytes := []byte(value) - + record = append(record, byte(len(keyBytes)>>8), byte(len(keyBytes))) record = append(record, keyBytes...) record = append(record, byte(len(valueBytes)>>8), byte(len(valueBytes))) record = append(record, valueBytes...) - + return record } diff --git a/test/kafka/seaweedmq_integration_test.go b/test/kafka/seaweedmq_integration_test.go index 73a8ba97e..b4ab6db69 100644 --- a/test/kafka/seaweedmq_integration_test.go +++ b/test/kafka/seaweedmq_integration_test.go @@ -14,14 +14,9 @@ func TestSeaweedMQIntegration_E2E(t *testing.T) { // Skip by default - requires real SeaweedMQ setup t.Skip("Integration test requires real SeaweedMQ setup - run manually") - // Test configuration - agentAddress := "localhost:17777" // Default SeaweedMQ Agent address - // Start the gateway with SeaweedMQ backend gatewayServer := gateway.NewServer(gateway.Options{ - Listen: ":0", // random port - AgentAddress: agentAddress, - UseSeaweedMQ: true, + Listen: ":0", // random port }) err := gatewayServer.Start() @@ -241,8 +236,7 @@ func TestSeaweedMQGateway_ModeSelection(t *testing.T) { // Test in-memory mode (should always work) t.Run("InMemoryMode", func(t *testing.T) { server := gateway.NewServer(gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }) err := server.Start() @@ -262,9 +256,7 @@ func TestSeaweedMQGateway_ModeSelection(t *testing.T) { // Test SeaweedMQ mode with invalid agent (should fall back) t.Run("SeaweedMQModeFallback", func(t *testing.T) { server := gateway.NewServer(gateway.Options{ - Listen: ":0", - AgentAddress: "invalid:99999", // Invalid address - UseSeaweedMQ: true, + Listen: ":0", }) err := server.Start() @@ -292,26 +284,21 @@ func TestSeaweedMQGateway_ConfigValidation(t *testing.T) { { name: "ValidInMemory", options: gateway.Options{ - Listen: ":0", - UseSeaweedMQ: false, + Listen: ":0", }, shouldWork: true, }, { name: "ValidSeaweedMQWithAgent", options: gateway.Options{ - Listen: ":0", - AgentAddress: "localhost:17777", - UseSeaweedMQ: true, + Listen: ":0", }, shouldWork: true, // May fail if no agent, but config is valid }, { name: "SeaweedMQWithoutAgent", options: gateway.Options{ - Listen: ":0", - UseSeaweedMQ: true, - // AgentAddress is empty + Listen: ":0", }, shouldWork: true, // Should fall back to in-memory },