Browse Source

filer could detect master nodes healthy status and choose a working one when encountering SPOF

pull/312/head
霍晓栋 9 years ago
parent
commit
1ef81ac518
  1. 78
      weed/server/filer_server.go
  2. 12
      weed/storage/store.go

78
weed/server/filer_server.go

@ -1,8 +1,11 @@
package weed_server package weed_server
import ( import (
"math/rand"
"net/http" "net/http"
"strconv" "strconv"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/cassandra_store" "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
@ -11,17 +14,21 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer/redis_store" "github.com/chrislusf/seaweedfs/weed/filer/redis_store"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
) )
type FilerServer struct { type FilerServer struct {
port string port string
master string master string
mnLock sync.RWMutex
collection string collection string
defaultReplication string defaultReplication string
redirectOnRead bool redirectOnRead bool
disableDirListing bool disableDirListing bool
secret security.Secret secret security.Secret
filer filer.Filer filer filer.Filer
masterNodes *storage.MasterNodes
} }
func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string,
@ -59,9 +66,80 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
r.HandleFunc("/", fs.filerHandler) r.HandleFunc("/", fs.filerHandler)
go func() {
connected := true
fs.masterNodes = storage.NewMasterNodes(fs.master)
glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
//force initialize with all available master nodes
fs.masterNodes.FindMaster()
for {
glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode())
master, err := fs.detectHealthyMaster(fs.getMasterNode())
if err == nil {
if !connected {
connected = true
if fs.getMasterNode() != master {
fs.setMasterNode(master)
}
glog.V(0).Infoln("Filer Server Connected with master at", master)
}
} else {
glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err)
if connected {
connected = false
}
}
if connected {
time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond)
} else {
time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
}
}
}()
return fs, nil return fs, nil
} }
func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
return security.GenJwt(fs.secret, fileId) return security.GenJwt(fs.secret, fileId)
} }
func (fs *FilerServer) getMasterNode() string {
fs.mnLock.RLock()
defer fs.mnLock.RUnlock()
return fs.master
}
func (fs *FilerServer) setMasterNode(masterNode string) {
fs.mnLock.Lock()
defer fs.mnLock.Unlock()
fs.master = masterNode
}
func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) {
statUrl := "http://" + masterNode + "/stats"
glog.V(4).Infof("Connecting to %s ...", statUrl)
_, e = util.Get(statUrl)
if e != nil {
fs.masterNodes.Reset()
for i := 0; i <= 3; i++ {
master, e = fs.masterNodes.FindMaster()
if e != nil {
continue
} else {
statUrl := "http://" + master + "/stats"
glog.V(4).Infof("Connecting to %s ...", statUrl)
_, e = util.Get(statUrl)
if e == nil {
break
}
}
}
} else {
master = masterNode
}
return
}

12
weed/storage/store.go

@ -32,14 +32,14 @@ func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
return return
} }
func (mn *MasterNodes) reset() {
func (mn *MasterNodes) Reset() {
glog.V(4).Infof("Resetting master nodes: %v", mn) glog.V(4).Infof("Resetting master nodes: %v", mn)
if len(mn.nodes) > 1 && mn.lastNode >= 0 { if len(mn.nodes) > 1 && mn.lastNode >= 0 {
glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes) glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes)
mn.lastNode = -mn.lastNode - 1 mn.lastNode = -mn.lastNode - 1
} }
} }
func (mn *MasterNodes) findMaster() (string, error) {
func (mn *MasterNodes) FindMaster() (string, error) {
if len(mn.nodes) == 0 { if len(mn.nodes) == 0 {
return "", errors.New("No master node found!") return "", errors.New("No master node found!")
} }
@ -210,7 +210,7 @@ func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
s.masterNodes = NewMasterNodes(bootstrapMaster) s.masterNodes = NewMasterNodes(bootstrapMaster)
} }
func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
masterNode, e = s.masterNodes.findMaster()
masterNode, e = s.masterNodes.FindMaster()
if e != nil { if e != nil {
return return
} }
@ -270,17 +270,17 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
jsonBlob, err := util.PostBytes(joinUrl, data) jsonBlob, err := util.PostBytes(joinUrl, data)
if err != nil { if err != nil {
s.masterNodes.reset()
s.masterNodes.Reset()
return "", "", err return "", "", err
} }
var ret operation.JoinResult var ret operation.JoinResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil { if err := json.Unmarshal(jsonBlob, &ret); err != nil {
glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
s.masterNodes.reset()
s.masterNodes.Reset()
return masterNode, "", err return masterNode, "", err
} }
if ret.Error != "" { if ret.Error != "" {
s.masterNodes.reset()
s.masterNodes.Reset()
return masterNode, "", errors.New(ret.Error) return masterNode, "", errors.New(ret.Error)
} }
s.volumeSizeLimit = ret.VolumeSizeLimit s.volumeSizeLimit = ret.VolumeSizeLimit

Loading…
Cancel
Save