Browse Source

fix v7 samara

pull/7231/head
chrislu 2 months ago
parent
commit
445d7343d7
  1. 4
      weed/mq/kafka/protocol/fetch.go
  2. 8
      weed/mq/kafka/protocol/fetch_test.go
  3. 10
      weed/mq/kafka/protocol/handler.go

4
weed/mq/kafka/protocol/fetch.go

@ -264,8 +264,8 @@ func (h *Handler) parseFetchRequest(apiVersion uint16, requestBody []byte) (*Fet
request.Topics[i].Partitions[j].PartitionID = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4]))
offset += 4
// Current leader epoch (4 bytes) - only in v5+
if apiVersion >= 5 {
// Current leader epoch (4 bytes) - only in v9+
if apiVersion >= 9 {
if offset+4 > len(requestBody) {
return nil, fmt.Errorf("insufficient data for current leader epoch")
}

8
weed/mq/kafka/protocol/fetch_test.go

@ -61,8 +61,8 @@ func TestHandler_handleFetch(t *testing.T) {
requestBody = append(requestBody, 0, 0, 0, 1)
// Partition 0
requestBody = append(requestBody, 0, 0, 0, 0) // partition ID
requestBody = append(requestBody, 0, 0, 0, 0) // current leader epoch
requestBody = append(requestBody, 0, 0, 0, 0) // partition ID
// NOTE: current leader epoch only in v9+, not v7
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, byte(baseOffset)) // fetch offset
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // log start offset
requestBody = append(requestBody, 0, 0, 0x10, 0) // partition max bytes (1MB)
@ -248,8 +248,8 @@ func TestHandler_handleFetch_EmptyPartition(t *testing.T) {
requestBody = append(requestBody, 0, 0, 0, 1)
// Partition 0 - fetch from offset 0
requestBody = append(requestBody, 0, 0, 0, 0) // partition ID
requestBody = append(requestBody, 0, 0, 0, 0) // current leader epoch
requestBody = append(requestBody, 0, 0, 0, 0) // partition ID
// NOTE: current leader epoch only in v9+, not v7
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // fetch offset
requestBody = append(requestBody, 0, 0, 0, 0, 0, 0, 0, 0) // log start offset
requestBody = append(requestBody, 0, 0, 0x10, 0) // partition max bytes

10
weed/mq/kafka/protocol/handler.go

@ -734,15 +734,15 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
// partition: error_code(2) + partition_id(4) + leader(4)
response = append(response, 0, 0) // error_code
response = append(response, 0, 0, 0, 0) // partition_id = 0
response = append(response, 0, 0, 0, 0) // leader = 0 (this broker)
response = append(response, 0, 0, 0, 1) // leader = 1 (this broker)
// replicas: array length(4) + one broker id (0)
// replicas: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 0)
// isr: array length(4) + one broker id (0)
// isr: array length(4) + one broker id (1)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 1)
response = append(response, 0, 0, 0, 0)
}
fmt.Printf("DEBUG: Metadata v0 response for %d topics: %v\n", len(topicsToReturn), topicsToReturn)

Loading…
Cancel
Save