From 9ce91d2ff3395eb4fdc99ceed54822b2aa5c9ec6 Mon Sep 17 00:00:00 2001 From: chrislu Date: Thu, 16 Oct 2025 15:43:17 -0700 Subject: [PATCH] debug: Add ListOffsets response validation logging Added comprehensive logging to ListOffsets handler: - Log when breaking early due to insufficient data - Log when response count differs from requested count - Log final response for verification CRITICAL FINDING: handleListOffsets is NOT being called! This means the issue is earlier in the request processing pipeline. The request is reaching the gateway (6 apiKey=2 requests seen), but handleListOffsets function is never being invoked. This suggests the routing/dispatching in processRequestSync() might have an issue or ListOffsets requests are being dropped before reaching the handler. Next investigation: Check why APIKeyListOffsets case isn't matching despite seeing apiKey=2 requests in logs. --- weed/mq/kafka/protocol/handler.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 76727628a..cf67eee47 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1917,6 +1917,8 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Parse minimal request to understand what's being asked (header already stripped) offset := 0 + glog.Warningf("🟡 ListOffsets CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) + // v1+ has replica_id(4) if apiVersion >= 1 { if len(requestBody) < offset+4 { @@ -1964,6 +1966,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Process each requested topic for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { + glog.Warningf("🟡 ListOffsets: Breaking at topic %d - insufficient data for name size", i) break } @@ -1972,6 +1975,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req offset += 2 if len(requestBody) < offset+int(topicNameSize)+4 { + glog.Warningf("🟡 ListOffsets: Breaking at topic %d - insufficient data for name (%d bytes needed, %d available)", i, topicNameSize+4, len(requestBody)-offset) break } @@ -2063,6 +2067,9 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // This prevents ErrIncompleteResponse when request parsing fails mid-way if actualTopicsCount != topicsCount { binary.BigEndian.PutUint32(response[topicsCountOffset:topicsCountOffset+4], actualTopicsCount) + glog.Warningf("🟡 ListOffsets: Updated response count from %d to %d, response size=%d", topicsCount, actualTopicsCount, len(response)) + } else { + glog.Infof("🟢 ListOffsets: Response OK - requested %d topics, actual %d, size=%d", topicsCount, actualTopicsCount, len(response)) } return response, nil