diff --git a/weed/command/master.go b/weed/command/master.go index 914853d88..1a6db1945 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -36,14 +36,15 @@ var ( ) type MasterOptions struct { - port *int - portGrpc *int - ip *string - ipBind *string - metaFolder *string - peers *string - volumeSizeLimitMB *uint - volumePreallocate *bool + port *int + portGrpc *int + ip *string + ipBind *string + metaFolder *string + peers *string + volumeSizeLimitMB *uint + volumePreallocate *bool + maxParallelVacuumPerServer *int // pulseSeconds *int defaultReplication *string garbageThreshold *float64 @@ -70,6 +71,7 @@ func init() { m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") + m.maxParallelVacuumPerServer = cmdMaster.Flag.Int("maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel per volume server") // m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "", "Default replication type if not specified.") m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") @@ -311,10 +313,11 @@ func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { masterAddress := pb.NewServerAddress(*m.ip, *m.port, *m.portGrpc) return &weed_server.MasterOption{ - Master: masterAddress, - MetaFolder: *m.metaFolder, - VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB), - VolumePreallocate: *m.volumePreallocate, + Master: masterAddress, + MetaFolder: *m.metaFolder, + VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB), + VolumePreallocate: *m.volumePreallocate, + MaxParallelVacuumPerServer: *m.maxParallelVacuumPerServer, // PulseSeconds: *m.pulseSeconds, DefaultReplicaPlacement: *m.defaultReplication, GarbageThreshold: *m.garbageThreshold, diff --git a/weed/command/server.go b/weed/command/server.go index ddcaf1f7e..13d06a56b 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -92,6 +92,7 @@ func init() { masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list") masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") masterOptions.volumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") + masterOptions.maxParallelVacuumPerServer = cmdServer.Flag.Int("master.maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel on one volume server") masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "", "Default replication type if not specified.") masterOptions.garbageThreshold = cmdServer.Flag.Float64("master.garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") masterOptions.metricsAddress = cmdServer.Flag.String("master.metrics.address", "", "Prometheus gateway address") diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 3cad627db..73f2b24cd 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -221,7 +221,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV resp := &master_pb.VacuumVolumeResponse{} - ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.option.MaxParallelVacuumPerServer, req.VolumeId, req.Collection, ms.preallocateSize) return resp, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index ee28f3386..aefae7126 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -29,8 +29,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/shell" "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/wdclient" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) const ( @@ -39,10 +39,11 @@ const ( ) type MasterOption struct { - Master pb.ServerAddress - MetaFolder string - VolumeSizeLimitMB uint32 - VolumePreallocate bool + Master pb.ServerAddress + MetaFolder string + VolumeSizeLimitMB uint32 + VolumePreallocate bool + MaxParallelVacuumPerServer int // PulseSeconds int DefaultReplicaPlacement string GarbageThreshold float64 @@ -158,6 +159,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se ms.Topo.StartRefreshWritableVolumes( ms.grpcDialOption, ms.option.GarbageThreshold, + ms.option.MaxParallelVacuumPerServer, topology.VolumeGrowStrategy.Threshold, ms.preallocateSize, ) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index e003d7d9f..47bc1918a 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -66,7 +66,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } } // glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.option.MaxParallelVacuumPerServer, 0, "", ms.preallocateSize) ms.dirStatusHandler(w, r) } diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index d0ecd089a..60459d7b7 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -13,7 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage" ) -func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, growThreshold float64, preallocate int64) { +func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, concurrentVacuumLimitPerVolumeServer int, growThreshold float64, preallocate int64) { go func() { for { if t.IsLeader() { @@ -27,7 +27,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g for { if t.IsLeader() { if !t.isDisableVacuum { - t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate) + t.Vacuum(grpcDialOption, garbageThreshold, concurrentVacuumLimitPerVolumeServer, 0, "", preallocate) } } else { stats.MasterReplicaPlacementMismatch.Reset() diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index b1a8b66d4..eb2ca8c3b 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,7 +2,9 @@ package topology import ( "context" + "github.com/seaweedfs/seaweedfs/weed/util" "io" + "sync" "sync/atomic" "time" @@ -213,7 +215,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl * } } -func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) { +func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64) { // if there is vacuum going on, return immediately swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) @@ -243,25 +245,83 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) } } else { - t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) + t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate) } } } } } -func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { +func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64) { volumeLayout.accessLock.RLock() - tmpMap := make(map[needle.VolumeId]*VolumeLocationList) + todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { - tmpMap[vid] = locationList.Copy() + todoVolumeMap[vid] = locationList.Copy() } volumeLayout.accessLock.RUnlock() - for vid, locationList := range tmpMap { - t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) + // limiter for each volume server + limiter := make(map[NodeId]int) + var limiterLock sync.Mutex + for _, locationList := range todoVolumeMap { + for _, dn := range locationList.list { + if _, ok := limiter[dn.Id()]; !ok { + limiter[dn.Id()] = maxParallelVacuumPerServer + } + } } + + executor := util.NewLimitedConcurrentExecutor(100) + + var wg sync.WaitGroup + + for len(todoVolumeMap) > 0 { + pendingVolumeMap := make(map[needle.VolumeId]*VolumeLocationList) + for vid, locationList := range todoVolumeMap { + hasEnoughQuota := true + for _, dn := range locationList.list { + limiterLock.Lock() + quota := limiter[dn.Id()] + limiterLock.Unlock() + if quota <= 0 { + hasEnoughQuota = false + break + } + } + if !hasEnoughQuota { + pendingVolumeMap[vid] = locationList + continue + } + + // debit the quota + for _, dn := range locationList.list { + limiterLock.Lock() + limiter[dn.Id()]-- + limiterLock.Unlock() + } + + wg.Add(1) + executor.Execute(func() { + defer wg.Done() + t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) + // credit the quota + for _, dn := range locationList.list { + limiterLock.Lock() + limiter[dn.Id()]++ + limiterLock.Unlock() + } + }) + } + + if len(todoVolumeMap) == len(pendingVolumeMap) { + time.Sleep(10 * time.Second) + } + todoVolumeMap = pendingVolumeMap + } + + wg.Wait() + } func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) {