Browse Source

fix design

pull/3338/head
Konstantin Lebedev 2 years ago
parent
commit
6c390851e7
  1. 11
      weed/server/master_server.go

11
weed/server/master_server.go

@ -11,6 +11,7 @@ import (
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/cluster"
@ -65,8 +66,8 @@ type MasterServer struct {
boundedLeaderChan chan int boundedLeaderChan chan int
onPeerUpdateDoneCn chan string
onPeerUpdateDoneCnExist bool
onPeerUpdateDoneCn chan string
onPeerUpdateGoroutineCount uint32
// notifying clients // notifying clients
clientChansLock sync.RWMutex clientChansLock sync.RWMutex
@ -366,15 +367,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
} }
} }
if ms.onPeerUpdateDoneCnExist {
if atomic.LoadUint32(&ms.onPeerUpdateGoroutineCount) > 0 {
ms.onPeerUpdateDoneCn <- peerName ms.onPeerUpdateDoneCn <- peerName
} }
} else if isLeader { } else if isLeader {
go func(peerName string) { go func(peerName string) {
raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime)
raftServerPingTicker := time.NewTicker(5 * time.Minute) raftServerPingTicker := time.NewTicker(5 * time.Minute)
atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, 1)
defer func() { defer func() {
ms.onPeerUpdateDoneCnExist = false
atomic.AddUint32(&ms.onPeerUpdateGoroutineCount, -1)
}() }()
for { for {
select { select {
@ -415,6 +417,5 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
} }
}(peerName) }(peerName)
glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName) glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName)
ms.onPeerUpdateDoneCnExist = true
} }
} }
Loading…
Cancel
Save