diff --git a/test/kafka/produce_consume_cycle_test.go b/test/kafka/produce_consume_cycle_test.go new file mode 100644 index 000000000..0be37e94b --- /dev/null +++ b/test/kafka/produce_consume_cycle_test.go @@ -0,0 +1,114 @@ +package kafka + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" +) + +func TestKafkaProduceConsumeE2E(t *testing.T) { + // Start gateway server + gatewayServer := gateway.NewServer(gateway.Options{ + Listen: ":0", // random port + }) + + go func() { + if err := gatewayServer.Start(); err != nil { + t.Errorf("Gateway server error: %v", err) + } + }() + defer gatewayServer.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + // Get the actual listening address + host, port := gatewayServer.GetListenerAddr() + brokerAddr := fmt.Sprintf("%s:%d", host, port) + t.Logf("Gateway running on %s", brokerAddr) + + // Get handler and configure it + handler := gatewayServer.GetHandler() + handler.SetBrokerAddress(host, port) + + // Add test topic + topicName := "e2e-test-topic" + handler.AddTopicForTesting(topicName, 1) + + // Test messages + testMessages := []string{ + "Hello Kafka Gateway!", + "This is message 2", + "Final test message", + } + + // === PRODUCE PHASE === + t.Log("=== STARTING PRODUCE PHASE ===") + + writer := &kafka.Writer{ + Addr: kafka.TCP(brokerAddr), + Topic: topicName, + Balancer: &kafka.LeastBytes{}, + } + defer writer.Close() + + // Produce messages + for i, msg := range testMessages { + err := writer.WriteMessages(context.Background(), kafka.Message{ + Key: []byte(fmt.Sprintf("key-%d", i)), + Value: []byte(msg), + }) + if err != nil { + t.Fatalf("Failed to produce message %d: %v", i, err) + } + t.Logf("Produced message %d: %s", i, msg) + } + + t.Log("=== PRODUCE PHASE COMPLETED ===") + + // === CONSUME PHASE === + t.Log("=== STARTING CONSUME PHASE ===") + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerAddr}, + Topic: topicName, + GroupID: "test-consumer-group", + }) + defer reader.Close() + + // Consume messages + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + consumedMessages := make([]string, 0, len(testMessages)) + + for i := 0; i < len(testMessages); i++ { + msg, err := reader.ReadMessage(ctx) + if err != nil { + t.Fatalf("Failed to consume message %d: %v", i, err) + } + + consumedMsg := string(msg.Value) + consumedMessages = append(consumedMessages, consumedMsg) + t.Logf("Consumed message %d: %s (offset: %d)", i, consumedMsg, msg.Offset) + } + + t.Log("=== CONSUME PHASE COMPLETED ===") + + // === VERIFICATION === + if len(consumedMessages) != len(testMessages) { + t.Fatalf("Expected %d messages, got %d", len(testMessages), len(consumedMessages)) + } + + for i, expected := range testMessages { + if consumedMessages[i] != expected { + t.Errorf("Message %d mismatch: expected %q, got %q", i, expected, consumedMessages[i]) + } + } + + t.Log("✅ SUCCESS: Complete produce/consume cycle working!") +} diff --git a/weed/mq/kafka/protocol/find_coordinator.go b/weed/mq/kafka/protocol/find_coordinator.go new file mode 100644 index 000000000..155be2eac --- /dev/null +++ b/weed/mq/kafka/protocol/find_coordinator.go @@ -0,0 +1,81 @@ +package protocol + +import ( + "encoding/binary" + "fmt" +) + +func (h *Handler) handleFindCoordinator(correlationID uint32, requestBody []byte) ([]byte, error) { + // Parse FindCoordinator request + // Request format: client_id + coordinator_key + coordinator_type(1) + + if len(requestBody) < 2 { // client_id_size(2) + return nil, fmt.Errorf("FindCoordinator request too short") + } + + // Skip client_id + clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) + offset := 2 + int(clientIDSize) + + if len(requestBody) < offset+3 { // coordinator_key_size(2) + coordinator_type(1) + return nil, fmt.Errorf("FindCoordinator request missing data") + } + + // Parse coordinator key (group ID for consumer groups) + coordinatorKeySize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + if len(requestBody) < offset+int(coordinatorKeySize)+1 { + return nil, fmt.Errorf("FindCoordinator request missing coordinator key") + } + + coordinatorKey := string(requestBody[offset : offset+int(coordinatorKeySize)]) + offset += int(coordinatorKeySize) + + coordinatorType := requestBody[offset] + _ = coordinatorType // 0 = group coordinator, 1 = transaction coordinator + + fmt.Printf("DEBUG: FindCoordinator request for key '%s' (type: %d)\n", coordinatorKey, coordinatorType) + + // DEBUG: Hex dump the request to understand format + dumpLen := len(requestBody) + if dumpLen > 50 { + dumpLen = 50 + } + fmt.Printf("DEBUG: FindCoordinator request hex dump (first %d bytes): %x\n", dumpLen, requestBody[:dumpLen]) + + response := make([]byte, 0, 64) + + // Correlation ID + correlationIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(correlationIDBytes, correlationID) + response = append(response, correlationIDBytes...) + + // Throttle time (4 bytes, 0 = no throttling) + response = append(response, 0, 0, 0, 0) + + // Error code (2 bytes, 0 = no error) + response = append(response, 0, 0) + + // Error message (nullable string) - null (-1 length) + response = append(response, 0xFF, 0xFF) + + // Coordinator node_id (4 bytes) - use broker 0 (this gateway) + response = append(response, 0, 0, 0, 0) + + // Coordinator host (string) + host := h.brokerHost + hostLen := uint16(len(host)) + response = append(response, byte(hostLen>>8), byte(hostLen)) + response = append(response, []byte(host)...) + + // Coordinator port (4 bytes) + portBytes := make([]byte, 4) + binary.BigEndian.PutUint32(portBytes, uint32(h.brokerPort)) + response = append(response, portBytes...) + + fmt.Printf("DEBUG: FindCoordinator response: coordinator at %s:%d\n", host, h.brokerPort) + fmt.Printf("DEBUG: FindCoordinator response hex dump (%d bytes): %x\n", len(response), response) + + return response, nil +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 804dcde69..e50e17736 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -233,6 +233,8 @@ func (h *Handler) HandleConn(conn net.Conn) error { response, err = h.handleOffsetCommit(correlationID, messageBuf[8:]) // skip header case 9: // OffsetFetch response, err = h.handleOffsetFetch(correlationID, messageBuf[8:]) // skip header + case 10: // FindCoordinator + response, err = h.handleFindCoordinator(correlationID, messageBuf[8:]) // skip header case 12: // Heartbeat response, err = h.handleHeartbeat(correlationID, messageBuf[8:]) // skip header case 13: // LeaveGroup @@ -286,7 +288,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // Number of API keys (compact array format in newer versions, but using basic format for simplicity) - response = append(response, 0, 0, 0, 13) // 13 API keys + response = append(response, 0, 0, 0, 14) // 14 API keys // API Key 18 (ApiVersions): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 18) // API key 18 @@ -346,6 +348,11 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) { response = append(response, 0, 0) // min version 0 response = append(response, 0, 8) // max version 8 + // API Key 10 (FindCoordinator): api_key(2) + min_version(2) + max_version(2) + response = append(response, 0, 10) // API key 10 + response = append(response, 0, 0) // min version 0 + response = append(response, 0, 4) // max version 4 + // API Key 12 (Heartbeat): api_key(2) + min_version(2) + max_version(2) response = append(response, 0, 12) // API key 12 response = append(response, 0, 0) // min version 0 @@ -487,7 +494,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] // Rack (NULLABLE_STRING) - null (-1 length, 2 bytes) response = append(response, 0xFF, 0xFF) - // Cluster ID (NULLABLE_STRING) - null (-1 length, 2 bytes) + // Cluster ID (NULLABLE_STRING) - null (-1 length, 2 bytes) response = append(response, 0xFF, 0xFF) // Controller ID (4 bytes) - -1 (no controller) @@ -1026,6 +1033,7 @@ func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { 2: {0, 5}, // ListOffsets: v0-v5 19: {0, 4}, // CreateTopics: v0-v4 20: {0, 4}, // DeleteTopics: v0-v4 + 10: {0, 4}, // FindCoordinator: v0-v4 11: {0, 7}, // JoinGroup: v0-v7 14: {0, 5}, // SyncGroup: v0-v5 8: {0, 8}, // OffsetCommit: v0-v8 @@ -1094,6 +1102,8 @@ func getAPIName(apiKey uint16) string { return "OffsetCommit" case 9: return "OffsetFetch" + case 10: + return "FindCoordinator" case 11: return "JoinGroup" case 12: