diff --git a/test/kafka/api_sequence_test.go b/test/kafka/api_sequence_test.go index dee9e4493..17eb12d5b 100644 --- a/test/kafka/api_sequence_test.go +++ b/test/kafka/api_sequence_test.go @@ -31,17 +31,18 @@ func TestKafkaGateway_APISequence(t *testing.T) { handler := srv.GetHandler() handler.AddTopicForTesting(topicName, 1) - // Create a writer and try to write a single message + // Create a writer and try to write a single message writer := &kafka.Writer{ Addr: kafka.TCP(brokerAddr), Topic: topicName, WriteTimeout: 15 * time.Second, ReadTimeout: 15 * time.Second, + // Enable ALL kafka-go logging to see internal validation issues Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { - fmt.Printf("KAFKA-GO WRITER LOG: "+msg+"\n", args...) + fmt.Printf("KAFKA-GO LOG: "+msg+"\n", args...) }), ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { - fmt.Printf("KAFKA-GO WRITER ERROR: "+msg+"\n", args...) + fmt.Printf("KAFKA-GO ERROR: "+msg+"\n", args...) }), } defer writer.Close() diff --git a/test/kafka/raw_protocol_test.go b/test/kafka/raw_protocol_test.go new file mode 100644 index 000000000..f4fde795a --- /dev/null +++ b/test/kafka/raw_protocol_test.go @@ -0,0 +1,267 @@ +package kafka + +import ( + "encoding/binary" + "fmt" + "net" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +// TestRawProduceRequest tests our Produce API directly without kafka-go +func TestRawProduceRequest(t *testing.T) { + // Start the gateway server + srv := gateway.NewServer(gateway.Options{ + Listen: ":0", + UseSeaweedMQ: false, + }) + + if err := srv.Start(); err != nil { + t.Fatalf("Failed to start gateway: %v", err) + } + defer srv.Close() + + brokerAddr := srv.Addr() + t.Logf("Gateway running on %s", brokerAddr) + + // Pre-create topic + topicName := "raw-test-topic" + handler := srv.GetHandler() + handler.AddTopicForTesting(topicName, 1) + + // Make a raw TCP connection + conn, err := net.Dial("tcp", brokerAddr) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + // Test 1: ApiVersions request (should work) + t.Log("=== Testing ApiVersions API ===") + if err := sendApiVersionsRequest(conn); err != nil { + t.Fatalf("ApiVersions failed: %v", err) + } + t.Log("✅ ApiVersions API working") + + // Test 2: Metadata request (should work) + t.Log("=== Testing Metadata API ===") + if err := sendMetadataRequest(conn); err != nil { + t.Fatalf("Metadata failed: %v", err) + } + t.Log("✅ Metadata API working") + + // Test 3: Raw Produce request (this is what we want to test) + t.Log("=== Testing Produce API (RAW) ===") + if err := sendProduceRequest(conn, topicName); err != nil { + t.Fatalf("Produce failed: %v", err) + } + t.Log("✅ Produce API working!") +} + +func sendApiVersionsRequest(conn net.Conn) error { + // Build ApiVersions request + correlationID := uint32(1) + + msgBody := make([]byte, 0, 32) + msgBody = append(msgBody, 0, 18) // API key 18 (ApiVersions) + msgBody = append(msgBody, 0, 0) // API version 0 + + // Correlation ID + correlationBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationBytes, correlationID) + msgBody = append(msgBody, correlationBytes...) + + // Client ID (empty) + msgBody = append(msgBody, 0, 0) // empty client ID + + // Send request + sizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) + request := append(sizeBytes, msgBody...) + + if _, err := conn.Write(request); err != nil { + return fmt.Errorf("write request: %w", err) + } + + // Read response size + var responseSizeBytes [4]byte + if _, err := conn.Read(responseSizeBytes[:]); err != nil { + return fmt.Errorf("read response size: %w", err) + } + + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) + + // Read response body + responseBody := make([]byte, responseSize) + if _, err := conn.Read(responseBody); err != nil { + return fmt.Errorf("read response body: %w", err) + } + + fmt.Printf("ApiVersions response: %d bytes\n", responseSize) + return nil +} + +func sendMetadataRequest(conn net.Conn) error { + // Build Metadata request + correlationID := uint32(2) + + msgBody := make([]byte, 0, 32) + msgBody = append(msgBody, 0, 3) // API key 3 (Metadata) + msgBody = append(msgBody, 0, 1) // API version 1 + + // Correlation ID + correlationBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationBytes, correlationID) + msgBody = append(msgBody, correlationBytes...) + + // Client ID (empty) + msgBody = append(msgBody, 0, 0) // empty client ID + + // Topics array (empty = all topics) + msgBody = append(msgBody, 0xFF, 0xFF, 0xFF, 0xFF) // -1 = all topics + + // Send request + sizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) + request := append(sizeBytes, msgBody...) + + if _, err := conn.Write(request); err != nil { + return fmt.Errorf("write request: %w", err) + } + + // Read response size + var responseSizeBytes [4]byte + if _, err := conn.Read(responseSizeBytes[:]); err != nil { + return fmt.Errorf("read response size: %w", err) + } + + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) + + // Read response body + responseBody := make([]byte, responseSize) + if _, err := conn.Read(responseBody); err != nil { + return fmt.Errorf("read response body: %w", err) + } + + fmt.Printf("Metadata response: %d bytes\n", responseSize) + return nil +} + +func sendProduceRequest(conn net.Conn, topicName string) error { + // Build simple Produce request + correlationID := uint32(3) + + msgBody := make([]byte, 0, 128) + msgBody = append(msgBody, 0, 0) // API key 0 (Produce) + msgBody = append(msgBody, 0, 1) // API version 1 + + // Correlation ID + correlationBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationBytes, correlationID) + msgBody = append(msgBody, correlationBytes...) + + // Client ID (empty) + msgBody = append(msgBody, 0, 0) // empty client ID + + // Acks (-1 = all replicas) + msgBody = append(msgBody, 0xFF, 0xFF) // -1 + + // Timeout (5000ms) + msgBody = append(msgBody, 0, 0, 0x13, 0x88) // 5000ms + + // Topics count (1) + msgBody = append(msgBody, 0, 0, 0, 1) + + // Topic name + topicNameBytes := []byte(topicName) + msgBody = append(msgBody, byte(len(topicNameBytes)>>8), byte(len(topicNameBytes))) + msgBody = append(msgBody, topicNameBytes...) + + // Partitions count (1) + msgBody = append(msgBody, 0, 0, 0, 1) + + // Partition 0 + msgBody = append(msgBody, 0, 0, 0, 0) // partition ID = 0 + + // Record set (simple test record) + testRecord := buildSimpleRecordSet("test-key", "test-value") + recordSetSize := make([]byte, 4) + binary.BigEndian.PutUint32(recordSetSize, uint32(len(testRecord))) + msgBody = append(msgBody, recordSetSize...) + msgBody = append(msgBody, testRecord...) + + // Send request + sizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(sizeBytes, uint32(len(msgBody))) + request := append(sizeBytes, msgBody...) + + fmt.Printf("Sending Produce request: %d bytes\n", len(request)) + + if _, err := conn.Write(request); err != nil { + return fmt.Errorf("write request: %w", err) + } + + // Read response size + var responseSizeBytes [4]byte + if _, err := conn.Read(responseSizeBytes[:]); err != nil { + return fmt.Errorf("read response size: %w", err) + } + + responseSize := binary.BigEndian.Uint32(responseSizeBytes[:]) + + // Read response body + responseBody := make([]byte, responseSize) + if _, err := conn.Read(responseBody); err != nil { + return fmt.Errorf("read response body: %w", err) + } + + fmt.Printf("Produce response: %d bytes\n", responseSize) + + // Check if the response indicates success (simplified check) + if responseSize > 8 { + // Extract correlation ID and basic error code + correlationResp := binary.BigEndian.Uint32(responseBody[0:4]) + if correlationResp == correlationID { + fmt.Printf("✅ Produce request correlation ID matches: %d\n", correlationResp) + } + + // Look for error codes in the response + if len(responseBody) > 20 { + // Skip to where partition error code should be (rough estimate) + errorCode := binary.BigEndian.Uint16(responseBody[16:18]) + if errorCode == 0 { + fmt.Printf("✅ Produce request succeeded (error code: 0)\n") + } else { + fmt.Printf("⚠️ Produce request error code: %d\n", errorCode) + } + } + } + + return nil +} + +func buildSimpleRecordSet(key, value string) []byte { + // Build a very simple Kafka record batch (v0 format for simplicity) + record := make([]byte, 0, 64) + + // Record batch header (simplified v0 format) + record = append(record, 0, 0, 0, 0, 0, 0, 0, 0) // base offset + record = append(record, 0, 0, 0, 30) // batch length (estimated) + record = append(record, 0, 0, 0, 0) // partition leader epoch + record = append(record, 0) // magic byte (v0) + record = append(record, 0, 0, 0, 0) // CRC32 (simplified) + record = append(record, 0, 0) // attributes + record = append(record, 0, 0, 0, 1) // record count = 1 + + // Simple record: key_length + key + value_length + value + keyBytes := []byte(key) + valueBytes := []byte(value) + + record = append(record, byte(len(keyBytes)>>8), byte(len(keyBytes))) + record = append(record, keyBytes...) + record = append(record, byte(len(valueBytes)>>8), byte(len(valueBytes))) + record = append(record, valueBytes...) + + return record +} diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index d19f182be..acebac1cd 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -132,19 +132,21 @@ func (s *Server) GetListenerAddr() (string, int) { } addr := s.ln.Addr().String() - // Parse [::]:port or host:port format + // Parse [::]:port or host:port format - use exact match for kafka-go compatibility if strings.HasPrefix(addr, "[::]:") { port := strings.TrimPrefix(addr, "[::]:") if p, err := strconv.Atoi(port); err == nil { - return "localhost", p + // Revert to 127.0.0.1 for broader compatibility + return "127.0.0.1", p } } // Handle host:port format if host, port, err := net.SplitHostPort(addr); err == nil { if p, err := strconv.Atoi(port); err == nil { + // Use 127.0.0.1 instead of localhost for better kafka-go compatibility if host == "::" || host == "" { - host = "localhost" + host = "127.0.0.1" } return host, p } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index dfafb8dfe..6578743db 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -42,7 +42,7 @@ type Handler struct { // Consumer group coordination groupCoordinator *consumer.GroupCoordinator - + // Dynamic broker address for Metadata responses brokerHost string brokerPort int @@ -128,7 +128,7 @@ func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { return h.ledgers[key] } -// SetBrokerAddress updates the broker address used in Metadata responses +// SetBrokerAddress updates the broker address used in Metadata responses func (h *Handler) SetBrokerAddress(host string, port int) { h.brokerHost = host h.brokerPort = port @@ -136,12 +136,13 @@ func (h *Handler) SetBrokerAddress(host string, port int) { // HandleConn processes a single client connection func (h *Handler) HandleConn(conn net.Conn) error { + connectionID := fmt.Sprintf("%s->%s", conn.RemoteAddr(), conn.LocalAddr()) defer func() { - fmt.Printf("DEBUG: Closing connection from %s\n", conn.RemoteAddr()) + fmt.Printf("DEBUG: [%s] Connection closing\n", connectionID) conn.Close() }() - - fmt.Printf("DEBUG: New connection from %s\n", conn.RemoteAddr()) + + fmt.Printf("DEBUG: [%s] New connection established\n", connectionID) r := bufio.NewReader(conn) w := bufio.NewWriter(conn) @@ -373,9 +374,9 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by // Use dynamic broker address set by the server host := h.brokerHost port := h.brokerPort - + fmt.Printf("DEBUG: Advertising broker at %s:%d\n", host, port) - + response = append(response, 0, byte(len(host))) response = append(response, []byte(host)...)