diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index e8c639eca..0bbc38764 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -72,8 +72,8 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo ledger := h.GetOrCreateLedger(topic.Name, partition.PartitionID) highWaterMark := ledger.GetHighWaterMark() - fmt.Printf("DEBUG: Fetch - topic: %s, partition: %d, fetchOffset: %d, highWaterMark: %d\n", - topic.Name, partition.PartitionID, partition.FetchOffset, highWaterMark) + fmt.Printf("DEBUG: Fetch v%d - topic: %s, partition: %d, fetchOffset: %d, highWaterMark: %d, maxBytes: %d\n", + apiVersion, topic.Name, partition.PartitionID, partition.FetchOffset, highWaterMark, partition.MaxBytes) // High water mark (8 bytes) highWaterMarkBytes := make([]byte, 8) @@ -97,7 +97,12 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo // Try to get the actual stored record batch first if storedBatch, exists := h.GetRecordBatch(topic.Name, partition.PartitionID, partition.FetchOffset); exists { recordBatch = storedBatch - fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(storedBatch)) + hexLen := 20 + if len(storedBatch) < hexLen { + hexLen = len(storedBatch) + } + fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n", + partition.FetchOffset, len(storedBatch), hexLen, storedBatch[:hexLen]) } else { fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset) // Fallback to synthetic batch if no stored batch found @@ -122,6 +127,7 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo } } + fmt.Printf("DEBUG: Fetch v%d response constructed, size: %d bytes\n", apiVersion, len(response)) return response, nil } diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index 1626bb50f..ba5c758c4 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -193,6 +193,16 @@ func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64 if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) { fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset) + + // If requesting the base offset, return the entire batch + if offset == baseOffset { + return batch, true + } + + // For non-base offsets, we need to create a sub-batch starting from the requested offset + // This is a complex operation, so for now return the entire batch + // TODO: Implement proper sub-batch extraction + fmt.Printf("DEBUG: WARNING: Returning entire batch for offset %d (baseOffset=%d) - may cause client issues\n", offset, baseOffset) return batch, true } } @@ -270,7 +280,6 @@ func (h *Handler) HandleConn(conn net.Conn) error { apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - apiName := getAPIName(apiKey) // Validate API version against what we support