Browse Source

Update fetch_multibatch.go

pull/7231/head
chrislu 2 months ago
parent
commit
ec7adc5a36
  1. 94
      weed/mq/kafka/protocol/fetch_multibatch.go

94
weed/mq/kafka/protocol/fetch_multibatch.go

@ -51,7 +51,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(topicName string, partitionID i
currentOffset := startOffset
totalSize := int32(0)
batchCount := 0
// Parameters for batch fetching - start smaller to respect maxBytes better
recordsPerBatch := int32(10) // Start with smaller batch size
maxBatchesPerFetch := 10 // Limit number of batches to avoid infinite loops
@ -85,31 +85,19 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(topicName string, partitionID i
// Estimate batch size before construction to better respect maxBytes
estimatedBatchSize := f.estimateBatchSize(smqRecords)
// Check if this batch would exceed maxBytes BEFORE constructing it
if totalSize+estimatedBatchSize > maxBytes && batchCount > 0 {
fmt.Printf("DEBUG: MultiBatch - estimated batch would exceed limit (%d + %d > %d), stopping\n",
totalSize, estimatedBatchSize, maxBytes)
break
}
// Special case: If this is the first batch and it's already too big,
// we still need to include it (Kafka behavior - always return at least some data)
if batchCount == 0 && estimatedBatchSize > maxBytes {
fmt.Printf("DEBUG: MultiBatch - first batch estimated size %d exceeds maxBytes %d, but including anyway\n",
estimatedBatchSize, maxBytes)
}
// Note: we do not stop based on estimate; we will check actual size after constructing the batch
// Construct record batch
batch := f.constructSingleRecordBatch(currentOffset, smqRecords)
batchSize := int32(len(batch))
fmt.Printf("DEBUG: MultiBatch - constructed batch %d: %d records, %d bytes (estimated %d), offset %d\n",
fmt.Printf("DEBUG: MultiBatch - constructed batch %d: %d records, %d bytes (estimated %d), offset %d\n",
batchCount+1, len(smqRecords), batchSize, estimatedBatchSize, currentOffset)
// Double-check actual size doesn't exceed maxBytes
if totalSize+batchSize > maxBytes && batchCount > 0 {
fmt.Printf("DEBUG: MultiBatch - actual batch would exceed limit (%d + %d > %d), stopping\n",
fmt.Printf("DEBUG: MultiBatch - actual batch would exceed limit (%d + %d > %d), stopping\n",
totalSize, batchSize, maxBytes)
break
}
@ -134,7 +122,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(topicName string, partitionID i
BatchCount: batchCount,
}
fmt.Printf("DEBUG: MultiBatch - completed: %d batches, %d total bytes, next offset %d\n",
fmt.Printf("DEBUG: MultiBatch - completed: %d batches, %d total bytes, next offset %d\n",
result.BatchCount, result.TotalSize, result.NextOffset)
return result, nil
@ -450,42 +438,62 @@ func (f *MultiBatchFetcher) constructCompressedRecordBatch(baseOffset int64, com
// estimateBatchSize estimates the size of a record batch before constructing it
func (f *MultiBatchFetcher) estimateBatchSize(smqRecords []offset.SMQRecord) int32 {
if len(smqRecords) == 0 {
return 61 // empty batch size
return 61 // empty batch header size
}
// Record batch header: 61 bytes
// Record batch header: 61 bytes (base_offset + batch_length + leader_epoch + magic + crc + attributes +
// last_offset_delta + first_ts + max_ts + producer_id + producer_epoch + base_seq + record_count)
headerSize := int32(61)
// Estimate records size
baseTs := smqRecords[0].GetTimestamp()
recordsSize := int32(0)
for _, record := range smqRecords {
// Each record has overhead: attributes(1) + timestamp_delta(varint) + offset_delta(varint) + headers(varint)
recordOverhead := int32(10) // rough estimate for varints and overhead
keySize := int32(0)
if record.GetKey() != nil {
keySize = int32(len(record.GetKey())) + 5 // +5 for length varint
for i, rec := range smqRecords {
// attributes(1)
rb := int32(1)
// timestamp_delta(varint)
tsDelta := rec.GetTimestamp() - baseTs
rb += int32(len(encodeVarint(tsDelta)))
// offset_delta(varint)
rb += int32(len(encodeVarint(int64(i))))
// key length varint + data or -1
if k := rec.GetKey(); k != nil {
rb += int32(len(encodeVarint(int64(len(k))))) + int32(len(k))
} else {
keySize = 1 // -1 encoded as varint
rb += int32(len(encodeVarint(-1)))
}
valueSize := int32(0)
if record.GetValue() != nil {
valueSize = int32(len(record.GetValue())) + 5 // +5 for length varint
// value length varint + data or -1
if v := rec.GetValue(); v != nil {
rb += int32(len(encodeVarint(int64(len(v))))) + int32(len(v))
} else {
valueSize = 1 // -1 encoded as varint
rb += int32(len(encodeVarint(-1)))
}
// Record length itself is also encoded as varint
recordLength := recordOverhead + keySize + valueSize
recordLengthVarintSize := int32(5) // conservative estimate for varint
recordsSize += recordLengthVarintSize + recordLength
// headers count (varint = 0)
rb += int32(len(encodeVarint(0)))
// prepend record length varint
recordsSize += int32(len(encodeVarint(int64(rb)))) + rb
}
return headerSize + recordsSize
}
// sizeOfVarint returns the number of bytes encodeVarint would use for value
func sizeOfVarint(value int64) int32 {
// ZigZag encode to match encodeVarint
u := uint64(uint64(value<<1) ^ uint64(value>>63))
size := int32(1)
for u >= 0x80 {
u >>= 7
size++
}
return size
}
// compressData compresses data using the specified codec (basic implementation)
func (f *MultiBatchFetcher) compressData(data []byte, codec compression.CompressionCodec) ([]byte, error) {
// For Phase 5, implement basic compression support

Loading…
Cancel
Save