diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 0bbc38764..096cf7882 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -101,7 +101,7 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo if len(storedBatch) < hexLen { hexLen = len(storedBatch) } - fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n", + fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n", partition.FetchOffset, len(storedBatch), hexLen, storedBatch[:hexLen]) } else { fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset) diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index ba5c758c4..35e9a1ab6 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -152,9 +152,28 @@ func (h *Handler) GetLedger(topic string, partition int32) *offset.Ledger { // StoreRecordBatch stores a record batch for later retrieval during Fetch operations func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) { key := fmt.Sprintf("%s:%d:%d", topicName, partition, baseOffset) - h.recordBatchMu.Lock() - defer h.recordBatchMu.Unlock() - h.recordBatches[key] = recordBatch + + // Fix the base offset in the record batch binary data to match the assigned offset + // The base offset is stored in the first 8 bytes of the record batch + if len(recordBatch) >= 8 { + // Create a copy to avoid modifying the original + fixedBatch := make([]byte, len(recordBatch)) + copy(fixedBatch, recordBatch) + + // Update the base offset (first 8 bytes, big endian) + binary.BigEndian.PutUint64(fixedBatch[0:8], uint64(baseOffset)) + + h.recordBatchMu.Lock() + defer h.recordBatchMu.Unlock() + h.recordBatches[key] = fixedBatch + + fmt.Printf("DEBUG: Stored record batch with corrected base offset %d (was %d)\n", + baseOffset, binary.BigEndian.Uint64(recordBatch[0:8])) + } else { + h.recordBatchMu.Lock() + defer h.recordBatchMu.Unlock() + h.recordBatches[key] = recordBatch + } } // GetRecordBatch retrieves a stored record batch that contains the requested offset @@ -193,12 +212,12 @@ func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64 if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) { fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset) - + // If requesting the base offset, return the entire batch if offset == baseOffset { return batch, true } - + // For non-base offsets, we need to create a sub-batch starting from the requested offset // This is a complex operation, so for now return the entire batch // TODO: Implement proper sub-batch extraction