diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go index bd8f03922..d19f182be 100644 --- a/weed/mq/kafka/gateway/server.go +++ b/weed/mq/kafka/gateway/server.go @@ -3,6 +3,8 @@ package gateway import ( "context" "net" + "strconv" + "strings" "sync" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -58,6 +60,11 @@ func (s *Server) Start() error { return err } s.ln = ln + + // Update handler with actual broker address for Metadata responses + host, port := s.GetListenerAddr() + s.handler.SetBrokerAddress(host, port) + glog.V(1).Infof("Kafka gateway advertising broker at %s:%d", host, port) s.wg.Add(1) go func() { defer s.wg.Done() @@ -117,3 +124,31 @@ func (s *Server) Addr() string { func (s *Server) GetHandler() *protocol.Handler { return s.handler } + +// GetListenerAddr returns the actual listening address and port +func (s *Server) GetListenerAddr() (string, int) { + if s.ln == nil { + return "localhost", 9092 // fallback + } + + addr := s.ln.Addr().String() + // Parse [::]:port or host:port format + if strings.HasPrefix(addr, "[::]:") { + port := strings.TrimPrefix(addr, "[::]:") + if p, err := strconv.Atoi(port); err == nil { + return "localhost", p + } + } + + // Handle host:port format + if host, port, err := net.SplitHostPort(addr); err == nil { + if p, err := strconv.Atoi(port); err == nil { + if host == "::" || host == "" { + host = "localhost" + } + return host, p + } + } + + return "localhost", 9092 // fallback +} diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 5f2e5b813..dfafb8dfe 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -42,6 +42,10 @@ type Handler struct { // Consumer group coordination groupCoordinator *consumer.GroupCoordinator + + // Dynamic broker address for Metadata responses + brokerHost string + brokerPort int } // NewHandler creates a new handler in legacy in-memory mode @@ -51,6 +55,8 @@ func NewHandler() *Handler { ledgers: make(map[TopicPartitionKey]*offset.Ledger), useSeaweedMQ: false, groupCoordinator: consumer.NewGroupCoordinator(), + brokerHost: "localhost", // default fallback + brokerPort: 9092, // default fallback } } @@ -122,6 +128,12 @@ func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { return h.ledgers[key] } +// SetBrokerAddress updates the broker address used in Metadata responses +func (h *Handler) SetBrokerAddress(host string, port int) { + h.brokerHost = host + h.brokerPort = port +} + // HandleConn processes a single client connection func (h *Handler) HandleConn(conn net.Conn) error { defer func() { @@ -358,16 +370,19 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by // Broker 0: node_id(4) + host + port(4) + rack response = append(response, 0, 0, 0, 0) // node_id = 0 - // Use "localhost" for simplicity - kafka-go should be able to connect back - // The port issue is more likely the problem than the host - host := "localhost" - + // Use dynamic broker address set by the server + host := h.brokerHost + port := h.brokerPort + + fmt.Printf("DEBUG: Advertising broker at %s:%d\n", host, port) + response = append(response, 0, byte(len(host))) response = append(response, []byte(host)...) - // Port (4 bytes) - Use standard Kafka port for now - // TODO: Should get actual port from server configuration - response = append(response, 0, 0, 0x23, 0x84) // 9092 in big-endian + // Port (4 bytes) - Use actual gateway port + portBytes := make([]byte, 4) + binary.BigEndian.PutUint32(portBytes, uint32(port)) + response = append(response, portBytes...) // Rack - nullable string, using null (-1 length) response = append(response, 0xFF, 0xFF) // null rack @@ -423,7 +438,7 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by binary.BigEndian.PutUint16(topicNameLen, uint16(len(topicNameBytes))) response = append(response, topicNameLen...) response = append(response, topicNameBytes...) - + // TEMP: Removed v7+ fields for v1 compatibility // Topic UUID and is_internal_topic removed @@ -437,7 +452,7 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by response = append(response, 0, 0, 0, 0) // leader_id = 0 (this broker) response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // replicas = [0] response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // isr = [0] - + // TEMP: Removed v7+ topic authorized operations for v1 compatibility }