From 6c390851e7bb8e25080ebf81ea0e1064ee9018af Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Wed, 20 Jul 2022 18:08:12 +0500 Subject: [PATCH] fix design --- weed/server/master_server.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index e75c4df54..7f9bff389 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -11,6 +11,7 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/cluster" @@ -65,8 +66,8 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCn chan string - onPeerUpdateDoneCnExist bool + onPeerUpdateDoneCn chan string + onPeerUpdateGoroutineCount uint32 // notifying clients clientChansLock sync.RWMutex @@ -366,15 +367,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } } - if ms.onPeerUpdateDoneCnExist { + if atomic.LoadUint32(&ms.onPeerUpdateGoroutineCount) > 0 { ms.onPeerUpdateDoneCn <- peerName } } else if isLeader { go func(peerName string) { raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) raftServerPingTicker := time.NewTicker(5 * time.Minute) + atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, 1) defer func() { - ms.onPeerUpdateDoneCnExist = false + atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, -1) }() for { select { @@ -415,6 +417,5 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF } }(peerName) glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName) - ms.onPeerUpdateDoneCnExist = true } }