From 8836fa19b697429d09359897fc2aff52e626947b Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 25 Nov 2024 22:30:37 +0500 Subject: [PATCH] use ShouldGrowVolumesByDcAndRack (#6280) --- weed/server/master_grpc_server_volume.go | 27 +++++++++++++-------- weed/topology/topology.go | 13 ++++++++++ weed/topology/volume_layout.go | 31 +++++++++--------------- 3 files changed, 42 insertions(+), 29 deletions(-) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index cc9bc3c51..24e9c058c 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "math" "math/rand/v2" "strings" "sync" @@ -53,7 +54,7 @@ func (ms *MasterServer) ProcessGrowRequest() { if !ms.Topo.IsLeader() { continue } - dcs := ms.Topo.ListDataCenters() + dcs := ms.Topo.ListDCAndRacks() var err error for _, vlc := range ms.Topo.ListVolumeLayoutCollections() { vl := vlc.VolumeLayout @@ -74,22 +75,28 @@ func (ms *MasterServer) ProcessGrowRequest() { case lastGrowCount > 0 && writable < int(lastGrowCount*2) && float64(crowded+volumeGrowStepCount) > float64(writable)*topology.VolumeGrowStrategy.Threshold: vgr.WritableVolumeCount = volumeGrowStepCount _, err = ms.VolumeGrow(ctx, vgr) - default: - for _, dc := range dcs { - if vl.ShouldGrowVolumesByDataNode("DataCenter", dc) { - vgr.DataCenter = dc + } + if err != nil { + glog.V(0).Infof("volume grow request failed: %+v", err) + } + writableVolumes := vl.CloneWritableVolumes() + for dcId, racks := range dcs { + for _, rackId := range racks { + if vl.ShouldGrowVolumesByDcAndRack(&writableVolumes, dcId, rackId) { + vgr.DataCenter = string(dcId) + vgr.Rack = string(rackId) if lastGrowCount > 0 { - vgr.WritableVolumeCount = uint32(int(lastGrowCount) / len(dcs)) + vgr.WritableVolumeCount = uint32(math.Ceil(float64(lastGrowCount) / float64(len(dcs)*len(racks)))) } else { vgr.WritableVolumeCount = volumeGrowStepCount } - _, err = ms.VolumeGrow(ctx, vgr) + + if _, err = ms.VolumeGrow(ctx, vgr); err != nil { + glog.V(0).Infof("volume grow request for dc:%s rack:%s failed: %+v", dcId, rackId, err) + } } } } - if err != nil { - glog.V(0).Infof("volume grow request failed: %+v", err) - } } } }() diff --git a/weed/topology/topology.go b/weed/topology/topology.go index e436b453a..be50eecdf 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -369,6 +369,19 @@ func (t *Topology) ListDataCenters() (dcs []string) { return dcs } +func (t *Topology) ListDCAndRacks() (dcs map[NodeId][]NodeId) { + t.RLock() + defer t.RUnlock() + dcs = make(map[NodeId][]NodeId) + for _, dcNode := range t.children { + dcNodeId := dcNode.(*DataCenter).Id() + for _, rackNode := range dcNode.Children() { + dcs[dcNodeId] = append(dcs[dcNodeId], rackNode.(*Rack).Id()) + } + } + return dcs +} + func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) { // convert into in memory struct storage.VolumeInfo var volumeInfos []storage.VolumeInfo diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 3a360ff99..94493a177 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -365,25 +365,10 @@ func (vl *VolumeLayout) ShouldGrowVolumes() bool { return writable <= crowded } -func (vl *VolumeLayout) ShouldGrowVolumesByDataNode(nodeType string, dataNode string) bool { - vl.accessLock.RLock() - writables := make([]needle.VolumeId, len(vl.writables)) - copy(writables, vl.writables) - vl.accessLock.RUnlock() - - dataNodeId := NodeId(dataNode) - for _, v := range writables { - for _, dn := range vl.vid2location[v].list { - dataNodeFound := false - switch nodeType { - case "DataCenter": - dataNodeFound = dn.GetDataCenter().Id() == dataNodeId - case "Rack": - dataNodeFound = dn.GetRack().Id() == dataNodeId - case "DataNode": - dataNodeFound = dn.Id() == dataNodeId - } - if dataNodeFound { +func (vl *VolumeLayout) ShouldGrowVolumesByDcAndRack(writables *[]needle.VolumeId, dcId NodeId, rackId NodeId) bool { + for _, v := range *writables { + for _, dn := range vl.Lookup(v) { + if dn.GetDataCenter().Id() == dcId && dn.GetRack().Id() == rackId { if info, err := dn.GetVolumesById(v); err == nil && !vl.isCrowdedVolume(&info) { return false } @@ -399,6 +384,14 @@ func (vl *VolumeLayout) GetWritableVolumeCount() (active, crowded int) { return len(vl.writables), len(vl.crowded) } +func (vl *VolumeLayout) CloneWritableVolumes() (writables []needle.VolumeId) { + vl.accessLock.RLock() + writables = make([]needle.VolumeId, len(vl.writables)) + copy(writables, vl.writables) + vl.accessLock.RUnlock() + return writables +} + func (vl *VolumeLayout) removeFromWritable(vid needle.VolumeId) bool { toDeleteIndex := -1 for k, id := range vl.writables {