From 93c3e0c784cfd5c4b661c9a342fa847ad1707596 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sat, 13 Sep 2025 18:27:35 -0700 Subject: [PATCH] fix tests --- weed/mq/kafka/protocol/fetch.go | 36 +++++++++++++++++++++++-------- weed/mq/kafka/protocol/handler.go | 7 ++++-- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index bf28342e6..f2216ff95 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -123,8 +123,26 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo highWaterMark = ledger.GetHighWaterMark() } - fmt.Printf("DEBUG: Fetch v%d - topic: %s, partition: %d, fetchOffset: %d, highWaterMark: %d, maxBytes: %d\n", - apiVersion, topic.Name, partition.PartitionID, partition.FetchOffset, highWaterMark, partition.MaxBytes) + // Normalize special fetch offsets: -2 = earliest, -1 = latest + effectiveFetchOffset := partition.FetchOffset + if effectiveFetchOffset < 0 { + if effectiveFetchOffset == -2 { // earliest + if ledger != nil { + effectiveFetchOffset = ledger.GetEarliestOffset() + } else { + effectiveFetchOffset = 0 + } + } else if effectiveFetchOffset == -1 { // latest + if ledger != nil { + effectiveFetchOffset = ledger.GetLatestOffset() + } else { + effectiveFetchOffset = 0 + } + } + } + + fmt.Printf("DEBUG: Fetch v%d - topic: %s, partition: %d, fetchOffset: %d (effective: %d), highWaterMark: %d, maxBytes: %d\n", + apiVersion, topic.Name, partition.PartitionID, partition.FetchOffset, effectiveFetchOffset, highWaterMark, partition.MaxBytes) // High water mark (8 bytes) highWaterMarkBytes := make([]byte, 8) @@ -150,16 +168,16 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo // Records - get actual stored record batches using multi-batch fetcher var recordBatch []byte - if ledger != nil && highWaterMark > partition.FetchOffset { + if ledger != nil && highWaterMark > effectiveFetchOffset { fmt.Printf("DEBUG: Multi-batch fetch - topic:%s, partition:%d, offset:%d, maxBytes:%d\n", - topic.Name, partition.PartitionID, partition.FetchOffset, partition.MaxBytes) + topic.Name, partition.PartitionID, effectiveFetchOffset, partition.MaxBytes) // Use multi-batch fetcher for better MaxBytes compliance multiFetcher := NewMultiBatchFetcher(h) result, err := multiFetcher.FetchMultipleBatches( topic.Name, partition.PartitionID, - partition.FetchOffset, + effectiveFetchOffset, highWaterMark, partition.MaxBytes, ) @@ -171,17 +189,17 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo } else { fmt.Printf("DEBUG: Multi-batch failed or empty, falling back to single batch\n") // Fallback to original single batch logic - smqRecords, err := h.seaweedMQHandler.GetStoredRecords(topic.Name, partition.PartitionID, partition.FetchOffset, 10) + smqRecords, err := h.seaweedMQHandler.GetStoredRecords(topic.Name, partition.PartitionID, effectiveFetchOffset, 10) if err == nil && len(smqRecords) > 0 { - recordBatch = h.constructRecordBatchFromSMQ(partition.FetchOffset, smqRecords) + recordBatch = h.constructRecordBatchFromSMQ(effectiveFetchOffset, smqRecords) fmt.Printf("DEBUG: Fallback single batch size: %d bytes\n", len(recordBatch)) } else { - recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark) + recordBatch = h.constructSimpleRecordBatch(effectiveFetchOffset, highWaterMark) fmt.Printf("DEBUG: Fallback synthetic batch size: %d bytes\n", len(recordBatch)) } } } else { - fmt.Printf("DEBUG: No messages available - fetchOffset %d >= highWaterMark %d\n", partition.FetchOffset, highWaterMark) + fmt.Printf("DEBUG: No messages available - effective fetchOffset %d >= highWaterMark %d\n", effectiveFetchOffset, highWaterMark) recordBatch = []byte{} // No messages available } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 69b63b27a..7c26c535b 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -1334,8 +1334,11 @@ func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, re switch apiVersion { case 0, 1: return h.handleCreateTopicsV0V1(correlationID, requestBody) - case 2, 3, 4, 5: - // Flexible versions (v2+) + case 2, 3, 4: + // Use non-flexible parser for v2-4 (kafka-go sends regular format) + return h.handleCreateTopicsV2To4(correlationID, requestBody) + case 5: + // v5+ uses flexible format return h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody) default: return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion)