diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index ee7eaf886..767db3aea 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,8 +1,11 @@ package weed_server import ( + "math/rand" "net/http" "strconv" + "sync" + "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store" @@ -11,17 +14,21 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer/redis_store" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" ) type FilerServer struct { port string master string + mnLock sync.RWMutex collection string defaultReplication string redirectOnRead bool disableDirListing bool secret security.Secret filer filer.Filer + masterNodes *storage.MasterNodes } func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, @@ -59,9 +66,80 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st r.HandleFunc("/", fs.filerHandler) + go func() { + connected := true + + fs.masterNodes = storage.NewMasterNodes(fs.master) + glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode()) + + //force initialize with all available master nodes + fs.masterNodes.FindMaster() + + for { + glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode()) + master, err := fs.detectHealthyMaster(fs.getMasterNode()) + if err == nil { + if !connected { + connected = true + if fs.getMasterNode() != master { + fs.setMasterNode(master) + } + glog.V(0).Infoln("Filer Server Connected with master at", master) + } + } else { + glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err) + if connected { + connected = false + } + } + if connected { + time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond) + } else { + time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond) + } + } + }() + return fs, nil } func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { return security.GenJwt(fs.secret, fileId) } + +func (fs *FilerServer) getMasterNode() string { + fs.mnLock.RLock() + defer fs.mnLock.RUnlock() + return fs.master +} + +func (fs *FilerServer) setMasterNode(masterNode string) { + fs.mnLock.Lock() + defer fs.mnLock.Unlock() + fs.master = masterNode +} + +func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) { + statUrl := "http://" + masterNode + "/stats" + glog.V(4).Infof("Connecting to %s ...", statUrl) + _, e = util.Get(statUrl) + if e != nil { + fs.masterNodes.Reset() + for i := 0; i <= 3; i++ { + master, e = fs.masterNodes.FindMaster() + if e != nil { + continue + } else { + statUrl := "http://" + master + "/stats" + glog.V(4).Infof("Connecting to %s ...", statUrl) + _, e = util.Get(statUrl) + if e == nil { + break + } + } + } + } else { + master = masterNode + } + return +} diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index ec61b0ab0..7b134581a 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -58,7 +58,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, return } - urlLocation, err := operation.LookupFileId(fs.master, fileId) + urlLocation, err := operation.LookupFileId(fs.getMasterNode(), fileId) if err != nil { glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) w.WriteHeader(http.StatusNotFound) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index b446704f2..2aad20980 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -70,7 +70,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } else if fileId != "" && err == nil { var le error - urlLocation, le = operation.LookupFileId(fs.master, fileId) + urlLocation, le = operation.LookupFileId(fs.getMasterNode(), fileId) if le != nil { glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error()) w.WriteHeader(http.StatusNotFound) @@ -78,7 +78,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } } } else { - assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) + assignResult, ae := operation.Assign(fs.getMasterNode(), 1, replication, collection, query.Get("ttl")) if ae != nil { glog.V(0).Infoln("failing to assign a file id", ae.Error()) writeJsonError(w, r, http.StatusInternalServerError, ae) @@ -132,7 +132,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if ret.Name != "" { path += ret.Name } else { - operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up + operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") writeJsonError(w, r, http.StatusInternalServerError, errors.New("Can not to write to folder "+path+" without a file name")) @@ -143,13 +143,13 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { // also delete the old fid unless PUT operation if r.Method != "PUT" { if oldFid, err := fs.filer.FindFile(path); err == nil { - operation.DeleteFile(fs.master, oldFid, fs.jwt(oldFid)) + operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) } } glog.V(4).Infoln("saving", path, "=>", fileId) if db_err := fs.filer.CreateFile(path, fileId); db_err != nil { - operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up + operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) return @@ -176,7 +176,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } else { fid, err = fs.filer.DeleteFile(r.URL.Path) if err == nil && fid != "" { - err = operation.DeleteFile(fs.master, fid, fs.jwt(fid)) + err = operation.DeleteFile(fs.getMasterNode(), fid, fs.jwt(fid)) } } if err == nil { diff --git a/weed/storage/store.go b/weed/storage/store.go index d44d6a863..485ed437f 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -32,14 +32,14 @@ func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) { mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} return } -func (mn *MasterNodes) reset() { +func (mn *MasterNodes) Reset() { glog.V(4).Infof("Resetting master nodes: %v", mn) if len(mn.nodes) > 1 && mn.lastNode >= 0 { glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes) mn.lastNode = -mn.lastNode - 1 } } -func (mn *MasterNodes) findMaster() (string, error) { +func (mn *MasterNodes) FindMaster() (string, error) { if len(mn.nodes) == 0 { return "", errors.New("No master node found!") } @@ -210,7 +210,7 @@ func (s *Store) SetBootstrapMaster(bootstrapMaster string) { s.masterNodes = NewMasterNodes(bootstrapMaster) } func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { - masterNode, e = s.masterNodes.findMaster() + masterNode, e = s.masterNodes.FindMaster() if e != nil { return } @@ -270,17 +270,17 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S jsonBlob, err := util.PostBytes(joinUrl, data) if err != nil { - s.masterNodes.reset() + s.masterNodes.Reset() return "", "", err } var ret operation.JoinResult if err := json.Unmarshal(jsonBlob, &ret); err != nil { glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) - s.masterNodes.reset() + s.masterNodes.Reset() return masterNode, "", err } if ret.Error != "" { - s.masterNodes.reset() + s.masterNodes.Reset() return masterNode, "", errors.New(ret.Error) } s.volumeSizeLimit = ret.VolumeSizeLimit