From 0c6980182c6a1c57b51b7a44e5d8b443bf711642 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 10 Aug 2025 19:40:35 -0700 Subject: [PATCH] ec shards with generation --- weed/storage/disk_location.go | 4 +- weed/storage/disk_location_ec.go | 111 +++++++++++++++++++++++-------- 2 files changed, 84 insertions(+), 31 deletions(-) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 02f5f5923..401d65f15 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -31,7 +31,7 @@ type DiskLocation struct { volumesLock sync.RWMutex // erasure coding - ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume + ecVolumes map[EcVolumeGenerationKey]*erasure_coding.EcVolume ecVolumesLock sync.RWMutex isDiskSpaceLow bool @@ -88,7 +88,7 @@ func NewDiskLocation(dir string, maxVolumeCount int32, minFreeSpace util.MinFree MinFreeSpace: minFreeSpace, } location.volumes = make(map[needle.VolumeId]*Volume) - location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) + location.ecVolumes = make(map[EcVolumeGenerationKey]*erasure_coding.EcVolume) location.closeCh = make(chan struct{}) go func() { location.CheckDiskSpace() diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index ace6ddc15..cdaf6ac10 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -19,13 +19,25 @@ var ( re = regexp.MustCompile(`\.ec[0-9][0-9]`) ) +// EcVolumeGenerationKey represents a unique key for EC volume with generation +type EcVolumeGenerationKey struct { + VolumeId needle.VolumeId + Generation uint32 +} + +func (k EcVolumeGenerationKey) String() string { + return fmt.Sprintf("v%d-g%d", k.VolumeId, k.Generation) +} + func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) { l.ecVolumesLock.RLock() defer l.ecVolumesLock.RUnlock() - ecVolume, ok := l.ecVolumes[vid] - if ok { - return ecVolume, true + // Search for any generation of this volume ID + for key, ecVolume := range l.ecVolumes { + if key.VolumeId == vid { + return ecVolume, true + } } return nil, false } @@ -34,10 +46,16 @@ func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId) { l.ecVolumesLock.Lock() defer l.ecVolumesLock.Unlock() - ecVolume, found := l.ecVolumes[vid] - if found { - ecVolume.Destroy() - delete(l.ecVolumes, vid) + // Find and destroy all generations of this volume + keysToDelete := make([]EcVolumeGenerationKey, 0) + for key, ecVolume := range l.ecVolumes { + if key.VolumeId == vid { + ecVolume.Destroy() + keysToDelete = append(keysToDelete, key) + } + } + for _, key := range keysToDelete { + delete(l.ecVolumes, key) } } @@ -45,7 +63,14 @@ func (l *DiskLocation) CollectEcShards(vid needle.VolumeId, shardFileNames []str l.ecVolumesLock.RLock() defer l.ecVolumesLock.RUnlock() - ecVolume, found = l.ecVolumes[vid] + // Search for any generation of this volume ID + for key, vol := range l.ecVolumes { + if key.VolumeId == vid { + ecVolume = vol + found = true + break + } + } if !found { return } @@ -61,7 +86,26 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S l.ecVolumesLock.RLock() defer l.ecVolumesLock.RUnlock() - ecVolume, ok := l.ecVolumes[vid] + // Search for any generation of this volume ID + for key, ecVolume := range l.ecVolumes { + if key.VolumeId == vid { + for _, ecShard := range ecVolume.Shards { + if ecShard.ShardId == shardId { + return ecShard, true + } + } + } + } + return nil, false +} + +func (l *DiskLocation) FindEcShardWithGeneration(vid needle.VolumeId, shardId erasure_coding.ShardId, generation uint32) (*erasure_coding.EcVolumeShard, bool) { + l.ecVolumesLock.RLock() + defer l.ecVolumesLock.RUnlock() + + // Search for specific generation of this volume ID + key := EcVolumeGenerationKey{VolumeId: vid, Generation: generation} + ecVolume, ok := l.ecVolumes[key] if !ok { return nil, false } @@ -84,13 +128,14 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard } l.ecVolumesLock.Lock() defer l.ecVolumesLock.Unlock() - ecVolume, found := l.ecVolumes[vid] + key := EcVolumeGenerationKey{VolumeId: vid, Generation: generation} + ecVolume, found := l.ecVolumes[key] if !found { ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid, generation) if err != nil { return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err) } - l.ecVolumes[vid] = ecVolume + l.ecVolumes[key] = ecVolume } ecVolume.AddEcVolumeShard(ecVolumeShard) @@ -102,19 +147,20 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding l.ecVolumesLock.Lock() defer l.ecVolumesLock.Unlock() - ecVolume, found := l.ecVolumes[vid] - if !found { - return false - } - if _, deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted { - if len(ecVolume.Shards) == 0 { - delete(l.ecVolumes, vid) - ecVolume.Close() + // Search for any generation of this volume ID + for key, ecVolume := range l.ecVolumes { + if key.VolumeId == vid { + if _, deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted { + if len(ecVolume.Shards) == 0 { + delete(l.ecVolumes, key) + ecVolume.Close() + } + return true + } } - return true } - return true + return false } func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId, generation uint32) (err error) { @@ -210,25 +256,32 @@ func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) { l.ecVolumesLock.Lock() defer l.ecVolumesLock.Unlock() - ecVolume, ok := l.ecVolumes[vid] - if !ok { - return + // Find and delete all generations of this volume + keysToDelete := make([]EcVolumeGenerationKey, 0) + for key, ecVolume := range l.ecVolumes { + if key.VolumeId == vid { + ecVolume.Destroy() + keysToDelete = append(keysToDelete, key) + } + } + for _, key := range keysToDelete { + delete(l.ecVolumes, key) } - ecVolume.Destroy() - delete(l.ecVolumes, vid) return } func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume { deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0) + keysToDelete := make([]EcVolumeGenerationKey, 0) for k, v := range l.ecVolumes { if v.Collection == collectionName { - deltaVols[k] = v + deltaVols[k.VolumeId] = v + keysToDelete = append(keysToDelete, k) } } - for k, _ := range deltaVols { - delete(l.ecVolumes, k) + for _, key := range keysToDelete { + delete(l.ecVolumes, key) } return deltaVols }