Browse Source

debug: add detailed logging for Sarama Fetch v5 issue

- Added hex dump of record batch content for each offset
- Confirmed we're returning different batches correctly (98 bytes each)
- Sarama requests offsets 0,1,2 individually but only consumes offset 0
- Issue identified: Fetch v5 (Sarama) vs v10 (kafka-go) response format difference
- kafka-go: fully working, Sarama: 1/3 messages consumed

Next: Investigate Fetch v5 response format requirements
pull/7231/head
chrislu 2 months ago
parent
commit
491404b3f6
  1. 12
      weed/mq/kafka/protocol/fetch.go
  2. 11
      weed/mq/kafka/protocol/handler.go

12
weed/mq/kafka/protocol/fetch.go

@ -72,8 +72,8 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
ledger := h.GetOrCreateLedger(topic.Name, partition.PartitionID) ledger := h.GetOrCreateLedger(topic.Name, partition.PartitionID)
highWaterMark := ledger.GetHighWaterMark() highWaterMark := ledger.GetHighWaterMark()
fmt.Printf("DEBUG: Fetch - topic: %s, partition: %d, fetchOffset: %d, highWaterMark: %d\n",
topic.Name, partition.PartitionID, partition.FetchOffset, highWaterMark)
fmt.Printf("DEBUG: Fetch v%d - topic: %s, partition: %d, fetchOffset: %d, highWaterMark: %d, maxBytes: %d\n",
apiVersion, topic.Name, partition.PartitionID, partition.FetchOffset, highWaterMark, partition.MaxBytes)
// High water mark (8 bytes) // High water mark (8 bytes)
highWaterMarkBytes := make([]byte, 8) highWaterMarkBytes := make([]byte, 8)
@ -97,7 +97,12 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
// Try to get the actual stored record batch first // Try to get the actual stored record batch first
if storedBatch, exists := h.GetRecordBatch(topic.Name, partition.PartitionID, partition.FetchOffset); exists { if storedBatch, exists := h.GetRecordBatch(topic.Name, partition.PartitionID, partition.FetchOffset); exists {
recordBatch = storedBatch recordBatch = storedBatch
fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes\n", partition.FetchOffset, len(storedBatch))
hexLen := 20
if len(storedBatch) < hexLen {
hexLen = len(storedBatch)
}
fmt.Printf("DEBUG: Using stored record batch for offset %d, size: %d bytes, first %d bytes: %x\n",
partition.FetchOffset, len(storedBatch), hexLen, storedBatch[:hexLen])
} else { } else {
fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset) fmt.Printf("DEBUG: No stored record batch found for offset %d, using synthetic batch\n", partition.FetchOffset)
// Fallback to synthetic batch if no stored batch found // Fallback to synthetic batch if no stored batch found
@ -122,6 +127,7 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
} }
} }
fmt.Printf("DEBUG: Fetch v%d response constructed, size: %d bytes\n", apiVersion, len(response))
return response, nil return response, nil
} }

11
weed/mq/kafka/protocol/handler.go

@ -193,6 +193,16 @@ func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64
if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) { if recordCount > 0 && offset >= baseOffset && offset < baseOffset+int64(recordCount) {
fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset) fmt.Printf("DEBUG: Found matching batch for offset %d in batch with baseOffset %d\n", offset, baseOffset)
// If requesting the base offset, return the entire batch
if offset == baseOffset {
return batch, true
}
// For non-base offsets, we need to create a sub-batch starting from the requested offset
// This is a complex operation, so for now return the entire batch
// TODO: Implement proper sub-batch extraction
fmt.Printf("DEBUG: WARNING: Returning entire batch for offset %d (baseOffset=%d) - may cause client issues\n", offset, baseOffset)
return batch, true return batch, true
} }
} }
@ -270,7 +280,6 @@ func (h *Handler) HandleConn(conn net.Conn) error {
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
apiName := getAPIName(apiKey) apiName := getAPIName(apiKey)
// Validate API version against what we support // Validate API version against what we support

Loading…
Cancel
Save