From e7585548a4daa2033896f2c4390fd19d0ffce066 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 5 Feb 2014 01:54:52 -0800 Subject: [PATCH] working auto fail-over master node --- go/weed/server.go | 5 ++-- go/weed/weed_server/common.go | 1 - go/weed/weed_server/master_server.go | 45 ++++++++++++++++++++++------ 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/go/weed/server.go b/go/weed/server.go index 4a5c32a7a..b1f5dc049 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -94,7 +94,7 @@ func runServer(cmd *Command, args []string) bool { go func() { r := mux.NewRouter() - weed_server.NewMasterServer(r, VERSION, *masterPort, *masterMetaFolder, + ms := weed_server.NewMasterServer(r, VERSION, *masterPort, *masterMetaFolder, *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultRepType, *garbageThreshold, serverWhiteList, ) @@ -111,7 +111,8 @@ func runServer(cmd *Command, args []string) bool { if *serverPeers != "" { peers = strings.Split(*serverPeers, ",") } - weed_server.NewRaftServer(r, VERSION, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder) + raftServer := weed_server.NewRaftServer(r, VERSION, peers, *serverIp+":"+strconv.Itoa(*masterPort), *masterMetaFolder) + ms.SetRaftServer(raftServer) }() e := masterServer.ListenAndServe() diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index 677a941e8..d0d52f488 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -14,7 +14,6 @@ import ( "strings" ) - func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) { w.Header().Set("Content-Type", "application/javascript") var bytes []byte diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index e13358105..e0fa677b9 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -6,6 +6,9 @@ import ( "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/topology" "github.com/gorilla/mux" + "net/http" + "net/http/httputil" + "net/url" "path" "sync" ) @@ -23,6 +26,8 @@ type MasterServer struct { topo *topology.Topology vg *replication.VolumeGrowth vgLock sync.Mutex + + raftServer *RaftServer } func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, @@ -31,7 +36,8 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, confFile string, defaultRepType string, garbageThreshold string, - whiteList []string) *MasterServer { + whiteList []string, +) *MasterServer { ms := &MasterServer{ version: version, volumeSizeLimitMB: volumeSizeLimitMB, @@ -49,17 +55,38 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, ms.vg = replication.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - r.HandleFunc("/dir/assign", secure(ms.whiteList, ms.dirAssignHandler)) - r.HandleFunc("/dir/lookup", secure(ms.whiteList, ms.dirLookupHandler)) - r.HandleFunc("/dir/join", secure(ms.whiteList, ms.dirJoinHandler)) - r.HandleFunc("/dir/status", secure(ms.whiteList, ms.dirStatusHandler)) - r.HandleFunc("/vol/grow", secure(ms.whiteList, ms.volumeGrowHandler)) - r.HandleFunc("/vol/status", secure(ms.whiteList, ms.volumeStatusHandler)) - r.HandleFunc("/vol/vacuum", secure(ms.whiteList, ms.volumeVacuumHandler)) - r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler)) + r.HandleFunc("/dir/assign", ms.proxyToLeader(secure(ms.whiteList, ms.dirAssignHandler))) + r.HandleFunc("/dir/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.dirLookupHandler))) + r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler))) + r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler))) + r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler))) + r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler))) + r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler))) + r.HandleFunc("/submit", ms.proxyToLeader(secure(ms.whiteList, ms.submitFromMasterServerHandler))) r.HandleFunc("/", ms.redirectHandler) ms.topo.StartRefreshWritableVolumes(garbageThreshold) return ms } + +func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { + ms.raftServer = raftServer +} + +func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + if ms.raftServer == nil || ms.raftServer.IsLeader() { + f(w, r) + } else { + targetUrl, err := url.Parse("http://" + ms.raftServer.Leader()) + if err != nil { + writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL Parse Error " + err.Error()}) + return + } + glog.V(4).Infoln("proxying to leader", ms.raftServer.Leader()) + proxy := httputil.NewSingleHostReverseProxy(targetUrl) + proxy.ServeHTTP(w, r) + } + } +}