Browse Source

refactor: update ec_encoder.go to use ECContext

- Add WriteEcFilesWithContext() and RebuildEcFilesWithContext() functions
- Keep old functions for backward compatibility (call new versions)
- Update all internal functions to accept ECContext parameter
- Use ctx.DataShards, ctx.ParityShards, ctx.TotalShards consistently
- Use ctx.CreateEncoder() instead of hardcoded reedsolomon.New()
- Use ctx.ToExt() for shard file extensions
- No behavior change, still uses default 10+4 configuration
pull/7396/head
chrislu 1 month ago
parent
commit
b1de205d6c
  1. 86
      weed/storage/erasure_coding/ec_encoder.go

86
weed/storage/erasure_coding/ec_encoder.go

@ -54,20 +54,32 @@ func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
return nil
}
// WriteEcFiles generates .ec00 ~ .ec13 files
// WriteEcFiles generates .ec00 ~ .ec13 files using default EC context
func WriteEcFiles(baseFileName string) error {
return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
ctx := NewDefaultECContext("", 0)
return WriteEcFilesWithContext(baseFileName, ctx)
}
// WriteEcFilesWithContext generates EC files using the provided context
func WriteEcFilesWithContext(baseFileName string, ctx *ECContext) error {
return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx)
}
func RebuildEcFiles(baseFileName string) ([]uint32, error) {
return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
ctx := NewDefaultECContext("", 0)
return RebuildEcFilesWithContext(baseFileName, ctx)
}
// RebuildEcFilesWithContext rebuilds missing EC files using the provided context
func RebuildEcFilesWithContext(baseFileName string, ctx *ECContext) ([]uint32, error) {
return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, ctx)
}
func ToExt(ecIndex int) string {
return fmt.Sprintf(".ec%02d", ecIndex)
}
func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64) error {
func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64, ctx *ECContext) error {
file, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0)
if err != nil {
return fmt.Errorf("failed to open dat file: %w", err)
@ -79,21 +91,21 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64,
return fmt.Errorf("failed to stat dat file: %w", err)
}
glog.V(0).Infof("encodeDatFile %s.dat size:%d", baseFileName, fi.Size())
err = encodeDatFile(fi.Size(), baseFileName, bufferSize, largeBlockSize, file, smallBlockSize)
glog.V(0).Infof("encodeDatFile %s.dat size:%d with EC context %s", baseFileName, fi.Size(), ctx.String())
err = encodeDatFile(fi.Size(), baseFileName, bufferSize, largeBlockSize, file, smallBlockSize, ctx)
if err != nil {
return fmt.Errorf("encodeDatFile: %w", err)
}
return nil
}
func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64) (generatedShardIds []uint32, err error) {
func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64, ctx *ECContext) (generatedShardIds []uint32, err error) {
shardHasData := make([]bool, TotalShardsCount)
inputFiles := make([]*os.File, TotalShardsCount)
outputFiles := make([]*os.File, TotalShardsCount)
for shardId := 0; shardId < TotalShardsCount; shardId++ {
shardFileName := baseFileName + ToExt(shardId)
shardHasData := make([]bool, ctx.TotalShards)
inputFiles := make([]*os.File, ctx.TotalShards)
outputFiles := make([]*os.File, ctx.TotalShards)
for shardId := 0; shardId < ctx.TotalShards; shardId++ {
shardFileName := baseFileName + ctx.ToExt(shardId)
if util.FileExists(shardFileName) {
shardHasData[shardId] = true
inputFiles[shardId], err = os.OpenFile(shardFileName, os.O_RDONLY, 0)
@ -111,14 +123,14 @@ func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize
}
}
err = rebuildEcFiles(shardHasData, inputFiles, outputFiles)
err = rebuildEcFiles(shardHasData, inputFiles, outputFiles, ctx)
if err != nil {
return nil, fmt.Errorf("rebuildEcFiles: %w", err)
}
return
}
func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File, ctx *ECContext) error {
bufferSize := int64(len(buffers[0]))
if bufferSize == 0 {
@ -131,7 +143,7 @@ func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize i
}
for b := int64(0); b < batchCount; b++ {
err := encodeDataOneBatch(file, enc, startOffset+b*bufferSize, blockSize, buffers, outputs)
err := encodeDataOneBatch(file, enc, startOffset+b*bufferSize, blockSize, buffers, outputs, ctx)
if err != nil {
return err
}
@ -140,9 +152,9 @@ func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize i
return nil
}
func openEcFiles(baseFileName string, forRead bool) (files []*os.File, err error) {
for i := 0; i < TotalShardsCount; i++ {
fname := baseFileName + ToExt(i)
func openEcFiles(baseFileName string, forRead bool, ctx *ECContext) (files []*os.File, err error) {
for i := 0; i < ctx.TotalShards; i++ {
fname := baseFileName + ctx.ToExt(i)
openOption := os.O_TRUNC | os.O_CREATE | os.O_WRONLY
if forRead {
openOption = os.O_RDONLY
@ -164,10 +176,10 @@ func closeEcFiles(files []*os.File) {
}
}
func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File, ctx *ECContext) error {
// read data into buffers
for i := 0; i < DataShardsCount; i++ {
for i := 0; i < ctx.DataShards; i++ {
n, err := file.ReadAt(buffers[i], startOffset+blockSize*int64(i))
if err != nil {
if err != io.EOF {
@ -186,7 +198,7 @@ func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blo
return err
}
for i := 0; i < TotalShardsCount; i++ {
for i := 0; i < ctx.TotalShards; i++ {
_, err := outputs[i].Write(buffers[i])
if err != nil {
return err
@ -196,53 +208,53 @@ func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blo
return nil
}
func encodeDatFile(remainingSize int64, baseFileName string, bufferSize int, largeBlockSize int64, file *os.File, smallBlockSize int64) error {
func encodeDatFile(remainingSize int64, baseFileName string, bufferSize int, largeBlockSize int64, file *os.File, smallBlockSize int64, ctx *ECContext) error {
var processedSize int64
enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
enc, err := ctx.CreateEncoder()
if err != nil {
return fmt.Errorf("failed to create encoder: %w", err)
}
buffers := make([][]byte, TotalShardsCount)
buffers := make([][]byte, ctx.TotalShards)
for i := range buffers {
buffers[i] = make([]byte, bufferSize)
}
outputs, err := openEcFiles(baseFileName, false)
outputs, err := openEcFiles(baseFileName, false, ctx)
defer closeEcFiles(outputs)
if err != nil {
return fmt.Errorf("failed to open ec files %s: %v", baseFileName, err)
}
for remainingSize > largeBlockSize*DataShardsCount {
err = encodeData(file, enc, processedSize, largeBlockSize, buffers, outputs)
for remainingSize > largeBlockSize*int64(ctx.DataShards) {
err = encodeData(file, enc, processedSize, largeBlockSize, buffers, outputs, ctx)
if err != nil {
return fmt.Errorf("failed to encode large chunk data: %w", err)
}
remainingSize -= largeBlockSize * DataShardsCount
processedSize += largeBlockSize * DataShardsCount
remainingSize -= largeBlockSize * int64(ctx.DataShards)
processedSize += largeBlockSize * int64(ctx.DataShards)
}
for remainingSize > 0 {
err = encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs)
err = encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs, ctx)
if err != nil {
return fmt.Errorf("failed to encode small chunk data: %w", err)
}
remainingSize -= smallBlockSize * DataShardsCount
processedSize += smallBlockSize * DataShardsCount
remainingSize -= smallBlockSize * int64(ctx.DataShards)
processedSize += smallBlockSize * int64(ctx.DataShards)
}
return nil
}
func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*os.File) error {
func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*os.File, ctx *ECContext) error {
enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
enc, err := ctx.CreateEncoder()
if err != nil {
return fmt.Errorf("failed to create encoder: %w", err)
}
buffers := make([][]byte, TotalShardsCount)
buffers := make([][]byte, ctx.TotalShards)
for i := range buffers {
if shardHasData[i] {
buffers[i] = make([]byte, ErasureCodingSmallBlockSize)
@ -254,7 +266,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
for {
// read the input data from files
for i := 0; i < TotalShardsCount; i++ {
for i := 0; i < ctx.TotalShards; i++ {
if shardHasData[i] {
n, _ := inputFiles[i].ReadAt(buffers[i], startOffset)
if n == 0 {
@ -278,7 +290,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
// write the data to output files
for i := 0; i < TotalShardsCount; i++ {
for i := 0; i < ctx.TotalShards; i++ {
if !shardHasData[i] {
n, _ := outputFiles[i].WriteAt(buffers[i][:inputBufferDataSize], startOffset)
if inputBufferDataSize != n {

Loading…
Cancel
Save