From 5ec751e2e31937dcfaaad7b770cb8aa09c7e37a9 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 14:58:58 -0700 Subject: [PATCH] feat: fix Sarama consumer compatibility by correcting record batch base offsets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🎉 MAJOR SUCCESS: Both kafka-go and Sarama now fully working! Root Cause: - Individual message batches (from Sarama) had base offset 0 in binary data - When Sarama requested offset 1, it received batch claiming offset 0 - Sarama ignored it as duplicate, never got actual message 1,2 Solution: - Correct base offset in record batch header during StoreRecordBatch - Update first 8 bytes (base_offset field) to match assigned offset - Each batch now has correct internal offset matching storage key Results: ✅ kafka-go: 3/3 produced, 3/3 consumed ✅ Sarama: 3/3 produced, 3/3 consumed Both clients now have full produce-consume compatibility --- weed/mq/kafka/protocol/fetch.go | 2 +- weed/mq/kafka/protocol/handler.go | 29 ++++++++++++++++++++++++----- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 0bbc38764..096cf7882 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -101,7 +101,7 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo if len(storedBatch) < hexLen { hexLen = len(storedBatch) } - fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n", + fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n", partition.FetchOffset, len(storedBatch), hexLen, storedBatch[:hexLen]) } else { fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index ba5c758c4..35e9a1ab6 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -152,9 +152,28 @@ func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { // StoreRecordBatch stores a record batch for later retrieval during Fetch operations func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) { key := fmt.Sprintf("%s:%d:%d", topicName, partition, baseOffset) - h.recordBatchMu.Lock() - defer h.recordBatchMu.Unlock() - h.recordBatches[key] = recordBatch + + // Fix the base offset in the record batch binary data to match the assigned offset + // The base offset is stored in the first 8 bytes of the record batch + if len(recordBatch) >= 8 { + // Create a copy to avoid modifying the original + fixedBatch := make([]byte, len(recordBatch)) + copy(fixedBatch, recordBatch) + + // Update the base offset (first 8 bytes, big endian) + binary.BigEndian.PutUint64(fixedBatch[0:8], uint64(baseOffset)) + + h.recordBatchMu.Lock() + defer h.recordBatchMu.Unlock() + h.recordBatches[key] = fixedBatch + + fmt.Printf("DEBUG: Stored record batch with corrected base offset %d (was %d)\n", + baseOffset, binary.BigEndian.Uint64(recordBatch[0:8])) + } else { + h.recordBatchMu.Lock() + defer h.recordBatchMu.Unlock() + h.recordBatches[key] = recordBatch + } } // GetRecordBatch retrieves a stored record batch that contains the requested offset @@ -193,12 +212,12 @@ func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64 if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) { fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset) - + // If requesting the base offset, return the entire batch if offset == baseOffset { return batch, true } - + // For non-base offsets, we need to create a sub-batch starting from the requested offset // This is a complex operation, so for now return the entire batch // TODO: Implement proper sub-batch extraction