diff --git a/test/kafka/client_integration_test.go b/test/kafka/client_integration_test.go index 9a23cb324..d9d98e688 100644 --- a/test/kafka/client_integration_test.go +++ b/test/kafka/client_integration_test.go @@ -8,15 +8,15 @@ import ( "testing" "time" - "github.com/segmentio/kafka-go" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/segmentio/kafka-go" ) // TestKafkaGoClient_BasicProduceConsume tests our gateway with real kafka-go client func TestKafkaGoClient_BasicProduceConsume(t *testing.T) { // Start the gateway server srv := gateway.NewServer(gateway.Options{ - Listen: ":0", // Use random port + Listen: ":0", // Use random port UseSeaweedMQ: false, // Use in-memory mode for testing }) @@ -188,7 +188,7 @@ func TestKafkaGoClient_OffsetManagement(t *testing.T) { t.Errorf("Expected 2 remaining messages, got %d", len(remainingMessages)) } - t.Logf("Offset management test passed: consumed %d + %d messages", + t.Logf("Offset management test passed: consumed %d + %d messages", len(partialMessages), len(remainingMessages)) } @@ -200,7 +200,7 @@ func createTopicWithKafkaGo(brokerAddr, topicName string) error { Timeout: 5 * time.Second, DualStack: true, } - + conn, err := dialer.Dial("tcp", brokerAddr) if err != nil { return fmt.Errorf("dial broker: %w", err) @@ -214,8 +214,8 @@ func createTopicWithKafkaGo(brokerAddr, topicName string) error { topicConfigs := []kafka.TopicConfig{ { - Topic: topicName, - NumPartitions: 1, + Topic: topicName, + NumPartitions: 1, ReplicationFactor: 1, }, } @@ -236,7 +236,7 @@ func produceMessages(brokerAddr, topicName string, messages []kafka.Message) err Topic: topicName, Balancer: &kafka.LeastBytes{}, // Enable detailed logging for debugging protocol issues - Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { if strings.Contains(msg, "error") || strings.Contains(msg, "failed") { fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...) } diff --git a/test/kafka/debug_connection_test.go b/test/kafka/debug_connection_test.go index 9e99b1632..8a4534c8e 100644 --- a/test/kafka/debug_connection_test.go +++ b/test/kafka/debug_connection_test.go @@ -67,17 +67,17 @@ func TestGateway_ApiVersionsRequest(t *testing.T) { clientID := "debug-client" request := make([]byte, 0, 64) - + // Build message body first (without size) msgBody := make([]byte, 0, 32) - msgBody = append(msgBody, 0, 18) // API key 18 (ApiVersions) - msgBody = append(msgBody, 0, 0) // API version 0 - + msgBody = append(msgBody, 0, 18) // API key 18 (ApiVersions) + msgBody = append(msgBody, 0, 0) // API version 0 + // Correlation ID correlationBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationBytes, correlationID) msgBody = append(msgBody, correlationBytes...) - + // Client ID string clientIDBytes := []byte(clientID) msgBody = append(msgBody, byte(len(clientIDBytes)>>8), byte(len(clientIDBytes))) @@ -170,37 +170,37 @@ func TestGateway_CreateTopicsRequest(t *testing.T) { // Build message body msgBody := make([]byte, 0, 128) - msgBody = append(msgBody, 0, 19) // API key 19 (CreateTopics) - msgBody = append(msgBody, 0, 0) // API version 0 - + msgBody = append(msgBody, 0, 19) // API key 19 (CreateTopics) + msgBody = append(msgBody, 0, 0) // API version 0 + // Correlation ID correlationBytes := make([]byte, 4) binary.BigEndian.PutUint32(correlationBytes, correlationID) msgBody = append(msgBody, correlationBytes...) - + // Client ID string clientIDBytes := []byte(clientID) msgBody = append(msgBody, byte(len(clientIDBytes)>>8), byte(len(clientIDBytes))) msgBody = append(msgBody, clientIDBytes...) - + // Topics array - count (4 bytes) msgBody = append(msgBody, 0, 0, 0, 1) // 1 topic - + // Topic name topicNameBytes := []byte(topicName) msgBody = append(msgBody, byte(len(topicNameBytes)>>8), byte(len(topicNameBytes))) msgBody = append(msgBody, topicNameBytes...) - + // Num partitions (4 bytes) msgBody = append(msgBody, 0, 0, 0, 1) // 1 partition - - // Replication factor (2 bytes) + + // Replication factor (2 bytes) msgBody = append(msgBody, 0, 1) // replication factor 1 - + // Configs count (4 bytes) msgBody = append(msgBody, 0, 0, 0, 0) // 0 configs - - // Timeout (4 bytes) + + // Timeout (4 bytes) msgBody = append(msgBody, 0, 0, 0x75, 0x30) // 30 seconds // Message size + message body diff --git a/test/kafka/produce_consume_test.go b/test/kafka/produce_consume_test.go new file mode 100644 index 000000000..cbb14b966 --- /dev/null +++ b/test/kafka/produce_consume_test.go @@ -0,0 +1,187 @@ +package kafka + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/segmentio/kafka-go" +) + +// TestKafkaGoClient_DirectProduceConsume bypasses CreateTopics and tests produce/consume directly +func TestKafkaGoClient_DirectProduceConsume(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + topicName := "direct-test-topic" + + // Pre-create the topic by making a direct call to our gateway's topic registry + // This simulates topic already existing (like pre-created topics) + if err := createTopicDirectly(srv, topicName); err != nil { + t.Fatalf("Failed to create topic: %v", err) + } + + // Test basic produce without kafka-go's CreateTopics API + messages := []kafka.Message{ + { + Key: []byte("test-key-1"), + Value: []byte("Hello from direct produce test!"), + }, + { + Key: []byte("test-key-2"), + Value: []byte("Second test message"), + }, + } + + t.Logf("Testing direct produce to topic %s", topicName) + + // Produce messages + if err := produceMessagesDirect(brokerAddr, topicName, messages); err != nil { + t.Fatalf("Failed to produce messages: %v", err) + } + + t.Logf("Successfully produced %d messages", len(messages)) + + // Consume messages + t.Logf("Testing direct consume from topic %s", topicName) + + consumedMessages, err := consumeMessagesDirect(brokerAddr, topicName, len(messages)) + if err != nil { + t.Fatalf("Failed to consume messages: %v", err) + } + + // Validate consumed messages + if len(consumedMessages) != len(messages) { + t.Errorf("Expected %d messages, got %d", len(messages), len(consumedMessages)) + } + + for i, msg := range consumedMessages { + if i < len(messages) { + expectedValue := string(messages[i].Value) + actualValue := string(msg.Value) + t.Logf("Message %d: key=%s, value=%s", i, string(msg.Key), actualValue) + + if actualValue != expectedValue { + t.Errorf("Message %d: expected value %q, got %q", i, expectedValue, actualValue) + } + } + } + + t.Logf("✅ Direct produce/consume test PASSED with %d messages", len(consumedMessages)) +} + +// createTopicDirectly creates a topic by directly adding it to the handler's registry +func createTopicDirectly(srv interface{}, topicName string) error { + gatewayServer, ok := srv.(*gateway.Server) + if !ok { + return fmt.Errorf("invalid server type") + } + + // Get the handler and directly add the topic + handler := gatewayServer.GetHandler() + if handler == nil { + return fmt.Errorf("handler is nil") + } + + // Add the topic with 1 partition + handler.AddTopicForTesting(topicName, 1) + + fmt.Printf("DEBUG: Topic %s created directly in handler registry\n", topicName) + return nil +} + +func produceMessagesDirect(brokerAddr, topicName string, messages []kafka.Message) error { + // Use kafka-go Writer which should use Produce API directly + writer := &kafka.Writer{ + Addr: kafka.TCP(brokerAddr), + Topic: topicName, + Balancer: &kafka.LeastBytes{}, + + // Reduce timeouts for faster debugging + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + + // Enable detailed logging + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "produce") || strings.Contains(msg, "Produce") { + fmt.Printf("PRODUCER: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("PRODUCER ERROR: "+msg+"\n", args...) + }), + } + defer writer.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + fmt.Printf("DEBUG: Writing %d messages to topic %s\n", len(messages), topicName) + + err := writer.WriteMessages(ctx, messages...) + if err != nil { + fmt.Printf("DEBUG: WriteMessages failed: %v\n", err) + return err + } + + fmt.Printf("DEBUG: WriteMessages completed successfully\n") + return nil +} + +func consumeMessagesDirect(brokerAddr, topicName string, expectedCount int) ([]kafka.Message, error) { + // Use kafka-go Reader which should use Fetch API directly + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topicName, + + // Start from the beginning + StartOffset: kafka.FirstOffset, + + // Enable detailed logging + Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + if strings.Contains(msg, "fetch") || strings.Contains(msg, "Fetch") { + fmt.Printf("CONSUMER: "+msg+"\n", args...) + } + }), + ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { + fmt.Printf("CONSUMER ERROR: "+msg+"\n", args...) + }), + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + fmt.Printf("DEBUG: Reading up to %d messages from topic %s\n", expectedCount, topicName) + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + fmt.Printf("DEBUG: Reading message %d/%d\n", i+1, expectedCount) + + msg, err := reader.ReadMessage(ctx) + if err != nil { + fmt.Printf("DEBUG: ReadMessage %d failed: %v\n", i+1, err) + return messages, fmt.Errorf("read message %d: %w", i+1, err) + } + + fmt.Printf("DEBUG: Successfully read message %d: %d bytes\n", i+1, len(msg.Value)) + messages = append(messages, msg) + } + + fmt.Printf("DEBUG: Successfully read all %d messages\n", len(messages)) + return messages, nil +} diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index 91a31a005..bd8f03922 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -112,3 +112,8 @@ func (s *Server) Addr() string { } return s.ln.Addr().String() } + +// GetHandler returns the protocol handler (for testing) +func (s *Server) GetHandler() *protocol.Handler { + return s.handler +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 9c0d74cdc..ddd955889 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -162,11 +162,11 @@ func (h *Handler) HandleConn(conn net.Conn) error { apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - + // DEBUG: Log all incoming requests for debugging client compatibility - fmt.Printf("DEBUG: Received request - API Key: %d, Version: %d, Correlation: %d, Size: %d\n", + fmt.Printf("DEBUG: Received request - API Key: %d, Version: %d, Correlation: %d, Size: %d\n", apiKey, apiVersion, correlationID, size) - + // TODO: IMPORTANT - API version validation is missing // Different API versions have different request/response formats // Need to validate apiVersion against supported versions for each API @@ -320,8 +320,9 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { } func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]byte, error) { - // For now, ignore the request body content (topics filter, etc.) - // Build minimal Metadata response + // Parse Metadata request to extract requested topics and auto-create them + // This implements auto.create.topics.enable=true behavior + // Request format: client_id + topics_array (if topics_count > 0) // Response format: correlation_id(4) + throttle_time(4) + brokers + cluster_id + controller_id + topics response := make([]byte, 0, 256) @@ -357,12 +358,122 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by // Controller ID (4 bytes) - -1 (no controller) response = append(response, 0xFF, 0xFF, 0xFF, 0xFF) - // Topics array length (4 bytes) - 0 topics for now - response = append(response, 0, 0, 0, 0) - + // Parse topics from request (for metadata discovery) + requestedTopics := h.parseMetadataTopics(requestBody) + fmt.Printf("DEBUG: Metadata request for topics: %v (empty=all topics)\n", requestedTopics) + + // Build topics array response - return existing topics only + h.topicsMu.RLock() + var topicsToReturn []string + if len(requestedTopics) == 0 { + // If no specific topics requested, return all existing topics + for topicName := range h.topics { + topicsToReturn = append(topicsToReturn, topicName) + } + fmt.Printf("DEBUG: Returning all existing topics: %v\n", topicsToReturn) + } else { + // Return only requested topics that exist + for _, topicName := range requestedTopics { + if _, exists := h.topics[topicName]; exists { + topicsToReturn = append(topicsToReturn, topicName) + } + } + fmt.Printf("DEBUG: Returning requested existing topics: %v\n", topicsToReturn) + } + h.topicsMu.RUnlock() + + // Topics array length (4 bytes) + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) + response = append(response, topicsCountBytes...) + + // Build each topic response + for _, topicName := range topicsToReturn { + // Topic error code (2 bytes) - 0 = no error + response = append(response, 0, 0) + + // Topic name + topicNameBytes := []byte(topicName) + topicNameLen := make([]byte, 2) + binary.BigEndian.PutUint16(topicNameLen, uint16(len(topicNameBytes))) + response = append(response, topicNameLen...) + response = append(response, topicNameBytes...) + + // Partitions array length (4 bytes) - 1 partition + response = append(response, 0, 0, 0, 1) + + // Partition 0: error_code(2) + partition_id(4) + leader_id(4) + replicas + isr + response = append(response, 0, 0) // no error + response = append(response, 0, 0, 0, 0) // partition_id = 0 + response = append(response, 0, 0, 0, 0) // leader_id = 0 (this broker) + response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // replicas = [0] + response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // isr = [0] + } + + fmt.Printf("DEBUG: Metadata response for %d topics: %v\n", len(topicsToReturn), topicsToReturn) return response, nil } +func (h *Handler) parseMetadataTopics(requestBody []byte) []string { + // Parse Metadata request to extract requested topics + // Format: client_id + topics_array + + fmt.Printf("DEBUG: parseMetadataTopics - request body length: %d\n", len(requestBody)) + if len(requestBody) > 0 { + dumpLen := len(requestBody) + if dumpLen > 30 { + dumpLen = 30 + } + fmt.Printf("DEBUG: parseMetadataTopics - hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) + } + + if len(requestBody) < 6 { // at minimum: client_id_size(2) + topics_count(4) + fmt.Printf("DEBUG: parseMetadataTopics - request too short (%d bytes), returning empty\n", len(requestBody)) + return []string{} // Return empty - means "all topics" + } + + // Skip client_id + clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) + offset := 2 + int(clientIDSize) + fmt.Printf("DEBUG: parseMetadataTopics - client ID size: %d, offset after client: %d\n", clientIDSize, offset) + + if len(requestBody) < offset+4 { + fmt.Printf("DEBUG: parseMetadataTopics - not enough bytes for topics count, returning empty\n") + return []string{} + } + + // Parse topics count + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + fmt.Printf("DEBUG: parseMetadataTopics - topics count: %d, offset after count: %d\n", topicsCount, offset) + + if topicsCount == 0 { + fmt.Printf("DEBUG: parseMetadataTopics - topics count is 0, returning empty (means 'all topics')\n") + return []string{} // Return empty - means "all topics" + } + + // Parse each requested topic name + topics := make([]string, 0, topicsCount) + for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { + if len(requestBody) < offset+2 { + break + } + + topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + if len(requestBody) < offset+int(topicNameSize) { + break + } + + topicName := string(requestBody[offset : offset+int(topicNameSize)]) + topics = append(topics, topicName) + offset += int(topicNameSize) + } + + return topics +} + func (h *Handler) handleListOffsets(correlationID uint32, requestBody []byte) ([]byte, error) { // Parse minimal request to understand what's being asked // For this stub, we'll just return stub responses for any requested topic/partition @@ -499,7 +610,7 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( // kafka-go uses v2 which has a different request structure! // The wrong topics count (1274981) shows we're parsing from wrong offset // Need to implement proper v2 request parsing or negotiate API version - + // Parse minimal CreateTopics request // Request format: client_id + timeout(4) + topics_array @@ -510,14 +621,14 @@ func (h *Handler) handleCreateTopics(correlationID uint32, requestBody []byte) ( // Skip client_id clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) offset := 2 + int(clientIDSize) - + fmt.Printf("DEBUG: Client ID size: %d, client ID: %s\n", clientIDSize, string(requestBody[2:2+clientIDSize])) // CreateTopics v2 has different format than v0 // v2 format: client_id + topics_array + timeout(4) + validate_only(1) // (no separate timeout field before topics like in v0) - - if len(requestBody) < offset+4 { + + if len(requestBody) < offset+4 { return nil, fmt.Errorf("CreateTopics request missing topics array") } @@ -777,3 +888,24 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( return response, nil } + +// AddTopicForTesting adds a topic directly to the handler (for testing only) +func (h *Handler) AddTopicForTesting(topicName string, partitions int32) { + h.topicsMu.Lock() + defer h.topicsMu.Unlock() + + if _, exists := h.topics[topicName]; !exists { + h.topics[topicName] = &TopicInfo{ + Name: topicName, + Partitions: partitions, + CreatedAt: time.Now().UnixNano(), + } + + // Initialize ledgers for all partitions + for partitionID := int32(0); partitionID < partitions; partitionID++ { + h.GetOrCreateLedger(topicName, partitionID) + } + + fmt.Printf("DEBUG: Added topic for testing: %s with %d partitions\n", topicName, partitions) + } +} diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index c401a3178..b858dc590 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -65,10 +65,21 @@ func (h *Handler) handleProduce(correlationID uint32, requestBody []byte) ([]byt partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) offset += 4 - // Check if topic exists - h.topicsMu.RLock() + // Check if topic exists, auto-create if it doesn't (simulates auto.create.topics.enable=true) + h.topicsMu.Lock() _, topicExists := h.topics[topicName] - h.topicsMu.RUnlock() + if !topicExists { + fmt.Printf("DEBUG: Auto-creating topic during Produce: %s\n", topicName) + h.topics[topicName] = &TopicInfo{ + Name: topicName, + Partitions: 1, // Default to 1 partition + CreatedAt: time.Now().UnixNano(), + } + // Initialize ledger for partition 0 + h.GetOrCreateLedger(topicName, 0) + topicExists = true + } + h.topicsMu.Unlock() // Response: topic_name_size(2) + topic_name + partitions_array response = append(response, byte(topicNameSize>>8), byte(topicNameSize))