From 6cfbfb084941e213f38a83a763fd359cc6611108 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 00:04:12 +0500 Subject: [PATCH 01/22] check for ping before deleting raft server https://github.com/chrislusf/seaweedfs/issues/3083 --- weed/server/master_server.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 0fdc3944f..0e95fa91f 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -372,8 +372,26 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } else if isLeader { go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) + raftServerPingTicker := time.NewTicker(5 * time.Minute) + defer func() { + ms.onPeerUpdateDoneCnExist = false + }() for { select { + case <-raftServerPingTicker.C: + err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.Ping(context.Background(), &master_pb.PingRequest{ + Target: peerName, + TargetType: cluster.MasterType, + }) + return err + }) + if err != nil { + glog.Warningf("raft server %s ping failed %+v", peerName, err) + } else { + glog.V(0).Infof("raft server %s remove canceled on ping success", peerName) + return + } case <-raftServerRemovalTimeAfter: err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ @@ -384,12 +402,13 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF }) if err != nil { glog.Warningf("failed to removing old raft server %s: %v", peerName, err) + return } glog.V(0).Infof("old raft server %s removed", peerName) return case peerDone := <-ms.onPeerUpdateDoneCn: if peerName == peerDone { - glog.V(0).Infof("raft server %s remove canceled", peerName) + glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName) return } } From f6a966b4fc43addad8b2ebe01970f15957b8cc02 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 00:31:57 +0500 Subject: [PATCH 02/22] add waiting log message --- weed/server/master_server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 0e95fa91f..e75c4df54 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -414,6 +414,7 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } } }(peerName) + glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName) ms.onPeerUpdateDoneCnExist = true } } From 6c390851e7bb8e25080ebf81ea0e1064ee9018af Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 18:08:12 +0500 Subject: [PATCH 03/22] fix design --- weed/server/master_server.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index e75c4df54..7f9bff389 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -11,6 +11,7 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/cluster" @@ -65,8 +66,8 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCn chan string - onPeerUpdateDoneCnExist bool + onPeerUpdateDoneCn chan string + onPeerUpdateGoroutineCount uint32 // notifying clients clientChansLock sync.RWMutex @@ -366,15 +367,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } } - if ms.onPeerUpdateDoneCnExist { + if atomic.LoadUint32(&ms.onPeerUpdateGoroutineCount) > 0 { ms.onPeerUpdateDoneCn <- peerName } } else if isLeader { go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) raftServerPingTicker := time.NewTicker(5 * time.Minute) + atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, 1) defer func() { - ms.onPeerUpdateDoneCnExist = false + atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, -1) }() for { select { @@ -415,6 +417,5 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } }(peerName) glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName) - ms.onPeerUpdateDoneCnExist = true } } From b6471ecd754dbcd26c7ccc767be16c8acc6337b9 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 00:45:13 +0500 Subject: [PATCH 04/22] err msg with duplicated local subscription detected move to log level 1 https://github.com/chrislusf/seaweedfs/issues/3320 --- weed/filer/meta_aggregator.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index c672ce342..5799e247e 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" + "strings" "sync" "time" @@ -99,7 +100,11 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres return } if err != nil { - glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) + errLvl := glog.Level(0) + if strings.Contains(err.Error(), "duplicated local subscription detected") { + errLvl = glog.Level(1) + } + glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err) } if lastTsNs < nextLastTsNs { lastTsNs = nextLastTsNs From ba0e3ce5fa4290be5d619345a22e8bbd61a757ee Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 11 Jul 2022 16:58:15 +0500 Subject: [PATCH 05/22] volume server evacuate to target server --- weed/shell/command_volume_server_evacuate.go | 29 ++++++++++++++------ 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index ffbee0302..f2c24a8b4 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -47,7 +47,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeServer := vsEvacuateCommand.String("node", "", ": of the volume server") - c.targetServer = *vsEvacuateCommand.String("target", "", ": of target volume") + targetServer := vsEvacuateCommand.String("target", "", ": of target volume") skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes") retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry") @@ -63,6 +63,9 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, if *volumeServer == "" { return fmt.Errorf("need to specify volume server by -node=:") } + if *targetServer != "" { + c.targetServer = *targetServer + } for i := 0; i < *retryCount+1; i++ { if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil { return nil @@ -103,14 +106,27 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE if thisNode == nil { return fmt.Errorf("%s is not found in this cluster", volumeServer) } + if c.targetServer != "" { + targetServerFound := false + for _, otherNode := range otherNodes { + if otherNode.info.Id == c.targetServer { + otherNodes = []*Node{otherNode} + targetServerFound = true + break + } + } + if !targetServerFound { + return fmt.Errorf("target %s is not found in this cluster", c.targetServer) + } + } // move away normal volumes volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) for _, diskInfo := range thisNode.info.DiskInfos { for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { - return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err) + fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) } if !hasMoved { if skipNonMoveable { @@ -138,7 +154,7 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, for _, ecShardInfo := range diskInfo.EcShardInfos { hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) if err != nil { - return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) + fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) } if !hasMoved { if skipNonMoveable { @@ -160,9 +176,6 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv }) for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] - if c.targetServer != "" && c.targetServer != emptyNode.info.Id { - continue - } collectionPrefix := "" if ecShardInfo.Collection != "" { collectionPrefix = ecShardInfo.Collection + "_" @@ -184,7 +197,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv return } -func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { +func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType)) for _, n := range otherNodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { From 6f764e101454ba85e50fcdfab056fcb5d8c66584 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 12 Jul 2022 11:33:08 +0500 Subject: [PATCH 06/22] volume server evacuate from rack --- weed/shell/command_volume_server_evacuate.go | 103 ++++++++++--------- 1 file changed, 56 insertions(+), 47 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index f2c24a8b4..37fb29b14 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -19,6 +19,7 @@ func init() { type commandVolumeServerEvacuate struct { targetServer string + volumeRack string } func (c *commandVolumeServerEvacuate) Name() string { @@ -47,6 +48,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeServer := vsEvacuateCommand.String("node", "", ": of the volume server") + volumeRack := vsEvacuateCommand.String("rack", "", "rack for then volume servers") targetServer := vsEvacuateCommand.String("target", "", ": of target volume") skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes") @@ -66,6 +68,9 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, if *targetServer != "" { c.targetServer = *targetServer } + if *volumeRack != "" { + c.volumeRack = *volumeRack + } for i := 0; i < *retryCount+1; i++ { if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil { return nil @@ -102,41 +107,31 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server volumeServers := collectVolumeServersByDc(topologyInfo, "") - thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer) - if thisNode == nil { + thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) + if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) } - if c.targetServer != "" { - targetServerFound := false - for _, otherNode := range otherNodes { - if otherNode.info.Id == c.targetServer { - otherNodes = []*Node{otherNode} - targetServerFound = true - break - } - } - if !targetServerFound { - return fmt.Errorf("target %s is not found in this cluster", c.targetServer) - } - } // move away normal volumes - volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) - for _, diskInfo := range thisNode.info.DiskInfos { - for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) - if err != nil { - fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) - } - if !hasMoved { - if skipNonMoveable { - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement)) - fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String()) - } else { - return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer) + for _, thisNode := range thisNodes { + for _, diskInfo := range thisNode.info.DiskInfos { + volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + for _, vol := range diskInfo.VolumeInfos { + hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + if err != nil { + fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement)) + fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String()) + } else { + return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer) + } } } } + } return nil } @@ -144,23 +139,25 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "") - thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer) - if thisNode == nil { + thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) + if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster\n", volumeServer) } // move away ec volumes - for _, diskInfo := range thisNode.info.DiskInfos { - for _, ecShardInfo := range diskInfo.EcShardInfos { - hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) - if err != nil { - fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) - } - if !hasMoved { - if skipNonMoveable { - fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer) - } else { - return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer) + for _, thisNode := range thisNodes { + for _, diskInfo := range thisNode.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) + if err != nil { + fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer) + } else { + return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer) + } } } } @@ -220,10 +217,16 @@ func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *Comman return } -func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, otherNodes []*Node) { +func (c *commandVolumeServerEvacuate) nodesOtherThan(volumeServers []*Node, thisServer string) (thisNodes []*Node, otherNodes []*Node) { for _, node := range volumeServers { - if node.info.Id == thisServer { - thisNode = node + if node.info.Id == thisServer || (c.volumeRack != "" && node.rack == c.volumeRack) { + thisNodes = append(thisNodes, node) + continue + } + if c.volumeRack != "" && c.volumeRack == node.rack { + continue + } + if c.targetServer != "" && c.targetServer != node.info.Id { continue } otherNodes = append(otherNodes, node) @@ -231,10 +234,16 @@ func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, o return } -func ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNode *EcNode, otherNodes []*EcNode) { +func (c *commandVolumeServerEvacuate) ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNodes []*EcNode, otherNodes []*EcNode) { for _, node := range volumeServers { - if node.info.Id == thisServer { - thisNode = node + if node.info.Id == thisServer || (c.volumeRack != "" && string(node.rack) == c.volumeRack) { + thisNodes = append(thisNodes, node) + continue + } + if c.volumeRack != "" && c.volumeRack == string(node.rack) { + continue + } + if c.targetServer != "" && c.targetServer != node.info.Id { continue } otherNodes = append(otherNodes, node) From 867269cdcf6ba79d80886ee769c44742f68092b6 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 12 Jul 2022 11:56:58 +0500 Subject: [PATCH 07/22] help rack --- weed/shell/command_volume_server_evacuate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 37fb29b14..dad8d8626 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -48,7 +48,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeServer := vsEvacuateCommand.String("node", "", ": of the volume server") - volumeRack := vsEvacuateCommand.String("rack", "", "rack for then volume servers") + volumeRack := vsEvacuateCommand.String("rack", "", "source rack for the volume servers") targetServer := vsEvacuateCommand.String("target", "", ": of target volume") skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes") From b5e5f6f55ad6d8fab838f5735867d1457c5aa420 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 12 Jul 2022 13:47:21 +0500 Subject: [PATCH 08/22] update topologyInfo --- weed/shell/command_volume_server_evacuate.go | 32 +++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index dad8d8626..195cc2699 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -11,6 +11,7 @@ import ( "golang.org/x/exp/slices" "io" "os" + "time" ) func init() { @@ -18,6 +19,7 @@ func init() { } type commandVolumeServerEvacuate struct { + topologyInfo *master_pb.TopologyInfo targetServer string volumeRack string } @@ -58,12 +60,12 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, } infoAboutSimulationMode(writer, *applyChange, "-force") - if err = commandEnv.confirmIsLocked(args); err != nil { + if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange { return } - if *volumeServer == "" { - return fmt.Errorf("need to specify volume server by -node=:") + if *volumeServer == "" && *volumeRack == "" { + return fmt.Errorf("need to specify volume server by -node=: or source rack") } if *targetServer != "" { c.targetServer = *targetServer @@ -88,25 +90,33 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn // list all the volumes // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) + c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0) if err != nil { return err } - if err := c.evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + go func() { + for { + if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil { + c.topologyInfo = topologyInfo + } + } + }() + + if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } - if err := c.evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } return nil } -func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { +func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server - volumeServers := collectVolumeServersByDc(topologyInfo, "") + volumeServers := collectVolumeServersByDc(c.topologyInfo, "") thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) @@ -115,7 +125,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE // move away normal volumes for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { - volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { @@ -136,9 +146,9 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE return nil } -func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { +func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server - ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "") + ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "") thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster\n", volumeServer) From 39eaf426f828ff84559c591b755e9c365516487f Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 12 Jul 2022 14:56:34 +0500 Subject: [PATCH 09/22] fix TestVolumeServerEvacuate --- weed/shell/command_volume_server_evacuate_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate_test.go b/weed/shell/command_volume_server_evacuate_test.go index 2cdb94a60..4563f38ba 100644 --- a/weed/shell/command_volume_server_evacuate_test.go +++ b/weed/shell/command_volume_server_evacuate_test.go @@ -6,12 +6,11 @@ import ( ) func TestVolumeServerEvacuate(t *testing.T) { - topologyInfo := parseOutput(topoData) + c := commandVolumeServerEvacuate{} + c.topologyInfo = parseOutput(topoData) volumeServer := "192.168.1.4:8080" - - c := commandVolumeServerEvacuate{} - if err := c.evacuateNormalVolumes(nil, topologyInfo, volumeServer, true, false, os.Stdout); err != nil { + if err := c.evacuateNormalVolumes(nil, volumeServer, true, false, os.Stdout); err != nil { t.Errorf("evacuate: %v", err) } From 884ffbafeedf380c5d1f81208577eddae0bc6311 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Fri, 15 Jul 2022 13:51:08 +0500 Subject: [PATCH 10/22] clouse background update --- weed/shell/command_volume_server_evacuate.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 195cc2699..3b0c8381b 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -14,6 +14,8 @@ import ( "time" ) +const topologyInfoUpdateInterval = 5 * time.Minute + func init() { Commands = append(Commands, &commandVolumeServerEvacuate{}) } @@ -95,13 +97,22 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn return err } + stopchan := make(chan struct{}) go func() { for { - if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil { - c.topologyInfo = topologyInfo + select { + default: + if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { + c.topologyInfo = topologyInfo + } else { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } + case <-stopchan: + return } } }() + defer close(stopchan) if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err @@ -127,7 +138,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE for _, diskInfo := range thisNode.info.DiskInfos { volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) } @@ -204,7 +215,7 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv return } -func (c *commandVolumeServerEvacuate) moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { +func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType)) for _, n := range otherNodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { From 72dca31cfa64b6a60633578f8ba924b645db74ba Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 01:46:31 +0500 Subject: [PATCH 11/22] fix update topologyInfo --- weed/shell/command_volume_server_evacuate.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 3b0c8381b..0595ef308 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -103,9 +103,9 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn select { default: if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { - c.topologyInfo = topologyInfo - } else { fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + c.topologyInfo = topologyInfo } case <-stopchan: return From fa88dff7ce9ffad1974698aaaca0fad762ffc4b5 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 11:32:28 +0500 Subject: [PATCH 12/22] update otherNodes --- weed/shell/command_volume_server_evacuate.go | 40 +++++++++++--------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 0595ef308..7d50b7f81 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -24,6 +24,7 @@ type commandVolumeServerEvacuate struct { topologyInfo *master_pb.TopologyInfo targetServer string volumeRack string + otherNodes []*Node } func (c *commandVolumeServerEvacuate) Name() string { @@ -97,22 +98,27 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn return err } - stopchan := make(chan struct{}) - go func() { - for { - select { - default: - if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { - fmt.Fprintf(writer, "update topologyInfo %v", err) - } else { - c.topologyInfo = topologyInfo + if applyChange { + stopchan := make(chan struct{}) + go func() { + for { + select { + default: + if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + c.topologyInfo = topologyInfo + _, c.otherNodes = c.nodesOtherThan( + collectVolumeServersByDc(c.topologyInfo, ""), volumeServer) + fmt.Fprintf(writer, "topologyInfo updated %v\n", len(c.otherNodes)) + } + case <-stopchan: + return } - case <-stopchan: - return } - } - }() - defer close(stopchan) + }() + defer close(stopchan) + } if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err @@ -128,7 +134,8 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server volumeServers := collectVolumeServersByDc(c.topologyInfo, "") - thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) + var thisNodes []*Node + thisNodes, c.otherNodes = c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) } @@ -138,7 +145,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE for _, diskInfo := range thisNode.info.DiskInfos { volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, c.otherNodes, applyChange) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) } @@ -152,7 +159,6 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE } } } - } return nil } From de4fcc0e2c3578cae784ba4bbcccd8b7d16ffc80 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 16:27:02 +0500 Subject: [PATCH 13/22] sync update topologyInfo --- weed/shell/command_volume_server_evacuate.go | 45 +++++++++----------- 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 7d50b7f81..d1c474a76 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -24,7 +24,6 @@ type commandVolumeServerEvacuate struct { topologyInfo *master_pb.TopologyInfo targetServer string volumeRack string - otherNodes []*Node } func (c *commandVolumeServerEvacuate) Name() string { @@ -98,28 +97,6 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn return err } - if applyChange { - stopchan := make(chan struct{}) - go func() { - for { - select { - default: - if topologyInfo, _, err := collectTopologyInfo(commandEnv, topologyInfoUpdateInterval); err != nil { - fmt.Fprintf(writer, "update topologyInfo %v", err) - } else { - c.topologyInfo = topologyInfo - _, c.otherNodes = c.nodesOtherThan( - collectVolumeServersByDc(c.topologyInfo, ""), volumeServer) - fmt.Fprintf(writer, "topologyInfo updated %v\n", len(c.otherNodes)) - } - case <-stopchan: - return - } - } - }() - defer close(stopchan) - } - if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } @@ -134,18 +111,34 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server volumeServers := collectVolumeServersByDc(c.topologyInfo, "") - var thisNodes []*Node - thisNodes, c.otherNodes = c.nodesOtherThan(volumeServers, volumeServer) + thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) } // move away normal volumes + ticker := time.NewTicker(topologyInfoUpdateInterval) for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { + if applyChange { + select { + case <-ticker.C: + if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + _, otherNodesNew := c.nodesOtherThan( + collectVolumeServersByDc(topologyInfo, ""), volumeServer) + if len(otherNodesNew) > 0 { + otherNodes = otherNodesNew + c.topologyInfo = topologyInfo + fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes)) + } + } + } + } volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, c.otherNodes, applyChange) + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err) } From e2d991d8d0529dbd221ea686ca73b9c87234eedb Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 18 Jul 2022 16:38:19 +0500 Subject: [PATCH 14/22] ticker.Stop --- weed/shell/command_volume_server_evacuate.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index d1c474a76..c9df2c79a 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -118,6 +118,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE // move away normal volumes ticker := time.NewTicker(topologyInfoUpdateInterval) + defer ticker.Stop() for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { if applyChange { From c5189c343baa0d7b36349bbca669f8bba12d491b Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 00:54:23 +0500 Subject: [PATCH 15/22] remove ticker update the topology before each file --- weed/shell/command_volume_server_evacuate.go | 26 +++++++------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index c9df2c79a..f72d73230 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -11,11 +11,8 @@ import ( "golang.org/x/exp/slices" "io" "os" - "time" ) -const topologyInfoUpdateInterval = 5 * time.Minute - func init() { Commands = append(Commands, &commandVolumeServerEvacuate{}) } @@ -117,23 +114,18 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE } // move away normal volumes - ticker := time.NewTicker(topologyInfoUpdateInterval) - defer ticker.Stop() for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { if applyChange { - select { - case <-ticker.C: - if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil { - fmt.Fprintf(writer, "update topologyInfo %v", err) - } else { - _, otherNodesNew := c.nodesOtherThan( - collectVolumeServersByDc(topologyInfo, ""), volumeServer) - if len(otherNodesNew) > 0 { - otherNodes = otherNodesNew - c.topologyInfo = topologyInfo - fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes)) - } + if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil { + fmt.Fprintf(writer, "update topologyInfo %v", err) + } else { + _, otherNodesNew := c.nodesOtherThan( + collectVolumeServersByDc(topologyInfo, ""), volumeServer) + if len(otherNodesNew) > 0 { + otherNodes = otherNodesNew + c.topologyInfo = topologyInfo + fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes)) } } } From 7875470e74022e2b4262e51a349c4e3c15459a33 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 18:40:35 +0500 Subject: [PATCH 16/22] onPeerUpdateGoroutineCount use int32 --- weed/server/master_server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 7f9bff389..98fb6aab1 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -67,7 +67,7 @@ type MasterServer struct { boundedLeaderChan chan int onPeerUpdateDoneCn chan string - onPeerUpdateGoroutineCount uint32 + onPeerUpdateGoroutineCount int32 // notifying clients clientChansLock sync.RWMutex @@ -367,16 +367,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } } - if atomic.LoadUint32(&ms.onPeerUpdateGoroutineCount) > 0 { + if atomic.LoadInt32(&ms.onPeerUpdateGoroutineCount) > 0 { ms.onPeerUpdateDoneCn <- peerName } } else if isLeader { go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) raftServerPingTicker := time.NewTicker(5 * time.Minute) - atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, 1) + atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, 1) defer func() { - atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, -1) + atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, -1) }() for { select { From 93ca87b7cbf53e6b17a8a6d3b5dec32af94cd00c Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 21 Jul 2022 15:51:14 +0500 Subject: [PATCH 17/22] use safe onPeerUpdateDoneCns --- weed/server/master_server.go | 44 ++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 98fb6aab1..03a3f74d1 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -11,7 +11,6 @@ import ( "regexp" "strings" "sync" - "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/cluster" @@ -33,9 +32,10 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" - RaftServerRemovalTime = 72 * time.Minute + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + RaftServerRemovalTime = 72 * time.Minute + ResetRaftServerRemovalTimeMsg = "ResetRaftServerRemovalTime" ) type MasterOption struct { @@ -66,8 +66,8 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCn chan string - onPeerUpdateGoroutineCount int32 + onPeerUpdateDoneCns map[string]*chan string + onPeerUpdateLock sync.RWMutex // notifying clients clientChansLock sync.RWMutex @@ -119,9 +119,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) - ms.onPeerUpdateDoneCn = make(chan string) ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate + ms.onPeerUpdateDoneCns = make(map[string]*chan string) seq := ms.createSequencer(option) if nil == seq { @@ -367,16 +367,31 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } } - if atomic.LoadInt32(&ms.onPeerUpdateGoroutineCount) > 0 { - ms.onPeerUpdateDoneCn <- peerName + ms.onPeerUpdateLock.RLock() + if len(ms.onPeerUpdateDoneCns) > 0 { + for _, onPeerUpdateDoneCn := range ms.onPeerUpdateDoneCns { + *onPeerUpdateDoneCn <- peerName + } } + ms.onPeerUpdateLock.RUnlock() } else if isLeader { + if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok { + *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg + return + } + onPeerUpdateDoneCn := make(chan string) + ms.onPeerUpdateLock.Lock() + ms.onPeerUpdateDoneCns[peerName] = &onPeerUpdateDoneCn + ms.onPeerUpdateLock.Unlock() + go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) raftServerPingTicker := time.NewTicker(5 * time.Minute) - atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, 1) defer func() { - atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, -1) + ms.onPeerUpdateLock.Lock() + delete(ms.onPeerUpdateDoneCns, peerName) + ms.onPeerUpdateLock.Unlock() + close(onPeerUpdateDoneCn) }() for { select { @@ -408,11 +423,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } glog.V(0).Infof("old raft server %s removed", peerName) return - case peerDone := <-ms.onPeerUpdateDoneCn: + case peerDone := <-onPeerUpdateDoneCn: if peerName == peerDone { glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName) return } + if peerDone == ResetRaftServerRemovalTimeMsg { + raftServerRemovalTimeAfter = time.After(RaftServerRemovalTime) + glog.V(0).Infof("rest wait %v for raft server %s activity, otherwise delete", + RaftServerRemovalTime, peerName) + } } } }(peerName) From 3c42814b5885b0ae10631c6a8a66b858c7923380 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Thu, 21 Jul 2022 17:15:10 +0500 Subject: [PATCH 18/22] avoid deadlock --- weed/server/master_server.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 03a3f74d1..6c0b6652b 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -368,8 +368,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } } ms.onPeerUpdateLock.RLock() - if len(ms.onPeerUpdateDoneCns) > 0 { - for _, onPeerUpdateDoneCn := range ms.onPeerUpdateDoneCns { + isGtZero := len(ms.onPeerUpdateDoneCns) > 0 + ms.onPeerUpdateLock.RUnlock() + if isGtZero { + var chanPtrs []*chan string + ms.onPeerUpdateLock.RLock() + for _, cn := range ms.onPeerUpdateDoneCns { + chanPtrs = append(chanPtrs, cn) + } + ms.onPeerUpdateLock.RUnlock() + for _, onPeerUpdateDoneCn := range chanPtrs { *onPeerUpdateDoneCn <- peerName } } From c88ea31f62315bca22cb0457dca92376b032f8b2 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Tue, 26 Jul 2022 12:57:07 +0500 Subject: [PATCH 19/22] fix RUnlock of unlocked RWMutex --- weed/server/master_server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 6c0b6652b..d2e98c2a2 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -381,7 +381,6 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF *onPeerUpdateDoneCn <- peerName } } - ms.onPeerUpdateLock.RUnlock() } else if isLeader { if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok { *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg From a98f6d66a38b8dd7231191361a4333b46182a407 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Mon, 1 Aug 2022 12:51:41 +0500 Subject: [PATCH 20/22] rollback over onPeerupdate implementation of automatic clean-up of failed servers in favor of synchronous ping --- weed/filer/meta_aggregator.go | 2 +- weed/server/master_server.go | 110 ++++------------------------------ 2 files changed, 13 insertions(+), 99 deletions(-) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 5799e247e..c78dcac95 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -102,7 +102,7 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres if err != nil { errLvl := glog.Level(0) if strings.Contains(err.Error(), "duplicated local subscription detected") { - errLvl = glog.Level(1) + errLvl = glog.Level(4) } glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err) } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index d2e98c2a2..1c623388c 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "fmt" "github.com/chrislusf/seaweedfs/weed/stats" "net/http" @@ -32,10 +31,8 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" - RaftServerRemovalTime = 72 * time.Minute - ResetRaftServerRemovalTimeMsg = "ResetRaftServerRemovalTime" + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" ) type MasterOption struct { @@ -66,9 +63,6 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCns map[string]*chan string - onPeerUpdateLock sync.RWMutex - // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.KeepConnectedResponse @@ -121,7 +115,6 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se ms.boundedLeaderChan = make(chan int, 16) ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate - ms.onPeerUpdateDoneCns = make(map[string]*chan string) seq := ms.createSequencer(option) if nil == seq { @@ -352,97 +345,18 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF peerAddress := pb.ServerAddress(update.Address) peerName := string(peerAddress) isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader - if update.IsAdd { - if isLeader { - raftServerFound := false - for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { - if string(server.ID) == peerName { - raftServerFound = true - } - } - if !raftServerFound { - glog.V(0).Infof("adding new raft server: %s", peerName) - ms.Topo.HashicorpRaft.AddVoter( - hashicorpRaft.ServerID(peerName), - hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) - } - } - ms.onPeerUpdateLock.RLock() - isGtZero := len(ms.onPeerUpdateDoneCns) > 0 - ms.onPeerUpdateLock.RUnlock() - if isGtZero { - var chanPtrs []*chan string - ms.onPeerUpdateLock.RLock() - for _, cn := range ms.onPeerUpdateDoneCns { - chanPtrs = append(chanPtrs, cn) - } - ms.onPeerUpdateLock.RUnlock() - for _, onPeerUpdateDoneCn := range chanPtrs { - *onPeerUpdateDoneCn <- peerName + if update.IsAdd && isLeader { + raftServerFound := false + for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerName { + raftServerFound = true } } - } else if isLeader { - if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok { - *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg - return + if !raftServerFound { + glog.V(0).Infof("adding new raft server: %s", peerName) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerName), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } - onPeerUpdateDoneCn := make(chan string) - ms.onPeerUpdateLock.Lock() - ms.onPeerUpdateDoneCns[peerName] = &onPeerUpdateDoneCn - ms.onPeerUpdateLock.Unlock() - - go func(peerName string) { - raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) - raftServerPingTicker := time.NewTicker(5 * time.Minute) - defer func() { - ms.onPeerUpdateLock.Lock() - delete(ms.onPeerUpdateDoneCns, peerName) - ms.onPeerUpdateLock.Unlock() - close(onPeerUpdateDoneCn) - }() - for { - select { - case <-raftServerPingTicker.C: - err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - _, err := client.Ping(context.Background(), &master_pb.PingRequest{ - Target: peerName, - TargetType: cluster.MasterType, - }) - return err - }) - if err != nil { - glog.Warningf("raft server %s ping failed %+v", peerName, err) - } else { - glog.V(0).Infof("raft server %s remove canceled on ping success", peerName) - return - } - case <-raftServerRemovalTimeAfter: - err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ - Id: peerName, - Force: false, - }) - return err - }) - if err != nil { - glog.Warningf("failed to removing old raft server %s: %v", peerName, err) - return - } - glog.V(0).Infof("old raft server %s removed", peerName) - return - case peerDone := <-onPeerUpdateDoneCn: - if peerName == peerDone { - glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName) - return - } - if peerDone == ResetRaftServerRemovalTimeMsg { - raftServerRemovalTimeAfter = time.After(RaftServerRemovalTime) - glog.V(0).Infof("rest wait %v for raft server %s activity, otherwise delete", - RaftServerRemovalTime, peerName) - } - } - } - }(peerName) - glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName) } } From 7235410b57ab982c32ef1c08b1f8d32fd4b502ca Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 1 Aug 2022 15:37:55 +0000 Subject: [PATCH 21/22] Bump github.com/fclairamb/ftpserverlib from 0.18.0 to 0.19.0 Bumps [github.com/fclairamb/ftpserverlib](https://github.com/fclairamb/ftpserverlib) from 0.18.0 to 0.19.0. - [Release notes](https://github.com/fclairamb/ftpserverlib/releases) - [Commits](https://github.com/fclairamb/ftpserverlib/compare/v0.18.0...v0.19.0) --- updated-dependencies: - dependency-name: github.com/fclairamb/ftpserverlib dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 6 +++--- go.sum | 13 +++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 3be03ef8f..8505a1e6d 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/stats v0.0.0-20151006221625-1b76add642e4 github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect - github.com/fclairamb/ftpserverlib v0.18.0 + github.com/fclairamb/ftpserverlib v0.19.0 github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/go-errors/errors v1.1.1 // indirect github.com/go-redis/redis/v8 v8.11.5 @@ -86,7 +86,7 @@ require ( github.com/seaweedfs/goexif v2.0.0+incompatible github.com/seaweedfs/raft v1.1.0 github.com/sirupsen/logrus v1.8.1 // indirect - github.com/spf13/afero v1.8.2 // indirect + github.com/spf13/afero v1.9.2 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.12.0 @@ -118,7 +118,7 @@ require ( golang.org/x/image v0.0.0-20200119044424-58c23975cae1 golang.org/x/net v0.0.0-20220708220712-1185a9018129 golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 // indirect - golang.org/x/sys v0.0.0-20220624220833-87e55d714810 + golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023 golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect diff --git a/go.sum b/go.sum index 578a8c965..874f38624 100644 --- a/go.sum +++ b/go.sum @@ -289,8 +289,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fclairamb/ftpserverlib v0.18.0 h1:q/uz7jVFMoGEMswnA+nbaKEC5mzxXJOmhPE/Q3r7VZI= -github.com/fclairamb/ftpserverlib v0.18.0/go.mod h1:QhLRiCajhPG/2WwGgcsAqmlaYXX8KziNXtSe1BlRH+k= +github.com/fclairamb/ftpserverlib v0.19.0 h1:5QcSQ0OIJBlezIqmGehiL/AVsRb6dIkMxbkuhyPkESM= +github.com/fclairamb/ftpserverlib v0.19.0/go.mod h1:pmukdVOFKKUY9zjWRoxFW8JAljyulC/uK5FfusJzK2E= github.com/fclairamb/go-log v0.3.0 h1:oSC7Zjt0FZIYC5xXahUUycKGkypSdr2srFPLsp7CLd0= github.com/fclairamb/go-log v0.3.0/go.mod h1:XG61EiPlAXnPDN8SA4N3zeA+GyBJmVOCCo12WORx/gA= github.com/fluent/fluent-logger-golang v1.9.0 h1:zUdY44CHX2oIUc7VTNZc+4m+ORuO/mldQDA7czhWXEg= @@ -317,7 +317,7 @@ github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= -github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw= +github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= @@ -835,8 +835,8 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/spf13/afero v1.8.2 h1:xehSyVa0YnHWsJ49JFljMpg1HX19V6NDZ1fkm1Xznbo= -github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo= +github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw= +github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= @@ -1247,8 +1247,9 @@ golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220624220833-87e55d714810 h1:rHZQSjJdAI4Xf5Qzeh2bBc5YJIkPFVM6oDtMFYmgws0= golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= From d764cefb18c3287d72ef163a2ebf00437ca25bf5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 1 Aug 2022 15:38:02 +0000 Subject: [PATCH 22/22] Bump google.golang.org/protobuf from 1.28.0 to 1.28.1 Bumps [google.golang.org/protobuf](https://github.com/protocolbuffers/protobuf-go) from 1.28.0 to 1.28.1. - [Release notes](https://github.com/protocolbuffers/protobuf-go/releases) - [Changelog](https://github.com/protocolbuffers/protobuf-go/blob/master/release.bash) - [Commits](https://github.com/protocolbuffers/protobuf-go/compare/v1.28.0...v1.28.1) --- updated-dependencies: - dependency-name: google.golang.org/protobuf dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 3be03ef8f..56383a814 100644 --- a/go.mod +++ b/go.mod @@ -126,7 +126,7 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f // indirect google.golang.org/grpc v1.48.0 - google.golang.org/protobuf v1.28.0 + google.golang.org/protobuf v1.28.1 gopkg.in/inf.v0 v0.9.1 // indirect modernc.org/b v1.0.0 // indirect modernc.org/cc/v3 v3.36.0 // indirect diff --git a/go.sum b/go.sum index 578a8c965..d4d48bb26 100644 --- a/go.sum +++ b/go.sum @@ -1559,8 +1559,9 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=