diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 44047441d..754f2e5fe 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -643,6 +643,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Channel closed, exit return } + if req.apiKey == 2 { // ListOffsets + glog.Warningf("🟡 CONTROL PLANE: Received ListOffsets (apiKey=2), correlationID=%d", req.correlationID) + } glog.V(4).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) // Wrap request processing with panic recovery to prevent deadlocks @@ -656,7 +659,13 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { err = fmt.Errorf("internal server error: panic in request handler: %v", r) } }() + if req.apiKey == 2 { // ListOffsets + glog.Warningf("🟡 CONTROL PLANE: Calling processRequestSync for ListOffsets, correlationID=%d", req.correlationID) + } response, err = h.processRequestSync(req) + if req.apiKey == 2 { // ListOffsets + glog.Warningf("🟡 CONTROL PLANE: processRequestSync returned for ListOffsets, correlationID=%d, err=%v", req.correlationID, err) + } }() glog.V(4).Infof("[%s] Control plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) @@ -1050,8 +1059,10 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { var targetChan chan *kafkaRequest if isDataPlaneAPI(apiKey) { targetChan = dataChan + glog.Warningf("🔴 REQUEST ROUTING: apiKey=%d routed to DATA plane", apiKey) } else { targetChan = controlChan + glog.Warningf("🔴 REQUEST ROUTING: apiKey=%d routed to CONTROL plane", apiKey) } // Only add to correlation queue AFTER successful channel send