From 8033ca6399a00c248b67d41c4086f3840bbed7d0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 14:44:54 -0700 Subject: [PATCH] feat: fix Fetch v10 response format for kafka-go compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Added missing error_code (2 bytes) and session_id (4 bytes) fields for Fetch v7+ - kafka-go now successfully produces and consumes all messages - Fixed both ListOffsets v1 and Fetch v10 protocol compatibility - Test shows: ✅ Consumed 3 messages successfully with correct keys/values/offsets Major breakthrough: kafka-go client now fully functional for produce-consume workflows --- weed/mq/kafka/protocol/fetch.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index b8283c739..40dfd0f1c 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -32,6 +32,12 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo response = append(response, 0, 0, 0, 0) // throttle_time_ms (4 bytes, 0 = no throttling) } + // Fetch v7+ has error_code and session_id + if apiVersion >= 7 { + response = append(response, 0, 0) // error_code (2 bytes, 0 = no error) + response = append(response, 0, 0, 0, 0) // session_id (4 bytes, 0 = no session) + } + // Topics count topicsCount := len(fetchRequest.Topics) topicsCountBytes := make([]byte, 4)