Browse Source

mq(kafka): MAJOR Fix - Dynamic broker address working, one issue remains

BREAKTHROUGH ACHIEVED:
 Dynamic broker port detection and advertisement working!
 Metadata now correctly advertises actual gateway port (e.g. localhost:60430)
 Fixed broker address mismatch that was part of the problem

IMPLEMENTATION:
- Added SetBrokerAddress() method to Handler
- Server.Start() now updates handler with actual listening address
- GetListenerAddr() handles [::]:port and host:port formats
- Metadata response uses dynamic broker host:port instead of hardcoded 9092

EVIDENCE OF SUCCESS:
- Debug logs: 'Advertising broker at localhost:60430' 
- Response hex contains correct port: 0000ec0e = 60430 
- No more 9092 hardcoding 

REMAINING ISSUE:
 Same '[3] Unknown Topic Or Partition' error still occurs
 kafka-go's internal validation logic still rejects our response

ANALYSIS:
This confirms broker address mismatch was PART of the problem but not the
complete solution. There's still another protocol validation issue preventing
kafka-go from accepting our topic metadata.

NEXT: Investigate partition leader configuration or missing Metadata v1 fields.
pull/7231/head
chrislu 2 months ago
parent
commit
62ee4e5390
  1. 35
      weed/mq/kafka/gateway/server.go
  2. 33
      weed/mq/kafka/protocol/handler.go

35
weed/mq/kafka/gateway/server.go

@ -3,6 +3,8 @@ package gateway
import ( import (
"context" "context"
"net" "net"
"strconv"
"strings"
"sync" "sync"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -58,6 +60,11 @@ func (s *Server) Start() error {
return err return err
} }
s.ln = ln s.ln = ln
// Update handler with actual broker address for Metadata responses
host, port := s.GetListenerAddr()
s.handler.SetBrokerAddress(host, port)
glog.V(1).Infof("Kafka gateway advertising broker at %s:%d", host, port)
s.wg.Add(1) s.wg.Add(1)
go func() { go func() {
defer s.wg.Done() defer s.wg.Done()
@ -117,3 +124,31 @@ func (s *Server) Addr() string {
func (s *Server) GetHandler() *protocol.Handler { func (s *Server) GetHandler() *protocol.Handler {
return s.handler return s.handler
} }
// GetListenerAddr returns the actual listening address and port
func (s *Server) GetListenerAddr() (string, int) {
if s.ln == nil {
return "localhost", 9092 // fallback
}
addr := s.ln.Addr().String()
// Parse [::]:port or host:port format
if strings.HasPrefix(addr, "[::]:") {
port := strings.TrimPrefix(addr, "[::]:")
if p, err := strconv.Atoi(port); err == nil {
return "localhost", p
}
}
// Handle host:port format
if host, port, err := net.SplitHostPort(addr); err == nil {
if p, err := strconv.Atoi(port); err == nil {
if host == "::" || host == "" {
host = "localhost"
}
return host, p
}
}
return "localhost", 9092 // fallback
}

33
weed/mq/kafka/protocol/handler.go

@ -42,6 +42,10 @@ type Handler struct {
// Consumer group coordination // Consumer group coordination
groupCoordinator *consumer.GroupCoordinator groupCoordinator *consumer.GroupCoordinator
// Dynamic broker address for Metadata responses
brokerHost string
brokerPort int
} }
// NewHandler creates a new handler in legacy in-memory mode // NewHandler creates a new handler in legacy in-memory mode
@ -51,6 +55,8 @@ func NewHandler() *Handler {
ledgers: make(map[TopicPartitionKey]*offset.Ledger), ledgers: make(map[TopicPartitionKey]*offset.Ledger),
useSeaweedMQ: false, useSeaweedMQ: false,
groupCoordinator: consumer.NewGroupCoordinator(), groupCoordinator: consumer.NewGroupCoordinator(),
brokerHost: "localhost", // default fallback
brokerPort: 9092, // default fallback
} }
} }
@ -122,6 +128,12 @@ func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger {
return h.ledgers[key] return h.ledgers[key]
} }
// SetBrokerAddress updates the broker address used in Metadata responses
func (h *Handler) SetBrokerAddress(host string, port int) {
h.brokerHost = host
h.brokerPort = port
}
// HandleConn processes a single client connection // HandleConn processes a single client connection
func (h *Handler) HandleConn(conn net.Conn) error { func (h *Handler) HandleConn(conn net.Conn) error {
defer func() { defer func() {
@ -358,16 +370,19 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by
// Broker 0: node_id(4) + host + port(4) + rack // Broker 0: node_id(4) + host + port(4) + rack
response = append(response, 0, 0, 0, 0) // node_id = 0 response = append(response, 0, 0, 0, 0) // node_id = 0
// Use "localhost" for simplicity - kafka-go should be able to connect back
// The port issue is more likely the problem than the host
host := "localhost"
// Use dynamic broker address set by the server
host := h.brokerHost
port := h.brokerPort
fmt.Printf("DEBUG: Advertising broker at %s:%d\n", host, port)
response = append(response, 0, byte(len(host))) response = append(response, 0, byte(len(host)))
response = append(response, []byte(host)...) response = append(response, []byte(host)...)
// Port (4 bytes) - Use standard Kafka port for now
// TODO: Should get actual port from server configuration
response = append(response, 0, 0, 0x23, 0x84) // 9092 in big-endian
// Port (4 bytes) - Use actual gateway port
portBytes := make([]byte, 4)
binary.BigEndian.PutUint32(portBytes, uint32(port))
response = append(response, portBytes...)
// Rack - nullable string, using null (-1 length) // Rack - nullable string, using null (-1 length)
response = append(response, 0xFF, 0xFF) // null rack response = append(response, 0xFF, 0xFF) // null rack
@ -423,7 +438,7 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by
binary.BigEndian.PutUint16(topicNameLen, uint16(len(topicNameBytes))) binary.BigEndian.PutUint16(topicNameLen, uint16(len(topicNameBytes)))
response = append(response, topicNameLen...) response = append(response, topicNameLen...)
response = append(response, topicNameBytes...) response = append(response, topicNameBytes...)
// TEMP: Removed v7+ fields for v1 compatibility // TEMP: Removed v7+ fields for v1 compatibility
// Topic UUID and is_internal_topic removed // Topic UUID and is_internal_topic removed
@ -437,7 +452,7 @@ func (h *Handler) handleMetadata(correlationID uint32, requestBody []byte) ([]by
response = append(response, 0, 0, 0, 0) // leader_id = 0 (this broker) response = append(response, 0, 0, 0, 0) // leader_id = 0 (this broker)
response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // replicas = [0] response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // replicas = [0]
response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // isr = [0] response = append(response, 0, 0, 0, 1, 0, 0, 0, 0) // isr = [0]
// TEMP: Removed v7+ topic authorized operations for v1 compatibility // TEMP: Removed v7+ topic authorized operations for v1 compatibility
} }

Loading…
Cancel
Save