diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 78357cc04..3e52e8d3f 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -71,7 +71,7 @@ var cmdFilerRemoteGateway = &Command{ filer.remote.gateway listens on filer local buckets update events. If any bucket is created, deleted, or updated, it will mirror the changes to remote object store. - weed filer.remote.sync -createBucketAt=cloud1 + weed filer.remote.gateway -createBucketAt=cloud1 `, } diff --git a/weed/command/master.go b/weed/command/master.go index 914853d88..1a6db1945 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -36,14 +36,15 @@ var ( ) type MasterOptions struct { - port *int - portGrpc *int - ip *string - ipBind *string - metaFolder *string - peers *string - volumeSizeLimitMB *uint - volumePreallocate *bool + port *int + portGrpc *int + ip *string + ipBind *string + metaFolder *string + peers *string + volumeSizeLimitMB *uint + volumePreallocate *bool + maxParallelVacuumPerServer *int // pulseSeconds *int defaultReplication *string garbageThreshold *float64 @@ -70,6 +71,7 @@ func init() { m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") + m.maxParallelVacuumPerServer = cmdMaster.Flag.Int("maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel per volume server") // m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "", "Default replication type if not specified.") m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") @@ -311,10 +313,11 @@ func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { masterAddress := pb.NewServerAddress(*m.ip, *m.port, *m.portGrpc) return &weed_server.MasterOption{ - Master: masterAddress, - MetaFolder: *m.metaFolder, - VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB), - VolumePreallocate: *m.volumePreallocate, + Master: masterAddress, + MetaFolder: *m.metaFolder, + VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB), + VolumePreallocate: *m.volumePreallocate, + MaxParallelVacuumPerServer: *m.maxParallelVacuumPerServer, // PulseSeconds: *m.pulseSeconds, DefaultReplicaPlacement: *m.defaultReplication, GarbageThreshold: *m.garbageThreshold, diff --git a/weed/command/server.go b/weed/command/server.go index ddcaf1f7e..13d06a56b 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -92,6 +92,7 @@ func init() { masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list") masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") masterOptions.volumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") + masterOptions.maxParallelVacuumPerServer = cmdServer.Flag.Int("master.maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel on one volume server") masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "", "Default replication type if not specified.") masterOptions.garbageThreshold = cmdServer.Flag.Float64("master.garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") masterOptions.metricsAddress = cmdServer.Flag.String("master.metrics.address", "", "Prometheus gateway address") diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 14b6de36a..28f3607f0 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -111,6 +111,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { // Readiness Probe apiRouter.Methods(http.MethodGet).Path("/status").HandlerFunc(s3a.StatusHandler) + apiRouter.Methods(http.MethodGet).Path("/healthz").HandlerFunc(s3a.StatusHandler) apiRouter.Methods(http.MethodOptions).HandlerFunc( func(w http.ResponseWriter, r *http.Request) { diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 6d1079ff4..c68d87151 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -135,13 +135,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } if query.Get("metadata") == "true" { + writeJsonQuiet(w, r, http.StatusOK, entry) + return + } + if entry.Attr.Mime == "" || (entry.Attr.Mime == s3_constants.FolderMimeType && r.Header.Get(s3_constants.AmzIdentityId) == "") { // Don't return directory meta if config value is set to true if fs.option.ExposeDirectoryData == false { writeJsonError(w, r, http.StatusForbidden, errors.New("directory listing is disabled")) return } - } - if entry.Attr.Mime == "" || (entry.Attr.Mime == s3_constants.FolderMimeType && r.Header.Get(s3_constants.AmzIdentityId) == "") { // return index of directory for non s3 gateway fs.listDirectoryHandler(w, r) return diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 3cad627db..73f2b24cd 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -221,7 +221,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV resp := &master_pb.VacuumVolumeResponse{} - ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.option.MaxParallelVacuumPerServer, req.VolumeId, req.Collection, ms.preallocateSize) return resp, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index ee28f3386..aefae7126 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -29,8 +29,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/shell" "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/seaweedfs/seaweedfs/weed/wdclient" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) const ( @@ -39,10 +39,11 @@ const ( ) type MasterOption struct { - Master pb.ServerAddress - MetaFolder string - VolumeSizeLimitMB uint32 - VolumePreallocate bool + Master pb.ServerAddress + MetaFolder string + VolumeSizeLimitMB uint32 + VolumePreallocate bool + MaxParallelVacuumPerServer int // PulseSeconds int DefaultReplicaPlacement string GarbageThreshold float64 @@ -158,6 +159,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se ms.Topo.StartRefreshWritableVolumes( ms.grpcDialOption, ms.option.GarbageThreshold, + ms.option.MaxParallelVacuumPerServer, topology.VolumeGrowStrategy.Threshold, ms.preallocateSize, ) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index e003d7d9f..47bc1918a 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -66,7 +66,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } } // glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.option.MaxParallelVacuumPerServer, 0, "", ms.preallocateSize) ms.dirStatusHandler(w, r) } diff --git a/weed/shell/command_remote_mount_buckets.go b/weed/shell/command_remote_mount_buckets.go index 78843e121..91375d2d2 100644 --- a/weed/shell/command_remote_mount_buckets.go +++ b/weed/shell/command_remote_mount_buckets.go @@ -33,7 +33,7 @@ func (c *commandRemoteMountBuckets) Help() string { remote.mount.buckets -remote=cloud1 # after mount, start a separate process to write updates to remote storage - weed filer.remote.sync -filer=: -createBucketAt=cloud1 + weed filer.remote.gateway -filer=: -createBucketAt=cloud1 ` } diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go index a263e6669..9a59eb258 100644 --- a/weed/storage/super_block/replica_placement.go +++ b/weed/storage/super_block/replica_placement.go @@ -1,7 +1,6 @@ package super_block import ( - "errors" "fmt" ) @@ -15,18 +14,21 @@ func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) { rp := &ReplicaPlacement{} for i, c := range t { count := int(c - '0') - if 0 <= count && count <= 2 { - switch i { - case 0: - rp.DiffDataCenterCount = count - case 1: - rp.DiffRackCount = count - case 2: - rp.SameRackCount = count - } - } else { - return rp, errors.New("Unknown Replication Type:" + t) + if count < 0 { + return rp, fmt.Errorf("unknown replication type: %s", t) } + switch i { + case 0: + rp.DiffDataCenterCount = count + case 1: + rp.DiffRackCount = count + case 2: + rp.SameRackCount = count + } + } + value := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount + if value > 255 { + return rp, fmt.Errorf("unexpected replication type: %s", t) } return rp, nil } diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index d0ecd089a..60459d7b7 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -13,7 +13,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage" ) -func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, growThreshold float64, preallocate int64) { +func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, concurrentVacuumLimitPerVolumeServer int, growThreshold float64, preallocate int64) { go func() { for { if t.IsLeader() { @@ -27,7 +27,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g for { if t.IsLeader() { if !t.isDisableVacuum { - t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate) + t.Vacuum(grpcDialOption, garbageThreshold, concurrentVacuumLimitPerVolumeServer, 0, "", preallocate) } } else { stats.MasterReplicaPlacementMismatch.Reset() diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index f5b1eaaff..eb2ca8c3b 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -2,7 +2,9 @@ package topology import ( "context" + "github.com/seaweedfs/seaweedfs/weed/util" "io" + "sync" "sync/atomic" "time" @@ -213,7 +215,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl * } } -func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) { +func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64) { // if there is vacuum going on, return immediately swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) @@ -243,25 +245,83 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) } } else { - t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) + t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate) } } } } } -func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) { +func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64) { volumeLayout.accessLock.RLock() - tmpMap := make(map[needle.VolumeId]*VolumeLocationList) + todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { - tmpMap[vid] = locationList.Copy() + todoVolumeMap[vid] = locationList.Copy() } volumeLayout.accessLock.RUnlock() - for vid, locationList := range tmpMap { - t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) + // limiter for each volume server + limiter := make(map[NodeId]int) + var limiterLock sync.Mutex + for _, locationList := range todoVolumeMap { + for _, dn := range locationList.list { + if _, ok := limiter[dn.Id()]; !ok { + limiter[dn.Id()] = maxParallelVacuumPerServer + } + } } + + executor := util.NewLimitedConcurrentExecutor(100) + + var wg sync.WaitGroup + + for len(todoVolumeMap) > 0 { + pendingVolumeMap := make(map[needle.VolumeId]*VolumeLocationList) + for vid, locationList := range todoVolumeMap { + hasEnoughQuota := true + for _, dn := range locationList.list { + limiterLock.Lock() + quota := limiter[dn.Id()] + limiterLock.Unlock() + if quota <= 0 { + hasEnoughQuota = false + break + } + } + if !hasEnoughQuota { + pendingVolumeMap[vid] = locationList + continue + } + + // debit the quota + for _, dn := range locationList.list { + limiterLock.Lock() + limiter[dn.Id()]-- + limiterLock.Unlock() + } + + wg.Add(1) + executor.Execute(func() { + defer wg.Done() + t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate) + // credit the quota + for _, dn := range locationList.list { + limiterLock.Lock() + limiter[dn.Id()]++ + limiterLock.Unlock() + } + }) + } + + if len(todoVolumeMap) == len(pendingVolumeMap) { + time.Sleep(10 * time.Second) + } + todoVolumeMap = pendingVolumeMap + } + + wg.Wait() + } func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) { @@ -270,7 +330,11 @@ func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayou isEnoughCopies := volumeLayout.enoughCopies(vid) volumeLayout.accessLock.RUnlock() - if isReadOnly || !isEnoughCopies { + if isReadOnly { + return + } + if !isEnoughCopies { + glog.Warningf("skip vacuuming: not enough copies for volume:%d", vid) return }