|
@ -65,8 +65,8 @@ type MasterServer struct { |
|
|
|
|
|
|
|
|
boundedLeaderChan chan int |
|
|
boundedLeaderChan chan int |
|
|
|
|
|
|
|
|
onPeerUpdatDoneCn chan string |
|
|
|
|
|
onPeerUpdatDoneCnExist bool |
|
|
|
|
|
|
|
|
onPeerUpdateDoneCn chan string |
|
|
|
|
|
onPeerUpdateDoneCnExist bool |
|
|
|
|
|
|
|
|
// notifying clients
|
|
|
// notifying clients
|
|
|
clientChansLock sync.RWMutex |
|
|
clientChansLock sync.RWMutex |
|
@ -118,7 +118,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se |
|
|
Cluster: cluster.NewCluster(), |
|
|
Cluster: cluster.NewCluster(), |
|
|
} |
|
|
} |
|
|
ms.boundedLeaderChan = make(chan int, 16) |
|
|
ms.boundedLeaderChan = make(chan int, 16) |
|
|
ms.onPeerUpdatDoneCn = make(chan string) |
|
|
|
|
|
|
|
|
ms.onPeerUpdateDoneCn = make(chan string) |
|
|
|
|
|
|
|
|
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate |
|
|
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate |
|
|
|
|
|
|
|
@ -366,14 +366,15 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF |
|
|
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) |
|
|
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
if ms.onPeerUpdatDoneCnExist { |
|
|
|
|
|
ms.onPeerUpdatDoneCn <- peerName |
|
|
|
|
|
|
|
|
if ms.onPeerUpdateDoneCnExist { |
|
|
|
|
|
ms.onPeerUpdateDoneCn <- peerName |
|
|
} |
|
|
} |
|
|
} else if isLeader { |
|
|
} else if isLeader { |
|
|
go func(peerName string) { |
|
|
go func(peerName string) { |
|
|
|
|
|
raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) |
|
|
for { |
|
|
for { |
|
|
select { |
|
|
select { |
|
|
case <-time.After(RaftServerRemovalTime): |
|
|
|
|
|
|
|
|
case <-raftServerRemovalTimeAfter: |
|
|
err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { |
|
|
err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { |
|
|
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ |
|
|
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ |
|
|
Id: peerName, |
|
|
Id: peerName, |
|
@ -384,14 +385,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Warningf("failed to removing old raft server %s: %v", peerName, err) |
|
|
glog.Warningf("failed to removing old raft server %s: %v", peerName, err) |
|
|
} |
|
|
} |
|
|
|
|
|
glog.V(0).Infof("old raft server %s removed", peerName) |
|
|
return |
|
|
return |
|
|
case peerDone := <-ms.onPeerUpdatDoneCn: |
|
|
|
|
|
|
|
|
case peerDone := <-ms.onPeerUpdateDoneCn: |
|
|
if peerName == peerDone { |
|
|
if peerName == peerDone { |
|
|
|
|
|
glog.V(0).Infof("raft server %s remove canceled", peerName) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
}(peerName) |
|
|
}(peerName) |
|
|
ms.onPeerUpdatDoneCnExist = true |
|
|
|
|
|
|
|
|
ms.onPeerUpdateDoneCnExist = true |
|
|
} |
|
|
} |
|
|
} |
|
|
} |