From 014db6f999b34d2b176dc4232fae7ac8e5229f9f Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 13:40:58 -0700 Subject: [PATCH] fix: correct ListOffsets v1 response format for kafka-go compatibility - Fixed throttle_time_ms field: only include in v2+, not v1 - Reduced kafka-go 'unread bytes' error from 60 to 56 bytes - Added comprehensive API request debugging to identify format mismatches - kafka-go now progresses further but still has 56 bytes format issue in some API response Progress: kafka-go client can now parse ListOffsets v1 responses correctly but still fails before making Fetch requests due to remaining API format issues. --- test/kafka/go.mod | 2 + test/kafka/go.sum | 2 - weed/mq/kafka/protocol/fetch.go | 36 +++++++++++++---- weed/mq/kafka/protocol/handler.go | 67 ++++++++++++++++++++++++++++--- 4 files changed, 91 insertions(+), 16 deletions(-) diff --git a/test/kafka/go.mod b/test/kafka/go.mod index c1c86d973..ca7658273 100644 --- a/test/kafka/go.mod +++ b/test/kafka/go.mod @@ -14,6 +14,8 @@ require ( replace github.com/seaweedfs/seaweedfs => ../../ +replace github.com/segmentio/kafka-go => /Users/chrislu/dev/kafka-go + require ( cloud.google.com/go/auth v0.16.5 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect diff --git a/test/kafka/go.sum b/test/kafka/go.sum index c2b28305c..44ec62173 100644 --- a/test/kafka/go.sum +++ b/test/kafka/go.sum @@ -552,8 +552,6 @@ github.com/samber/lo v1.51.0 h1:kysRYLbHy/MB7kQZf5DSN50JHmMsNEdeY24VzJFu7wI= github.com/samber/lo v1.51.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/seaweedfs/goexif v1.0.3 h1:ve/OjI7dxPW8X9YQsv3JuVMaxEyF9Rvfd04ouL+Bz30= github.com/seaweedfs/goexif v1.0.3/go.mod h1:Oni780Z236sXpIQzk1XoJlTwqrJ02smEin9zQeff7Fk= -github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk= -github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk= diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 4441ab3a8..de81f3568 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -12,6 +12,7 @@ import ( ) func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + fmt.Printf("DEBUG: *** FETCH HANDLER CALLED *** Correlation: %d, Version: %d\n", correlationID, apiVersion) // Parse the Fetch request to get the requested topics and partitions fetchRequest, err := h.parseFetchRequest(apiVersion, requestBody) if err != nil { @@ -31,9 +32,6 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo response = append(response, 0, 0, 0, 0) // throttle_time_ms (4 bytes, 0 = no throttling) } - // Fetch v4+ has different format - no error_code and session_id at top level - // The session_id and error_code are handled differently in v4+ - // Topics count topicsCount := len(fetchRequest.Topics) topicsCountBytes := make([]byte, 4) @@ -82,11 +80,9 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo response = append(response, highWaterMarkBytes...) // Log start offset (8 bytes) - 0 for simplicity response = append(response, 0, 0, 0, 0, 0, 0, 0, 0) - } - - // Fetch v4+ has aborted_transactions - if apiVersion >= 4 { - response = append(response, 0, 0, 0, 0) // aborted_transactions count (4 bytes) = 0 + + // Aborted transactions count (4 bytes) = 0 + response = append(response, 0, 0, 0, 0) } // Records - get actual stored record batches @@ -97,11 +93,13 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo recordBatch = storedBatch fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(storedBatch)) } else { + fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset) // Fallback to synthetic batch if no stored batch found recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark) fmt.Printf("DEBUG: Using synthetic record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(recordBatch)) } } else { + fmt.Printf("DEBUG: No messages available - fetchOffset %d >= highWaterMark %d\n", partition.FetchOffset, highWaterMark) recordBatch = []byte{} // No messages available } @@ -681,6 +679,28 @@ func encodeVarint(value int64) []byte { return buf } +// getMultipleRecordBatches retrieves and combines multiple record batches starting from the given offset +func (h *Handler) getMultipleRecordBatches(topicName string, partitionID int32, startOffset, highWaterMark int64) []byte { + var combinedBatch []byte + + // Try to get all available record batches from startOffset to highWaterMark-1 + for offset := startOffset; offset < highWaterMark; offset++ { + if batch, exists := h.GetRecordBatch(topicName, partitionID, offset); exists { + // For the first batch, include the full record batch + if len(combinedBatch) == 0 { + combinedBatch = append(combinedBatch, batch...) + } else { + // For subsequent batches, we need to append them properly + // For now, just return the first batch to avoid format issues + // TODO: Implement proper record batch concatenation + break + } + } + } + + return combinedBatch +} + // reconstructSchematizedMessage reconstructs a schematized message from SMQ RecordValue func (h *Handler) reconstructSchematizedMessage(recordValue *schema_pb.RecordValue, metadata map[string]string) ([]byte, error) { // Only reconstruct if schema management is enabled diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 1cce73e95..662490c7d 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -7,6 +7,8 @@ import ( "fmt" "io" "net" + "strconv" + "strings" "sync" "time" @@ -155,13 +157,64 @@ func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset h.recordBatches[key] = recordBatch } -// GetRecordBatch retrieves a stored record batch +// GetRecordBatch retrieves a stored record batch that contains the requested offset func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) { - key := fmt.Sprintf("%s:%d:%d", topicName, partition, offset) h.recordBatchMu.RLock() defer h.recordBatchMu.RUnlock() - batch, exists := h.recordBatches[key] - return batch, exists + + fmt.Printf("DEBUG: GetRecordBatch - looking for topic=%s, partition=%d, offset=%d\n", topicName, partition, offset) + fmt.Printf("DEBUG: Available record batches: %d\n", len(h.recordBatches)) + + // Look for a record batch that contains this offset + // Record batches are stored by their base offset, but may contain multiple records + topicPartitionPrefix := fmt.Sprintf("%s:%d:", topicName, partition) + + for key, batch := range h.recordBatches { + fmt.Printf("DEBUG: Checking key: %s\n", key) + if !strings.HasPrefix(key, topicPartitionPrefix) { + continue + } + + // Extract the base offset from the key + parts := strings.Split(key, ":") + if len(parts) != 3 { + continue + } + + baseOffset, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + continue + } + + // Check if this batch could contain the requested offset + // We need to parse the batch to determine how many records it contains + recordCount := h.getRecordCountFromBatch(batch) + fmt.Printf("DEBUG: Batch key=%s, baseOffset=%d, recordCount=%d, requested offset=%d\n", key, baseOffset, recordCount, offset) + + if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) { + fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset) + return batch, true + } + } + + fmt.Printf("DEBUG: No matching batch found for offset %d\n", offset) + return nil, false +} + +// getRecordCountFromBatch extracts the record count from a Kafka record batch +func (h *Handler) getRecordCountFromBatch(batch []byte) int32 { + // Kafka record batch format: + // base_offset (8) + batch_length (4) + partition_leader_epoch (4) + magic (1) + crc (4) + + // attributes (2) + last_offset_delta (4) + first_timestamp (8) + max_timestamp (8) + + // producer_id (8) + producer_epoch (2) + base_sequence (4) + records_count (4) + records... + + // The record count is at offset 57 (8+4+4+1+4+2+4+8+8+8+2+4 = 57) + if len(batch) < 61 { // 57 + 4 bytes for record count + return 0 + } + + recordCount := binary.BigEndian.Uint32(batch[57:61]) + return int32(recordCount) } // SetBrokerAddress updates the broker address used in Metadata responses @@ -216,6 +269,8 @@ func (h *Handler) HandleConn(conn net.Conn) error { apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) + + fmt.Printf("DEBUG: API Request - Key: %d, Version: %d, Correlation: %d\n", apiKey, apiVersion, correlationID) apiName := getAPIName(apiKey) @@ -1098,8 +1153,8 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req binary.BigEndian.PutUint32(correlationIDBytes, correlationID) response = append(response, correlationIDBytes...) - // Throttle time (4 bytes, 0 = no throttling) - v1+ only - if apiVersion >= 1 { + // Throttle time (4 bytes, 0 = no throttling) - v2+ only + if apiVersion >= 2 { response = append(response, 0, 0, 0, 0) }