Browse Source

Merge branch 'master' into feat_acl

pull/5936/head
LHHDZ 5 months ago
parent
commit
577016b724
  1. 2
      weed/command/filer_remote_gateway.go
  2. 3
      weed/command/master.go
  3. 1
      weed/command/server.go
  4. 1
      weed/s3api/s3api_server.go
  5. 6
      weed/server/filer_server_handlers_read.go
  6. 2
      weed/server/master_grpc_server_volume.go
  7. 4
      weed/server/master_server.go
  8. 2
      weed/server/master_server_handlers_admin.go
  9. 2
      weed/shell/command_remote_mount_buckets.go
  10. 10
      weed/storage/super_block/replica_placement.go
  11. 4
      weed/topology/topology_event_handling.go
  12. 78
      weed/topology/topology_vacuum.go

2
weed/command/filer_remote_gateway.go

@ -71,7 +71,7 @@ var cmdFilerRemoteGateway = &Command{
filer.remote.gateway listens on filer local buckets update events.
If any bucket is created, deleted, or updated, it will mirror the changes to remote object store.
weed filer.remote.sync -createBucketAt=cloud1
weed filer.remote.gateway -createBucketAt=cloud1
`,
}

3
weed/command/master.go

@ -44,6 +44,7 @@ type MasterOptions struct {
peers *string
volumeSizeLimitMB *uint
volumePreallocate *bool
maxParallelVacuumPerServer *int
// pulseSeconds *int
defaultReplication *string
garbageThreshold *float64
@ -70,6 +71,7 @@ func init() {
m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
m.maxParallelVacuumPerServer = cmdMaster.Flag.Int("maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel per volume server")
// m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "", "Default replication type if not specified.")
m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
@ -315,6 +317,7 @@ func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOp
MetaFolder: *m.metaFolder,
VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB),
VolumePreallocate: *m.volumePreallocate,
MaxParallelVacuumPerServer: *m.maxParallelVacuumPerServer,
// PulseSeconds: *m.pulseSeconds,
DefaultReplicaPlacement: *m.defaultReplication,
GarbageThreshold: *m.garbageThreshold,

1
weed/command/server.go

@ -92,6 +92,7 @@ func init() {
masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list")
masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
masterOptions.volumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.")
masterOptions.maxParallelVacuumPerServer = cmdServer.Flag.Int("master.maxParallelVacuumPerServer", 1, "maximum number of volumes to vacuum in parallel on one volume server")
masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "", "Default replication type if not specified.")
masterOptions.garbageThreshold = cmdServer.Flag.Float64("master.garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
masterOptions.metricsAddress = cmdServer.Flag.String("master.metrics.address", "", "Prometheus gateway address")

1
weed/s3api/s3api_server.go

@ -111,6 +111,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// Readiness Probe
apiRouter.Methods(http.MethodGet).Path("/status").HandlerFunc(s3a.StatusHandler)
apiRouter.Methods(http.MethodGet).Path("/healthz").HandlerFunc(s3a.StatusHandler)
apiRouter.Methods(http.MethodOptions).HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {

6
weed/server/filer_server_handlers_read.go

@ -135,13 +135,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
if query.Get("metadata") == "true" {
writeJsonQuiet(w, r, http.StatusOK, entry)
return
}
if entry.Attr.Mime == "" || (entry.Attr.Mime == s3_constants.FolderMimeType && r.Header.Get(s3_constants.AmzIdentityId) == "") {
// Don't return directory meta if config value is set to true
if fs.option.ExposeDirectoryData == false {
writeJsonError(w, r, http.StatusForbidden, errors.New("directory listing is disabled"))
return
}
}
if entry.Attr.Mime == "" || (entry.Attr.Mime == s3_constants.FolderMimeType && r.Header.Get(s3_constants.AmzIdentityId) == "") {
// return index of directory for non s3 gateway
fs.listDirectoryHandler(w, r)
return

2
weed/server/master_grpc_server_volume.go

@ -221,7 +221,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
resp := &master_pb.VacuumVolumeResponse{}
ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize)
ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.option.MaxParallelVacuumPerServer, req.VolumeId, req.Collection, ms.preallocateSize)
return resp, nil
}

4
weed/server/master_server.go

@ -29,8 +29,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/shell"
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
const (
@ -43,6 +43,7 @@ type MasterOption struct {
MetaFolder string
VolumeSizeLimitMB uint32
VolumePreallocate bool
MaxParallelVacuumPerServer int
// PulseSeconds int
DefaultReplicaPlacement string
GarbageThreshold float64
@ -158,6 +159,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
ms.Topo.StartRefreshWritableVolumes(
ms.grpcDialOption,
ms.option.GarbageThreshold,
ms.option.MaxParallelVacuumPerServer,
topology.VolumeGrowStrategy.Threshold,
ms.preallocateSize,
)

2
weed/server/master_server_handlers_admin.go

@ -66,7 +66,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
// glog.Infoln("garbageThreshold =", gcThreshold)
ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize)
ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.option.MaxParallelVacuumPerServer, 0, "", ms.preallocateSize)
ms.dirStatusHandler(w, r)
}

2
weed/shell/command_remote_mount_buckets.go

@ -33,7 +33,7 @@ func (c *commandRemoteMountBuckets) Help() string {
remote.mount.buckets -remote=cloud1
# after mount, start a separate process to write updates to remote storage
weed filer.remote.sync -filer=<filerHost>:<filerPort> -createBucketAt=cloud1
weed filer.remote.gateway -filer=<filerHost>:<filerPort> -createBucketAt=cloud1
`
}

