Browse Source

fix host

pull/7231/head
chrislu 2 months ago
parent
commit
eeb5f62d74
  1. 46
      weed/mq/kafka/gateway/server.go
  2. 55
      weed/mq/kafka/gateway/server_test.go

46
weed/mq/kafka/gateway/server.go

@ -11,6 +11,42 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol"
) )
// resolveAdvertisedAddress resolves the appropriate address to advertise to Kafka clients
// when the server binds to all interfaces (:: or 0.0.0.0)
func resolveAdvertisedAddress() string {
// Try to find a non-loopback interface
interfaces, err := net.Interfaces()
if err != nil {
glog.V(1).Infof("Failed to get network interfaces, using localhost: %v", err)
return "127.0.0.1"
}
for _, iface := range interfaces {
// Skip loopback and inactive interfaces
if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
// Prefer IPv4 addresses for better Kafka client compatibility
if ipv4 := ipNet.IP.To4(); ipv4 != nil {
return ipv4.String()
}
}
}
}
// Fallback to localhost if no suitable interface found
glog.V(1).Infof("No non-loopback interface found, using localhost")
return "127.0.0.1"
}
type Options struct { type Options struct {
Listen string Listen string
AgentAddress string // Optional: SeaweedMQ Agent address for production mode AgentAddress string // Optional: SeaweedMQ Agent address for production mode
@ -136,17 +172,17 @@ func (s *Server) GetListenerAddr() (string, int) {
if strings.HasPrefix(addr, "[::]:") { if strings.HasPrefix(addr, "[::]:") {
port := strings.TrimPrefix(addr, "[::]:") port := strings.TrimPrefix(addr, "[::]:")
if p, err := strconv.Atoi(port); err == nil { if p, err := strconv.Atoi(port); err == nil {
// Revert to 127.0.0.1 for broader compatibility
return "127.0.0.1", p
// Resolve appropriate address when bound to IPv6 all interfaces
return resolveAdvertisedAddress(), p
} }
} }
// Handle host:port format // Handle host:port format
if host, port, err := net.SplitHostPort(addr); err == nil { if host, port, err := net.SplitHostPort(addr); err == nil {
if p, err := strconv.Atoi(port); err == nil { if p, err := strconv.Atoi(port); err == nil {
// Use 127.0.0.1 instead of localhost for better kafka-go compatibility
if host == "::" || host == "" {
host = "127.0.0.1"
// Resolve appropriate address when bound to all interfaces
if host == "::" || host == "" || host == "0.0.0.0" {
host = resolveAdvertisedAddress()
} }
return host, p return host, p
} }

55
weed/mq/kafka/gateway/server_test.go

@ -2,6 +2,7 @@ package gateway
import ( import (
"net" "net"
"strings"
"testing" "testing"
"time" "time"
) )
@ -29,4 +30,58 @@ func TestServerStartAndClose(t *testing.T) {
} }
} }
func TestGetListenerAddr(t *testing.T) {
// Test with localhost binding - should return the actual address
srv := NewServer(Options{Listen: "127.0.0.1:0"})
if err := srv.Start(); err != nil {
t.Fatalf("start: %v", err)
}
defer srv.Close()
host, port := srv.GetListenerAddr()
if host != "127.0.0.1" {
t.Errorf("expected 127.0.0.1, got %s", host)
}
if port <= 0 {
t.Errorf("expected valid port, got %d", port)
}
// Test IPv6 all interfaces binding - should resolve to non-loopback IP
srv6 := NewServer(Options{Listen: "[::]:0"})
if err := srv6.Start(); err != nil {
t.Fatalf("start IPv6: %v", err)
}
defer srv6.Close()
host6, port6 := srv6.GetListenerAddr()
// Should not be localhost when bound to all interfaces
if host6 == "localhost" {
t.Errorf("IPv6 all interfaces should not resolve to localhost, got %s", host6)
}
if port6 <= 0 {
t.Errorf("expected valid port, got %d", port6)
}
t.Logf("IPv6 all interfaces resolved to: %s:%d", host6, port6)
}
func TestResolveAdvertisedAddress(t *testing.T) {
addr := resolveAdvertisedAddress()
if addr == "" {
t.Error("resolveAdvertisedAddress returned empty string")
}
// Should be a valid IP address
ip := net.ParseIP(addr)
if ip == nil {
t.Errorf("resolveAdvertisedAddress returned invalid IP: %s", addr)
}
// Should not be IPv6 (we prefer IPv4 for Kafka compatibility)
if strings.Contains(addr, ":") {
t.Errorf("Expected IPv4 address, got IPv6: %s", addr)
}
t.Logf("Resolved advertised address: %s", addr)
}
Loading…
Cancel
Save