Browse Source

ec shards with generation

add-ec-vacuum
chrislu 4 months ago
parent
commit
0c6980182c
  1. 4
      weed/storage/disk_location.go
  2. 99
      weed/storage/disk_location_ec.go

4
weed/storage/disk_location.go

@ -31,7 +31,7 @@ type DiskLocation struct {
volumesLock sync.RWMutex volumesLock sync.RWMutex
// erasure coding // erasure coding
ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
ecVolumes map[EcVolumeGenerationKey]*erasure_coding.EcVolume
ecVolumesLock sync.RWMutex ecVolumesLock sync.RWMutex
isDiskSpaceLow bool isDiskSpaceLow bool
@ -88,7 +88,7 @@ func NewDiskLocation(dir string, maxVolumeCount int32, minFreeSpace util.MinFree
MinFreeSpace: minFreeSpace, MinFreeSpace: minFreeSpace,
} }
location.volumes = make(map[needle.VolumeId]*Volume) location.volumes = make(map[needle.VolumeId]*Volume)
location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
location.ecVolumes = make(map[EcVolumeGenerationKey]*erasure_coding.EcVolume)
location.closeCh = make(chan struct{}) location.closeCh = make(chan struct{})
go func() { go func() {
location.CheckDiskSpace() location.CheckDiskSpace()

99
weed/storage/disk_location_ec.go

@ -19,14 +19,26 @@ var (
re = regexp.MustCompile(`\.ec[0-9][0-9]`) re = regexp.MustCompile(`\.ec[0-9][0-9]`)
) )
// EcVolumeGenerationKey represents a unique key for EC volume with generation
type EcVolumeGenerationKey struct {
VolumeId needle.VolumeId
Generation uint32
}
func (k EcVolumeGenerationKey) String() string {
return fmt.Sprintf("v%d-g%d", k.VolumeId, k.Generation)
}
func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) { func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
l.ecVolumesLock.RLock() l.ecVolumesLock.RLock()
defer l.ecVolumesLock.RUnlock() defer l.ecVolumesLock.RUnlock()
ecVolume, ok := l.ecVolumes[vid]
if ok {
// Search for any generation of this volume ID
for key, ecVolume := range l.ecVolumes {
if key.VolumeId == vid {
return ecVolume, true return ecVolume, true
} }
}
return nil, false return nil, false
} }
@ -34,10 +46,16 @@ func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId) {
l.ecVolumesLock.Lock() l.ecVolumesLock.Lock()
defer l.ecVolumesLock.Unlock() defer l.ecVolumesLock.Unlock()
ecVolume, found := l.ecVolumes[vid]
if found {
// Find and destroy all generations of this volume
keysToDelete := make([]EcVolumeGenerationKey, 0)
for key, ecVolume := range l.ecVolumes {
if key.VolumeId == vid {
ecVolume.Destroy() ecVolume.Destroy()
delete(l.ecVolumes, vid)
keysToDelete = append(keysToDelete, key)
}
}
for _, key := range keysToDelete {
delete(l.ecVolumes, key)
} }
} }
@ -45,7 +63,14 @@ func (l *DiskLocation) CollectEcShards(vid needle.VolumeId, shardFileNames []str
l.ecVolumesLock.RLock() l.ecVolumesLock.RLock()
defer l.ecVolumesLock.RUnlock() defer l.ecVolumesLock.RUnlock()
ecVolume, found = l.ecVolumes[vid]
// Search for any generation of this volume ID
for key, vol := range l.ecVolumes {
if key.VolumeId == vid {
ecVolume = vol
found = true
break
}
}
if !found { if !found {
return return
} }
@ -61,7 +86,26 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S
l.ecVolumesLock.RLock() l.ecVolumesLock.RLock()
defer l.ecVolumesLock.RUnlock() defer l.ecVolumesLock.RUnlock()
ecVolume, ok := l.ecVolumes[vid]
// Search for any generation of this volume ID
for key, ecVolume := range l.ecVolumes {
if key.VolumeId == vid {
for _, ecShard := range ecVolume.Shards {
if ecShard.ShardId == shardId {
return ecShard, true
}
}
}
}
return nil, false
}
func (l *DiskLocation) FindEcShardWithGeneration(vid needle.VolumeId, shardId erasure_coding.ShardId, generation uint32) (*erasure_coding.EcVolumeShard, bool) {
l.ecVolumesLock.RLock()
defer l.ecVolumesLock.RUnlock()
// Search for specific generation of this volume ID
key := EcVolumeGenerationKey{VolumeId: vid, Generation: generation}
ecVolume, ok := l.ecVolumes[key]
if !ok { if !ok {
return nil, false return nil, false
} }
@ -84,13 +128,14 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
} }
l.ecVolumesLock.Lock() l.ecVolumesLock.Lock()
defer l.ecVolumesLock.Unlock() defer l.ecVolumesLock.Unlock()
ecVolume, found := l.ecVolumes[vid]
key := EcVolumeGenerationKey{VolumeId: vid, Generation: generation}
ecVolume, found := l.ecVolumes[key]
if !found { if !found {
ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid, generation) ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid, generation)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err) return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err)
} }
l.ecVolumes[vid] = ecVolume
l.ecVolumes[key] = ecVolume
} }
ecVolume.AddEcVolumeShard(ecVolumeShard) ecVolume.AddEcVolumeShard(ecVolumeShard)
@ -102,19 +147,20 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding
l.ecVolumesLock.Lock() l.ecVolumesLock.Lock()
defer l.ecVolumesLock.Unlock() defer l.ecVolumesLock.Unlock()
ecVolume, found := l.ecVolumes[vid]
if !found {
return false
}
// Search for any generation of this volume ID
for key, ecVolume := range l.ecVolumes {
if key.VolumeId == vid {
if _, deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted { if _, deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted {
if len(ecVolume.Shards) == 0 { if len(ecVolume.Shards) == 0 {
delete(l.ecVolumes, vid)
delete(l.ecVolumes, key)
ecVolume.Close() ecVolume.Close()
} }
return true return true
} }
}
}
return true
return false
} }
func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId, generation uint32) (err error) { func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId, generation uint32) (err error) {
@ -210,25 +256,32 @@ func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) {
l.ecVolumesLock.Lock() l.ecVolumesLock.Lock()
defer l.ecVolumesLock.Unlock() defer l.ecVolumesLock.Unlock()
ecVolume, ok := l.ecVolumes[vid]
if !ok {
return
}
// Find and delete all generations of this volume
keysToDelete := make([]EcVolumeGenerationKey, 0)
for key, ecVolume := range l.ecVolumes {
if key.VolumeId == vid {
ecVolume.Destroy() ecVolume.Destroy()
delete(l.ecVolumes, vid)
keysToDelete = append(keysToDelete, key)
}
}
for _, key := range keysToDelete {
delete(l.ecVolumes, key)
}
return return
} }
func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume { func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume {
deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0) deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0)
keysToDelete := make([]EcVolumeGenerationKey, 0)
for k, v := range l.ecVolumes { for k, v := range l.ecVolumes {
if v.Collection == collectionName { if v.Collection == collectionName {
deltaVols[k] = v
deltaVols[k.VolumeId] = v
keysToDelete = append(keysToDelete, k)
} }
} }
for k, _ := range deltaVols {
delete(l.ecVolumes, k)
for _, key := range keysToDelete {
delete(l.ecVolumes, key)
} }
return deltaVols return deltaVols
} }

Loading…
Cancel
Save