Browse Source

[master] process grow request with must grow (#5999)

process grow request with must grow
pull/6006/head
Konstantin Lebedev 3 months ago
committed by GitHub
parent
commit
34bbaa2cdd
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 52
      weed/server/master_grpc_server_volume.go
  2. 16
      weed/topology/volume_layout.go

52
weed/server/master_grpc_server_volume.go

@ -3,6 +3,7 @@ package weed_server
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
"math/rand/v2" "math/rand/v2"
"strings" "strings"
"sync" "sync"
@ -20,6 +21,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/storage/types"
) )
const (
volumeGrowStepCount = 2
)
func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) { func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
glog.V(1).Infoln("starting automatic volume grow") glog.V(1).Infoln("starting automatic volume grow")
start := time.Now() start := time.Now()
@ -36,37 +41,47 @@ func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
func (ms *MasterServer) ProcessGrowRequest() { func (ms *MasterServer) ProcessGrowRequest() {
go func() { go func() {
ctx := context.Background()
for { for {
if !ms.Topo.IsLeader() { if !ms.Topo.IsLeader() {
continue continue
} }
dcs := ms.Topo.ListDataCenters() dcs := ms.Topo.ListDataCenters()
var err error
for _, vlc := range ms.Topo.ListVolumeLayoutCollections() { for _, vlc := range ms.Topo.ListVolumeLayoutCollections() {
vl := vlc.VolumeLayout vl := vlc.VolumeLayout
lastGrowCount := vl.GetLastGrowCount()
if vl.HasGrowRequest() { if vl.HasGrowRequest() {
continue continue
} }
if vl.ShouldGrowVolumes(vlc.Collection) {
vl.AddGrowRequest()
ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
Option: vlc.ToGrowOption(),
Count: vl.GetLastGrowCount(),
Reason: "collection autogrow",
}
} else {
writable, crowded := vl.GetWritableVolumeCount()
mustGrow := int(lastGrowCount) - writable
vgr := vlc.ToVolumeGrowRequest()
stats.MasterVolumeLayoutWritable.WithLabelValues(vlc.Collection, vgr.DiskType, vgr.Replication, vgr.Ttl).Set(float64(writable))
stats.MasterVolumeLayoutCrowded.WithLabelValues(vlc.Collection, vgr.DiskType, vgr.Replication, vgr.Ttl).Set(float64(crowded))
switch {
case mustGrow > 0:
vgr.WritableVolumeCount = uint32(mustGrow)
_, err = ms.VolumeGrow(ctx, vgr)
case crowded+volumeGrowStepCount >= writable:
vgr.WritableVolumeCount = volumeGrowStepCount
_, err = ms.VolumeGrow(ctx, vgr)
default:
for _, dc := range dcs { for _, dc := range dcs {
if vl.ShouldGrowVolumesByDataNode("DataCenter", dc) { if vl.ShouldGrowVolumesByDataNode("DataCenter", dc) {
vl.AddGrowRequest()
volumeGrowOption := vlc.ToGrowOption()
volumeGrowOption.DataCenter = dc
ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
Option: volumeGrowOption,
Count: vl.GetLastGrowCount(),
Force: true,
Reason: "per-dc autogrow",
vgr.DataCenter = dc
if lastGrowCount > 0 {
vgr.WritableVolumeCount = uint32(int(lastGrowCount) / len(dcs))
} else {
vgr.WritableVolumeCount = volumeGrowStepCount
}
_, err = ms.VolumeGrow(ctx, vgr)
} }
} }
} }
if err != nil {
glog.V(0).Infof("volume grow request failed: %+v", err)
} }
} }
time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second) time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second)
@ -101,7 +116,7 @@ func (ms *MasterServer) ProcessGrowRequest() {
}) })
// not atomic but it's okay // not atomic but it's okay
if found || (!req.Force && !vl.ShouldGrowVolumes(req.Option.Collection)) {
if found || (!req.Force && !vl.ShouldGrowVolumes()) {
glog.V(4).Infoln("discard volume grow request") glog.V(4).Infoln("discard volume grow request")
time.Sleep(time.Millisecond * 211) time.Sleep(time.Millisecond * 211)
vl.DoneGrowRequest() vl.DoneGrowRequest()
@ -302,6 +317,9 @@ func (ms *MasterServer) VolumeGrow(ctx context.Context, req *master_pb.VolumeGro
if err != nil { if err != nil {
return nil, err return nil, err
} }
if req.DataCenter != "" && !ms.Topo.DataCenterExists(req.DataCenter) {
return nil, fmt.Errorf("data center not exists")
}
volumeGrowOption := topology.VolumeGrowOption{ volumeGrowOption := topology.VolumeGrowOption{
Collection: req.Collection, Collection: req.Collection,
ReplicaPlacement: replicaPlacement, ReplicaPlacement: replicaPlacement,

16
weed/topology/volume_layout.go

@ -2,7 +2,7 @@ package topology
import ( import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"math/rand" "math/rand"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -359,10 +359,8 @@ func (vl *VolumeLayout) GetLastGrowCount() uint32 {
return vl.lastGrowCount.Load() return vl.lastGrowCount.Load()
} }
func (vl *VolumeLayout) ShouldGrowVolumes(collection string) bool {
func (vl *VolumeLayout) ShouldGrowVolumes() bool {
writable, crowded := vl.GetWritableVolumeCount() writable, crowded := vl.GetWritableVolumeCount()
stats.MasterVolumeLayoutWritable.WithLabelValues(collection, vl.diskType.String(), vl.rp.String(), vl.ttl.String()).Set(float64(writable))
stats.MasterVolumeLayoutCrowded.WithLabelValues(collection, vl.diskType.String(), vl.rp.String(), vl.ttl.String()).Set(float64(crowded))
return writable <= crowded return writable <= crowded
} }
@ -544,12 +542,12 @@ func (vl *VolumeLayout) ToInfo() (info VolumeLayoutInfo) {
return return
} }
func (vlc *VolumeLayoutCollection) ToGrowOption() (option *VolumeGrowOption) {
return &VolumeGrowOption{
func (vlc *VolumeLayoutCollection) ToVolumeGrowRequest() *master_pb.VolumeGrowRequest {
return &master_pb.VolumeGrowRequest{
Collection: vlc.Collection, Collection: vlc.Collection,
ReplicaPlacement: vlc.VolumeLayout.rp,
Ttl: vlc.VolumeLayout.ttl,
DiskType: vlc.VolumeLayout.diskType,
Replication: vlc.VolumeLayout.rp.String(),
Ttl: vlc.VolumeLayout.ttl.String(),
DiskType: vlc.VolumeLayout.diskType.String(),
} }
} }

Loading…
Cancel
Save