From 491404b3f6b4dcdcbd6a13e89216c18d76206ee1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 12 Sep 2025 14:54:13 -0700 Subject: [PATCH] debug: add detailed logging for Sarama Fetch v5 issue - Added hex dump of record batch content for each offset - Confirmed we're returning different batches correctly (98 bytes each) - Sarama requests offsets 0,1,2 individually but only consumes offset 0 - Issue identified: Fetch v5 (Sarama) vs v10 (kafka-go) response format difference - kafka-go: fully working, Sarama: 1/3 messages consumed Next: Investigate Fetch v5 response format requirements --- weed/mq/kafka/protocol/fetch.go | 12 +++++++++--- weed/mq/kafka/protocol/handler.go | 11 ++++++++++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index e8c639eca..0bbc38764 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -72,8 +72,8 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo ledger := h.GetOrCreateLedger(topic.Name, partition.PartitionID) highWaterMark := ledger.GetHighWaterMark() - fmt.Printf("DEBUG: Fetch - topic: %s, partition: %d, fetchOffset: %d, highWaterMark: %d\n", - topic.Name, partition.PartitionID, partition.FetchOffset, highWaterMark) + fmt.Printf("DEBUG: Fetch v%d - topic: %s, partition: %d, fetchOffset: %d, highWaterMark: %d, maxBytes: %d\n", + apiVersion, topic.Name, partition.PartitionID, partition.FetchOffset, highWaterMark, partition.MaxBytes) // High water mark (8 bytes) highWaterMarkBytes := make([]byte, 8) @@ -97,7 +97,12 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo // Try to get the actual stored record batch first if storedBatch, exists := h.GetRecordBatch(topic.Name, partition.PartitionID, partition.FetchOffset); exists { recordBatch = storedBatch - fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(storedBatch)) + hexLen := 20 + if len(storedBatch) < hexLen { + hexLen = len(storedBatch) + } + fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n", + partition.FetchOffset, len(storedBatch), hexLen, storedBatch[:hexLen]) } else { fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset) // Fallback to synthetic batch if no stored batch found @@ -122,6 +127,7 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo } } + fmt.Printf("DEBUG: Fetch v%d response constructed, size: %d bytes\n", apiVersion, len(response)) return response, nil } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 1626bb50f..ba5c758c4 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -193,6 +193,16 @@ func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64 if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) { fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset) + + // If requesting the base offset, return the entire batch + if offset == baseOffset { + return batch, true + } + + // For non-base offsets, we need to create a sub-batch starting from the requested offset + // This is a complex operation, so for now return the entire batch + // TODO: Implement proper sub-batch extraction + fmt.Printf("DEBUG: WARNING: Returning entire batch for offset %d (baseOffset=%d) - may cause client issues\n", offset, baseOffset) return batch, true } } @@ -270,7 +280,6 @@ func (h *Handler) HandleConn(conn net.Conn) error { apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - apiName := getAPIName(apiKey) // Validate API version against what we support