Browse Source

feat: fix Fetch v10 response format for kafka-go compatibility

- Added missing error_code (2 bytes) and session_id (4 bytes) fields for Fetch v7+
- kafka-go now successfully produces and consumes all messages
- Fixed both ListOffsets v1 and Fetch v10 protocol compatibility
- Test shows:  Consumed 3 messages successfully with correct keys/values/offsets

Major breakthrough: kafka-go client now fully functional for produce-consume workflows
pull/7231/head
chrislu 2 months ago
parent
commit
8033ca6399
  1. 6
      weed/mq/kafka/protocol/fetch.go

6
weed/mq/kafka/protocol/fetch.go

@ -32,6 +32,12 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
response = append(response, 0, 0, 0, 0) // throttle_time_ms (4 bytes, 0 = no throttling)
}
// Fetch v7+ has error_code and session_id
if apiVersion >= 7 {
response = append(response, 0, 0) // error_code (2 bytes, 0 = no error)
response = append(response, 0, 0, 0, 0) // session_id (4 bytes, 0 = no session)
}
// Topics count
topicsCount := len(fetchRequest.Topics)
topicsCountBytes := make([]byte, 4)

Loading…
Cancel
Save