Browse Source

refactor: replace TotalShards field with Total() method

- Remove TotalShards field from ECContext to avoid field drift
- Add Total() method that computes DataShards + ParityShards
- Update all references to use ctx.Total() instead of ctx.TotalShards
- Read EC config from VolumeInfo when loading EC volumes
- Read data shard count from .vif in VolumeEcShardsToVolume
- Use >= instead of > for exact boundary handling in encoding loops
pull/7396/head
chrislu 1 month ago
parent
commit
0cda3b9dc8
  1. 30
      weed/server/volume_grpc_erasure_coding.go
  2. 9
      weed/storage/erasure_coding/ec_context.go
  3. 22
      weed/storage/erasure_coding/ec_encoder.go
  4. 2
      weed/storage/erasure_coding/ec_test.go
  5. 17
      weed/storage/erasure_coding/ec_volume.go

30
weed/server/volume_grpc_erasure_coding.go

@ -59,7 +59,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
if !shouldCleanup {
return
}
for i := 0; i < ecCtx.TotalShards; i++ {
for i := 0; i < ecCtx.Total(); i++ {
os.Remove(baseFileName + ecCtx.ToExt(i))
}
os.Remove(v.IndexFileName() + ".ecx")
@ -453,9 +453,27 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
// collect .ec00 ~ .ec09 files
shardFileNames := make([]string, erasure_coding.DataShardsCount)
v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames)
// collect data shard files; derive shard count from .vif if present
dataShards := erasure_coding.DataShardsCount
// Try to get the volume first to determine data base filename
tempShards := make([]string, erasure_coding.TotalShardsCount)
v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), tempShards)
if !found {
return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
}
dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
// Read EC config from .vif if available
if vi, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found && vi.EcShardConfig != nil {
dataShards = int(vi.EcShardConfig.DataShards)
glog.V(1).Infof("Using EC config from VolumeInfo: %d data shards", dataShards)
}
// Collect only the data shards needed
shardFileNames := make([]string, dataShards)
v, found = vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames)
if !found {
return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
}
@ -464,13 +482,11 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ {
for shardId := 0; shardId < dataShards; shardId++ {
if shardFileNames[shardId] == "" {
return nil, fmt.Errorf("ec volume %d missing shard %d", req.VolumeId, shardId)
}
}
dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
// calculate .dat file size
datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
if err != nil {

9
weed/storage/erasure_coding/ec_context.go

@ -11,17 +11,20 @@ import (
type ECContext struct {
DataShards int
ParityShards int
TotalShards int
Collection string
VolumeId needle.VolumeId
}
// Total returns the total number of shards (data + parity)
func (ctx *ECContext) Total() int {
return ctx.DataShards + ctx.ParityShards
}
// NewDefaultECContext creates a context with default 10+4 shard configuration
func NewDefaultECContext(collection string, volumeId needle.VolumeId) *ECContext {
return &ECContext{
DataShards: DataShardsCount,
ParityShards: ParityShardsCount,
TotalShards: TotalShardsCount,
Collection: collection,
VolumeId: volumeId,
}
@ -39,5 +42,5 @@ func (ctx *ECContext) ToExt(shardIndex int) string {
// String returns a human-readable representation of the EC configuration
func (ctx *ECContext) String() string {
return fmt.Sprintf("%d+%d (total: %d)", ctx.DataShards, ctx.ParityShards, ctx.TotalShards)
return fmt.Sprintf("%d+%d (total: %d)", ctx.DataShards, ctx.ParityShards, ctx.Total())
}

22
weed/storage/erasure_coding/ec_encoder.go

@ -101,10 +101,10 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64,
func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64, ctx *ECContext) (generatedShardIds []uint32, err error) {
shardHasData := make([]bool, ctx.TotalShards)
inputFiles := make([]*os.File, ctx.TotalShards)
outputFiles := make([]*os.File, ctx.TotalShards)
for shardId := 0; shardId < ctx.TotalShards; shardId++ {
shardHasData := make([]bool, ctx.Total())
inputFiles := make([]*os.File, ctx.Total())
outputFiles := make([]*os.File, ctx.Total())
for shardId := 0; shardId < ctx.Total(); shardId++ {
shardFileName := baseFileName + ctx.ToExt(shardId)
if util.FileExists(shardFileName) {
shardHasData[shardId] = true
@ -153,7 +153,7 @@ func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize i
}
func openEcFiles(baseFileName string, forRead bool, ctx *ECContext) (files []*os.File, err error) {
for i := 0; i < ctx.TotalShards; i++ {
for i := 0; i < ctx.Total(); i++ {
fname := baseFileName + ctx.ToExt(i)
openOption := os.O_TRUNC | os.O_CREATE | os.O_WRONLY
if forRead {
@ -198,7 +198,7 @@ func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blo
return err
}
for i := 0; i < ctx.TotalShards; i++ {
for i := 0; i < ctx.Total(); i++ {
_, err := outputs[i].Write(buffers[i])
if err != nil {
return err
@ -217,7 +217,7 @@ func encodeDatFile(remainingSize int64, baseFileName string, bufferSize int, lar
return fmt.Errorf("failed to create encoder: %w", err)
}
buffers := make([][]byte, ctx.TotalShards)
buffers := make([][]byte, ctx.Total())
for i := range buffers {
buffers[i] = make([]byte, bufferSize)
}
@ -232,7 +232,7 @@ func encodeDatFile(remainingSize int64, baseFileName string, bufferSize int, lar
largeRowSize := largeBlockSize * int64(ctx.DataShards)
smallRowSize := smallBlockSize * int64(ctx.DataShards)
for remainingSize > largeRowSize {
for remainingSize >= largeRowSize {
err = encodeData(file, enc, processedSize, largeBlockSize, buffers, outputs, ctx)
if err != nil {
return fmt.Errorf("failed to encode large chunk data: %w", err)
@ -258,7 +258,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
return fmt.Errorf("failed to create encoder: %w", err)
}
buffers := make([][]byte, ctx.TotalShards)
buffers := make([][]byte, ctx.Total())
for i := range buffers {
if shardHasData[i] {
buffers[i] = make([]byte, ErasureCodingSmallBlockSize)
@ -270,7 +270,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
for {
// read the input data from files
for i := 0; i < ctx.TotalShards; i++ {
for i := 0; i < ctx.Total(); i++ {
if shardHasData[i] {
n, _ := inputFiles[i].ReadAt(buffers[i], startOffset)
if n == 0 {
@ -294,7 +294,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
// write the data to output files
for i := 0; i < ctx.TotalShards; i++ {
for i := 0; i < ctx.Total(); i++ {
if !shardHasData[i] {
n, _ := outputFiles[i].WriteAt(buffers[i][:inputBufferDataSize], startOffset)
if inputBufferDataSize != n {

2
weed/storage/erasure_coding/ec_test.go

@ -188,7 +188,7 @@ func readFromFile(file *os.File, data []byte, ecFileOffset int64) (err error) {
}
func removeGeneratedFiles(baseFileName string, ctx *ECContext) {
for i := 0; i < ctx.TotalShards; i++ {
for i := 0; i < ctx.Total(); i++ {
fname := baseFileName + ctx.ToExt(i)
os.Remove(fname)
}

17
weed/storage/erasure_coding/ec_volume.go

@ -74,14 +74,25 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
ev.Version = needle.Version(volumeInfo.Version)
ev.datFileSize = volumeInfo.DatFileSize
ev.ExpireAtSec = volumeInfo.ExpireAtSec
// Initialize EC context from .vif if present; fallback to defaults
if volumeInfo.EcShardConfig != nil {
ev.ECContext = &ECContext{
Collection: collection,
VolumeId: vid,
DataShards: int(volumeInfo.EcShardConfig.DataShards),
ParityShards: int(volumeInfo.EcShardConfig.ParityShards),
}
glog.V(1).Infof("Loaded EC config from VolumeInfo for volume %d: %s", vid, ev.ECContext.String())
} else {
ev.ECContext = NewDefaultECContext(collection, vid)
}
} else {
glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName)
volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
ev.ECContext = NewDefaultECContext(collection, vid)
}
// Initialize EC context with default configuration (Phase 1: always 10+4)
ev.ECContext = NewDefaultECContext(collection, vid)
ev.ShardLocations = make(map[ShardId][]pb.ServerAddress)
return

Loading…
Cancel
Save