Browse Source

refactor OnPeerUpdate

pull/2868/head
Konstantin Lebedev 3 years ago
parent
commit
7ff248d5cd
  1. 15
      weed/server/master_server.go

15
weed/server/master_server.go

@ -334,14 +334,15 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
} }
func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
glog.V(0).Infof("OnPeerUpdate: %+v", update)
glog.V(2).Infof("OnPeerUpdate: %+v", update)
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil { if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
return return
} }
peerAddress := pb.ServerAddress(update.Address) peerAddress := pb.ServerAddress(update.Address)
peerName := string(peerAddress) peerName := string(peerAddress)
isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
if update.IsAdd { if update.IsAdd {
if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
if isLeader {
raftServerFound := false raftServerFound := false
for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerName { if string(server.ID) == peerName {
@ -349,16 +350,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
} }
} }
if !raftServerFound { if !raftServerFound {
glog.V(0).Infof("adding new raft server: %s", peerAddress.String())
glog.V(0).Infof("adding new raft server: %s", peerName)
ms.Topo.HashicorpRaft.AddVoter( ms.Topo.HashicorpRaft.AddVoter(
hashicorpRaft.ServerID(peerName), hashicorpRaft.ServerID(peerName),
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
} }
} }
if ms.onPeerUpdatDoneCnExist { if ms.onPeerUpdatDoneCnExist {
ms.onPeerUpdatDoneCn <- string(peerAddress)
ms.onPeerUpdatDoneCn <- peerName
} }
} else if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
} else if isLeader {
go func(peerName string) { go func(peerName string) {
for { for {
select { select {
@ -371,7 +372,7 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
return err return err
}) })
if err != nil { if err != nil {
glog.Warningf("failed removing old raft server: %v", err)
glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
} }
return return
case peerDone := <-ms.onPeerUpdatDoneCn: case peerDone := <-ms.onPeerUpdatDoneCn:
@ -380,7 +381,7 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
} }
} }
} }
}(string(peerAddress))
}(peerName)
ms.onPeerUpdatDoneCnExist = true ms.onPeerUpdatDoneCnExist = true
} }
} }
Loading…
Cancel
Save