diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 18534e446..509ed1330 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -908,8 +908,13 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - - glog.Warningf("🔴 REQUEST LOOP: Parsed apiKey=%d, apiVersion=%d, correlationID=%d from %s", apiKey, apiVersion, correlationID, connectionID) + + // LOG EVERY SINGLE REQUEST REGARDLESS OF apiKey + maxLen := len(messageBuf) + if maxLen > 16 { + maxLen = 16 + } + glog.Warningf("🔴 RAW REQUEST: correlationID=%d, firstTwoBytes=[%02x %02x] = apiKey=%d, first16bytes=%v", correlationID, messageBuf[0], messageBuf[1], apiKey, messageBuf[:maxLen]) // Validate API version against what we support if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { @@ -2884,10 +2889,10 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, // handleMetadata routes to the appropriate version-specific handler func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { glog.Warningf("🟡 handleMetadata ENTRY: apiVersion=%d, correlationID=%d, requestBodyLen=%d", apiVersion, correlationID, len(requestBody)) - + var response []byte var err error - + switch apiVersion { case 0: response, err = h.HandleMetadataV0(correlationID, requestBody) @@ -2909,7 +2914,7 @@ func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, reques err = fmt.Errorf("metadata version %d not implemented yet", apiVersion) } } - + if err != nil { glog.Warningf("🟡 handleMetadata EXIT ERROR: apiVersion=%d, correlationID=%d, err=%v", apiVersion, correlationID, err) } else {