From 8c31d5e3317ba1237d99cc020a9d9ab92e879f27 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 13:45:08 -0700 Subject: [PATCH] EcVolume creation properly refactored --- weed/server/volume_grpc_erasure_coding.go | 2 +- weed/storage/disk_location_ec.go | 19 +++++++---- weed/storage/erasure_coding/ec_shard.go | 25 ++++++++++++-- weed/storage/erasure_coding/ec_volume.go | 40 ++++++++++++++++++----- weed/storage/store_ec.go | 4 +-- 5 files changed, 70 insertions(+), 20 deletions(-) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 88e94115d..a90747ba4 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -303,7 +303,7 @@ func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_ser glog.V(0).Infof("VolumeEcShardsMount: %v", req) for _, shardId := range req.ShardIds { - err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId)) + err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId), req.Generation) if err != nil { glog.Errorf("ec shard mount %v: %v", req, err) diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index e46480060..983865424 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -72,9 +72,9 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S return nil, false } -func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolume, error) { +func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, generation uint32) (*erasure_coding.EcVolume, error) { - ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId) + ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId, generation) if err != nil { if err == os.ErrNotExist { return nil, os.ErrNotExist @@ -85,7 +85,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard defer l.ecVolumesLock.Unlock() ecVolume, found := l.ecVolumes[vid] if !found { - ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid) + ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid, generation) if err != nil { return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err) } @@ -116,7 +116,7 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding return true } -func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId) (err error) { +func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId, generation uint32) (err error) { for _, shard := range shards { shardId, err := strconv.ParseInt(path.Ext(shard)[3:], 10, 64) @@ -124,7 +124,7 @@ func (l *DiskLocation) loadEcShards(shards []string, collection string, vid need return fmt.Errorf("failed to parse ec shard name %v: %w", shard, err) } - _, err = l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId)) + _, err = l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId), generation) if err != nil { return fmt.Errorf("failed to load ec shard %v: %w", shard, err) } @@ -183,8 +183,13 @@ func (l *DiskLocation) loadAllEcShards() (err error) { } if ext == ".ecx" && volumeId == prevVolumeId { - if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil { - return fmt.Errorf("loadEcShards collection:%v volumeId:%d : %v", collection, volumeId, err) + // Parse generation from the first shard filename + generation := uint32(0) + if len(sameVolumeShards) > 0 { + generation = erasure_coding.ParseGenerationFromFileName(sameVolumeShards[0]) + } + if err = l.loadEcShards(sameVolumeShards, collection, volumeId, generation); err != nil { + return fmt.Errorf("loadEcShards collection:%v volumeId:%d generation:%d : %v", collection, volumeId, generation, err) } prevVolumeId = volumeId continue diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 559fd4f4a..58b878da8 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -25,11 +25,11 @@ type EcVolumeShard struct { DiskType types.DiskType } -func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { +func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId, generation uint32) (v *EcVolumeShard, e error) { v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType} - baseFileName := v.FileName() + baseFileName := v.FileNameWithGeneration(generation) // open ecd file if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil { @@ -151,3 +151,24 @@ func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { return n, err } + +// ParseGenerationFromFileName extracts generation from EC volume filename +// Returns 0 for files without generation suffix (backward compatibility) +func ParseGenerationFromFileName(fileName string) uint32 { + // Remove extension first + baseName := fileName + if lastDot := strings.LastIndex(fileName, "."); lastDot >= 0 { + baseName = fileName[:lastDot] + } + + // Look for _g{N} pattern at the end + if gIndex := strings.LastIndex(baseName, "_g"); gIndex >= 0 { + generationStr := baseName[gIndex+2:] + if generation, err := strconv.ParseUint(generationStr, 10, 32); err == nil { + return uint32(generation) + } + } + + // No generation suffix found, return 0 for backward compatibility + return 0 +} diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index c18835119..33cb9041f 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -27,6 +27,7 @@ var ( type EcVolume struct { VolumeId needle.VolumeId Collection string + Generation uint32 // generation of this EC volume, defaults to 0 for backward compatibility dir string dirIdx string ecxFile *os.File @@ -44,11 +45,19 @@ type EcVolume struct { ExpireAtSec uint64 //ec volume destroy time, calculated from the ec volume was created } -func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { - ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType} +func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId, generation uint32) (ev *EcVolume, err error) { + ev = &EcVolume{ + dir: dir, + dirIdx: dirIdx, + Collection: collection, + VolumeId: vid, + Generation: generation, + diskType: diskType, + } - dataBaseFileName := EcShardFileName(collection, dir, int(vid)) - indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid)) + // Use generation-aware filenames + dataBaseFileName := EcShardFileNameWithGeneration(collection, dir, int(vid), generation) + indexBaseFileName := EcShardFileNameWithGeneration(collection, dirIdx, int(vid), generation) // open ecx file if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil { @@ -74,7 +83,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection ev.datFileSize = volumeInfo.DatFileSize ev.ExpireAtSec = volumeInfo.ExpireAtSec } else { - glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName) + glog.V(1).Infof("vif file not found for volume %d generation %d, creating new one: %s", vid, generation, dataBaseFileName) volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) } @@ -155,7 +164,7 @@ func (ev *EcVolume) Destroy() { } func (ev *EcVolume) FileName(ext string) string { - return ev.FileNameWithGeneration(ext, 0) + return ev.FileNameWithGeneration(ext, ev.Generation) } func (ev *EcVolume) FileNameWithGeneration(ext string, generation uint32) string { @@ -168,7 +177,7 @@ func (ev *EcVolume) FileNameWithGeneration(ext string, generation uint32) string } func (ev *EcVolume) DataBaseFileName() string { - return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId)) + return EcShardFileNameWithGeneration(ev.Collection, ev.dir, int(ev.VolumeId), ev.Generation) } func (ev *EcVolume) DataBaseFileNameWithGeneration(generation uint32) string { @@ -176,7 +185,7 @@ func (ev *EcVolume) DataBaseFileNameWithGeneration(generation uint32) string { } func (ev *EcVolume) IndexBaseFileName() string { - return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId)) + return EcShardFileNameWithGeneration(ev.Collection, ev.dirIdx, int(ev.VolumeId), ev.Generation) } func (ev *EcVolume) IndexBaseFileNameWithGeneration(generation uint32) string { @@ -190,6 +199,20 @@ func (ev *EcVolume) ShardSize() uint64 { return 0 } +// String returns a string representation of the EC volume including generation +func (ev *EcVolume) String() string { + return fmt.Sprintf("EcVolume{Id:%d, Collection:%s, Generation:%d, Shards:%d}", + ev.VolumeId, ev.Collection, ev.Generation, len(ev.Shards)) +} + +// Key returns a unique key for this EC volume including generation +func (ev *EcVolume) Key() string { + if ev.Collection == "" { + return fmt.Sprintf("%d_g%d", ev.VolumeId, ev.Generation) + } + return fmt.Sprintf("%s_%d_g%d", ev.Collection, ev.VolumeId, ev.Generation) +} + func (ev *EcVolume) Size() (size uint64) { for _, shard := range ev.Shards { if shardSize := shard.Size(); shardSize > 0 { @@ -239,6 +262,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage(diskId uint32) (messages [ DiskType: string(ev.diskType), ExpireAtSec: ev.ExpireAtSec, DiskId: diskId, + Generation: ev.Generation, // include generation in heartbeat message } messages = append(messages, m) } diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 0126ad9d4..b255ee80d 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -48,9 +48,9 @@ func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat { } -func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error { +func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, generation uint32) error { for diskId, location := range s.Locations { - if ecVolume, err := location.LoadEcShard(collection, vid, shardId); err == nil { + if ecVolume, err := location.LoadEcShard(collection, vid, shardId, generation); err == nil { glog.V(0).Infof("MountEcShards %d.%d on disk ID %d", vid, shardId, diskId) var shardBits erasure_coding.ShardBits