|
@ -16,6 +16,54 @@ import ( |
|
|
"time" |
|
|
"time" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { |
|
|
|
|
|
for _, peer := range s.peers { |
|
|
|
|
|
cfg.Servers = append(cfg.Servers, raft.Server{ |
|
|
|
|
|
Suffrage: raft.Voter, |
|
|
|
|
|
ID: raft.ServerID(peer.String()), |
|
|
|
|
|
Address: raft.ServerAddress(peer.ToGrpcAddress()), |
|
|
|
|
|
}) |
|
|
|
|
|
} |
|
|
|
|
|
return cfg |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (s *RaftServer) UpdatePeers() { |
|
|
|
|
|
for { |
|
|
|
|
|
select { |
|
|
|
|
|
case isLeader := <-s.RaftHashicorp.LeaderCh(): |
|
|
|
|
|
if isLeader { |
|
|
|
|
|
peerLeader := s.serverAddr.String() |
|
|
|
|
|
existsPeerName := make(map[string]bool) |
|
|
|
|
|
for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { |
|
|
|
|
|
if string(server.ID) == peerLeader { |
|
|
|
|
|
continue |
|
|
|
|
|
} |
|
|
|
|
|
existsPeerName[string(server.ID)] = true |
|
|
|
|
|
} |
|
|
|
|
|
for _, peer := range s.peers { |
|
|
|
|
|
if peer.String() == peerLeader || existsPeerName[peer.String()] { |
|
|
|
|
|
continue |
|
|
|
|
|
} |
|
|
|
|
|
glog.V(0).Infof("adding new peer: %s", peer.String()) |
|
|
|
|
|
s.RaftHashicorp.AddVoter( |
|
|
|
|
|
raft.ServerID(peer.String()), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) |
|
|
|
|
|
} |
|
|
|
|
|
for peer, _ := range existsPeerName { |
|
|
|
|
|
if _, found := s.peers[peer]; !found { |
|
|
|
|
|
glog.V(0).Infof("removing old peer: %s", peer) |
|
|
|
|
|
s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
if _, found := s.peers[peerLeader]; !found { |
|
|
|
|
|
glog.V(0).Infof("removing old leader peer: %s", peerLeader) |
|
|
|
|
|
s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { |
|
|
func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { |
|
|
s := &RaftServer{ |
|
|
s := &RaftServer{ |
|
|
peers: option.Peers, |
|
|
peers: option.Peers, |
|
@ -25,7 +73,7 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
c := raft.DefaultConfig() |
|
|
c := raft.DefaultConfig() |
|
|
c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change
|
|
|
|
|
|
|
|
|
c.LocalID = raft.ServerID(s.serverAddr.String()) // TODO maybee the IP:port address will change
|
|
|
c.NoSnapshotRestoreOnStart = option.RaftResumeState |
|
|
c.NoSnapshotRestoreOnStart = option.RaftResumeState |
|
|
c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) |
|
|
c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) |
|
|
c.ElectionTimeout = option.ElectionTimeout |
|
|
c.ElectionTimeout = option.ElectionTimeout |
|
@ -66,32 +114,17 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return nil, fmt.Errorf("raft.NewRaft: %v", err) |
|
|
return nil, fmt.Errorf("raft.NewRaft: %v", err) |
|
|
} |
|
|
} |
|
|
if option.RaftBootstrap { |
|
|
|
|
|
cfg := raft.Configuration{ |
|
|
|
|
|
Servers: []raft.Server{ |
|
|
|
|
|
{ |
|
|
|
|
|
Suffrage: raft.Voter, |
|
|
|
|
|
ID: c.LocalID, |
|
|
|
|
|
Address: raft.ServerAddress(s.serverAddr.ToGrpcAddress()), |
|
|
|
|
|
}, |
|
|
|
|
|
}, |
|
|
|
|
|
} |
|
|
|
|
|
// Add known peers to bootstrap
|
|
|
|
|
|
for _, peer := range option.Peers { |
|
|
|
|
|
if peer == option.ServerAddr { |
|
|
|
|
|
continue |
|
|
|
|
|
} |
|
|
|
|
|
cfg.Servers = append(cfg.Servers, raft.Server{ |
|
|
|
|
|
Suffrage: raft.Voter, |
|
|
|
|
|
ID: raft.ServerID(peer), |
|
|
|
|
|
Address: raft.ServerAddress(peer.ToGrpcAddress()), |
|
|
|
|
|
}) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 { |
|
|
|
|
|
cfg := s.AddPeersConfiguration() |
|
|
|
|
|
glog.V(0).Infoln("Bootstrapping new cluster %+v", cfg) |
|
|
f := s.RaftHashicorp.BootstrapCluster(cfg) |
|
|
f := s.RaftHashicorp.BootstrapCluster(cfg) |
|
|
if err := f.Error(); err != nil { |
|
|
if err := f.Error(); err != nil { |
|
|
return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) |
|
|
return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) |
|
|
} |
|
|
} |
|
|
|
|
|
} else { |
|
|
|
|
|
go s.UpdatePeers() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
ticker := time.NewTicker(c.HeartbeatTimeout * 10) |
|
|
ticker := time.NewTicker(c.HeartbeatTimeout * 10) |
|
|
if glog.V(4) { |
|
|
if glog.V(4) { |
|
|
go func() { |
|
|
go func() { |
|
|