diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 2f967c272..a9048b8e1 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1965,7 +1965,19 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Parse minimal request to understand what's being asked (header already stripped) offset := 0 - glog.Warningf("🟡 ListOffsets CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) + glog.Warningf("🔥🔥🔥 ListOffsets HANDLER CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) + + maxBytes := len(requestBody) + if maxBytes > 64 { + maxBytes = 64 + } + glog.Warningf("🔥🔥🔥 ListOffsets first %d bytes (hex): %x", maxBytes, requestBody[:maxBytes]) + + // Log the specific replica ID and topics being requested + if len(requestBody) >= 4 { + replicaID := int32(binary.BigEndian.Uint32(requestBody[0:4])) + glog.Warningf("🔥🔥🔥 ListOffsets replica_id=%d", replicaID) + } // v1+ has replica_id(4) if apiVersion >= 1 { @@ -2120,7 +2132,16 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req glog.Infof("🟢 ListOffsets: Response OK - requested %d topics, actual %d, size=%d", topicsCount, actualTopicsCount, len(response)) } + glog.Warningf("🔥🔥🔥 ListOffsets HANDLER RETURNING: correlationID=%d, responseLen=%d", correlationID, len(response)) + if len(response) > 0 { + respPreview := len(response) + if respPreview > 32 { + respPreview = 32 + } + glog.Warningf("🔥🔥🔥 ListOffsets response first %d bytes (hex): %x", respPreview, response[:respPreview]) + } return response, nil + } func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {