Browse Source

use two flags: v.isCompacting and v.isCommitCompacting

pull/2982/head
chrislu 3 years ago
parent
commit
37ab8909b0
  1. 2
      weed/storage/disk_location.go
  2. 3
      weed/storage/volume.go
  3. 4
      weed/storage/volume_vacuum.go
  4. 2
      weed/storage/volume_write.go

2
weed/storage/disk_location.go

@ -283,7 +283,7 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
deltaVols := make(map[needle.VolumeId]*Volume, 0)
for k, v := range l.volumes {
if v.Collection == collectionName && !v.isCompacting {
if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting {
deltaVols[k] = v
}
}

3
weed/storage/volume.go

@ -42,7 +42,8 @@ type Volume struct {
lastCompactIndexOffset uint64
lastCompactRevision uint16
isCompacting bool
isCompacting bool
isCommitCompacting bool
volumeInfo *volume_server_pb.VolumeInfo
location *DiskLocation

4
weed/storage/volume_vacuum.go

@ -94,9 +94,9 @@ func (v *Volume) CommitCompact() error {
}
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
v.isCompacting = true
v.isCommitCompacting = true
defer func() {
v.isCompacting = false
v.isCommitCompacting = false
}()
v.dataFileAccessLock.Lock()

2
weed/storage/volume_write.go

@ -54,7 +54,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
// Destroy removes everything related to this volume
func (v *Volume) Destroy() (err error) {
if v.isCompacting {
if v.isCompacting || v.isCommitCompacting {
err = fmt.Errorf("volume %d is compacting", v.Id)
return
}

Loading…
Cancel
Save