diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index acebac1cd..f9a88b264 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -11,6 +11,42 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol" ) +// resolveAdvertisedAddress resolves the appropriate address to advertise to Kafka clients +// when the server binds to all interfaces (:: or 0.0.0.0) +func resolveAdvertisedAddress() string { + // Try to find a non-loopback interface + interfaces, err := net.Interfaces() + if err != nil { + glog.V(1).Infof("Failed to get network interfaces, using localhost: %v", err) + return "127.0.0.1" + } + + for _, iface := range interfaces { + // Skip loopback and inactive interfaces + if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 { + continue + } + + addrs, err := iface.Addrs() + if err != nil { + continue + } + + for _, addr := range addrs { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + // Prefer IPv4 addresses for better Kafka client compatibility + if ipv4 := ipNet.IP.To4(); ipv4 != nil { + return ipv4.String() + } + } + } + } + + // Fallback to localhost if no suitable interface found + glog.V(1).Infof("No non-loopback interface found, using localhost") + return "127.0.0.1" +} + type Options struct { Listen string AgentAddress string // Optional: SeaweedMQ Agent address for production mode @@ -136,17 +172,17 @@ func (s *Server) GetListenerAddr() (string, int) { if strings.HasPrefix(addr, "[::]:") { port := strings.TrimPrefix(addr, "[::]:") if p, err := strconv.Atoi(port); err == nil { - // Revert to 127.0.0.1 for broader compatibility - return "127.0.0.1", p + // Resolve appropriate address when bound to IPv6 all interfaces + return resolveAdvertisedAddress(), p } } // Handle host:port format if host, port, err := net.SplitHostPort(addr); err == nil { if p, err := strconv.Atoi(port); err == nil { - // Use 127.0.0.1 instead of localhost for better kafka-go compatibility - if host == "::" || host == "" { - host = "127.0.0.1" + // Resolve appropriate address when bound to all interfaces + if host == "::" || host == "" || host == "0.0.0.0" { + host = resolveAdvertisedAddress() } return host, p } diff --git a/weed/mq/kafka/gateway/server_test.go b/weed/mq/kafka/gateway/server_test.go index 935c2b3a8..d2244df83 100644 --- a/weed/mq/kafka/gateway/server_test.go +++ b/weed/mq/kafka/gateway/server_test.go @@ -2,6 +2,7 @@ package gateway import ( "net" + "strings" "testing" "time" ) @@ -29,4 +30,58 @@ func TestServerStartAndClose(t *testing.T) { } } +func TestGetListenerAddr(t *testing.T) { + // Test with localhost binding - should return the actual address + srv := NewServer(Options{Listen: "127.0.0.1:0"}) + if err := srv.Start(); err != nil { + t.Fatalf("start: %v", err) + } + defer srv.Close() + + host, port := srv.GetListenerAddr() + if host != "127.0.0.1" { + t.Errorf("expected 127.0.0.1, got %s", host) + } + if port <= 0 { + t.Errorf("expected valid port, got %d", port) + } + + // Test IPv6 all interfaces binding - should resolve to non-loopback IP + srv6 := NewServer(Options{Listen: "[::]:0"}) + if err := srv6.Start(); err != nil { + t.Fatalf("start IPv6: %v", err) + } + defer srv6.Close() + + host6, port6 := srv6.GetListenerAddr() + // Should not be localhost when bound to all interfaces + if host6 == "localhost" { + t.Errorf("IPv6 all interfaces should not resolve to localhost, got %s", host6) + } + if port6 <= 0 { + t.Errorf("expected valid port, got %d", port6) + } + t.Logf("IPv6 all interfaces resolved to: %s:%d", host6, port6) +} + +func TestResolveAdvertisedAddress(t *testing.T) { + addr := resolveAdvertisedAddress() + if addr == "" { + t.Error("resolveAdvertisedAddress returned empty string") + } + + // Should be a valid IP address + ip := net.ParseIP(addr) + if ip == nil { + t.Errorf("resolveAdvertisedAddress returned invalid IP: %s", addr) + } + + // Should not be IPv6 (we prefer IPv4 for Kafka compatibility) + if strings.Contains(addr, ":") { + t.Errorf("Expected IPv4 address, got IPv6: %s", addr) + } + + t.Logf("Resolved advertised address: %s", addr) +} +