Browse Source

Merge pull request #312 from hxiaodon/master

filer server concern the lead change
pull/313/head
Chris Lu 9 years ago
parent
commit
04380d6a36
  1. 78
      weed/server/filer_server.go
  2. 2
      weed/server/filer_server_handlers_read.go
  3. 12
      weed/server/filer_server_handlers_write.go
  4. 12
      weed/storage/store.go

78
weed/server/filer_server.go

@ -1,8 +1,11 @@
package weed_server package weed_server
import ( import (
"math/rand"
"net/http" "net/http"
"strconv" "strconv"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/cassandra_store" "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
@ -11,17 +14,21 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer/redis_store" "github.com/chrislusf/seaweedfs/weed/filer/redis_store"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
) )
type FilerServer struct { type FilerServer struct {
port string port string
master string master string
mnLock sync.RWMutex
collection string collection string
defaultReplication string defaultReplication string
redirectOnRead bool redirectOnRead bool
disableDirListing bool disableDirListing bool
secret security.Secret secret security.Secret
filer filer.Filer filer filer.Filer
masterNodes *storage.MasterNodes
} }
func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string,
@ -59,9 +66,80 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
r.HandleFunc("/", fs.filerHandler) r.HandleFunc("/", fs.filerHandler)
go func() {
connected := true
fs.masterNodes = storage.NewMasterNodes(fs.master)
glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
//force initialize with all available master nodes
fs.masterNodes.FindMaster()
for {
glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode())
master, err := fs.detectHealthyMaster(fs.getMasterNode())
if err == nil {
if !connected {
connected = true
if fs.getMasterNode() != master {
fs.setMasterNode(master)
}
glog.V(0).Infoln("Filer Server Connected with master at", master)
}
} else {
glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err)
if connected {
connected = false
}
}
if connected {
time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond)
} else {
time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
}
}
}()
return fs, nil return fs, nil
} }
func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
return security.GenJwt(fs.secret, fileId) return security.GenJwt(fs.secret, fileId)
} }
func (fs *FilerServer) getMasterNode() string {
fs.mnLock.RLock()
defer fs.mnLock.RUnlock()
return fs.master
}
func (fs *FilerServer) setMasterNode(masterNode string) {
fs.mnLock.Lock()
defer fs.mnLock.Unlock()
fs.master = masterNode
}
func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) {
statUrl := "http://" + masterNode + "/stats"
glog.V(4).Infof("Connecting to %s ...", statUrl)
_, e = util.Get(statUrl)
if e != nil {
fs.masterNodes.Reset()
for i := 0; i <= 3; i++ {
master, e = fs.masterNodes.FindMaster()
if e != nil {
continue
} else {
statUrl := "http://" + master + "/stats"
glog.V(4).Infof("Connecting to %s ...", statUrl)
_, e = util.Get(statUrl)
if e == nil {
break
}
}
}
} else {
master = masterNode
}
return
}

2
weed/server/filer_server_handlers_read.go

@ -58,7 +58,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return return
} }
urlLocation, err := operation.LookupFileId(fs.master, fileId)
urlLocation, err := operation.LookupFileId(fs.getMasterNode(), fileId)
if err != nil { if err != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)

12
weed/server/filer_server_handlers_write.go

@ -70,7 +70,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return return
} else if fileId != "" && err == nil { } else if fileId != "" && err == nil {
var le error var le error
urlLocation, le = operation.LookupFileId(fs.master, fileId)
urlLocation, le = operation.LookupFileId(fs.getMasterNode(), fileId)
if le != nil { if le != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error()) glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error())
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
@ -78,7 +78,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
} else { } else {
assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl"))
assignResult, ae := operation.Assign(fs.getMasterNode(), 1, replication, collection, query.Get("ttl"))
if ae != nil { if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error()) glog.V(0).Infoln("failing to assign a file id", ae.Error())
writeJsonError(w, r, http.StatusInternalServerError, ae) writeJsonError(w, r, http.StatusInternalServerError, ae)
@ -132,7 +132,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if ret.Name != "" { if ret.Name != "" {
path += ret.Name path += ret.Name
} else { } else {
operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up
operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
writeJsonError(w, r, http.StatusInternalServerError, writeJsonError(w, r, http.StatusInternalServerError,
errors.New("Can not to write to folder "+path+" without a file name")) errors.New("Can not to write to folder "+path+" without a file name"))
@ -143,13 +143,13 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// also delete the old fid unless PUT operation // also delete the old fid unless PUT operation
if r.Method != "PUT" { if r.Method != "PUT" {
if oldFid, err := fs.filer.FindFile(path); err == nil { if oldFid, err := fs.filer.FindFile(path); err == nil {
operation.DeleteFile(fs.master, oldFid, fs.jwt(oldFid))
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} }
} }
glog.V(4).Infoln("saving", path, "=>", fileId) glog.V(4).Infoln("saving", path, "=>", fileId)
if db_err := fs.filer.CreateFile(path, fileId); db_err != nil { if db_err := fs.filer.CreateFile(path, fileId); db_err != nil {
operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up
operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err)
return return
@ -176,7 +176,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} else { } else {
fid, err = fs.filer.DeleteFile(r.URL.Path) fid, err = fs.filer.DeleteFile(r.URL.Path)
if err == nil && fid != "" { if err == nil && fid != "" {
err = operation.DeleteFile(fs.master, fid, fs.jwt(fid))
err = operation.DeleteFile(fs.getMasterNode(), fid, fs.jwt(fid))
} }
} }
if err == nil { if err == nil {

12
weed/storage/store.go

@ -32,14 +32,14 @@ func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
return return
} }
func (mn *MasterNodes) reset() {
func (mn *MasterNodes) Reset() {
glog.V(4).Infof("Resetting master nodes: %v", mn) glog.V(4).Infof("Resetting master nodes: %v", mn)
if len(mn.nodes) > 1 && mn.lastNode >= 0 { if len(mn.nodes) > 1 && mn.lastNode >= 0 {
glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes) glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes)
mn.lastNode = -mn.lastNode - 1 mn.lastNode = -mn.lastNode - 1
} }
} }
func (mn *MasterNodes) findMaster() (string, error) {
func (mn *MasterNodes) FindMaster() (string, error) {
if len(mn.nodes) == 0 { if len(mn.nodes) == 0 {
return "", errors.New("No master node found!") return "", errors.New("No master node found!")
} }
@ -210,7 +210,7 @@ func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
s.masterNodes = NewMasterNodes(bootstrapMaster) s.masterNodes = NewMasterNodes(bootstrapMaster)
} }
func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
masterNode, e = s.masterNodes.findMaster()
masterNode, e = s.masterNodes.FindMaster()
if e != nil { if e != nil {
return return
} }
@ -270,17 +270,17 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
jsonBlob, err := util.PostBytes(joinUrl, data) jsonBlob, err := util.PostBytes(joinUrl, data)
if err != nil { if err != nil {
s.masterNodes.reset()
s.masterNodes.Reset()
return "", "", err return "", "", err
} }
var ret operation.JoinResult var ret operation.JoinResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil { if err := json.Unmarshal(jsonBlob, &ret); err != nil {
glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
s.masterNodes.reset()
s.masterNodes.Reset()
return masterNode, "", err return masterNode, "", err
} }
if ret.Error != "" { if ret.Error != "" {
s.masterNodes.reset()
s.masterNodes.Reset()
return masterNode, "", errors.New(ret.Error) return masterNode, "", errors.New(ret.Error)
} }
s.volumeSizeLimit = ret.VolumeSizeLimit s.volumeSizeLimit = ret.VolumeSizeLimit

Loading…
Cancel
Save