|
|
@ -11,6 +11,8 @@ import ( |
|
|
|
"net/url" |
|
|
|
"os" |
|
|
|
"path" |
|
|
|
"reflect" |
|
|
|
"sort" |
|
|
|
"strings" |
|
|
|
"time" |
|
|
|
|
|
|
@ -49,8 +51,9 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin |
|
|
|
transporter.Transport.MaxIdleConnsPerHost = 1024 |
|
|
|
glog.V(1).Infof("Starting RaftServer with IP:%v:", httpAddr) |
|
|
|
|
|
|
|
// Clear old cluster configurations if peers are set
|
|
|
|
if len(s.peers) > 0 { |
|
|
|
// Clear old cluster configurations if peers are changed
|
|
|
|
if oldPeers, changed := isPeersChanged(s.dataDir, httpAddr, s.peers); changed { |
|
|
|
glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) |
|
|
|
os.RemoveAll(path.Join(s.dataDir, "conf")) |
|
|
|
os.RemoveAll(path.Join(s.dataDir, "log")) |
|
|
|
os.RemoveAll(path.Join(s.dataDir, "snapshot")) |
|
|
@ -71,18 +74,23 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin |
|
|
|
|
|
|
|
if len(s.peers) > 0 { |
|
|
|
// Join to leader if specified.
|
|
|
|
glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) |
|
|
|
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) |
|
|
|
firstJoinError := s.Join(s.peers) |
|
|
|
if firstJoinError != nil { |
|
|
|
glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.") |
|
|
|
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{ |
|
|
|
Name: s.raftServer.Name(), |
|
|
|
ConnectionString: "http://" + s.httpAddr, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infoln(err) |
|
|
|
return nil |
|
|
|
for { |
|
|
|
glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) |
|
|
|
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) |
|
|
|
firstJoinError := s.Join(s.peers) |
|
|
|
if firstJoinError != nil { |
|
|
|
glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.") |
|
|
|
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{ |
|
|
|
Name: s.raftServer.Name(), |
|
|
|
ConnectionString: "http://" + s.httpAddr, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infoln(err) |
|
|
|
} else { |
|
|
|
break |
|
|
|
} |
|
|
|
} else { |
|
|
|
break |
|
|
|
} |
|
|
|
} |
|
|
|
} else if s.raftServer.IsLogEmpty() { |
|
|
@ -116,6 +124,30 @@ func (s *RaftServer) Peers() (members []string) { |
|
|
|
return |
|
|
|
} |
|
|
|
|
|
|
|
func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) { |
|
|
|
confPath := path.Join(dir, "conf") |
|
|
|
// open conf file
|
|
|
|
b, err := ioutil.ReadFile(confPath) |
|
|
|
if err != nil { |
|
|
|
return oldPeers, true |
|
|
|
} |
|
|
|
conf := &raft.Config{} |
|
|
|
if err = json.Unmarshal(b, conf); err != nil { |
|
|
|
return oldPeers, true |
|
|
|
} |
|
|
|
|
|
|
|
for _, p := range conf.Peers { |
|
|
|
oldPeers = append(oldPeers, strings.TrimPrefix(p.ConnectionString, "http://")) |
|
|
|
} |
|
|
|
oldPeers = append(oldPeers, self) |
|
|
|
|
|
|
|
sort.Strings(peers) |
|
|
|
sort.Strings(oldPeers) |
|
|
|
|
|
|
|
return oldPeers, reflect.DeepEqual(peers, oldPeers) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// Join joins an existing cluster.
|
|
|
|
func (s *RaftServer) Join(peers []string) error { |
|
|
|
command := &raft.DefaultJoinCommand{ |
|
|
|