diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go index 0eff5da37..7f8fb8ff5 100644 --- a/weed/mq/kafka/protocol/fetch_multibatch.go +++ b/weed/mq/kafka/protocol/fetch_multibatch.go @@ -51,7 +51,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(topicName string, partitionID i currentOffset := startOffset totalSize := int32(0) batchCount := 0 - + // Parameters for batch fetching - start smaller to respect maxBytes better recordsPerBatch := int32(10) // Start with smaller batch size maxBatchesPerFetch := 10 // Limit number of batches to avoid infinite loops @@ -85,31 +85,19 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(topicName string, partitionID i // Estimate batch size before construction to better respect maxBytes estimatedBatchSize := f.estimateBatchSize(smqRecords) - - // Check if this batch would exceed maxBytes BEFORE constructing it - if totalSize+estimatedBatchSize > maxBytes && batchCount > 0 { - fmt.Printf("DEBUG: MultiBatch - estimated batch would exceed limit (%d + %d > %d), stopping\n", - totalSize, estimatedBatchSize, maxBytes) - break - } - - // Special case: If this is the first batch and it's already too big, - // we still need to include it (Kafka behavior - always return at least some data) - if batchCount == 0 && estimatedBatchSize > maxBytes { - fmt.Printf("DEBUG: MultiBatch - first batch estimated size %d exceeds maxBytes %d, but including anyway\n", - estimatedBatchSize, maxBytes) - } + + // Note: we do not stop based on estimate; we will check actual size after constructing the batch // Construct record batch batch := f.constructSingleRecordBatch(currentOffset, smqRecords) batchSize := int32(len(batch)) - - fmt.Printf("DEBUG: MultiBatch - constructed batch %d: %d records, %d bytes (estimated %d), offset %d\n", + + fmt.Printf("DEBUG: MultiBatch - constructed batch %d: %d records, %d bytes (estimated %d), offset %d\n", batchCount+1, len(smqRecords), batchSize, estimatedBatchSize, currentOffset) // Double-check actual size doesn't exceed maxBytes if totalSize+batchSize > maxBytes && batchCount > 0 { - fmt.Printf("DEBUG: MultiBatch - actual batch would exceed limit (%d + %d > %d), stopping\n", + fmt.Printf("DEBUG: MultiBatch - actual batch would exceed limit (%d + %d > %d), stopping\n", totalSize, batchSize, maxBytes) break } @@ -134,7 +122,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(topicName string, partitionID i BatchCount: batchCount, } - fmt.Printf("DEBUG: MultiBatch - completed: %d batches, %d total bytes, next offset %d\n", + fmt.Printf("DEBUG: MultiBatch - completed: %d batches, %d total bytes, next offset %d\n", result.BatchCount, result.TotalSize, result.NextOffset) return result, nil @@ -450,42 +438,62 @@ func (f *MultiBatchFetcher) constructCompressedRecordBatch(baseOffset int64, com // estimateBatchSize estimates the size of a record batch before constructing it func (f *MultiBatchFetcher) estimateBatchSize(smqRecords []offset.SMQRecord) int32 { if len(smqRecords) == 0 { - return 61 // empty batch size + return 61 // empty batch header size } - // Record batch header: 61 bytes + // Record batch header: 61 bytes (base_offset + batch_length + leader_epoch + magic + crc + attributes + + // last_offset_delta + first_ts + max_ts + producer_id + producer_epoch + base_seq + record_count) headerSize := int32(61) - - // Estimate records size + + baseTs := smqRecords[0].GetTimestamp() recordsSize := int32(0) - for _, record := range smqRecords { - // Each record has overhead: attributes(1) + timestamp_delta(varint) + offset_delta(varint) + headers(varint) - recordOverhead := int32(10) // rough estimate for varints and overhead - - keySize := int32(0) - if record.GetKey() != nil { - keySize = int32(len(record.GetKey())) + 5 // +5 for length varint + for i, rec := range smqRecords { + // attributes(1) + rb := int32(1) + + // timestamp_delta(varint) + tsDelta := rec.GetTimestamp() - baseTs + rb += int32(len(encodeVarint(tsDelta))) + + // offset_delta(varint) + rb += int32(len(encodeVarint(int64(i)))) + + // key length varint + data or -1 + if k := rec.GetKey(); k != nil { + rb += int32(len(encodeVarint(int64(len(k))))) + int32(len(k)) } else { - keySize = 1 // -1 encoded as varint + rb += int32(len(encodeVarint(-1))) } - - valueSize := int32(0) - if record.GetValue() != nil { - valueSize = int32(len(record.GetValue())) + 5 // +5 for length varint + + // value length varint + data or -1 + if v := rec.GetValue(); v != nil { + rb += int32(len(encodeVarint(int64(len(v))))) + int32(len(v)) } else { - valueSize = 1 // -1 encoded as varint + rb += int32(len(encodeVarint(-1))) } - - // Record length itself is also encoded as varint - recordLength := recordOverhead + keySize + valueSize - recordLengthVarintSize := int32(5) // conservative estimate for varint - - recordsSize += recordLengthVarintSize + recordLength + + // headers count (varint = 0) + rb += int32(len(encodeVarint(0))) + + // prepend record length varint + recordsSize += int32(len(encodeVarint(int64(rb)))) + rb } - + return headerSize + recordsSize } +// sizeOfVarint returns the number of bytes encodeVarint would use for value +func sizeOfVarint(value int64) int32 { + // ZigZag encode to match encodeVarint + u := uint64(uint64(value<<1) ^ uint64(value>>63)) + size := int32(1) + for u >= 0x80 { + u >>= 7 + size++ + } + return size +} + // compressData compresses data using the specified codec (basic implementation) func (f *MultiBatchFetcher) compressData(data []byte, codec compression.CompressionCodec) ([]byte, error) { // For Phase 5, implement basic compression support