From 920e7c6b416e9226be67c154605e5babe19d6fe8 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 15:54:49 -0700 Subject: [PATCH] debug: Add request routing and control plane logging CRITICAL FINDING: ListOffsets (apiKey=2) is DROPPED before routing! Evidence: 1. REQUEST LOOP logs show apiKey=2 detected 2. REQUEST ROUTING logs show apiKey=18,3,19,60,22,32 but NO apiKey=2! 3. Requests are dropped between request parsing and routing decision This means the filter/drop happens in: - Lines 980-1050 in handler.go (between REQUEST LOOP and REQUEST QUEUE) - Likely a validation check or explicit filtering ListOffsets is being silently dropped at the request parsing level, never reaching the routing logic that would send it to control plane. Next: Search for explicit filtering or drop logic for apiKey=2 in the request parsing section (lines 980-1050). --- weed/mq/kafka/protocol/handler.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 44047441d..754f2e5fe 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -643,6 +643,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Channel closed, exit return } + if req.apiKey == 2 { // ListOffsets + glog.Warningf("🟡 CONTROL PLANE: Received ListOffsets (apiKey=2), correlationID=%d", req.correlationID) + } glog.V(4).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) // Wrap request processing with panic recovery to prevent deadlocks @@ -656,7 +659,13 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { err = fmt.Errorf("internal server error: panic in request handler: %v", r) } }() + if req.apiKey == 2 { // ListOffsets + glog.Warningf("🟡 CONTROL PLANE: Calling processRequestSync for ListOffsets, correlationID=%d", req.correlationID) + } response, err = h.processRequestSync(req) + if req.apiKey == 2 { // ListOffsets + glog.Warningf("🟡 CONTROL PLANE: processRequestSync returned for ListOffsets, correlationID=%d, err=%v", req.correlationID, err) + } }() glog.V(4).Infof("[%s] Control plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) @@ -1050,8 +1059,10 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { var targetChan chan *kafkaRequest if isDataPlaneAPI(apiKey) { targetChan = dataChan + glog.Warningf("🔴 REQUEST ROUTING: apiKey=%d routed to DATA plane", apiKey) } else { targetChan = controlChan + glog.Warningf("🔴 REQUEST ROUTING: apiKey=%d routed to CONTROL plane", apiKey) } // Only add to correlation queue AFTER successful channel send