From 03f50180f3f6bfca5ae61fab0fa1c3b1db939e05 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 12 Jun 2018 01:54:09 -0700 Subject: [PATCH] simplifying the leader election by raft fixing https://github.com/chrislusf/seaweedfs/issues/629 --- weed/server/master_server.go | 1 + weed/server/raft_server.go | 107 +++------------------------- weed/server/raft_server_handlers.go | 45 ------------ 3 files changed, 9 insertions(+), 144 deletions(-) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 9fd07072f..efa0ae104 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -88,6 +88,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { ms.Topo.RaftServer = raftServer.raftServer ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + glog.V(0).Infof("event: %+v", e) if ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") } diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 61adcdc59..e2a091e83 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -1,14 +1,8 @@ package weed_server import ( - "bytes" "encoding/json" - "errors" - "fmt" "io/ioutil" - "math/rand" - "net/http" - "net/url" "os" "path" "reflect" @@ -49,7 +43,7 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin var err error transporter := raft.NewHTTPTransporter("/cluster", 0) transporter.Transport.MaxIdleConnsPerHost = 1024 - glog.V(1).Infof("Starting RaftServer with IP:%v:", httpAddr) + glog.V(0).Infof("Starting RaftServer with %v", httpAddr) // Clear old cluster configurations if peers are changed if oldPeers, changed := isPeersChanged(s.dataDir, httpAddr, s.peers); changed { @@ -69,31 +63,13 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond) s.raftServer.Start() - s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST") s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET") - if len(s.peers) > 0 { - // Join to leader if specified. - for { - glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - firstJoinError := s.Join(s.peers) - if firstJoinError != nil { - glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.") - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, - }) - if err != nil { - glog.V(0).Infoln(err) - } else { - break - } - } else { - break - } - } - } else if s.raftServer.IsLogEmpty() { + for _, peer := range s.peers { + s.raftServer.AddPeer(peer, "http://"+peer) + } + time.Sleep(2 * time.Second) + if s.raftServer.IsLogEmpty() { // Initialize the server by joining itself. glog.V(0).Infoln("Initializing new cluster") @@ -106,11 +82,10 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin glog.V(0).Infoln(err) return nil } - - } else { - glog.V(0).Infoln("Old conf,log,snapshot should have been removed.") } + glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) + return s } @@ -151,69 +126,3 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, return oldPeers, !reflect.DeepEqual(peers, oldPeers) } - -// Join joins an existing cluster. -func (s *RaftServer) Join(peers []string) error { - command := &raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, - } - - var err error - var b bytes.Buffer - json.NewEncoder(&b).Encode(command) - for _, m := range peers { - if m == s.httpAddr { - continue - } - target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)) - glog.V(0).Infoln("Attempting to connect to:", target) - - err = postFollowingOneRedirect(target, "application/json", b) - - if err != nil { - glog.V(0).Infoln("Post returned error: ", err.Error()) - if _, ok := err.(*url.Error); ok { - // If we receive a network error try the next member - continue - } - } else { - return nil - } - } - - return errors.New("Could not connect to any cluster peers") -} - -// a workaround because http POST following redirection misses request body -func postFollowingOneRedirect(target string, contentType string, b bytes.Buffer) error { - backupReader := bytes.NewReader(b.Bytes()) - resp, err := http.Post(target, contentType, &b) - if err != nil { - return err - } - defer resp.Body.Close() - statusCode := resp.StatusCode - data, _ := ioutil.ReadAll(resp.Body) - reply := string(data) - - if strings.HasPrefix(reply, "\"http") { - urlStr := reply[1 : len(reply)-1] - - glog.V(0).Infoln("Post redirected to ", urlStr) - resp2, err2 := http.Post(urlStr, contentType, backupReader) - if err2 != nil { - return err2 - } - defer resp2.Body.Close() - data, _ = ioutil.ReadAll(resp2.Body) - statusCode = resp2.StatusCode - } - - glog.V(0).Infoln("Post returned status: ", statusCode, string(data)) - if statusCode != http.StatusOK { - return errors.New(string(data)) - } - - return nil -} diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index 0a794ce6c..c91ab0407 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -1,59 +1,14 @@ package weed_server import ( - "encoding/json" - "io/ioutil" "net/http" - "strings" - - "github.com/chrislusf/raft" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" ) -// Handles incoming RAFT joins. -func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { - glog.V(0).Infoln("Processing incoming join. Current Leader", s.raftServer.Leader(), "Self", s.raftServer.Name(), "Peers", s.raftServer.Peers()) - command := &raft.DefaultJoinCommand{} - - commandText, _ := ioutil.ReadAll(req.Body) - glog.V(0).Info("Command:", string(commandText)) - if err := json.NewDecoder(strings.NewReader(string(commandText))).Decode(&command); err != nil { - glog.V(0).Infoln("Error decoding json message:", err, string(commandText)) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - glog.V(0).Infoln("join command from Name", command.Name, "Connection", command.ConnectionString) - - if _, err := s.raftServer.Do(command); err != nil { - switch err { - case raft.NotLeaderError: - s.redirectToLeader(w, req) - default: - glog.V(0).Infoln("Error processing join:", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } -} - func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { s.router.HandleFunc(pattern, handler) } -func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) { - if leader, e := s.topo.Leader(); e == nil { - //http.StatusMovedPermanently does not cause http POST following redirection - learderLocation := "http://" + leader + req.URL.Path - glog.V(0).Infoln("Redirecting to", learderLocation) - writeJsonQuiet(w, req, http.StatusOK, learderLocation) - // http.Redirect(w, req, "http://"+leader+req.URL.Path, http.StatusFound) // not working any more - } else { - glog.V(0).Infoln("Error: Leader Unknown") - http.Error(w, "Leader unknown", http.StatusInternalServerError) - } -} - func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) { ret := operation.ClusterStatusResult{ IsLeader: s.topo.IsLeader(),