Browse Source

Merge pull request #1168 from stlpmo-jn/optimize_DeleteCollectionFromDiskLocation

decouple the volume.Destroy() from the operation of unmountVolume()
pull/1171/head
Chris Lu 5 years ago
committed by GitHub
parent
commit
409a3fe41f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 63
      weed/storage/disk_location.go
  2. 14
      weed/storage/disk_location_ec.go

63
weed/storage/disk_location.go

@ -71,7 +71,6 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}
}
}
}
@ -116,28 +115,46 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
l.volumesLock.Lock()
for k, v := range l.volumes {
if v.Collection == collection {
e = l.deleteVolumeById(k)
if e != nil {
l.volumesLock.Unlock()
return
}
}
}
delVolsMap := l.unmountVolumeByCollection(collection)
l.volumesLock.Unlock()
l.ecVolumesLock.Lock()
for k, v := range l.ecVolumes {
if v.Collection == collection {
e = l.deleteEcVolumeById(k)
if e != nil {
delEcVolsMap := l.unmountEcVolumeByCollection(collection)
l.ecVolumesLock.Unlock()
return
errChain := make(chan error, 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
for _, v := range delVolsMap {
if err := v.Destroy(); err != nil {
errChain <- err
}
}
wg.Done()
}()
go func() {
for _, v := range delEcVolsMap {
v.Destroy()
}
wg.Done()
}()
go func() {
wg.Wait()
close(errChain)
}()
errBuilder := strings.Builder{}
for err := range errChain {
errBuilder.WriteString(err.Error())
errBuilder.WriteString("; ")
}
if errBuilder.Len() > 0 {
e = fmt.Errorf(errBuilder.String())
}
l.ecVolumesLock.Unlock()
return
}
@ -193,6 +210,20 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
return nil
}
func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
deltaVols := make(map[needle.VolumeId]*Volume, 0)
for k, v := range l.volumes {
if v.Collection == collectionName && !v.isCompacting {
deltaVols[k] = v
}
}
for k, _ := range deltaVols {
delete(l.volumes, k)
}
return deltaVols
}
func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
l.volumesLock.Lock()
defer l.volumesLock.Unlock()

14
weed/storage/disk_location_ec.go

@ -169,3 +169,17 @@ func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) {
delete(l.ecVolumes, vid)
return
}
func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume {
deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0)
for k, v := range l.ecVolumes {
if v.Collection == collectionName {
deltaVols[k] = v
}
}
for k, _ := range deltaVols {
delete(l.ecVolumes, k)
}
return deltaVols
}
Loading…
Cancel
Save