diff --git a/go/weed/server.go b/go/weed/server.go index f4752992b..ade040b15 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -41,7 +41,7 @@ var ( serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - serverMembers = cmdServer.Flag.String("members", "", "comma separated ip:masterPort list") + serverPeers = cmdServer.Flag.String("peers", "", "comma separated ip:masterPort list") masterPort = cmdServer.Flag.Int("masterPort", 9333, "master server http listen port") masterMetaFolder = cmdServer.Flag.String("mdir", os.TempDir(), "data directory to store meta data") masterVolumeSizeLimitMB = cmdServer.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") @@ -107,11 +107,11 @@ func runServer(cmd *Command, args []string) bool { go func() { time.Sleep(100 * time.Millisecond) - var members []string - if *serverMembers != "" { - members = strings.Split(*serverMembers, ",") + var peers []string + if *serverPeers != "" { + peers = strings.Split(*serverPeers, ",") } - weed_server.NewRaftServer(r, VERSION, members, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder) + weed_server.NewRaftServer(r, VERSION, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder) }() e := masterServer.ListenAndServe() diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go new file mode 100644 index 000000000..daafe32af --- /dev/null +++ b/go/weed/weed_server/raft_server.go @@ -0,0 +1,137 @@ +package weed_server + +import ( + "bytes" + "code.google.com/p/weed-fs/go/glog" + "encoding/json" + "errors" + "fmt" + "github.com/goraft/raft" + "github.com/gorilla/mux" + "net/http" + "net/url" + "strings" + "time" +) + +type RaftServer struct { + peers []string // initial peers to join with + raftServer raft.Server + dataDir string + httpAddr string + version string + router *mux.Router +} + +func NewRaftServer(r *mux.Router, version string, peers []string, httpAddr string, dataDir string) *RaftServer { + s := &RaftServer{ + version: version, + peers: peers, + httpAddr: httpAddr, + dataDir: dataDir, + router: r, + } + + raft.SetLogLevel(2) + + var err error + transporter := raft.NewHTTPTransporter("/raft") + s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, nil, "") + if err != nil { + glog.V(0).Infoln(err) + return nil + } + transporter.Install(s.raftServer, s) + s.raftServer.SetHeartbeatTimeout(1 * time.Second) + s.raftServer.SetElectionTimeout(1500 * time.Millisecond) + s.raftServer.Start() + + s.router.HandleFunc("/raft/join", s.joinHandler).Methods("POST") + + // Join to leader if specified. + if len(s.peers) > 0 { + glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) + + if !s.raftServer.IsLogEmpty() { + glog.V(0).Infoln("Cannot join with an existing log") + } + + if err := s.Join(s.peers); err != nil { + return nil + } + + glog.V(0).Infoln("Joined cluster") + + // Initialize the server by joining itself. + } else if s.raftServer.IsLogEmpty() { + glog.V(0).Infoln("Initializing new cluster") + + _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: "http://" + s.httpAddr, + }) + + if err != nil { + glog.V(0).Infoln(err) + return nil + } + + } else { + glog.V(0).Infoln("Recovered from log") + } + + return s +} + +func (s *RaftServer) Leader() string { + l := s.raftServer.Leader() + + if l == "" { + // We are a single node cluster, we are the leader + return s.raftServer.Name() + } + + return l +} + +func (s *RaftServer) Members() (members []string) { + peers := s.raftServer.Peers() + + for _, p := range peers { + members = append(members, strings.TrimPrefix(p.ConnectionString, "http://")) + } + + return +} + +// Join joins an existing cluster. +func (s *RaftServer) Join(peers []string) error { + command := &raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: "http://" + s.httpAddr, + } + + var b bytes.Buffer + json.NewEncoder(&b).Encode(command) + + for _, m := range peers { + glog.V(0).Infoln("Attempting to connect to:", m) + + resp, err := http.Post(fmt.Sprintf("http://%s/raft/join", strings.TrimSpace(m)), "application/json", &b) + glog.V(0).Infoln("Post returned: ", err) + + if err != nil { + if _, ok := err.(*url.Error); ok { + // If we receive a network error try the next member + continue + } + + return err + } + + resp.Body.Close() + return nil + } + + return errors.New("Could not connect to any cluster peers") +} diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go new file mode 100644 index 000000000..f1cb1a6c9 --- /dev/null +++ b/go/weed/weed_server/raft_server_handlers.go @@ -0,0 +1,46 @@ +package weed_server + +import ( + "code.google.com/p/weed-fs/go/glog" + "encoding/json" + "github.com/goraft/raft" + "net/http" +) + +// Handles incoming RAFT joins. +func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { + glog.V(0).Infoln("Processing incoming join. Current Leader", s.raftServer.Leader(), "Self", s.raftServer.Name(), "Peers", s.raftServer.Peers()) + command := &raft.DefaultJoinCommand{} + + if err := json.NewDecoder(req.Body).Decode(&command); err != nil { + glog.V(0).Infoln("Error decoding json message:", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + glog.V(0).Infoln("join command from Name", command.Name, "Connection", command.ConnectionString) + + if _, err := s.raftServer.Do(command); err != nil { + switch err { + case raft.NotLeaderError: + s.redirectToLeader(w, req) + default: + glog.V(0).Infoln("Error processing join:", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } +} + +func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { + s.router.HandleFunc(pattern, handler) +} + +func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) { + if s.Leader() != "" { + glog.V(0).Infoln("Redirecting to", "http://"+s.Leader()+req.URL.Path) + http.Redirect(w, req, "http://"+s.Leader()+req.URL.Path, http.StatusMovedPermanently) + } else { + glog.V(0).Infoln("Error: Leader Unknown") + http.Error(w, "Leader unknown", http.StatusInternalServerError) + } +}