diff --git a/test/kafka/docker_setup_test.go b/test/kafka/docker_setup_test.go index a327f649c..588f86d50 100644 --- a/test/kafka/docker_setup_test.go +++ b/test/kafka/docker_setup_test.go @@ -21,7 +21,7 @@ func TestDockerSetup_Files(t *testing.T) { // Read and validate basic structure content, err := os.ReadFile(composePath) require.NoError(t, err) - + composeContent := string(content) assert.Contains(t, composeContent, "version:", "Should have version specified") assert.Contains(t, composeContent, "services:", "Should have services section") @@ -45,7 +45,7 @@ func TestDockerSetup_Files(t *testing.T) { // Validate basic Dockerfile structure content, err := os.ReadFile(dockerfilePath) require.NoError(t, err) - + dockerContent := string(content) assert.Contains(t, dockerContent, "FROM", "Should have FROM instruction") } @@ -69,7 +69,7 @@ func TestDockerSetup_Files(t *testing.T) { // Validate basic shell script structure content, err := os.ReadFile(scriptPath) require.NoError(t, err) - + scriptContent := string(content) assert.Contains(t, scriptContent, "#!/", "Should have shebang") } @@ -83,7 +83,7 @@ func TestDockerSetup_Files(t *testing.T) { // Validate basic Makefile structure content, err := os.ReadFile(makefilePath) require.NoError(t, err) - + makefileContent := string(content) assert.Contains(t, makefileContent, "help:", "Should have help target") assert.Contains(t, makefileContent, "setup:", "Should have setup target") @@ -99,7 +99,7 @@ func TestDockerSetup_Files(t *testing.T) { // Validate basic README structure content, err := os.ReadFile(readmePath) require.NoError(t, err) - + readmeContent := string(content) assert.Contains(t, readmeContent, "# Kafka Integration Testing", "Should have main title") assert.Contains(t, readmeContent, "## Quick Start", "Should have quick start section") @@ -114,7 +114,7 @@ func TestDockerSetup_Files(t *testing.T) { // Validate basic Go file structure content, err := os.ReadFile(setupPath) require.NoError(t, err) - + setupContent := string(content) assert.Contains(t, setupContent, "package main", "Should be main package") assert.Contains(t, setupContent, "func main()", "Should have main function") @@ -127,11 +127,11 @@ func TestDockerSetup_Configuration(t *testing.T) { t.Run("PortConfiguration", func(t *testing.T) { // This test verifies that the ports used in docker-compose.yml are reasonable // and don't conflict with common development ports - + expectedPorts := map[string]string{ - "zookeeper": "2181", - "kafka": "9092", - "schema-registry": "8081", + "zookeeper": "2181", + "kafka": "9092", + "schema-registry": "8081", "seaweedfs-master": "9333", "seaweedfs-volume": "8080", "seaweedfs-filer": "8888", @@ -141,11 +141,11 @@ func TestDockerSetup_Configuration(t *testing.T) { composePath := "docker-compose.yml" content, err := os.ReadFile(composePath) require.NoError(t, err) - + composeContent := string(content) - + for service, port := range expectedPorts { - assert.Contains(t, composeContent, port+":", + assert.Contains(t, composeContent, port+":", "Service %s should expose port %s", service, port) } }) @@ -155,17 +155,17 @@ func TestDockerSetup_Configuration(t *testing.T) { composePath := "docker-compose.yml" content, err := os.ReadFile(composePath) require.NoError(t, err) - + composeContent := string(content) - + // Should have health checks for critical services assert.Contains(t, composeContent, "healthcheck:", "Should have health checks") - + // Verify specific health check patterns healthCheckServices := []string{"kafka", "schema-registry", "seaweedfs-master"} for _, service := range healthCheckServices { // Look for health check in the service section (basic validation) - assert.Contains(t, composeContent, service+":", + assert.Contains(t, composeContent, service+":", "Service %s should be defined", service) } }) @@ -174,9 +174,9 @@ func TestDockerSetup_Configuration(t *testing.T) { composePath := "docker-compose.yml" content, err := os.ReadFile(composePath) require.NoError(t, err) - + composeContent := string(content) - + // Should have network configuration assert.Contains(t, composeContent, "networks:", "Should have networks section") assert.Contains(t, composeContent, "kafka-test-net", "Should have test network") @@ -193,7 +193,7 @@ func TestDockerSetup_Integration(t *testing.T) { // Validate test structure content, err := os.ReadFile(testPath) require.NoError(t, err) - + testContent := string(content) assert.Contains(t, testContent, "TestDockerIntegration_E2E", "Should have E2E test") assert.Contains(t, testContent, "KAFKA_BOOTSTRAP_SERVERS", "Should check environment variables") @@ -205,17 +205,17 @@ func TestDockerSetup_Integration(t *testing.T) { testPath := "docker_integration_test.go" content, err := os.ReadFile(testPath) require.NoError(t, err) - + testContent := string(content) - + envVars := []string{ "KAFKA_BOOTSTRAP_SERVERS", - "KAFKA_GATEWAY_URL", + "KAFKA_GATEWAY_URL", "SCHEMA_REGISTRY_URL", } - + for _, envVar := range envVars { - assert.Contains(t, testContent, envVar, + assert.Contains(t, testContent, envVar, "Should reference environment variable %s", envVar) } }) @@ -227,9 +227,9 @@ func TestDockerSetup_Makefile(t *testing.T) { makefilePath := "Makefile" content, err := os.ReadFile(makefilePath) require.NoError(t, err) - + makefileContent := string(content) - + essentialTargets := []string{ "help:", "setup:", @@ -241,9 +241,9 @@ func TestDockerSetup_Makefile(t *testing.T) { "logs:", "status:", } - + for _, target := range essentialTargets { - assert.Contains(t, makefileContent, target, + assert.Contains(t, makefileContent, target, "Should have target %s", target) } }) @@ -252,9 +252,9 @@ func TestDockerSetup_Makefile(t *testing.T) { makefilePath := "Makefile" content, err := os.ReadFile(makefilePath) require.NoError(t, err) - + makefileContent := string(content) - + devTargets := []string{ "dev-kafka:", "dev-seaweedfs:", @@ -262,9 +262,9 @@ func TestDockerSetup_Makefile(t *testing.T) { "shell-kafka:", "topics:", } - + for _, target := range devTargets { - assert.Contains(t, makefileContent, target, + assert.Contains(t, makefileContent, target, "Should have development target %s", target) } }) diff --git a/test/kafka/go.mod b/test/kafka/go.mod index 926a2c30c..c1c86d973 100644 --- a/test/kafka/go.mod +++ b/test/kafka/go.mod @@ -9,6 +9,7 @@ require ( github.com/linkedin/goavro/v2 v2.14.0 github.com/seaweedfs/seaweedfs v0.0.0-00010101000000-000000000000 github.com/segmentio/kafka-go v0.4.49 + github.com/stretchr/testify v1.11.1 ) replace github.com/seaweedfs/seaweedfs => ../../ @@ -196,7 +197,6 @@ require ( github.com/spf13/pflag v1.0.7 // indirect github.com/spf13/viper v1.20.1 // indirect github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect - github.com/stretchr/testify v1.11.1 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 // indirect github.com/t3rm1n4l/go-mega v0.0.0-20241213151442-a19cff0ec7b5 // indirect diff --git a/test/kafka/kafka_go_produce_only_test.go b/test/kafka/kafka_go_produce_only_test.go index bc1f711c6..0401d3153 100644 --- a/test/kafka/kafka_go_produce_only_test.go +++ b/test/kafka/kafka_go_produce_only_test.go @@ -6,8 +6,8 @@ import ( "testing" "time" - "github.com/segmentio/kafka-go" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/segmentio/kafka-go" ) func TestKafkaGo_ProduceOnly(t *testing.T) { @@ -18,13 +18,12 @@ func TestKafkaGo_ProduceOnly(t *testing.T) { t.Errorf("Failed to start gateway: %v", err) } }() - defer gatewayServer.Close() time.Sleep(100 * time.Millisecond) host, port := gatewayServer.GetListenerAddr() addr := fmt.Sprintf("%s:%d", host, port) - topic := "kgo-produce-only" + topic := "kgo-produce-only" gatewayServer.GetHandler().AddTopicForTesting(topic, 1) w := &kafka.Writer{ @@ -32,8 +31,8 @@ func TestKafkaGo_ProduceOnly(t *testing.T) { Topic: topic, Balancer: &kafka.LeastBytes{}, BatchTimeout: 50 * time.Millisecond, + RequiredAcks: kafka.RequireOne, } - defer w.Close() ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() diff --git a/test/kafka/metadata_comparison_test.go b/test/kafka/metadata_comparison_test.go new file mode 100644 index 000000000..12fafcc97 --- /dev/null +++ b/test/kafka/metadata_comparison_test.go @@ -0,0 +1,46 @@ +package kafka + +import ( + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +func TestMetadataResponseComparison(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{Listen: "127.0.0.1:0"}) + go func() { + if err := gatewayServer.Start(); err != nil { + t.Errorf("Failed to start gateway: %v", err) + } + }() + defer gatewayServer.Close() + + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + + // Add the same topic for both tests + topic := "comparison-topic" + gatewayServer.GetHandler().AddTopicForTesting(topic, 1) + + t.Logf("=== COMPARISON TEST ===") + t.Logf("Gateway: %s", addr) + t.Logf("Topic: %s", topic) + + // The key insight: Both Sarama and kafka-go should get the SAME metadata response + // But Sarama works and kafka-go doesn't - this suggests kafka-go has stricter validation + + // Let's examine what our current Metadata v4 response looks like + t.Logf("Run Sarama test and kafka-go test separately to compare logs") + t.Logf("Look for differences in:") + t.Logf("1. Response byte counts") + t.Logf("2. Broker ID consistency") + t.Logf("3. Partition leader/ISR values") + t.Logf("4. Error codes") + + // This test is just for documentation - the real comparison happens in logs +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 6a13d2968..75e4ee168 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -237,7 +237,6 @@ func (h *Handler) HandleConn(conn net.Conn) error { case 20: // DeleteTopics response, err = h.handleDeleteTopics(correlationID, messageBuf[8:]) // skip header case 0: // Produce - fmt.Printf("DEBUG: *** PRODUCE REQUEST RECEIVED *** Correlation: %d\n", correlationID) response, err = h.handleProduce(correlationID, apiVersion, messageBuf[8:]) case 1: // Fetch fmt.Printf("DEBUG: *** FETCH HANDLER CALLED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) @@ -700,7 +699,6 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - fmt.Printf("DEBUG: 🔍 METADATA v3/v4 REQUEST - Requested: %v (empty=all)\n", requestedTopics) // Determine topics to return h.topicsMu.RLock() diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index df1174303..019a088ad 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -233,7 +233,7 @@ func (h *Handler) handleProduceV0V1(correlationID uint32, apiVersion uint16, req // - Individual record extraction func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, totalSize int32, err error) { parser := NewRecordBatchParser() - + // Parse the record batch with CRC validation batch, err := parser.ParseRecordBatchWithValidation(recordSetData, true) if err != nil { @@ -242,10 +242,10 @@ func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, total if err != nil { return 0, 0, fmt.Errorf("failed to parse record batch: %w", err) } - fmt.Printf("DEBUG: Record batch parsed without CRC validation (codec: %s)\n", + fmt.Printf("DEBUG: Record batch parsed without CRC validation (codec: %s)\n", batch.GetCompressionCodec()) } else { - fmt.Printf("DEBUG: Record batch parsed successfully with CRC validation (codec: %s)\n", + fmt.Printf("DEBUG: Record batch parsed successfully with CRC validation (codec: %s)\n", batch.GetCompressionCodec()) } @@ -558,14 +558,14 @@ func (h *Handler) storeDecodedMessage(topicName string, partitionID int32, decod // extractMessagesFromRecordSet extracts individual messages from a record set with compression support func (h *Handler) extractMessagesFromRecordSet(recordSetData []byte) ([][]byte, error) { parser := NewRecordBatchParser() - + // Parse the record batch batch, err := parser.ParseRecordBatch(recordSetData) if err != nil { return nil, fmt.Errorf("failed to parse record batch for message extraction: %w", err) } - fmt.Printf("DEBUG: Extracting messages from record batch (codec: %s, records: %d)\n", + fmt.Printf("DEBUG: Extracting messages from record batch (codec: %s, records: %d)\n", batch.GetCompressionCodec(), batch.RecordCount) // Decompress the records if compressed @@ -578,7 +578,7 @@ func (h *Handler) extractMessagesFromRecordSet(recordSetData []byte) ([][]byte, // In a full implementation, this would parse individual records from the decompressed data messages := [][]byte{decompressedData} - fmt.Printf("DEBUG: Extracted %d messages (decompressed size: %d bytes)\n", + fmt.Printf("DEBUG: Extracted %d messages (decompressed size: %d bytes)\n", len(messages), len(decompressedData)) return messages, nil