diff --git a/weed/command/mq_kafka_gateway.go b/weed/command/mq_kafka_gateway.go index 041690b62..b878d4f26 100644 --- a/weed/command/mq_kafka_gateway.go +++ b/weed/command/mq_kafka_gateway.go @@ -10,40 +10,44 @@ var ( ) type mqKafkaGatewayOpts struct { - listen *string - agentAddress *string + listen *string + masters *string + filerGroup *string } func init() { cmdMqKafkaGateway.Run = runMqKafkaGateway mqKafkaGatewayOptions.listen = cmdMqKafkaGateway.Flag.String("listen", ":9092", "Kafka gateway listen address") - mqKafkaGatewayOptions.agentAddress = cmdMqKafkaGateway.Flag.String("agent", "localhost:17777", "SeaweedMQ Agent address (required)") + mqKafkaGatewayOptions.masters = cmdMqKafkaGateway.Flag.String("masters", "localhost:9333", "SeaweedFS master servers") + mqKafkaGatewayOptions.filerGroup = cmdMqKafkaGateway.Flag.String("filerGroup", "", "filer group name") } var cmdMqKafkaGateway = &Command{ - UsageLine: "mq.kafka.gateway [-listen=:9092] [-agent=localhost:17777]", + UsageLine: "mq.kafka.gateway [-listen=:9092] [-masters=localhost:9333] [-filerGroup=]", Short: "start a Kafka wire-protocol gateway for SeaweedMQ", Long: `Start a Kafka wire-protocol gateway translating Kafka client requests to SeaweedMQ. -Requires a running SeaweedMQ Agent. Use -agent=
to specify the agent location. +Connects to SeaweedFS master servers to discover available brokers. Use -masters= +to specify comma-separated master locations. This is experimental and currently supports a minimal subset for development. `, } func runMqKafkaGateway(cmd *Command, args []string) bool { - // Validate options - agent address is now required - if *mqKafkaGatewayOptions.agentAddress == "" { - glog.Fatalf("SeaweedMQ Agent address is required (-agent)") + // Validate options - masters address is now required + if *mqKafkaGatewayOptions.masters == "" { + glog.Fatalf("SeaweedFS masters address is required (-masters)") return false } srv := gateway.NewServer(gateway.Options{ - Listen: *mqKafkaGatewayOptions.listen, - AgentAddress: *mqKafkaGatewayOptions.agentAddress, + Listen: *mqKafkaGatewayOptions.listen, + Masters: *mqKafkaGatewayOptions.masters, + FilerGroup: *mqKafkaGatewayOptions.filerGroup, }) - glog.V(0).Infof("Starting MQ Kafka Gateway on %s with SeaweedMQ backend (%s)", *mqKafkaGatewayOptions.listen, *mqKafkaGatewayOptions.agentAddress) + glog.V(0).Infof("Starting MQ Kafka Gateway on %s with SeaweedMQ brokers from masters (%s)", *mqKafkaGatewayOptions.listen, *mqKafkaGatewayOptions.masters) if err := srv.Start(); err != nil { glog.Fatalf("mq kafka gateway start: %v", err) return false diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index b8b83c40d..3ea73f365 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -48,8 +48,9 @@ func resolveAdvertisedAddress() string { } type Options struct { - Listen string - AgentAddress string // SeaweedMQ Agent address (required) + Listen string + Masters string // SeaweedFS master servers (required) + FilerGroup string // filer group name (optional) } type Server struct { @@ -64,12 +65,12 @@ type Server struct { func NewServer(opts Options) *Server { ctx, cancel := context.WithCancel(context.Background()) - // Always use SeaweedMQ handler - handler, err := protocol.NewSeaweedMQHandler(opts.AgentAddress) + // Create broker-based SeaweedMQ handler + handler, err := protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup) if err != nil { - glog.Fatalf("Failed to create SeaweedMQ handler: %v", err) + glog.Fatalf("Failed to create SeaweedMQ broker handler: %v", err) } - glog.V(1).Infof("Created Kafka gateway with SeaweedMQ backend at %s", opts.AgentAddress) + glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters) return &Server{ opts: opts, diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go index 69895433e..bff598dd8 100644 --- a/weed/mq/kafka/integration/seaweedmq_handler.go +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -1,12 +1,19 @@ package integration import ( + "context" "encoding/binary" "fmt" + "strings" "sync" "time" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/offset" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) @@ -540,3 +547,89 @@ func (h *SeaweedMQHandler) convertSingleSeaweedRecord(seaweedRecord *SeaweedReco return record } + +// NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration +func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*SeaweedMQHandler, error) { + // Parse master addresses + masterAddresses := strings.Split(masters, ",") + if len(masterAddresses) == 0 { + return nil, fmt.Errorf("no master addresses provided") + } + + // Discover brokers from masters + brokerAddresses, err := discoverBrokers(masterAddresses, filerGroup) + if err != nil { + return nil, fmt.Errorf("failed to discover brokers: %v", err) + } + + if len(brokerAddresses) == 0 { + return nil, fmt.Errorf("no brokers discovered from masters") + } + + // For now, use the first broker (can be enhanced later for load balancing) + brokerAddress := brokerAddresses[0] + + // Create broker client (reuse AgentClient structure but connect to broker) + brokerClient, err := NewBrokerClient(brokerAddress) + if err != nil { + return nil, fmt.Errorf("failed to create broker client: %v", err) + } + + // Test the connection + if err := brokerClient.HealthCheck(); err != nil { + brokerClient.Close() + return nil, fmt.Errorf("broker health check failed: %v", err) + } + + return &SeaweedMQHandler{ + agentClient: brokerClient, + topics: make(map[string]*KafkaTopicInfo), + ledgers: make(map[TopicPartitionKey]*offset.Ledger), + }, nil +} + +// discoverBrokers queries masters for available brokers +func discoverBrokers(masterAddresses []string, filerGroup string) ([]string, error) { + var brokers []string + + // Try each master until we get a response + for _, masterAddr := range masterAddresses { + conn, err := grpc.Dial(masterAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + continue // Try next master + } + defer conn.Close() + + client := master_pb.NewSeaweedClient(conn) + + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.BrokerType, + FilerGroup: filerGroup, + Limit: 100, + }) + if err != nil { + continue // Try next master + } + + // Extract broker addresses from response + for _, node := range resp.ClusterNodes { + if node.Address != "" { + brokers = append(brokers, node.Address) + } + } + + if len(brokers) > 0 { + break // Found brokers, no need to try other masters + } + } + + return brokers, nil +} + +// NewBrokerClient creates a client that connects to a SeaweedMQ broker +// This reuses the AgentClient structure but connects to a broker instead +func NewBrokerClient(brokerAddress string) (*AgentClient, error) { + // For now, reuse the AgentClient implementation + // In the future, this could be enhanced to use broker-specific protocols + return NewAgentClient(brokerAddress) +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index e0903138d..50dc6b458 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -90,6 +90,22 @@ func NewSeaweedMQHandler(agentAddress string) (*Handler, error) { }, nil } +// NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration +func NewSeaweedMQBrokerHandler(masters string, filerGroup string) (*Handler, error) { + smqHandler, err := integration.NewSeaweedMQBrokerHandler(masters, filerGroup) + if err != nil { + return nil, err + } + + return &Handler{ + topics: make(map[string]*TopicInfo), // Keep for compatibility + ledgers: make(map[TopicPartitionKey]*offset.Ledger), // Keep for compatibility + seaweedMQHandler: smqHandler, + useSeaweedMQ: true, + groupCoordinator: consumer.NewGroupCoordinator(), + }, nil +} + // Close shuts down the handler and all connections func (h *Handler) Close() error { // Close group coordinator