From 445d7343d71de368e286367b7cc8d5a5ad3fbd01 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 09:06:02 -0700 Subject: [PATCH] fix v7 samara --- weed/mq/kafka/protocol/fetch.go | 4 ++-- weed/mq/kafka/protocol/fetch_test.go | 8 ++++---- weed/mq/kafka/protocol/handler.go | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index e411adc2c..c3240310a 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -264,8 +264,8 @@ func (h *Handler) parseFetchRequest(apiVersion uint16, requestBody []byte) (*Fet request.Topics[i].Partitions[j].PartitionID = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) offset += 4 - // Current leader epoch (4 bytes) - only in v5+ - if apiVersion >= 5 { + // Current leader epoch (4 bytes) - only in v9+ + if apiVersion >= 9 { if offset+4 > len(requestBody) { return nil, fmt.Errorf("insufficient data for current leader epoch") } diff --git a/weed/mq/kafka/protocol/fetch_test.go b/weed/mq/kafka/protocol/fetch_test.go index af90a83b1..50ba259f0 100644 --- a/weed/mq/kafka/protocol/fetch_test.go +++ b/weed/mq/kafka/protocol/fetch_test.go @@ -61,8 +61,8 @@ func TestHandler_handleFetch(t *testing.T) { requestBody = append(requestBody, 0, 0, 0, 1) // Partition 0 - requestBody = append(requestBody, 0, 0, 0, 0) // partition ID - requestBody = append(requestBody, 0, 0, 0, 0) // current leader epoch + requestBody = append(requestBody, 0, 0, 0, 0) // partition ID + // NOTE: current leader epoch only in v9+, not v7 requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, byte(baseOffset)) // fetch offset requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // log start offset requestBody = append(requestBody, 0, 0, 0x10, 0) // partition max bytes (1MB) @@ -248,8 +248,8 @@ func TestHandler_handleFetch_EmptyPartition(t *testing.T) { requestBody = append(requestBody, 0, 0, 0, 1) // Partition 0 - fetch from offset 0 - requestBody = append(requestBody, 0, 0, 0, 0) // partition ID - requestBody = append(requestBody, 0, 0, 0, 0) // current leader epoch + requestBody = append(requestBody, 0, 0, 0, 0) // partition ID + // NOTE: current leader epoch only in v9+, not v7 requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // fetch offset requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // log start offset requestBody = append(requestBody, 0, 0, 0x10, 0) // partition max bytes diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index e29ec3770..d35c5bc2d 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -734,15 +734,15 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] // partition: error_code(2) + partition_id(4) + leader(4) response = append(response, 0, 0) // error_code response = append(response, 0, 0, 0, 0) // partition_id = 0 - response = append(response, 0, 0, 0, 0) // leader = 0 (this broker) + response = append(response, 0, 0, 0, 1) // leader = 1 (this broker) - // replicas: array length(4) + one broker id (0) + // replicas: array length(4) + one broker id (1) + response = append(response, 0, 0, 0, 1) response = append(response, 0, 0, 0, 1) - response = append(response, 0, 0, 0, 0) - // isr: array length(4) + one broker id (0) + // isr: array length(4) + one broker id (1) + response = append(response, 0, 0, 0, 1) response = append(response, 0, 0, 0, 1) - response = append(response, 0, 0, 0, 0) } fmt.Printf("DEBUG: Metadata v0 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)