diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index dc75e0829..93b14f6a2 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -3921,6 +3921,14 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, // v2+: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + producer_id(INT64) + producer_epoch(INT16) // v4+: Uses flexible format with tagged fields + glog.Warningf("🔴🔴🔴 InitProducerId HANDLER CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) + + maxBytes := len(requestBody) + if maxBytes > 64 { + maxBytes = 64 + } + glog.Warningf("🔴🔴🔴 InitProducerId request first %d bytes (hex): %x", maxBytes, requestBody[:maxBytes]) + offset := 0 // Parse transactional_id (NULLABLE_STRING or COMPACT_NULLABLE_STRING for flexible versions) @@ -4026,6 +4034,12 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, response = append(response, 0x00) // Empty response body tagged fields } + glog.Warningf("🔴🔴🔴 InitProducerId HANDLER RETURNING: correlationID=%d, responseLen=%d bytes", correlationID, len(response)) + respPreview := len(response) + if respPreview > 32 { + respPreview = 32 + } + glog.Warningf("🔴🔴🔴 InitProducerId response (hex): %x", response[:respPreview]) return response, nil }