Browse Source

fix tests

pull/7231/head
chrislu 2 months ago
parent
commit
93c3e0c784
  1. 36
      weed/mq/kafka/protocol/fetch.go
  2. 7
      weed/mq/kafka/protocol/handler.go

36
weed/mq/kafka/protocol/fetch.go

@ -123,8 +123,26 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
highWaterMark = ledger.GetHighWaterMark() highWaterMark = ledger.GetHighWaterMark()
} }
fmt.Printf("DEBUG: Fetch v%d - topic: %s, partition: %d, fetchOffset: %d, highWaterMark: %d, maxBytes: %d\n",
apiVersion, topic.Name, partition.PartitionID, partition.FetchOffset, highWaterMark, partition.MaxBytes)
// Normalize special fetch offsets: -2 = earliest, -1 = latest
effectiveFetchOffset := partition.FetchOffset
if effectiveFetchOffset < 0 {
if effectiveFetchOffset == -2 { // earliest
if ledger != nil {
effectiveFetchOffset = ledger.GetEarliestOffset()
} else {
effectiveFetchOffset = 0
}
} else if effectiveFetchOffset == -1 { // latest
if ledger != nil {
effectiveFetchOffset = ledger.GetLatestOffset()
} else {
effectiveFetchOffset = 0
}
}
}
fmt.Printf("DEBUG: Fetch v%d - topic: %s, partition: %d, fetchOffset: %d (effective: %d), highWaterMark: %d, maxBytes: %d\n",
apiVersion, topic.Name, partition.PartitionID, partition.FetchOffset, effectiveFetchOffset, highWaterMark, partition.MaxBytes)
// High water mark (8 bytes) // High water mark (8 bytes)
highWaterMarkBytes := make([]byte, 8) highWaterMarkBytes := make([]byte, 8)
@ -150,16 +168,16 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
// Records - get actual stored record batches using multi-batch fetcher // Records - get actual stored record batches using multi-batch fetcher
var recordBatch []byte var recordBatch []byte
if ledger != nil && highWaterMark > partition.FetchOffset {
if ledger != nil && highWaterMark > effectiveFetchOffset {
fmt.Printf("DEBUG: Multi-batch fetch - topic:%s, partition:%d, offset:%d, maxBytes:%d\n", fmt.Printf("DEBUG: Multi-batch fetch - topic:%s, partition:%d, offset:%d, maxBytes:%d\n",
topic.Name, partition.PartitionID, partition.FetchOffset, partition.MaxBytes)
topic.Name, partition.PartitionID, effectiveFetchOffset, partition.MaxBytes)
// Use multi-batch fetcher for better MaxBytes compliance // Use multi-batch fetcher for better MaxBytes compliance
multiFetcher := NewMultiBatchFetcher(h) multiFetcher := NewMultiBatchFetcher(h)
result, err := multiFetcher.FetchMultipleBatches( result, err := multiFetcher.FetchMultipleBatches(
topic.Name, topic.Name,
partition.PartitionID, partition.PartitionID,
partition.FetchOffset,
effectiveFetchOffset,
highWaterMark, highWaterMark,
partition.MaxBytes, partition.MaxBytes,
) )
@ -171,17 +189,17 @@ func (h *Handler) handleFetch(correlationID uint32, apiVersion uint16, requestBo
} else { } else {
fmt.Printf("DEBUG: Multi-batch failed or empty, falling back to single batch\n") fmt.Printf("DEBUG: Multi-batch failed or empty, falling back to single batch\n")
// Fallback to original single batch logic // Fallback to original single batch logic
smqRecords, err := h.seaweedMQHandler.GetStoredRecords(topic.Name, partition.PartitionID, partition.FetchOffset, 10)
smqRecords, err := h.seaweedMQHandler.GetStoredRecords(topic.Name, partition.PartitionID, effectiveFetchOffset, 10)
if err == nil && len(smqRecords) > 0 { if err == nil && len(smqRecords) > 0 {
recordBatch = h.constructRecordBatchFromSMQ(partition.FetchOffset, smqRecords)
recordBatch = h.constructRecordBatchFromSMQ(effectiveFetchOffset, smqRecords)
fmt.Printf("DEBUG: Fallback single batch size: %d bytes\n", len(recordBatch)) fmt.Printf("DEBUG: Fallback single batch size: %d bytes\n", len(recordBatch))
} else { } else {
recordBatch = h.constructSimpleRecordBatch(partition.FetchOffset, highWaterMark)
recordBatch = h.constructSimpleRecordBatch(effectiveFetchOffset, highWaterMark)
fmt.Printf("DEBUG: Fallback synthetic batch size: %d bytes\n", len(recordBatch)) fmt.Printf("DEBUG: Fallback synthetic batch size: %d bytes\n", len(recordBatch))
} }
} }
} else { } else {
fmt.Printf("DEBUG: No messages available - fetchOffset %d >= highWaterMark %d\n", partition.FetchOffset, highWaterMark)
fmt.Printf("DEBUG: No messages available - effective fetchOffset %d >= highWaterMark %d\n", effectiveFetchOffset, highWaterMark)
recordBatch = []byte{} // No messages available recordBatch = []byte{} // No messages available
} }

7
weed/mq/kafka/protocol/handler.go

@ -1334,8 +1334,11 @@ func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, re
switch apiVersion { switch apiVersion {
case 0, 1: case 0, 1:
return h.handleCreateTopicsV0V1(correlationID, requestBody) return h.handleCreateTopicsV0V1(correlationID, requestBody)
case 2, 3, 4, 5:
// Flexible versions (v2+)
case 2, 3, 4:
// Use non-flexible parser for v2-4 (kafka-go sends regular format)
return h.handleCreateTopicsV2To4(correlationID, requestBody)
case 5:
// v5+ uses flexible format
return h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody) return h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody)
default: default:
return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion) return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion)

Loading…
Cancel
Save