Browse Source

Add retrying logic to wait for other peers during cluster bootstrapping.

pull/2/head
Chris Lu 11 years ago
parent
commit
008aee0dc1
  1. 13
      go/topology/topology.go
  2. 4
      go/weed/weed_server/master_server.go
  3. 24
      go/weed/weed_server/raft_server.go
  4. 12
      go/weed/weed_server/raft_server_handlers.go

13
go/topology/topology.go

@ -52,21 +52,26 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
} }
func (t *Topology) IsLeader() bool { func (t *Topology) IsLeader() bool {
return t.RaftServer == nil || t.Leader() == t.RaftServer.Name()
if leader, e := t.Leader(); e == nil {
return leader == t.RaftServer.Name()
}
return false
} }
func (t *Topology) Leader() string {
func (t *Topology) Leader() (string, error) {
l := "" l := ""
if t.RaftServer != nil { if t.RaftServer != nil {
l = t.RaftServer.Leader() l = t.RaftServer.Leader()
} else {
return "", errors.New("Raft Server not ready yet!")
} }
if l == "" { if l == "" {
// We are a single node cluster, we are the leader // We are a single node cluster, we are the leader
return t.RaftServer.Name()
return t.RaftServer.Name(), errors.New("Raft Server not initialized!")
} }
return l
return l, nil
} }
func (t *Topology) loadConfiguration(configurationFile string) error { func (t *Topology) loadConfiguration(configurationFile string) error {

4
go/weed/weed_server/master_server.go

@ -77,13 +77,17 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
ms.Topo.RaftServer = raftServer.raftServer ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
}) })
if ms.Topo.IsLeader() { if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else { } else {
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
} }
}
} }
func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {

24
go/weed/weed_server/raft_server.go

@ -10,6 +10,7 @@ import (
"github.com/goraft/raft" "github.com/goraft/raft"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
@ -59,14 +60,29 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
// Join to leader if specified. // Join to leader if specified.
if len(s.peers) > 0 { if len(s.peers) > 0 {
glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
if !s.raftServer.IsLogEmpty() { if !s.raftServer.IsLogEmpty() {
glog.V(0).Infoln("Cannot join with an existing log")
glog.V(0).Infoln("Starting cluster with existing logs.")
} else { } else {
if err := s.Join(s.peers); err != nil {
glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
firstJoinError := s.Join(s.peers)
if firstJoinError != nil {
glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
ConnectionString: "http://" + s.httpAddr,
})
if err != nil {
glog.V(0).Infoln(err)
return nil return nil
} }
}
var err error
for err != nil {
glog.V(0).Infoln("waiting for peers on", strings.Join(s.peers, ","), "...")
time.Sleep(time.Duration(1000+rand.Intn(2000)) * time.Millisecond)
err = s.Join(s.peers)
}
glog.V(0).Infoln("Joined cluster") glog.V(0).Infoln("Joined cluster")
} }

12
go/weed/weed_server/raft_server_handlers.go

@ -40,10 +40,10 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter
} }
func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) { func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) {
if s.topo.Leader() != "" {
if leader, e := s.topo.Leader(); e == nil {
//http.StatusMovedPermanently does not cause http POST following redirection //http.StatusMovedPermanently does not cause http POST following redirection
glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+s.topo.Leader()+req.URL.Path)
http.Redirect(w, req, "http://"+s.topo.Leader()+req.URL.Path, http.StatusMovedPermanently)
glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+leader+req.URL.Path)
http.Redirect(w, req, "http://"+leader+req.URL.Path, http.StatusMovedPermanently)
} else { } else {
glog.V(0).Infoln("Error: Leader Unknown") glog.V(0).Infoln("Error: Leader Unknown")
http.Error(w, "Leader unknown", http.StatusInternalServerError) http.Error(w, "Leader unknown", http.StatusInternalServerError)
@ -53,7 +53,11 @@ func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request)
func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) { func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["IsLeader"] = s.topo.IsLeader() m["IsLeader"] = s.topo.IsLeader()
m["Leader"] = s.topo.Leader()
if leader, e := s.topo.Leader(); e == nil {
m["Leader"] = leader
} else {
m["Leader"] = ""
}
m["Peers"] = s.Peers() m["Peers"] = s.Peers()
writeJsonQuiet(w, r, m) writeJsonQuiet(w, r, m)
} }
Loading…
Cancel
Save