diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index f9a88b264..f3c1a6860 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -96,9 +96,9 @@ func (s *Server) Start() error { return err } s.ln = ln - + // Update handler with actual broker address for Metadata responses - host, port := s.GetListenerAddr() + host, port := s.GetListenerAddr() s.handler.SetBrokerAddress(host, port) glog.V(1).Infof("Kafka gateway advertising broker at %s:%d", host, port) s.wg.Add(1) @@ -166,17 +166,17 @@ func (s *Server) GetListenerAddr() (string, int) { if s.ln == nil { return "localhost", 9092 // fallback } - + addr := s.ln.Addr().String() // Parse [::]:port or host:port format - use exact match for kafka-go compatibility if strings.HasPrefix(addr, "[::]:") { - port := strings.TrimPrefix(addr, "[::]:") + port := strings.TrimPrefix(addr, "[::]:") if p, err := strconv.Atoi(port); err == nil { // Resolve appropriate address when bound to IPv6 all interfaces return resolveAdvertisedAddress(), p } } - + // Handle host:port format if host, port, err := net.SplitHostPort(addr); err == nil { if p, err := strconv.Atoi(port); err == nil { @@ -187,6 +187,6 @@ func (s *Server) GetListenerAddr() (string, int) { return host, p } } - + return "localhost", 9092 // fallback } diff --git a/weed/mq/kafka/gateway/server_test.go b/weed/mq/kafka/gateway/server_test.go index d2244df83..53df8b2f4 100644 --- a/weed/mq/kafka/gateway/server_test.go +++ b/weed/mq/kafka/gateway/server_test.go @@ -1,87 +1,85 @@ package gateway import ( - "net" - "strings" - "testing" - "time" + "net" + "strings" + "testing" + "time" ) func TestServerStartAndClose(t *testing.T) { - srv := NewServer(Options{Listen: ":0"}) - if err := srv.Start(); err != nil { - t.Fatalf("start: %v", err) - } - // ensure listener is open and accepting - // try to dial the actual chosen port - // Find the actual address - var addr string - if srv.ln == nil { - t.Fatalf("listener not set") - } - addr = srv.ln.Addr().String() - c, err := net.DialTimeout("tcp", addr, time.Second) - if err != nil { - t.Fatalf("dial: %v", err) - } - _ = c.Close() - if err := srv.Close(); err != nil { - t.Fatalf("close: %v", err) - } + srv := NewServer(Options{Listen: ":0"}) + if err := srv.Start(); err != nil { + t.Fatalf("start: %v", err) + } + // ensure listener is open and accepting + // try to dial the actual chosen port + // Find the actual address + var addr string + if srv.ln == nil { + t.Fatalf("listener not set") + } + addr = srv.ln.Addr().String() + c, err := net.DialTimeout("tcp", addr, time.Second) + if err != nil { + t.Fatalf("dial: %v", err) + } + _ = c.Close() + if err := srv.Close(); err != nil { + t.Fatalf("close: %v", err) + } } func TestGetListenerAddr(t *testing.T) { - // Test with localhost binding - should return the actual address - srv := NewServer(Options{Listen: "127.0.0.1:0"}) - if err := srv.Start(); err != nil { - t.Fatalf("start: %v", err) - } - defer srv.Close() - - host, port := srv.GetListenerAddr() - if host != "127.0.0.1" { - t.Errorf("expected 127.0.0.1, got %s", host) - } - if port <= 0 { - t.Errorf("expected valid port, got %d", port) - } - - // Test IPv6 all interfaces binding - should resolve to non-loopback IP - srv6 := NewServer(Options{Listen: "[::]:0"}) - if err := srv6.Start(); err != nil { - t.Fatalf("start IPv6: %v", err) - } - defer srv6.Close() - - host6, port6 := srv6.GetListenerAddr() - // Should not be localhost when bound to all interfaces - if host6 == "localhost" { - t.Errorf("IPv6 all interfaces should not resolve to localhost, got %s", host6) - } - if port6 <= 0 { - t.Errorf("expected valid port, got %d", port6) - } - t.Logf("IPv6 all interfaces resolved to: %s:%d", host6, port6) + // Test with localhost binding - should return the actual address + srv := NewServer(Options{Listen: "127.0.0.1:0"}) + if err := srv.Start(); err != nil { + t.Fatalf("start: %v", err) + } + defer srv.Close() + + host, port := srv.GetListenerAddr() + if host != "127.0.0.1" { + t.Errorf("expected 127.0.0.1, got %s", host) + } + if port <= 0 { + t.Errorf("expected valid port, got %d", port) + } + + // Test IPv6 all interfaces binding - should resolve to non-loopback IP + srv6 := NewServer(Options{Listen: "[::]:0"}) + if err := srv6.Start(); err != nil { + t.Fatalf("start IPv6: %v", err) + } + defer srv6.Close() + + host6, port6 := srv6.GetListenerAddr() + // Should not be localhost when bound to all interfaces + if host6 == "localhost" { + t.Errorf("IPv6 all interfaces should not resolve to localhost, got %s", host6) + } + if port6 <= 0 { + t.Errorf("expected valid port, got %d", port6) + } + t.Logf("IPv6 all interfaces resolved to: %s:%d", host6, port6) } func TestResolveAdvertisedAddress(t *testing.T) { - addr := resolveAdvertisedAddress() - if addr == "" { - t.Error("resolveAdvertisedAddress returned empty string") - } - - // Should be a valid IP address - ip := net.ParseIP(addr) - if ip == nil { - t.Errorf("resolveAdvertisedAddress returned invalid IP: %s", addr) - } - - // Should not be IPv6 (we prefer IPv4 for Kafka compatibility) - if strings.Contains(addr, ":") { - t.Errorf("Expected IPv4 address, got IPv6: %s", addr) - } - - t.Logf("Resolved advertised address: %s", addr) -} + addr := resolveAdvertisedAddress() + if addr == "" { + t.Error("resolveAdvertisedAddress returned empty string") + } + + // Should be a valid IP address + ip := net.ParseIP(addr) + if ip == nil { + t.Errorf("resolveAdvertisedAddress returned invalid IP: %s", addr) + } + // Should not be IPv6 (we prefer IPv4 for Kafka compatibility) + if strings.Contains(addr, ":") { + t.Errorf("Expected IPv4 address, got IPv6: %s", addr) + } + t.Logf("Resolved advertised address: %s", addr) +}