Browse Source

update store SendHeartbeatToMaster

pull/279/head
tnextday 10 years ago
parent
commit
76655fc1bd
  1. 26
      go/security/guard.go
  2. 35
      go/storage/collection_settings.go
  3. 62
      go/storage/store.go
  4. 2
      go/storage/store_task_replication.go
  5. 4
      go/weed/weed_server/master_server_handlers_admin.go
  6. 40
      go/weed/weed_server/volume_server.go
  7. 2
      go/weed/weed_server/volume_server_handlers_ui.go
  8. 2
      go/weedpb/system_message.proto

26
go/security/guard.go

@ -41,19 +41,35 @@ Referenced:
https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go
*/
//TODO lock on data access?
type Guard struct {
whiteList []string
SecretKey Secret
secretKey Secret
isActive bool
}
func NewGuard(whiteList []string, secretKey string) *Guard {
g := &Guard{whiteList: whiteList, SecretKey: Secret(secretKey)}
g.isActive = len(g.whiteList) != 0 || len(g.SecretKey) != 0
g := &Guard{whiteList: whiteList, secretKey: Secret(secretKey)}
g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0
return g
}
func (g *Guard) SetSecretKey(k string) {
g.secretKey = Secret(k)
g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0
}
func (g *Guard) GetSecretKey() Secret {
return g.secretKey
}
func (g *Guard) SetWhiteList(l []string) {
g.whiteList = l
g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0
}
func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
if !g.isActive {
//if no security needed, just skip all checkings
@ -138,7 +154,7 @@ func (g *Guard) checkJwt(w http.ResponseWriter, r *http.Request) error {
return nil
}
if len(g.SecretKey) == 0 {
if len(g.secretKey) == 0 {
return nil
}
@ -149,7 +165,7 @@ func (g *Guard) checkJwt(w http.ResponseWriter, r *http.Request) error {
}
// Verify the token
token, err := DecodeJwt(g.SecretKey, tokenStr)
token, err := DecodeJwt(g.secretKey, tokenStr)
if err != nil {
glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err)
return ErrUnauthorized

35
go/storage/collection_settings.go

@ -1,5 +1,7 @@
package storage
import "sync"
type SettingKey int
const (
@ -9,6 +11,7 @@ const (
type CollectionSettings struct {
settings map[string]map[SettingKey]interface{}
mutex sync.RWMutex
}
func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold string) *CollectionSettings {
@ -24,23 +27,27 @@ func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold st
return c
}
func (c *CollectionSettings) get(collection string, key SettingKey) interface{} {
if m, ok := c.settings[collection]; ok {
func (cs *CollectionSettings) get(collection string, key SettingKey) interface{} {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
if m, ok := cs.settings[collection]; ok {
if v, ok := m[key]; ok {
return v
}
}
if m, ok := c.settings[""]; ok {
if m, ok := cs.settings[""]; ok {
return m[key]
}
return nil
}
func (c *CollectionSettings) set(collection string, key SettingKey, value interface{}) {
m := c.settings[collection]
func (cs *CollectionSettings) set(collection string, key SettingKey, value interface{}) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
m := cs.settings[collection]
if m == nil {
m = make(map[SettingKey]interface{})
c.settings[collection] = m
cs.settings[collection] = m
}
if value == nil {
//mustn't delete default setting
@ -52,22 +59,22 @@ func (c *CollectionSettings) set(collection string, key SettingKey, value interf
}
}
func (c *CollectionSettings) GetGarbageThreshold(collection string) string {
return c.get(collection, keyGarbageThreshold).(string)
func (cs *CollectionSettings) GetGarbageThreshold(collection string) string {
return cs.get(collection, keyGarbageThreshold).(string)
}
func (c *CollectionSettings) SetGarbageThreshold(collection string, gt string) {
c.set(collection, keyGarbageThreshold, gt)
func (cs *CollectionSettings) SetGarbageThreshold(collection string, gt string) {
cs.set(collection, keyGarbageThreshold, gt)
}
func (c *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement {
return c.get(collection, keyReplicatePlacement).(*ReplicaPlacement)
func (cs *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement {
return cs.get(collection, keyReplicatePlacement).(*ReplicaPlacement)
}
func (c *CollectionSettings) SetReplicaPlacement(collection, t string) error {
func (cs *CollectionSettings) SetReplicaPlacement(collection, t string) error {
rp, e := NewReplicaPlacementFromString(t)
if e == nil {
c.set(collection, keyReplicatePlacement, rp)
cs.set(collection, keyReplicatePlacement, rp)
}
return e
}

62
go/storage/store.go

@ -10,9 +10,9 @@ import (
"sync"
"encoding/json"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/security"
"github.com/chrislusf/seaweedfs/go/util"
"github.com/chrislusf/seaweedfs/go/weedpb"
)
@ -22,30 +22,30 @@ const (
)
type MasterNodes struct {
nodes []string
lastNode int
nodes []string
master string
}
func (mn *MasterNodes) String() string {
return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
return fmt.Sprintf("nodes:%v, master:%d", mn.nodes, mn.master)
}
func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
mn = &MasterNodes{nodes: []string{bootstrapNode}}
return
}
func (mn *MasterNodes) reset() {
glog.V(4).Infof("Resetting master nodes: %v", mn)
if len(mn.nodes) > 1 && mn.lastNode >= 0 {
glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes)
mn.lastNode = -mn.lastNode - 1
if len(mn.nodes) > 1 && mn.master != "" {
glog.V(0).Infof("Reset master %s from: %v", mn.master, mn.nodes)
mn.master = ""
}
}
func (mn *MasterNodes) findMaster() (string, error) {
if len(mn.nodes) == 0 {
return "", errors.New("No master node found!")
}
if mn.lastNode < 0 {
if mn.master == "" {
for _, m := range mn.nodes {
glog.V(4).Infof("Listing masters on %s", m)
if masters, e := operation.ListMasters(m); e == nil {
@ -53,7 +53,7 @@ func (mn *MasterNodes) findMaster() (string, error) {
continue
}
mn.nodes = append(masters, m)
mn.lastNode = rand.Intn(len(mn.nodes))
mn.master = mn.nodes[rand.Intn(len(mn.nodes))]
glog.V(2).Infof("current master nodes is %v", mn)
break
} else {
@ -61,10 +61,14 @@ func (mn *MasterNodes) findMaster() (string, error) {
}
}
}
if mn.lastNode < 0 {
if mn.master == "" {
return "", errors.New("No master node available!")
}
return mn.nodes[mn.lastNode], nil
return mn.master, nil
}
func (mn *MasterNodes) GetMaster() string {
return mn.master
}
/*
@ -212,12 +216,17 @@ func (s *Store) SetRack(rack string) {
}
func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.masterNodes = NewMasterNodes(bootstrapMaster)
}
func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
masterNode, e = s.masterNodes.findMaster()
if e != nil {
return
type SettingChanged func(s *weedpb.JoinResponse)
func (s *Store) SendHeartbeatToMaster(callback SettingChanged) error {
masterNode, err := s.masterNodes.findMaster()
if err != nil {
return err
}
var volumeMessages []*weedpb.VolumeInformationMessage
maxVolumeCount := 0
@ -270,15 +279,15 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
}
ret := &weedpb.JoinResponse{}
joinUrl := util.MkUrl(masterNode, "/dir/join2", nil)
glog.V(4).Infof("Connecting to %s ...", joinUrl)
if err := util.PostPbMsg(joinUrl, joinMsgV2, ret); err != nil {
glog.V(4).Infof("Sending heartbeat to %s ...", joinUrl)
if err = util.PostPbMsg(joinUrl, joinMsgV2, ret); err != nil {
s.masterNodes.reset()
return "", "", err
return err
}
if ret.Error != "" {
s.masterNodes.reset()
return masterNode, "", errors.New(ret.Error)
return errors.New(ret.Error)
}
if ret.JoinKey != s.GetJoinKey() {
if glog.V(4) {
@ -292,10 +301,11 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
if ret.VolumeSizeLimit != 0 {
s.SetVolumeSizeLimit(ret.VolumeSizeLimit)
}
if callback != nil {
callback(ret)
}
}
//todo
secretKey = security.Secret(ret.SecretKey)
return
return nil
}
func (s *Store) Close() {
for _, location := range s.Locations {
@ -315,7 +325,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
}
if s.GetVolumeSizeLimit() < v.ContentSize()+3*uint64(size) {
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.GetVolumeSizeLimit())
if _, _, e := s.SendHeartbeatToMaster(); e != nil {
if e := s.SendHeartbeatToMaster(nil); e != nil {
glog.V(0).Infoln("error when reporting size:", e)
}
}
@ -390,3 +400,7 @@ func (s *Store) SetJoinKey(k string) {
defer s.mutex.Unlock()
s.joinKey = k
}
func (s *Store) GetMaster() string {
return s.masterNodes.GetMaster()
}

2
go/storage/store_task_replication.go

@ -89,7 +89,7 @@ func (t *ReplicaTask) Commit() error {
volume, e = NewVolume(t.location.Directory, t.Collection, t.VID, t.s.needleMapKind, nil)
if e == nil {
t.location.AddVolume(t.VID, volume)
t.s.SendHeartbeatToMaster()
t.s.SendHeartbeatToMaster(nil)
}
return e
}

4
go/weed/weed_server/master_server_handlers_admin.go

@ -74,7 +74,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
writeJsonQuiet(w, r, http.StatusOK, JoinResult{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
SecretKey: string(ms.guard.SecretKey),
SecretKey: string(ms.guard.GetSecretKey()),
})
}
@ -106,7 +106,7 @@ func (ms *MasterServer) dirJoin2Handler(w http.ResponseWriter, r *http.Request)
if joinMsgV2.JoinKey != joinResp.JoinKey {
joinResp.JoinIp = joinMsgV2.Ip
joinResp.VolumeSizeLimit = ms.Topo.GetVolumeSizeLimit()
joinResp.SecretKey = string(ms.guard.SecretKey)
joinResp.SecretKey = string(ms.guard.GetSecretKey())
}
writeObjResponse(w, r, http.StatusOK, joinResp)
}

40
go/weed/weed_server/volume_server.go

@ -3,20 +3,16 @@ package weed_server
import (
"math/rand"
"net/http"
"sync"
"time"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/security"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/weedpb"
)
type VolumeServer struct {
masterNode string
mnLock sync.RWMutex
pulseSeconds int
dataCenter string
rack string
store *storage.Store
guard *security.Guard
@ -36,14 +32,14 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
readRedirect, readRemoteNeedle bool) *VolumeServer {
vs := &VolumeServer{
pulseSeconds: pulseSeconds,
dataCenter: dataCenter,
rack: rack,
FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
ReadRemoteNeedle: readRemoteNeedle,
}
vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, needleMapKind)
vs.store.SetBootstrapMaster(masterNode)
vs.store.SetDataCenter(dataCenter)
vs.store.SetRack(rack)
vs.guard = security.NewGuard(whiteList, "")
@ -77,23 +73,19 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
go func() {
connected := true
glog.V(0).Infof("Volume server bootstraps with master %s", masterNode)
glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode())
vs.store.SetBootstrapMaster(vs.GetMasterNode())
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
for {
glog.V(4).Infof("Volume server sending to master %s", vs.GetMasterNode())
master, secretKey, err := vs.store.SendHeartbeatToMaster()
err := vs.store.SendHeartbeatToMaster(func(s *weedpb.JoinResponse) {
vs.guard.SetSecretKey(s.SecretKey)
})
if err == nil {
if !connected {
connected = true
vs.SetMasterNode(master)
vs.guard.SecretKey = secretKey
glog.V(0).Infoln("Volume Server Connected with master at", master)
glog.V(0).Infoln("Volume Server Connected with master at", vs.GetMasterNode())
}
} else {
glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.masterNode, err)
glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.GetMasterNode(), err)
if connected {
connected = false
}
@ -110,15 +102,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
func (vs *VolumeServer) GetMasterNode() string {
vs.mnLock.RLock()
defer vs.mnLock.RUnlock()
return vs.masterNode
}
func (vs *VolumeServer) SetMasterNode(masterNode string) {
vs.mnLock.Lock()
defer vs.mnLock.Unlock()
vs.masterNode = masterNode
return vs.store.GetMaster()
}
func (vs *VolumeServer) Shutdown() {
@ -128,5 +112,5 @@ func (vs *VolumeServer) Shutdown() {
}
func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt {
return security.GenJwt(vs.guard.SecretKey, fileId)
return security.GenJwt(vs.guard.GetSecretKey(), fileId)
}

2
go/weed/weed_server/volume_server_handlers_ui.go

@ -28,7 +28,7 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
Counters *stats.ServerStats
}{
util.VERSION,
vs.masterNode,
vs.GetMasterNode(),
vs.store.Status(),
ds,
infos,

2
go/weedpb/system_message.proto

@ -54,8 +54,6 @@ message JoinResponse {
uint64 volume_size_limit = 4;
repeated CollectionSetting collection_settings = 5;
string secret_key = 6;
// repeated string master_peers = 7;
}
Loading…
Cancel
Save