diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index c3240310a..e411adc2c 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -264,8 +264,8 @@ func (h *Handler) parseFetchRequest(apiVersion uint16, requestBody []byte) (*Fet request.Topics[i].Partitions[j].PartitionID = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) offset += 4 - // Current leader epoch (4 bytes) - only in v9+ - if apiVersion >= 9 { + // Current leader epoch (4 bytes) - only in v5+ + if apiVersion >= 5 { if offset+4 > len(requestBody) { return nil, fmt.Errorf("insufficient data for current leader epoch") } diff --git a/weed/mq/kafka/protocol/fetch_test.go b/weed/mq/kafka/protocol/fetch_test.go index bbe11c53d..af90a83b1 100644 --- a/weed/mq/kafka/protocol/fetch_test.go +++ b/weed/mq/kafka/protocol/fetch_test.go @@ -24,13 +24,10 @@ func TestHandler_handleFetch(t *testing.T) { ledger.AppendRecord(baseOffset+2, currentTime+2000, 150) // Build a Fetch request - clientID := "test-consumer" - requestBody := make([]byte, 0, 256) - // Client ID - requestBody = append(requestBody, 0, byte(len(clientID))) - requestBody = append(requestBody, []byte(clientID)...) + // NOTE: client_id is handled by HandleConn and stripped before reaching handler + // Start directly with Fetch-specific fields // Replica ID (-1 for consumer) requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) @@ -166,14 +163,12 @@ func TestHandler_handleFetch_UnknownTopic(t *testing.T) { correlationID := uint32(777) // Build Fetch request for non-existent topic (don't create it) - clientID := "test-consumer" topicName := "non-existent-topic" requestBody := make([]byte, 0, 128) - // Client ID - requestBody = append(requestBody, 0, byte(len(clientID))) - requestBody = append(requestBody, []byte(clientID)...) + // NOTE: client_id is handled by HandleConn and stripped before reaching handler + // Start directly with Fetch-specific fields // Standard Fetch parameters requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) // replica ID @@ -228,13 +223,10 @@ func TestHandler_handleFetch_EmptyPartition(t *testing.T) { _ = ledger // ledger exists but is empty // Build Fetch request - clientID := "test-consumer" - requestBody := make([]byte, 0, 128) - // Client ID - requestBody = append(requestBody, 0, byte(len(clientID))) - requestBody = append(requestBody, []byte(clientID)...) + // NOTE: client_id is handled by HandleConn and stripped before reaching handler + // Start directly with Fetch-specific fields // Standard parameters requestBody = append(requestBody, 0xFF, 0xFF, 0xFF, 0xFF) // replica ID