From 0cda3b9dc83053dbba201c99aa13545fe920943d Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 27 Oct 2025 19:22:13 -0700 Subject: [PATCH] refactor: replace TotalShards field with Total() method - Remove TotalShards field from ECContext to avoid field drift - Add Total() method that computes DataShards + ParityShards - Update all references to use ctx.Total() instead of ctx.TotalShards - Read EC config from VolumeInfo when loading EC volumes - Read data shard count from .vif in VolumeEcShardsToVolume - Use >= instead of > for exact boundary handling in encoding loops --- weed/server/volume_grpc_erasure_coding.go | 30 +++++++++++++++++------ weed/storage/erasure_coding/ec_context.go | 9 ++++--- weed/storage/erasure_coding/ec_encoder.go | 22 ++++++++--------- weed/storage/erasure_coding/ec_test.go | 2 +- weed/storage/erasure_coding/ec_volume.go | 17 ++++++++++--- 5 files changed, 55 insertions(+), 25 deletions(-) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 2e3eda7b6..f62ae1885 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -59,7 +59,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ if !shouldCleanup { return } - for i := 0; i < ecCtx.TotalShards; i++ { + for i := 0; i < ecCtx.Total(); i++ { os.Remove(baseFileName + ecCtx.ToExt(i)) } os.Remove(v.IndexFileName() + ".ecx") @@ -453,9 +453,27 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ glog.V(0).Infof("VolumeEcShardsToVolume: %v", req) - // collect .ec00 ~ .ec09 files - shardFileNames := make([]string, erasure_coding.DataShardsCount) - v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames) + // collect data shard files; derive shard count from .vif if present + dataShards := erasure_coding.DataShardsCount + + // Try to get the volume first to determine data base filename + tempShards := make([]string, erasure_coding.TotalShardsCount) + v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), tempShards) + if !found { + return nil, fmt.Errorf("ec volume %d not found", req.VolumeId) + } + + dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName() + + // Read EC config from .vif if available + if vi, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found && vi.EcShardConfig != nil { + dataShards = int(vi.EcShardConfig.DataShards) + glog.V(1).Infof("Using EC config from VolumeInfo: %d data shards", dataShards) + } + + // Collect only the data shards needed + shardFileNames := make([]string, dataShards) + v, found = vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames) if !found { return nil, fmt.Errorf("ec volume %d not found", req.VolumeId) } @@ -464,13 +482,11 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } - for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ { + for shardId := 0; shardId < dataShards; shardId++ { if shardFileNames[shardId] == "" { return nil, fmt.Errorf("ec volume %d missing shard %d", req.VolumeId, shardId) } } - - dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName() // calculate .dat file size datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName) if err != nil { diff --git a/weed/storage/erasure_coding/ec_context.go b/weed/storage/erasure_coding/ec_context.go index 1c8613371..770fe41af 100644 --- a/weed/storage/erasure_coding/ec_context.go +++ b/weed/storage/erasure_coding/ec_context.go @@ -11,17 +11,20 @@ import ( type ECContext struct { DataShards int ParityShards int - TotalShards int Collection string VolumeId needle.VolumeId } +// Total returns the total number of shards (data + parity) +func (ctx *ECContext) Total() int { + return ctx.DataShards + ctx.ParityShards +} + // NewDefaultECContext creates a context with default 10+4 shard configuration func NewDefaultECContext(collection string, volumeId needle.VolumeId) *ECContext { return &ECContext{ DataShards: DataShardsCount, ParityShards: ParityShardsCount, - TotalShards: TotalShardsCount, Collection: collection, VolumeId: volumeId, } @@ -39,5 +42,5 @@ func (ctx *ECContext) ToExt(shardIndex int) string { // String returns a human-readable representation of the EC configuration func (ctx *ECContext) String() string { - return fmt.Sprintf("%d+%d (total: %d)", ctx.DataShards, ctx.ParityShards, ctx.TotalShards) + return fmt.Sprintf("%d+%d (total: %d)", ctx.DataShards, ctx.ParityShards, ctx.Total()) } diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index 68b6a3fa9..67d634d28 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -101,10 +101,10 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64, ctx *ECContext) (generatedShardIds []uint32, err error) { - shardHasData := make([]bool, ctx.TotalShards) - inputFiles := make([]*os.File, ctx.TotalShards) - outputFiles := make([]*os.File, ctx.TotalShards) - for shardId := 0; shardId < ctx.TotalShards; shardId++ { + shardHasData := make([]bool, ctx.Total()) + inputFiles := make([]*os.File, ctx.Total()) + outputFiles := make([]*os.File, ctx.Total()) + for shardId := 0; shardId < ctx.Total(); shardId++ { shardFileName := baseFileName + ctx.ToExt(shardId) if util.FileExists(shardFileName) { shardHasData[shardId] = true @@ -153,7 +153,7 @@ func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize i } func openEcFiles(baseFileName string, forRead bool, ctx *ECContext) (files []*os.File, err error) { - for i := 0; i < ctx.TotalShards; i++ { + for i := 0; i < ctx.Total(); i++ { fname := baseFileName + ctx.ToExt(i) openOption := os.O_TRUNC | os.O_CREATE | os.O_WRONLY if forRead { @@ -198,7 +198,7 @@ func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blo return err } - for i := 0; i < ctx.TotalShards; i++ { + for i := 0; i < ctx.Total(); i++ { _, err := outputs[i].Write(buffers[i]) if err != nil { return err @@ -217,7 +217,7 @@ func encodeDatFile(remainingSize int64, baseFileName string, bufferSize int, lar return fmt.Errorf("failed to create encoder: %w", err) } - buffers := make([][]byte, ctx.TotalShards) + buffers := make([][]byte, ctx.Total()) for i := range buffers { buffers[i] = make([]byte, bufferSize) } @@ -232,7 +232,7 @@ func encodeDatFile(remainingSize int64, baseFileName string, bufferSize int, lar largeRowSize := largeBlockSize * int64(ctx.DataShards) smallRowSize := smallBlockSize * int64(ctx.DataShards) - for remainingSize > largeRowSize { + for remainingSize >= largeRowSize { err = encodeData(file, enc, processedSize, largeBlockSize, buffers, outputs, ctx) if err != nil { return fmt.Errorf("failed to encode large chunk data: %w", err) @@ -258,7 +258,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o return fmt.Errorf("failed to create encoder: %w", err) } - buffers := make([][]byte, ctx.TotalShards) + buffers := make([][]byte, ctx.Total()) for i := range buffers { if shardHasData[i] { buffers[i] = make([]byte, ErasureCodingSmallBlockSize) @@ -270,7 +270,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o for { // read the input data from files - for i := 0; i < ctx.TotalShards; i++ { + for i := 0; i < ctx.Total(); i++ { if shardHasData[i] { n, _ := inputFiles[i].ReadAt(buffers[i], startOffset) if n == 0 { @@ -294,7 +294,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o } // write the data to output files - for i := 0; i < ctx.TotalShards; i++ { + for i := 0; i < ctx.Total(); i++ { if !shardHasData[i] { n, _ := outputFiles[i].WriteAt(buffers[i][:inputBufferDataSize], startOffset) if inputBufferDataSize != n { diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go index a3bf6e61b..cbb20832c 100644 --- a/weed/storage/erasure_coding/ec_test.go +++ b/weed/storage/erasure_coding/ec_test.go @@ -188,7 +188,7 @@ func readFromFile(file *os.File, data []byte, ecFileOffset int64) (err error) { } func removeGeneratedFiles(baseFileName string, ctx *ECContext) { - for i := 0; i < ctx.TotalShards; i++ { + for i := 0; i < ctx.Total(); i++ { fname := baseFileName + ctx.ToExt(i) os.Remove(fname) } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index ce94d741b..9074bd9ad 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -74,14 +74,25 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection ev.Version = needle.Version(volumeInfo.Version) ev.datFileSize = volumeInfo.DatFileSize ev.ExpireAtSec = volumeInfo.ExpireAtSec + + // Initialize EC context from .vif if present; fallback to defaults + if volumeInfo.EcShardConfig != nil { + ev.ECContext = &ECContext{ + Collection: collection, + VolumeId: vid, + DataShards: int(volumeInfo.EcShardConfig.DataShards), + ParityShards: int(volumeInfo.EcShardConfig.ParityShards), + } + glog.V(1).Infof("Loaded EC config from VolumeInfo for volume %d: %s", vid, ev.ECContext.String()) + } else { + ev.ECContext = NewDefaultECContext(collection, vid) + } } else { glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName) volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) + ev.ECContext = NewDefaultECContext(collection, vid) } - // Initialize EC context with default configuration (Phase 1: always 10+4) - ev.ECContext = NewDefaultECContext(collection, vid) - ev.ShardLocations = make(map[ShardId][]pb.ServerAddress) return