diff --git a/test/kafka/debug_produce_v7_test.go b/test/kafka/debug_produce_v7_test.go new file mode 100644 index 000000000..d710c1d76 --- /dev/null +++ b/test/kafka/debug_produce_v7_test.go @@ -0,0 +1,71 @@ +package kafka + +import ( + "fmt" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +func TestDebugProduceV7Format(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Errorf("Failed to start gateway: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + brokerAddr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", brokerAddr) + + // Add test topic + gatewayHandler := gatewayServer.GetHandler() + topicName := "debug-produce-topic" + gatewayHandler.AddTopicForTesting(topicName, 1) + t.Logf("Added topic: %s", topicName) + + // Configure Sarama for Kafka 2.1.0 (which uses Produce v7) + config := sarama.NewConfig() + config.Version = sarama.V2_1_0_0 + config.Producer.Return.Successes = true + config.Producer.RequiredAcks = sarama.WaitForAll + + t.Logf("=== Testing single Sarama Produce v7 request ===") + + // Create producer + producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config) + if err != nil { + t.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + // Send a single message to capture the exact request format + msg := &sarama.ProducerMessage{ + Topic: topicName, + Key: sarama.StringEncoder("test-key"), + Value: sarama.StringEncoder("test-value"), + } + + t.Logf("Sending message to topic: %s", topicName) + partition, offset, err := producer.SendMessage(msg) + + if err != nil { + t.Logf("❌ Produce failed (expected): %v", err) + t.Logf("This allows us to see the debug output of the malformed request parsing") + } else { + t.Logf("✅ Produce succeeded: partition=%d, offset=%d", partition, offset) + } + + t.Logf("Check the debug output above to see the actual Produce v7 request format") +} diff --git a/test/kafka/sarama_e2e_test.go b/test/kafka/sarama_e2e_test.go new file mode 100644 index 000000000..b0d7f17f8 --- /dev/null +++ b/test/kafka/sarama_e2e_test.go @@ -0,0 +1,233 @@ +package kafka + +import ( + "fmt" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +func TestSaramaE2EProduceConsume(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Errorf("Failed to start gateway: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + brokerAddr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", brokerAddr) + + // Add test topic + gatewayHandler := gatewayServer.GetHandler() + topicName := "sarama-e2e-topic" + gatewayHandler.AddTopicForTesting(topicName, 1) + t.Logf("Added topic: %s", topicName) + + // Configure Sarama for Kafka 2.1.0 (our best supported version) + config := sarama.NewConfig() + config.Version = sarama.V2_1_0_0 + config.Producer.Return.Successes = true + config.Producer.RequiredAcks = sarama.WaitForAll + config.Consumer.Return.Errors = true + + t.Logf("=== Testing Sarama Producer ===") + + // Create producer + producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config) + if err != nil { + t.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + // Produce messages + messages := []string{"Hello Sarama", "Message 2", "Final message"} + for i, msgText := range messages { + msg := &sarama.ProducerMessage{ + Topic: topicName, + Key: sarama.StringEncoder(fmt.Sprintf("key-%d", i)), + Value: sarama.StringEncoder(msgText), + } + + partition, offset, err := producer.SendMessage(msg) + if err != nil { + t.Fatalf("Failed to produce message %d: %v", i, err) + } + t.Logf("✅ Produced message %d: partition=%d, offset=%d", i, partition, offset) + } + + t.Logf("=== Testing Sarama Consumer ===") + + // Create consumer + consumer, err := sarama.NewConsumer([]string{brokerAddr}, config) + if err != nil { + t.Fatalf("Failed to create consumer: %v", err) + } + defer consumer.Close() + + // Get partition consumer + partitionConsumer, err := consumer.ConsumePartition(topicName, 0, sarama.OffsetOldest) + if err != nil { + t.Fatalf("Failed to create partition consumer: %v", err) + } + defer partitionConsumer.Close() + + // Consume messages + consumedCount := 0 + timeout := time.After(5 * time.Second) + + for consumedCount < len(messages) { + select { + case msg := <-partitionConsumer.Messages(): + t.Logf("✅ Consumed message %d: key=%s, value=%s, offset=%d", + consumedCount, string(msg.Key), string(msg.Value), msg.Offset) + + // Verify message content + expectedValue := messages[consumedCount] + if string(msg.Value) != expectedValue { + t.Errorf("Message %d mismatch: got %s, want %s", + consumedCount, string(msg.Value), expectedValue) + } + + consumedCount++ + + case err := <-partitionConsumer.Errors(): + t.Fatalf("Consumer error: %v", err) + + case <-timeout: + t.Fatalf("Timeout waiting for messages. Consumed %d/%d", consumedCount, len(messages)) + } + } + + t.Logf("🎉 SUCCESS: Sarama E2E test completed! Produced and consumed %d messages", len(messages)) +} + +func TestSaramaConsumerGroup(t *testing.T) { + // Start gateway + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: "127.0.0.1:0", + }) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Errorf("Failed to start gateway: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + host, port := gatewayServer.GetListenerAddr() + brokerAddr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", brokerAddr) + + // Add test topic + gatewayHandler := gatewayServer.GetHandler() + topicName := "sarama-cg-topic" + gatewayHandler.AddTopicForTesting(topicName, 1) + t.Logf("Added topic: %s", topicName) + + // Configure Sarama + config := sarama.NewConfig() + config.Version = sarama.V2_1_0_0 + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + config.Consumer.Offsets.Initial = sarama.OffsetOldest + config.Consumer.Return.Errors = true + + t.Logf("=== Testing Sarama Consumer Group ===") + + // Create consumer group + consumerGroup, err := sarama.NewConsumerGroup([]string{brokerAddr}, "test-group", config) + if err != nil { + t.Fatalf("Failed to create consumer group: %v", err) + } + defer consumerGroup.Close() + + // Consumer group handler + consumerHandler := &TestConsumerGroupHandler{ + t: t, + messages: make(chan string, 10), + } + + // Start consuming (this will test FindCoordinator, JoinGroup, SyncGroup workflow) + go func() { + for { + err := consumerGroup.Consume(nil, []string{topicName}, consumerHandler) + if err != nil { + t.Logf("Consumer group error: %v", err) + return + } + } + }() + + // Give consumer group time to initialize + time.Sleep(2 * time.Second) + + // Produce a test message + producer, err := sarama.NewSyncProducer([]string{brokerAddr}, config) + if err != nil { + t.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + msg := &sarama.ProducerMessage{ + Topic: topicName, + Value: sarama.StringEncoder("Consumer group test message"), + } + + _, _, err = producer.SendMessage(msg) + if err != nil { + t.Fatalf("Failed to produce message: %v", err) + } + t.Logf("✅ Produced message for consumer group") + + // Wait for message consumption + select { + case receivedMsg := <-consumerHandler.messages: + t.Logf("✅ Consumer group received message: %s", receivedMsg) + if receivedMsg != "Consumer group test message" { + t.Errorf("Message mismatch: got %s, want %s", receivedMsg, "Consumer group test message") + } + case <-time.After(10 * time.Second): + t.Fatalf("Timeout waiting for consumer group message") + } + + t.Logf("🎉 SUCCESS: Sarama Consumer Group test completed!") +} + +// TestConsumerGroupHandler implements sarama.ConsumerGroupHandler +type TestConsumerGroupHandler struct { + t *testing.T + messages chan string +} + +func (h *TestConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { + h.t.Logf("Consumer group setup") + return nil +} + +func (h *TestConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { + h.t.Logf("Consumer group cleanup") + return nil +} + +func (h *TestConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for message := range claim.Messages() { + h.t.Logf("Received message: %s", string(message.Value)) + h.messages <- string(message.Value) + session.MarkMessage(message, "") + } + return nil +} diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 5f5f2088c..8777d42ec 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -152,9 +152,21 @@ func (h *Handler) handleFetch(correlationID uint32, requestBody []byte) ([]byte, } else { highWaterMark = ledger.GetHighWaterMark() - // For Phase 1, construct simple record batches for any messages in range + // Try to fetch actual records using SeaweedMQ integration if available if fetchOffset < highWaterMark { - records = h.constructRecordBatch(ledger, fetchOffset, highWaterMark) + if h.useSeaweedMQ && h.seaweedMQHandler != nil { + // Use SeaweedMQ integration for real message fetching + fetchedRecords, err := h.seaweedMQHandler.FetchRecords(topicName, int32(partitionID), fetchOffset, int32(partitionMaxBytes)) + if err != nil { + fmt.Printf("DEBUG: FetchRecords error: %v\n", err) + errorCode = 1 // OFFSET_OUT_OF_RANGE + } else { + records = fetchedRecords + } + } else { + // Fallback to in-memory stub records + records = h.constructRecordBatch(ledger, fetchOffset, highWaterMark) + } } } } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index b27e5d107..500ea8eda 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -334,10 +334,10 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 4) // max version 4 // API Key 0 (Produce): api_key(2) + min_version(2) + max_version(2) - // Advertise v1 to get simpler request format from kafka-go + // Support v7 for Sarama compatibility (Kafka 2.1.0) response = append(response, 0, 0) // API key 0 response = append(response, 0, 0) // min version 0 - response = append(response, 0, 1) // max version 1 (simplified parsing) + response = append(response, 0, 7) // max version 7 // API Key 1 (Fetch): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 1) // API key 1 @@ -1453,7 +1453,7 @@ func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { supportedVersions := map[uint16][2]uint16{ 18: {0, 3}, // ApiVersions: v0-v3 3: {0, 7}, // Metadata: v0-v7 - 0: {0, 1}, // Produce: v0-v1 + 0: {0, 7}, // Produce: v0-v7 1: {0, 1}, // Fetch: v0-v1 2: {0, 5}, // ListOffsets: v0-v5 19: {0, 4}, // CreateTopics: v0-v4 diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index 671c88656..f12dff4c4 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -11,6 +11,8 @@ func (h *Handler) handleProduce(correlationID uint32, apiVersion uint16, request switch apiVersion { case 0, 1: return h.handleProduceV0V1(correlationID, apiVersion, requestBody) + case 2, 3, 4, 5, 6, 7: + return h.handleProduceV2Plus(correlationID, apiVersion, requestBody) default: return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion) } @@ -297,3 +299,157 @@ func (h *Handler) extractFirstRecord(recordSetData []byte) ([]byte, []byte) { return key, []byte(value) } + +// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+) +func (h *Handler) handleProduceV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + fmt.Printf("DEBUG: Handling Produce v%d request\n", apiVersion) + + // DEBUG: Hex dump first 100 bytes to understand actual request format + dumpLen := len(requestBody) + if dumpLen > 100 { + dumpLen = 100 + } + fmt.Printf("DEBUG: Produce v%d request hex dump (first %d bytes): %x\n", apiVersion, dumpLen, requestBody[:dumpLen]) + fmt.Printf("DEBUG: Produce v%d request total length: %d bytes\n", apiVersion, len(requestBody)) + + // For now, use simplified parsing similar to v0/v1 but handle v2+ response format + // In v2+, the main differences are: + // - Request: transactional_id field (nullable string) at the beginning + // - Response: throttle_time_ms field at the beginning + + // Parse Produce v7 request format based on actual Sarama request + // Format: client_id(STRING) + transactional_id(NULLABLE_STRING) + acks(INT16) + timeout_ms(INT32) + topics(ARRAY) + + offset := 0 + + // Parse client_id (STRING: 2 bytes length + data) + if len(requestBody) < 2 { + return nil, fmt.Errorf("Produce v%d request too short for client_id", apiVersion) + } + clientIDLen := binary.BigEndian.Uint16(requestBody[offset:offset+2]) + offset += 2 + + if len(requestBody) < offset+int(clientIDLen) { + return nil, fmt.Errorf("Produce v%d request client_id too short", apiVersion) + } + clientID := string(requestBody[offset:offset+int(clientIDLen)]) + offset += int(clientIDLen) + fmt.Printf("DEBUG: Produce v%d - client_id: %s\n", apiVersion, clientID) + + // Parse transactional_id (NULLABLE_STRING: 2 bytes length + data, -1 = null) + if len(requestBody) < offset+2 { + return nil, fmt.Errorf("Produce v%d request too short for transactional_id", apiVersion) + } + transactionalIDLen := int16(binary.BigEndian.Uint16(requestBody[offset:offset+2])) + offset += 2 + + var transactionalID string + if transactionalIDLen == -1 { + transactionalID = "null" + } else if transactionalIDLen >= 0 { + if len(requestBody) < offset+int(transactionalIDLen) { + return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion) + } + transactionalID = string(requestBody[offset:offset+int(transactionalIDLen)]) + offset += int(transactionalIDLen) + } + fmt.Printf("DEBUG: Produce v%d - transactional_id: %s\n", apiVersion, transactionalID) + + // Parse acks (INT16) and timeout_ms (INT32) + if len(requestBody) < offset+6 { + return nil, fmt.Errorf("Produce v%d request missing acks/timeout", apiVersion) + } + + acks := int16(binary.BigEndian.Uint16(requestBody[offset:offset+2])) + offset += 2 + timeout := binary.BigEndian.Uint32(requestBody[offset:offset+4]) + offset += 4 + + fmt.Printf("DEBUG: Produce v%d - acks: %d, timeout: %d\n", apiVersion, acks, timeout) + + // Parse topics array + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("Produce v%d request missing topics count", apiVersion) + } + + topicsCount := binary.BigEndian.Uint32(requestBody[offset:offset+4]) + offset += 4 + + fmt.Printf("DEBUG: Produce v%d - topics count: %d\n", apiVersion, topicsCount) + + // Build response + response := make([]byte, 0, 256) + + // Correlation ID + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) + + // Throttle time (4 bytes) - v1+ + response = append(response, 0, 0, 0, 0) + + // Topics array length + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) + response = append(response, topicsCountBytes...) + + // Process each topic (simplified - just return success for all) + for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { + // Parse topic name + if len(requestBody) < offset+2 { + break + } + + topicNameSize := binary.BigEndian.Uint16(requestBody[offset:offset+2]) + offset += 2 + + if len(requestBody) < offset+int(topicNameSize)+4 { + break + } + + topicName := string(requestBody[offset:offset+int(topicNameSize)]) + offset += int(topicNameSize) + + // Parse partitions count + partitionsCount := binary.BigEndian.Uint32(requestBody[offset:offset+4]) + offset += 4 + + fmt.Printf("DEBUG: Produce v%d - topic: %s, partitions: %d\n", apiVersion, topicName, partitionsCount) + + // Response: topic name + response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) + response = append(response, []byte(topicName)...) + + // Response: partitions count + partitionsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount) + response = append(response, partitionsCountBytes...) + + // Process each partition (simplified - just return success) + for j := uint32(0); j < partitionsCount && offset < len(requestBody); j++ { + // Skip partition parsing for now - just return success response + + // Response: partition_id(4) + error_code(2) + base_offset(8) + response = append(response, 0, 0, 0, byte(j)) // partition_id + response = append(response, 0, 0) // error_code (success) + response = append(response, 0, 0, 0, 0, 0, 0, 0, 0) // base_offset + + // v2+ additional fields + if apiVersion >= 2 { + // log_append_time (-1 = not set) + response = append(response, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) + } + + if apiVersion >= 5 { + // log_start_offset (8 bytes) + response = append(response, 0, 0, 0, 0, 0, 0, 0, 0) + } + + // Skip to next partition (simplified) + offset += 20 // rough estimate to skip partition data + } + } + + fmt.Printf("DEBUG: Produce v%d response: %d bytes\n", apiVersion, len(response)) + return response, nil +}