10
weed/storage/super_block/replica_placement.go

@ -1,7 +1,6 @@
package super_block
import (
"errors"
"fmt"
)
@ -15,7 +14,9 @@ func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
rp := &ReplicaPlacement{}
for i, c := range t {
count := int(c - '0')
if 0 <= count && count <= 2 {
if count < 0 {
return rp, fmt.Errorf("unknown replication type: %s", t)
}
switch i {
case 0:
rp.DiffDataCenterCount = count
@ -24,9 +25,10 @@ func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
case 2:
rp.SameRackCount = count
}
} else {
return rp, errors.New("Unknown Replication Type:" + t)
}
value := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount
if value > 255 {
return rp, fmt.Errorf("unexpected replication type: %s", t)
}
return rp, nil
}

4
weed/topology/topology_event_handling.go

@ -13,7 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage"
)
func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, growThreshold float64, preallocate int64) {
func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, concurrentVacuumLimitPerVolumeServer int, growThreshold float64, preallocate int64) {
go func() {
for {
if t.IsLeader() {
@ -27,7 +27,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
for {
if t.IsLeader() {
if !t.isDisableVacuum {
t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate)
t.Vacuum(grpcDialOption, garbageThreshold, concurrentVacuumLimitPerVolumeServer, 0, "", preallocate)
}
} else {
stats.MasterReplicaPlacementMismatch.Reset()

78
weed/topology/topology_vacuum.go

@ -2,7 +2,9 @@ package topology
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"sync"
"sync/atomic"
"time"
@ -213,7 +215,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) {
func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64) {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
@ -243,25 +245,83 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
}
} else {
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate)
}
}
}
}
}
func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64) {
volumeLayout.accessLock.RLock()
tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range volumeLayout.vid2location {
tmpMap[vid] = locationList.Copy()
todoVolumeMap[vid] = locationList.Copy()
}
volumeLayout.accessLock.RUnlock()
for vid, locationList := range tmpMap {
// limiter for each volume server
limiter := make(map[NodeId]int)
var limiterLock sync.Mutex
for _, locationList := range todoVolumeMap {
for _, dn := range locationList.list {
if _, ok := limiter[dn.Id()]; !ok {
limiter[dn.Id()] = maxParallelVacuumPerServer
}
}
}
executor := util.NewLimitedConcurrentExecutor(100)
var wg sync.WaitGroup
for len(todoVolumeMap) > 0 {
pendingVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range todoVolumeMap {
hasEnoughQuota := true
for _, dn := range locationList.list {
limiterLock.Lock()
quota := limiter[dn.Id()]
limiterLock.Unlock()
if quota <= 0 {
hasEnoughQuota = false
break
}
}
if !hasEnoughQuota {
pendingVolumeMap[vid] = locationList
continue
}
// debit the quota
for _, dn := range locationList.list {
limiterLock.Lock()
limiter[dn.Id()]--
limiterLock.Unlock()
}
wg.Add(1)
executor.Execute(func() {
defer wg.Done()
t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
// credit the quota
for _, dn := range locationList.list {
limiterLock.Lock()
limiter[dn.Id()]++
limiterLock.Unlock()
}
})
}
if len(todoVolumeMap) == len(pendingVolumeMap) {
time.Sleep(10 * time.Second)
}
todoVolumeMap = pendingVolumeMap
}
wg.Wait()
}
func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) {
@ -270,7 +330,11 @@ func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayou
isEnoughCopies := volumeLayout.enoughCopies(vid)
volumeLayout.accessLock.RUnlock()
if isReadOnly || !isEnoughCopies {
if isReadOnly {
return
}
if !isEnoughCopies {
glog.Warningf("skip vacuuming: not enough copies for volume:%d", vid)
return
}

Loading…
Cancel
Save