From d6fbd741fd0a41132cc53fff2c2ff31d5507c182 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 5 Feb 2014 00:25:23 -0800 Subject: [PATCH] a stable working clustering master node implementation --- go/weed/weed_server/raft_server.go | 50 ++++++++++++++++++--- go/weed/weed_server/raft_server_handlers.go | 12 +++-- 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go index d00b36588..a44936413 100644 --- a/go/weed/weed_server/raft_server.go +++ b/go/weed/weed_server/raft_server.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/goraft/raft" "github.com/gorilla/mux" + "io/ioutil" "net/http" "net/url" "strings" @@ -86,6 +87,10 @@ func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr strin return s } +func (s *RaftServer) IsLeader() bool { + return s.Leader() == s.raftServer.Name() +} + func (s *RaftServer) Leader() string { l := s.raftServer.Leader() @@ -97,7 +102,7 @@ func (s *RaftServer) Leader() string { return l } -func (s *RaftServer) Members() (members []string) { +func (s *RaftServer) Peers() (members []string) { peers := s.raftServer.Peers() for _, p := range peers { @@ -118,12 +123,13 @@ func (s *RaftServer) Join(peers []string) error { json.NewEncoder(&b).Encode(command) for _, m := range peers { - glog.V(0).Infoln("Attempting to connect to:", m) + target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)) + glog.V(0).Infoln("Attempting to connect to:", target) - resp, err := http.Post(fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)), "application/json", &b) - glog.V(0).Infoln("Post returned: ", err) + err := postFollowingOneRedirect(target, "application/json", &b) if err != nil { + glog.V(0).Infoln("Post returned error: ", err) if _, ok := err.(*url.Error); ok { // If we receive a network error try the next member continue @@ -132,9 +138,43 @@ func (s *RaftServer) Join(peers []string) error { return err } - resp.Body.Close() return nil } return errors.New("Could not connect to any cluster peers") } + +// a workaround because http POST following redirection misses request body +func postFollowingOneRedirect(target string, contentType string, b *bytes.Buffer) error { + backupReader := bytes.NewReader(b.Bytes()) + resp, err := http.Post(target, contentType, b) + if err != nil { + return err + } + defer resp.Body.Close() + reply, _ := ioutil.ReadAll(resp.Body) + statusCode := resp.StatusCode + + if statusCode == http.StatusMovedPermanently { + var urlStr string + if urlStr = resp.Header.Get("Location"); urlStr == "" { + return errors.New(fmt.Sprintf("%d response missing Location header", resp.StatusCode)) + } + + glog.V(0).Infoln("Post redirected to ", urlStr) + resp2, err2 := http.Post(urlStr, contentType, backupReader) + if err2 != nil { + return err2 + } + defer resp2.Body.Close() + reply, _ = ioutil.ReadAll(resp2.Body) + statusCode = resp2.StatusCode + } + + glog.V(0).Infoln("Post returned status: ", statusCode, string(reply)) + if statusCode != http.StatusOK { + return errors.New(string(reply)) + } + + return nil +} diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go index 585b6698a..4de52bf0a 100644 --- a/go/weed/weed_server/raft_server_handlers.go +++ b/go/weed/weed_server/raft_server_handlers.go @@ -4,7 +4,9 @@ import ( "code.google.com/p/weed-fs/go/glog" "encoding/json" "github.com/goraft/raft" + "io/ioutil" "net/http" + "strings" ) // Handles incoming RAFT joins. @@ -12,7 +14,9 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { glog.V(0).Infoln("Processing incoming join. Current Leader", s.raftServer.Leader(), "Self", s.raftServer.Name(), "Peers", s.raftServer.Peers()) command := &raft.DefaultJoinCommand{} - if err := json.NewDecoder(req.Body).Decode(&command); err != nil { + commandText, _ := ioutil.ReadAll(req.Body) + glog.V(0).Info("Command:", string(commandText)) + if err := json.NewDecoder(strings.NewReader(string(commandText))).Decode(&command); err != nil { glog.V(0).Infoln("Error decoding json message:", err) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -37,7 +41,8 @@ func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) { if s.Leader() != "" { - glog.V(0).Infoln("Redirecting to", "http://"+s.Leader()+req.URL.Path) + //http.StatusMovedPermanently does not cause http POST following redirection + glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+s.Leader()+req.URL.Path) http.Redirect(w, req, "http://"+s.Leader()+req.URL.Path, http.StatusMovedPermanently) } else { glog.V(0).Infoln("Error: Leader Unknown") @@ -47,7 +52,8 @@ func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) + m["IsLeader"] = s.IsLeader() m["Leader"] = s.Leader() - m["Members"] = s.Members() + m["Peers"] = s.Peers() writeJsonQuiet(w, r, m) }