Browse Source

fmt

pull/7231/head
chrislu 2 months ago
parent
commit
cd6a55533a
  1. 12
      weed/mq/kafka/gateway/server.go
  2. 144
      weed/mq/kafka/gateway/server_test.go

12
weed/mq/kafka/gateway/server.go

@ -96,9 +96,9 @@ func (s *Server) Start() error {
return err
}
s.ln = ln
// Update handler with actual broker address for Metadata responses
host, port := s.GetListenerAddr()
host, port := s.GetListenerAddr()
s.handler.SetBrokerAddress(host, port)
glog.V(1).Infof("Kafka gateway advertising broker at %s:%d", host, port)
s.wg.Add(1)
@ -166,17 +166,17 @@ func (s *Server) GetListenerAddr() (string, int) {
if s.ln == nil {
return "localhost", 9092 // fallback
}
addr := s.ln.Addr().String()
// Parse [::]:port or host:port format - use exact match for kafka-go compatibility
if strings.HasPrefix(addr, "[::]:") {
port := strings.TrimPrefix(addr, "[::]:")
port := strings.TrimPrefix(addr, "[::]:")
if p, err := strconv.Atoi(port); err == nil {
// Resolve appropriate address when bound to IPv6 all interfaces
return resolveAdvertisedAddress(), p
}
}
// Handle host:port format
if host, port, err := net.SplitHostPort(addr); err == nil {
if p, err := strconv.Atoi(port); err == nil {
@ -187,6 +187,6 @@ func (s *Server) GetListenerAddr() (string, int) {
return host, p
}
}
return "localhost", 9092 // fallback
}

144
weed/mq/kafka/gateway/server_test.go

@ -1,87 +1,85 @@
package gateway
import (
"net"
"strings"
"testing"
"time"
"net"
"strings"
"testing"
"time"
)
func TestServerStartAndClose(t *testing.T) {
srv := NewServer(Options{Listen: ":0"})
if err := srv.Start(); err != nil {
t.Fatalf("start: %v", err)
}
// ensure listener is open and accepting
// try to dial the actual chosen port
// Find the actual address
var addr string
if srv.ln == nil {
t.Fatalf("listener not set")
}
addr = srv.ln.Addr().String()
c, err := net.DialTimeout("tcp", addr, time.Second)
if err != nil {
t.Fatalf("dial: %v", err)
}
_ = c.Close()
if err := srv.Close(); err != nil {
t.Fatalf("close: %v", err)
}
srv := NewServer(Options{Listen: ":0"})
if err := srv.Start(); err != nil {
t.Fatalf("start: %v", err)
}
// ensure listener is open and accepting
// try to dial the actual chosen port
// Find the actual address
var addr string
if srv.ln == nil {
t.Fatalf("listener not set")
}
addr = srv.ln.Addr().String()
c, err := net.DialTimeout("tcp", addr, time.Second)
if err != nil {
t.Fatalf("dial: %v", err)
}
_ = c.Close()
if err := srv.Close(); err != nil {
t.Fatalf("close: %v", err)
}
}
func TestGetListenerAddr(t *testing.T) {
// Test with localhost binding - should return the actual address
srv := NewServer(Options{Listen: "127.0.0.1:0"})
if err := srv.Start(); err != nil {
t.Fatalf("start: %v", err)
}
defer srv.Close()
host, port := srv.GetListenerAddr()
if host != "127.0.0.1" {
t.Errorf("expected 127.0.0.1, got %s", host)
}
if port <= 0 {
t.Errorf("expected valid port, got %d", port)
}
// Test IPv6 all interfaces binding - should resolve to non-loopback IP
srv6 := NewServer(Options{Listen: "[::]:0"})
if err := srv6.Start(); err != nil {
t.Fatalf("start IPv6: %v", err)
}
defer srv6.Close()
host6, port6 := srv6.GetListenerAddr()
// Should not be localhost when bound to all interfaces
if host6 == "localhost" {
t.Errorf("IPv6 all interfaces should not resolve to localhost, got %s", host6)
}
if port6 <= 0 {
t.Errorf("expected valid port, got %d", port6)
}
t.Logf("IPv6 all interfaces resolved to: %s:%d", host6, port6)
// Test with localhost binding - should return the actual address
srv := NewServer(Options{Listen: "127.0.0.1:0"})
if err := srv.Start(); err != nil {
t.Fatalf("start: %v", err)
}
defer srv.Close()
host, port := srv.GetListenerAddr()
if host != "127.0.0.1" {
t.Errorf("expected 127.0.0.1, got %s", host)
}
if port <= 0 {
t.Errorf("expected valid port, got %d", port)
}
// Test IPv6 all interfaces binding - should resolve to non-loopback IP
srv6 := NewServer(Options{Listen: "[::]:0"})
if err := srv6.Start(); err != nil {
t.Fatalf("start IPv6: %v", err)
}
defer srv6.Close()
host6, port6 := srv6.GetListenerAddr()
// Should not be localhost when bound to all interfaces
if host6 == "localhost" {
t.Errorf("IPv6 all interfaces should not resolve to localhost, got %s", host6)
}
if port6 <= 0 {
t.Errorf("expected valid port, got %d", port6)
}
t.Logf("IPv6 all interfaces resolved to: %s:%d", host6, port6)
}
func TestResolveAdvertisedAddress(t *testing.T) {
addr := resolveAdvertisedAddress()
if addr == "" {
t.Error("resolveAdvertisedAddress returned empty string")
}
// Should be a valid IP address
ip := net.ParseIP(addr)
if ip == nil {
t.Errorf("resolveAdvertisedAddress returned invalid IP: %s", addr)
}
// Should not be IPv6 (we prefer IPv4 for Kafka compatibility)
if strings.Contains(addr, ":") {
t.Errorf("Expected IPv4 address, got IPv6: %s", addr)
}
t.Logf("Resolved advertised address: %s", addr)
}
addr := resolveAdvertisedAddress()
if addr == "" {
t.Error("resolveAdvertisedAddress returned empty string")
}
// Should be a valid IP address
ip := net.ParseIP(addr)
if ip == nil {
t.Errorf("resolveAdvertisedAddress returned invalid IP: %s", addr)
}
// Should not be IPv6 (we prefer IPv4 for Kafka compatibility)
if strings.Contains(addr, ":") {
t.Errorf("Expected IPv4 address, got IPv6: %s", addr)
}
t.Logf("Resolved advertised address: %s", addr)
}
Loading…
Cancel
Save