Browse Source

fix: correct ListOffsets v1 request parsing for kafka-go compatibility

- Fixed ListOffsets v1 to parse replica_id field (present in v1+, not v2+)
- Fixed ListOffsets v1 response format - now 55 bytes instead of 64
- kafka-go now successfully passes ListOffsets and makes Fetch requests
- Identified next issue: Fetch response format has incorrect topic count

Progress: kafka-go client now progresses to Fetch API but fails due to Fetch response format mismatch.
pull/7231/head
chrislu 2 months ago
parent
commit
0670ea4690
  1. 6
      weed/mq/kafka/protocol/fetch.go
  2. 50
      weed/mq/kafka/protocol/handler.go

6
weed/mq/kafka/protocol/fetch.go

@ -80,7 +80,7 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
response = append(response, highWaterMarkBytes...)
// Log start offset (8 bytes) - 0 for simplicity
response = append(response, 0, 0, 0, 0, 0, 0, 0, 0)
// Aborted transactions count (4 bytes) = 0
response = append(response, 0, 0, 0, 0)
}
@ -682,7 +682,7 @@ func encodeVarint(value int64) []byte {
// getMultipleRecordBatches retrieves and combines multiple record batches starting from the given offset
func (h *Handler) getMultipleRecordBatches(topicName string, partitionID int32, startOffset, highWaterMark int64) []byte {
var combinedBatch []byte
// Try to get all available record batches from startOffset to highWaterMark-1
for offset := startOffset; offset < highWaterMark; offset++ {
if batch, exists := h.GetRecordBatch(topicName, partitionID, offset); exists {
@ -697,7 +697,7 @@ func (h *Handler) getMultipleRecordBatches(topicName string, partitionID int32,
}
}
}
return combinedBatch
}

50
weed/mq/kafka/protocol/handler.go

@ -161,42 +161,42 @@ func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset
func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) {
h.recordBatchMu.RLock()
defer h.recordBatchMu.RUnlock()
fmt.Printf("DEBUG: GetRecordBatch - looking for topic=%s, partition=%d, offset=%d\n", topicName, partition, offset)
fmt.Printf("DEBUG: Available record batches: %d\n", len(h.recordBatches))
// Look for a record batch that contains this offset
// Record batches are stored by their base offset, but may contain multiple records
topicPartitionPrefix := fmt.Sprintf("%s:%d:", topicName, partition)
for key, batch := range h.recordBatches {
fmt.Printf("DEBUG: Checking key: %s\n", key)
if !strings.HasPrefix(key, topicPartitionPrefix) {
continue
}
// Extract the base offset from the key
parts := strings.Split(key, ":")
if len(parts) != 3 {
continue
}
baseOffset, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
continue
}
// Check if this batch could contain the requested offset
// We need to parse the batch to determine how many records it contains
recordCount := h.getRecordCountFromBatch(batch)
fmt.Printf("DEBUG: Batch key=%s, baseOffset=%d, recordCount=%d, requested offset=%d\n", key, baseOffset, recordCount, offset)
if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) {
fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset)
return batch, true
}
}
fmt.Printf("DEBUG: No matching batch found for offset %d\n", offset)
return nil, false
}
@ -204,15 +204,15 @@ func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64
// getRecordCountFromBatch extracts the record count from a Kafka record batch
func (h *Handler) getRecordCountFromBatch(batch []byte) int32 {
// Kafka record batch format:
// base_offset (8) + batch_length (4) + partition_leader_epoch (4) + magic (1) + crc (4) +
// base_offset (8) + batch_length (4) + partition_leader_epoch (4) + magic (1) + crc (4) +
// attributes (2) + last_offset_delta (4) + first_timestamp (8) + max_timestamp (8) +
// producer_id (8) + producer_epoch (2) + base_sequence (4) + records_count (4) + records...
// The record count is at offset 57 (8+4+4+1+4+2+4+8+8+8+2+4 = 57)
if len(batch) < 61 { // 57 + 4 bytes for record count
return 0
}
recordCount := binary.BigEndian.Uint32(batch[57:61])
return int32(recordCount)
}
@ -269,7 +269,7 @@ func (h *Handler) HandleConn(conn net.Conn) error {
apiKey := binary.BigEndian.Uint16(messageBuf[0:2])
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
fmt.Printf("DEBUG: API Request - Key: %d, Version: %d, Correlation: %d\n", apiKey, apiVersion, correlationID)
apiName := getAPIName(apiKey)
@ -465,6 +465,7 @@ func (h *Handler) handleApiVersions(correlationID uint32) ([]byte, error) {
response = append(response, 0, 0) // min version 0
response = append(response, 0, 4) // max version 4
fmt.Printf("DEBUG: ApiVersions v0 response: %d bytes\n", len(response))
return response, nil
}
@ -851,6 +852,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
}
response := buf.Bytes()
fmt.Printf("DEBUG: Metadata v4 response size: %d bytes, hex: %x\n", len(response), response)
return response, nil
}
@ -1128,15 +1130,23 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])
offset := 2 + int(clientIDSize)
// ListOffsets v2+ has additional fields: replica_id(4) + isolation_level(1)
if apiVersion >= 2 {
if len(requestBody) < offset+5 {
return nil, fmt.Errorf("ListOffsets v%d request missing replica_id/isolation_level", apiVersion)
// ListOffsets v1+ has replica_id(4), v2+ adds isolation_level(1)
if apiVersion >= 1 {
if len(requestBody) < offset+4 {
return nil, fmt.Errorf("ListOffsets v%d request missing replica_id", apiVersion)
}
replicaID := int32(binary.BigEndian.Uint32(requestBody[offset : offset+4]))
isolationLevel := requestBody[offset+4]
offset += 5
fmt.Printf("DEBUG: ListOffsets v%d - replica_id: %d, isolation_level: %d\n", apiVersion, replicaID, isolationLevel)
offset += 4
fmt.Printf("DEBUG: ListOffsets v%d - replica_id: %d\n", apiVersion, replicaID)
if apiVersion >= 2 {
if len(requestBody) < offset+1 {
return nil, fmt.Errorf("ListOffsets v%d request missing isolation_level", apiVersion)
}
isolationLevel := requestBody[offset]
offset += 1
fmt.Printf("DEBUG: ListOffsets v%d - isolation_level: %d\n", apiVersion, isolationLevel)
}
}
if len(requestBody) < offset+4 {
@ -1256,7 +1266,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
}
}
fmt.Printf("DEBUG: ListOffsets v%d response: %d bytes\n", apiVersion, len(response))
fmt.Printf("DEBUG: ListOffsets v%d response: %d bytes, hex: %x\n", apiVersion, len(response), response)
return response, nil
}

Loading…
Cancel
Save