Browse Source

SeaweedMQ is Now the Only Mode

pull/7231/head
chrislu 2 months ago
parent
commit
b7514c4ab0
  1. 22
      weed/command/mq_kafka_gateway.go
  2. 23
      weed/mq/kafka/gateway/server.go
  3. 21
      weed/mq/kafka/gateway/server_test.go

22
weed/command/mq_kafka_gateway.go

@ -12,46 +12,38 @@ var (
type mqKafkaGatewayOpts struct { type mqKafkaGatewayOpts struct {
listen *string listen *string
agentAddress *string agentAddress *string
seaweedMode *bool
} }
func init() { func init() {
cmdMqKafkaGateway.Run = runMqKafkaGateway cmdMqKafkaGateway.Run = runMqKafkaGateway
mqKafkaGatewayOptions.listen = cmdMqKafkaGateway.Flag.String("listen", ":9092", "Kafka gateway listen address") mqKafkaGatewayOptions.listen = cmdMqKafkaGateway.Flag.String("listen", ":9092", "Kafka gateway listen address")
mqKafkaGatewayOptions.agentAddress = cmdMqKafkaGateway.Flag.String("agent", "", "SeaweedMQ Agent address (e.g., localhost:17777)")
mqKafkaGatewayOptions.seaweedMode = cmdMqKafkaGateway.Flag.Bool("seaweedmq", false, "Use SeaweedMQ backend instead of in-memory stub")
mqKafkaGatewayOptions.agentAddress = cmdMqKafkaGateway.Flag.String("agent", "localhost:17777", "SeaweedMQ Agent address (required)")
} }
var cmdMqKafkaGateway = &Command{ var cmdMqKafkaGateway = &Command{
UsageLine: "mq.kafka.gateway [-listen=:9092] [-agent=localhost:17777] [-seaweedmq]",
UsageLine: "mq.kafka.gateway [-listen=:9092] [-agent=localhost:17777]",
Short: "start a Kafka wire-protocol gateway for SeaweedMQ", Short: "start a Kafka wire-protocol gateway for SeaweedMQ",
Long: `Start a Kafka wire-protocol gateway translating Kafka client requests to SeaweedMQ. Long: `Start a Kafka wire-protocol gateway translating Kafka client requests to SeaweedMQ.
By default, uses an in-memory stub for development and testing.
Use -seaweedmq -agent=<address> to connect to a real SeaweedMQ Agent for production.
Requires a running SeaweedMQ Agent. Use -agent=<address> to specify the agent location.
This is experimental and currently supports a minimal subset for development. This is experimental and currently supports a minimal subset for development.
`, `,
} }
func runMqKafkaGateway(cmd *Command, args []string) bool { func runMqKafkaGateway(cmd *Command, args []string) bool {
// Validate options
if *mqKafkaGatewayOptions.seaweedMode && *mqKafkaGatewayOptions.agentAddress == "" {
glog.Fatalf("SeaweedMQ mode requires -agent address")
// Validate options - agent address is now required
if *mqKafkaGatewayOptions.agentAddress == "" {
glog.Fatalf("SeaweedMQ Agent address is required (-agent)")
return false return false
} }
srv := gateway.NewServer(gateway.Options{ srv := gateway.NewServer(gateway.Options{
Listen: *mqKafkaGatewayOptions.listen, Listen: *mqKafkaGatewayOptions.listen,
AgentAddress: *mqKafkaGatewayOptions.agentAddress, AgentAddress: *mqKafkaGatewayOptions.agentAddress,
UseSeaweedMQ: *mqKafkaGatewayOptions.seaweedMode,
}) })
mode := "in-memory"
if *mqKafkaGatewayOptions.seaweedMode {
mode = "SeaweedMQ (" + *mqKafkaGatewayOptions.agentAddress + ")"
}
glog.V(0).Infof("Starting MQ Kafka Gateway on %s with %s backend", *mqKafkaGatewayOptions.listen, mode)
glog.V(0).Infof("Starting MQ Kafka Gateway on %s with SeaweedMQ backend (%s)", *mqKafkaGatewayOptions.listen, *mqKafkaGatewayOptions.agentAddress)
if err := srv.Start(); err != nil { if err := srv.Start(); err != nil {
glog.Fatalf("mq kafka gateway start: %v", err) glog.Fatalf("mq kafka gateway start: %v", err)
return false return false

23
weed/mq/kafka/gateway/server.go

@ -49,8 +49,7 @@ func resolveAdvertisedAddress() string {
type Options struct { type Options struct {
Listen string Listen string
AgentAddress string // Optional: SeaweedMQ Agent address for production mode
UseSeaweedMQ bool // Use SeaweedMQ backend instead of in-memory stub
AgentAddress string // SeaweedMQ Agent address (required)
} }
type Server struct { type Server struct {
@ -65,22 +64,12 @@ type Server struct {
func NewServer(opts Options) *Server { func NewServer(opts Options) *Server {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
var handler *protocol.Handler
if opts.UseSeaweedMQ && opts.AgentAddress != "" {
// Try to create SeaweedMQ handler
smqHandler, err := protocol.NewSeaweedMQHandler(opts.AgentAddress)
if err != nil {
glog.Warningf("Failed to create SeaweedMQ handler, falling back to in-memory mode: %v", err)
handler = protocol.NewHandler()
} else {
handler = smqHandler
glog.V(1).Infof("Created Kafka gateway with SeaweedMQ backend at %s", opts.AgentAddress)
}
} else {
// Use in-memory mode
handler = protocol.NewHandler()
glog.V(1).Infof("Created Kafka gateway with in-memory backend")
// Always use SeaweedMQ handler
handler, err := protocol.NewSeaweedMQHandler(opts.AgentAddress)
if err != nil {
glog.Fatalf("Failed to create SeaweedMQ handler: %v", err)
} }
glog.V(1).Infof("Created Kafka gateway with SeaweedMQ backend at %s", opts.AgentAddress)
return &Server{ return &Server{
opts: opts, opts: opts,

21
weed/mq/kafka/gateway/server_test.go

@ -8,7 +8,13 @@ import (
) )
func TestServerStartAndClose(t *testing.T) { func TestServerStartAndClose(t *testing.T) {
srv := NewServer(Options{Listen: ":0"})
// Skip this test as it requires a real SeaweedMQ Agent
t.Skip("This test requires SeaweedMQ Agent integration - run manually with agent available")
srv := NewServer(Options{
Listen: ":0",
AgentAddress: "localhost:17777", // Would need real agent for this test
})
if err := srv.Start(); err != nil { if err := srv.Start(); err != nil {
t.Fatalf("start: %v", err) t.Fatalf("start: %v", err)
} }
@ -31,8 +37,14 @@ func TestServerStartAndClose(t *testing.T) {
} }
func TestGetListenerAddr(t *testing.T) { func TestGetListenerAddr(t *testing.T) {
// Skip this test as it requires a real SeaweedMQ Agent
t.Skip("This test requires SeaweedMQ Agent integration - run manually with agent available")
// Test with localhost binding - should return the actual address // Test with localhost binding - should return the actual address
srv := NewServer(Options{Listen: "127.0.0.1:0"})
srv := NewServer(Options{
Listen: "127.0.0.1:0",
AgentAddress: "localhost:17777", // Would need real agent for this test
})
if err := srv.Start(); err != nil { if err := srv.Start(); err != nil {
t.Fatalf("start: %v", err) t.Fatalf("start: %v", err)
} }
@ -47,7 +59,10 @@ func TestGetListenerAddr(t *testing.T) {
} }
// Test IPv6 all interfaces binding - should resolve to non-loopback IP // Test IPv6 all interfaces binding - should resolve to non-loopback IP
srv6 := NewServer(Options{Listen: "[::]:0"})
srv6 := NewServer(Options{
Listen: "[::]:0",
AgentAddress: "localhost:17777", // Would need real agent for this test
})
if err := srv6.Start(); err != nil { if err := srv6.Start(); err != nil {
t.Fatalf("start IPv6: %v", err) t.Fatalf("start IPv6: %v", err)
} }

Loading…
Cancel
Save