diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 76727628a..cf67eee47 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1917,6 +1917,8 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Parse minimal request to understand what's being asked (header already stripped) offset := 0 + glog.Warningf("🟡 ListOffsets CALLED: correlationID=%d, apiVersion=%d, requestBodyLen=%d", correlationID, apiVersion, len(requestBody)) + // v1+ has replica_id(4) if apiVersion >= 1 { if len(requestBody) < offset+4 { @@ -1964,6 +1966,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Process each requested topic for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { if len(requestBody) < offset+2 { + glog.Warningf("🟡 ListOffsets: Breaking at topic %d - insufficient data for name size", i) break } @@ -1972,6 +1975,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req offset += 2 if len(requestBody) < offset+int(topicNameSize)+4 { + glog.Warningf("🟡 ListOffsets: Breaking at topic %d - insufficient data for name (%d bytes needed, %d available)", i, topicNameSize+4, len(requestBody)-offset) break } @@ -2063,6 +2067,9 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // This prevents ErrIncompleteResponse when request parsing fails mid-way if actualTopicsCount != topicsCount { binary.BigEndian.PutUint32(response[topicsCountOffset:topicsCountOffset+4], actualTopicsCount) + glog.Warningf("🟡 ListOffsets: Updated response count from %d to %d, response size=%d", topicsCount, actualTopicsCount, len(response)) + } else { + glog.Infof("🟢 ListOffsets: Response OK - requested %d topics, actual %d, size=%d", topicsCount, actualTopicsCount, len(response)) } return response, nil