Browse Source

fix: correct ListOffsets v1 response format for kafka-go compatibility

- Fixed throttle_time_ms field: only include in v2+, not v1
- Reduced kafka-go 'unread bytes' error from 60 to 56 bytes
- Added comprehensive API request debugging to identify format mismatches
- kafka-go now progresses further but still has 56 bytes format issue in some API response

Progress: kafka-go client can now parse ListOffsets v1 responses correctly but still fails before making Fetch requests due to remaining API format issues.
pull/7231/head
chrislu 2 months ago
parent
commit
014db6f999
  1. 2
      test/kafka/go.mod
  2. 2
      test/kafka/go.sum
  3. 34
      weed/mq/kafka/protocol/fetch.go
  4. 67
      weed/mq/kafka/protocol/handler.go

2
test/kafka/go.mod

@ -14,6 +14,8 @@ require (
replace github.com/seaweedfs/seaweedfs => ../../ replace github.com/seaweedfs/seaweedfs => ../../
replace github.com/segmentio/kafka-go => /Users/chrislu/dev/kafka-go
require ( require (
cloud.google.com/go/auth v0.16.5 // indirect cloud.google.com/go/auth v0.16.5 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect

2
test/kafka/go.sum

@ -552,8 +552,6 @@ github.com/samber/lo v1.51.0 h1:kysRYLbHy/MB7kQZf5DSN50JHmMsNEdeY24VzJFu7wI=
github.com/samber/lo v1.51.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/samber/lo v1.51.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
github.com/seaweedfs/goexif v1.0.3 h1:ve/OjI7dxPW8X9YQsv3JuVMaxEyF9Rvfd04ouL+Bz30= github.com/seaweedfs/goexif v1.0.3 h1:ve/OjI7dxPW8X9YQsv3JuVMaxEyF9Rvfd04ouL+Bz30=
github.com/seaweedfs/goexif v1.0.3/go.mod h1:Oni780Z236sXpIQzk1XoJlTwqrJ02smEin9zQeff7Fk= github.com/seaweedfs/goexif v1.0.3/go.mod h1:Oni780Z236sXpIQzk1XoJlTwqrJ02smEin9zQeff7Fk=
github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk=
github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI=
github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=

34
weed/mq/kafka/protocol/fetch.go

@ -12,6 +12,7 @@ import (
) )
func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
fmt.Printf("DEBUG: *** FETCH HANDLER CALLED *** Correlation: %d, Version: %d\n", correlationID, apiVersion)
// Parse the Fetch request to get the requested topics and partitions // Parse the Fetch request to get the requested topics and partitions
fetchRequest, err := h.parseFetchRequest(apiVersion, requestBody) fetchRequest, err := h.parseFetchRequest(apiVersion, requestBody)
if err != nil { if err != nil {
@ -31,9 +32,6 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
response = append(response, 0, 0, 0, 0) // throttle_time_ms (4 bytes, 0 = no throttling) response = append(response, 0, 0, 0, 0) // throttle_time_ms (4 bytes, 0 = no throttling)
} }
// Fetch v4+ has different format - no error_code and session_id at top level
// The session_id and error_code are handled differently in v4+
// Topics count // Topics count
topicsCount := len(fetchRequest.Topics) topicsCount := len(fetchRequest.Topics)
topicsCountBytes := make([]byte, 4) topicsCountBytes := make([]byte, 4)
@ -82,11 +80,9 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
response = append(response, highWaterMarkBytes...) response = append(response, highWaterMarkBytes...)
// Log start offset (8 bytes) - 0 for simplicity // Log start offset (8 bytes) - 0 for simplicity
response = append(response, 0, 0, 0, 0, 0, 0, 0, 0) response = append(response, 0, 0, 0, 0, 0, 0, 0, 0)
}
// Fetch v4+ has aborted_transactions
if apiVersion >= 4 {
response = append(response, 0, 0, 0, 0) // aborted_transactions count (4 bytes) = 0
// Aborted transactions count (4 bytes) = 0
response = append(response, 0, 0, 0, 0)
} }
// Records - get actual stored record batches // Records - get actual stored record batches
@ -97,11 +93,13 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
recordBatch = storedBatch recordBatch = storedBatch
fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(storedBatch)) fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(storedBatch))
} else { } else {
fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset)
// Fallback to synthetic batch if no stored batch found // Fallback to synthetic batch if no stored batch found
recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark) recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark)
fmt.Printf("DEBUG: Using synthetic record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch)) fmt.Printf("DEBUG: Using synthetic record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch))
} }
} else { } else {
fmt.Printf("DEBUG: No messages available - fetchOffset %d >= highWaterMark %d\n", partition.FetchOffset, highWaterMark)
recordBatch = []byte{} // No messages available recordBatch = []byte{} // No messages available
} }
@ -681,6 +679,28 @@ func encodeVarint(value int64) []byte {
return buf return buf
} }
// getMultipleRecordBatches retrieves and combines multiple record batches starting from the given offset
func (h *Handler) getMultipleRecordBatches(topicName string, partitionID int32, startOffset, highWaterMark int64) []byte {
var combinedBatch []byte
// Try to get all available record batches from startOffset to highWaterMark-1
for offset := startOffset; offset < highWaterMark; offset++ {
if batch, exists := h.GetRecordBatch(topicName, partitionID, offset); exists {
// For the first batch, include the full record batch
if len(combinedBatch) == 0 {
combinedBatch = append(combinedBatch, batch...)
} else {
// For subsequent batches, we need to append them properly
// For now, just return the first batch to avoid format issues
// TODO: Implement proper record batch concatenation
break
}
}
}
return combinedBatch
}
// reconstructSchematizedMessage reconstructs a schematized message from SMQ RecordValue // reconstructSchematizedMessage reconstructs a schematized message from SMQ RecordValue
func (h *Handler) reconstructSchematizedMessage(recordValue *schema_pb.RecordValue, metadata map[string]string) ([]byte, error) { func (h *Handler) reconstructSchematizedMessage(recordValue *schema_pb.RecordValue, metadata map[string]string) ([]byte, error) {
// Only reconstruct if schema management is enabled // Only reconstruct if schema management is enabled

67
weed/mq/kafka/protocol/handler.go

@ -7,6 +7,8 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"strconv"
"strings"
"sync" "sync"
"time" "time"
@ -155,13 +157,64 @@ func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset
h.recordBatches[key] = recordBatch h.recordBatches[key] = recordBatch
} }
// GetRecordBatch retrieves a stored record batch
// GetRecordBatch retrieves a stored record batch that contains the requested offset
func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) { func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) {
key := fmt.Sprintf("%s:%d:%d", topicName, partition, offset)
h.recordBatchMu.RLock() h.recordBatchMu.RLock()
defer h.recordBatchMu.RUnlock() defer h.recordBatchMu.RUnlock()
batch, exists := h.recordBatches[key]
return batch, exists
fmt.Printf("DEBUG: GetRecordBatch - looking for topic=%s, partition=%d, offset=%d\n", topicName, partition, offset)
fmt.Printf("DEBUG: Available record batches: %d\n", len(h.recordBatches))
// Look for a record batch that contains this offset
// Record batches are stored by their base offset, but may contain multiple records
topicPartitionPrefix := fmt.Sprintf("%s:%d:", topicName, partition)
for key, batch := range h.recordBatches {
fmt.Printf("DEBUG: Checking key: %s\n", key)
if !strings.HasPrefix(key, topicPartitionPrefix) {
continue
}
// Extract the base offset from the key
parts := strings.Split(key, ":")
if len(parts) != 3 {
continue
}
baseOffset, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
continue
}
// Check if this batch could contain the requested offset
// We need to parse the batch to determine how many records it contains
recordCount := h.getRecordCountFromBatch(batch)
fmt.Printf("DEBUG: Batch key=%s, baseOffset=%d, recordCount=%d, requested offset=%d\n", key, baseOffset, recordCount, offset)
if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) {
fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset)
return batch, true
}
}
fmt.Printf("DEBUG: No matching batch found for offset %d\n", offset)
return nil, false
}
// getRecordCountFromBatch extracts the record count from a Kafka record batch
func (h *Handler) getRecordCountFromBatch(batch []byte) int32 {
// Kafka record batch format:
// base_offset (8) + batch_length (4) + partition_leader_epoch (4) + magic (1) + crc (4) +
// attributes (2) + last_offset_delta (4) + first_timestamp (8) + max_timestamp (8) +
// producer_id (8) + producer_epoch (2) + base_sequence (4) + records_count (4) + records...
// The record count is at offset 57 (8+4+4+1+4+2+4+8+8+8+2+4 = 57)
if len(batch) < 61 { // 57 + 4 bytes for record count
return 0
}
recordCount := binary.BigEndian.Uint32(batch[57:61])
return int32(recordCount)
} }
// SetBrokerAddress updates the broker address used in Metadata responses // SetBrokerAddress updates the broker address used in Metadata responses
@ -217,6 +270,8 @@ func (h *Handler) HandleConn(conn net.Conn) error {
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
fmt.Printf("DEBUG: API Request - Key: %d, Version: %d, Correlation: %d\n", apiKey, apiVersion, correlationID)
apiName := getAPIName(apiKey) apiName := getAPIName(apiKey)
// Validate API version against what we support // Validate API version against what we support
@ -1098,8 +1153,8 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
binary.BigEndian.PutUint32(correlationIDBytes, correlationID) binary.BigEndian.PutUint32(correlationIDBytes, correlationID)
response = append(response, correlationIDBytes...) response = append(response, correlationIDBytes...)
// Throttle time (4 bytes, 0 = no throttling) - v1+ only
if apiVersion >= 1 {
// Throttle time (4 bytes, 0 = no throttling) - v2+ only
if apiVersion >= 2 {
response = append(response, 0, 0, 0, 0) response = append(response, 0, 0, 0, 0)
} }

Loading…
Cancel
Save