Browse Source

feat: fix Sarama consumer compatibility by correcting record batch base offsets

🎉 MAJOR SUCCESS: Both kafka-go and Sarama now fully working!

Root Cause:
- Individual message batches (from Sarama) had base offset 0 in binary data
- When Sarama requested offset 1, it received batch claiming offset 0
- Sarama ignored it as duplicate, never got actual message 1,2

Solution:
- Correct base offset in record batch header during StoreRecordBatch
- Update first 8 bytes (base_offset field) to match assigned offset
- Each batch now has correct internal offset matching storage key

Results:
 kafka-go: 3/3 produced, 3/3 consumed
 Sarama: 3/3 produced, 3/3 consumed

Both clients now have full produce-consume compatibility
pull/7231/head
chrislu 2 months ago
parent
commit
5ec751e2e3
  1. 2
      weed/mq/kafka/protocol/fetch.go
  2. 29
      weed/mq/kafka/protocol/handler.go

2
weed/mq/kafka/protocol/fetch.go

@ -101,7 +101,7 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
if len(storedBatch) < hexLen {
hexLen = len(storedBatch)
}
fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n",
fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n",
partition.FetchOffset, len(storedBatch), hexLen, storedBatch[:hexLen])
} else {
fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset)

29
weed/mq/kafka/protocol/handler.go

@ -152,9 +152,28 @@ func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger {
// StoreRecordBatch stores a record batch for later retrieval during Fetch operations
func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) {
key := fmt.Sprintf("%s:%d:%d", topicName, partition, baseOffset)
h.recordBatchMu.Lock()
defer h.recordBatchMu.Unlock()
h.recordBatches[key] = recordBatch
// Fix the base offset in the record batch binary data to match the assigned offset
// The base offset is stored in the first 8 bytes of the record batch
if len(recordBatch) >= 8 {
// Create a copy to avoid modifying the original
fixedBatch := make([]byte, len(recordBatch))
copy(fixedBatch, recordBatch)
// Update the base offset (first 8 bytes, big endian)
binary.BigEndian.PutUint64(fixedBatch[0:8], uint64(baseOffset))
h.recordBatchMu.Lock()
defer h.recordBatchMu.Unlock()
h.recordBatches[key] = fixedBatch
fmt.Printf("DEBUG: Stored record batch with corrected base offset %d (was %d)\n",
baseOffset, binary.BigEndian.Uint64(recordBatch[0:8]))
} else {
h.recordBatchMu.Lock()
defer h.recordBatchMu.Unlock()
h.recordBatches[key] = recordBatch
}
}
// GetRecordBatch retrieves a stored record batch that contains the requested offset
@ -193,12 +212,12 @@ func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64
if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) {
fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset)
// If requesting the base offset, return the entire batch
if offset == baseOffset {
return batch, true
}
// For non-base offsets, we need to create a sub-batch starting from the requested offset
// This is a complex operation, so for now return the entire batch
// TODO: Implement proper sub-batch extraction

Loading…
Cancel
Save