Browse Source

stop ongoing vacumming when volume.disable

pull/5903/head
Yang Wang 5 months ago
parent
commit
8199d8ba2a
  1. 2
      weed/server/master_grpc_server_volume.go
  2. 7
      weed/server/master_server_handlers_admin.go
  3. 3
      weed/storage/volume_write.go
  4. 2
      weed/topology/topology_event_handling.go
  5. 14
      weed/topology/topology_vacuum.go

2
weed/server/master_grpc_server_volume.go

@ -221,7 +221,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
resp := &master_pb.VacuumVolumeResponse{} resp := &master_pb.VacuumVolumeResponse{}
ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize)
ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize, true)
return resp, nil return resp, nil
} }

7
weed/server/master_server_handlers_admin.go

@ -3,12 +3,13 @@ package weed_server
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"math/rand" "math/rand"
"net/http" "net/http"
"strconv" "strconv"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@ -66,7 +67,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
} }
} }
// glog.Infoln("garbageThreshold =", gcThreshold) // glog.Infoln("garbageThreshold =", gcThreshold)
ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize)
ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize, true)
ms.dirStatusHandler(w, r) ms.dirStatusHandler(w, r)
} }

3
weed/storage/volume_write.go

@ -4,11 +4,12 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"os"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/backend" "github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
. "github.com/seaweedfs/seaweedfs/weed/storage/types" . "github.com/seaweedfs/seaweedfs/weed/storage/types"
"os"
) )
var ErrorNotFound = errors.New("not found") var ErrorNotFound = errors.New("not found")

2
weed/topology/topology_event_handling.go

@ -27,7 +27,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
for { for {
if t.IsLeader() { if t.IsLeader() {
if !t.isDisableVacuum { if !t.isDisableVacuum {
t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate)
t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate, false)
} }
} else { } else {
stats.MasterReplicaPlacementMismatch.Reset() stats.MasterReplicaPlacementMismatch.Reset()

14
weed/topology/topology_vacuum.go

@ -213,7 +213,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
} }
} }
func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) {
func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64, force bool) {
// if there is vacuum going on, return immediately // if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
@ -232,6 +232,9 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
continue continue
} }
for _, vl := range c.storageType2VolumeLayout.Items() { for _, vl := range c.storageType2VolumeLayout.Items() {
if !force && t.isDisableVacuum {
return
}
if vl != nil { if vl != nil {
volumeLayout := vl.(*VolumeLayout) volumeLayout := vl.(*VolumeLayout)
if volumeId > 0 { if volumeId > 0 {
@ -243,14 +246,14 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
} }
} else { } else {
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate, force)
} }
} }
} }
} }
} }
func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64, force bool) {
volumeLayout.accessLock.RLock() volumeLayout.accessLock.RLock()
tmpMap := make(map[needle.VolumeId]*VolumeLocationList) tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
@ -260,11 +263,16 @@ func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeL
volumeLayout.accessLock.RUnlock() volumeLayout.accessLock.RUnlock()
for vid, locationList := range tmpMap { for vid, locationList := range tmpMap {
if !force && t.isDisableVacuum {
return
}
t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
} }
} }
func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) { func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) {
volumeLayout.accessLock.RLock() volumeLayout.accessLock.RLock()
isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid) isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid)
isEnoughCopies := volumeLayout.enoughCopies(vid) isEnoughCopies := volumeLayout.enoughCopies(vid)

Loading…
Cancel
Save