diff --git a/go/security/guard.go b/go/security/guard.go index 3c7e3bce3..57b1f22d9 100644 --- a/go/security/guard.go +++ b/go/security/guard.go @@ -41,19 +41,35 @@ Referenced: https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go */ + +//TODO lock on data access? type Guard struct { whiteList []string - SecretKey Secret + secretKey Secret isActive bool } func NewGuard(whiteList []string, secretKey string) *Guard { - g := &Guard{whiteList: whiteList, SecretKey: Secret(secretKey)} - g.isActive = len(g.whiteList) != 0 || len(g.SecretKey) != 0 + g := &Guard{whiteList: whiteList, secretKey: Secret(secretKey)} + g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0 return g } +func (g *Guard) SetSecretKey(k string) { + g.secretKey = Secret(k) + g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0 +} + +func (g *Guard) GetSecretKey() Secret { + return g.secretKey +} + +func (g *Guard) SetWhiteList(l []string) { + g.whiteList = l + g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0 +} + func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { if !g.isActive { //if no security needed, just skip all checkings @@ -138,7 +154,7 @@ func (g *Guard) checkJwt(w http.ResponseWriter, r *http.Request) error { return nil } - if len(g.SecretKey) == 0 { + if len(g.secretKey) == 0 { return nil } @@ -149,7 +165,7 @@ func (g *Guard) checkJwt(w http.ResponseWriter, r *http.Request) error { } // Verify the token - token, err := DecodeJwt(g.SecretKey, tokenStr) + token, err := DecodeJwt(g.secretKey, tokenStr) if err != nil { glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err) return ErrUnauthorized diff --git a/go/storage/collection_settings.go b/go/storage/collection_settings.go index ec98b5d9b..db303e7e5 100644 --- a/go/storage/collection_settings.go +++ b/go/storage/collection_settings.go @@ -1,5 +1,7 @@ package storage +import "sync" + type SettingKey int const ( @@ -9,6 +11,7 @@ const ( type CollectionSettings struct { settings map[string]map[SettingKey]interface{} + mutex sync.RWMutex } func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold string) *CollectionSettings { @@ -24,23 +27,27 @@ func NewCollectionSettings(defaultReplicatePlacement, defaultGarbageThreshold st return c } -func (c *CollectionSettings) get(collection string, key SettingKey) interface{} { - if m, ok := c.settings[collection]; ok { +func (cs *CollectionSettings) get(collection string, key SettingKey) interface{} { + cs.mutex.RLock() + defer cs.mutex.RUnlock() + if m, ok := cs.settings[collection]; ok { if v, ok := m[key]; ok { return v } } - if m, ok := c.settings[""]; ok { + if m, ok := cs.settings[""]; ok { return m[key] } return nil } -func (c *CollectionSettings) set(collection string, key SettingKey, value interface{}) { - m := c.settings[collection] +func (cs *CollectionSettings) set(collection string, key SettingKey, value interface{}) { + cs.mutex.Lock() + defer cs.mutex.Unlock() + m := cs.settings[collection] if m == nil { m = make(map[SettingKey]interface{}) - c.settings[collection] = m + cs.settings[collection] = m } if value == nil { //mustn't delete default setting @@ -52,22 +59,22 @@ func (c *CollectionSettings) set(collection string, key SettingKey, value interf } } -func (c *CollectionSettings) GetGarbageThreshold(collection string) string { - return c.get(collection, keyGarbageThreshold).(string) +func (cs *CollectionSettings) GetGarbageThreshold(collection string) string { + return cs.get(collection, keyGarbageThreshold).(string) } -func (c *CollectionSettings) SetGarbageThreshold(collection string, gt string) { - c.set(collection, keyGarbageThreshold, gt) +func (cs *CollectionSettings) SetGarbageThreshold(collection string, gt string) { + cs.set(collection, keyGarbageThreshold, gt) } -func (c *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement { - return c.get(collection, keyReplicatePlacement).(*ReplicaPlacement) +func (cs *CollectionSettings) GetReplicaPlacement(collection string) *ReplicaPlacement { + return cs.get(collection, keyReplicatePlacement).(*ReplicaPlacement) } -func (c *CollectionSettings) SetReplicaPlacement(collection, t string) error { +func (cs *CollectionSettings) SetReplicaPlacement(collection, t string) error { rp, e := NewReplicaPlacementFromString(t) if e == nil { - c.set(collection, keyReplicatePlacement, rp) + cs.set(collection, keyReplicatePlacement, rp) } return e } diff --git a/go/storage/store.go b/go/storage/store.go index 53f5edebf..35a8a0d6f 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -10,9 +10,9 @@ import ( "sync" "encoding/json" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/security" "github.com/chrislusf/seaweedfs/go/util" "github.com/chrislusf/seaweedfs/go/weedpb" ) @@ -22,30 +22,30 @@ const ( ) type MasterNodes struct { - nodes []string - lastNode int + nodes []string + master string } func (mn *MasterNodes) String() string { - return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode) + return fmt.Sprintf("nodes:%v, master:%d", mn.nodes, mn.master) } func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) { - mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} + mn = &MasterNodes{nodes: []string{bootstrapNode}} return } func (mn *MasterNodes) reset() { glog.V(4).Infof("Resetting master nodes: %v", mn) - if len(mn.nodes) > 1 && mn.lastNode >= 0 { - glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes) - mn.lastNode = -mn.lastNode - 1 + if len(mn.nodes) > 1 && mn.master != "" { + glog.V(0).Infof("Reset master %s from: %v", mn.master, mn.nodes) + mn.master = "" } } func (mn *MasterNodes) findMaster() (string, error) { if len(mn.nodes) == 0 { return "", errors.New("No master node found!") } - if mn.lastNode < 0 { + if mn.master == "" { for _, m := range mn.nodes { glog.V(4).Infof("Listing masters on %s", m) if masters, e := operation.ListMasters(m); e == nil { @@ -53,7 +53,7 @@ func (mn *MasterNodes) findMaster() (string, error) { continue } mn.nodes = append(masters, m) - mn.lastNode = rand.Intn(len(mn.nodes)) + mn.master = mn.nodes[rand.Intn(len(mn.nodes))] glog.V(2).Infof("current master nodes is %v", mn) break } else { @@ -61,10 +61,14 @@ func (mn *MasterNodes) findMaster() (string, error) { } } } - if mn.lastNode < 0 { + if mn.master == "" { return "", errors.New("No master node available!") } - return mn.nodes[mn.lastNode], nil + return mn.master, nil +} + +func (mn *MasterNodes) GetMaster() string { + return mn.master } /* @@ -212,12 +216,17 @@ func (s *Store) SetRack(rack string) { } func (s *Store) SetBootstrapMaster(bootstrapMaster string) { + s.mutex.Lock() + defer s.mutex.Unlock() s.masterNodes = NewMasterNodes(bootstrapMaster) } -func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { - masterNode, e = s.masterNodes.findMaster() - if e != nil { - return + +type SettingChanged func(s *weedpb.JoinResponse) + +func (s *Store) SendHeartbeatToMaster(callback SettingChanged) error { + masterNode, err := s.masterNodes.findMaster() + if err != nil { + return err } var volumeMessages []*weedpb.VolumeInformationMessage maxVolumeCount := 0 @@ -270,15 +279,15 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S } ret := &weedpb.JoinResponse{} joinUrl := util.MkUrl(masterNode, "/dir/join2", nil) - glog.V(4).Infof("Connecting to %s ...", joinUrl) - if err := util.PostPbMsg(joinUrl, joinMsgV2, ret); err != nil { + glog.V(4).Infof("Sending heartbeat to %s ...", joinUrl) + if err = util.PostPbMsg(joinUrl, joinMsgV2, ret); err != nil { s.masterNodes.reset() - return "", "", err + return err } if ret.Error != "" { s.masterNodes.reset() - return masterNode, "", errors.New(ret.Error) + return errors.New(ret.Error) } if ret.JoinKey != s.GetJoinKey() { if glog.V(4) { @@ -292,10 +301,11 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S if ret.VolumeSizeLimit != 0 { s.SetVolumeSizeLimit(ret.VolumeSizeLimit) } + if callback != nil { + callback(ret) + } } - //todo - secretKey = security.Secret(ret.SecretKey) - return + return nil } func (s *Store) Close() { for _, location := range s.Locations { @@ -315,7 +325,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } if s.GetVolumeSizeLimit() < v.ContentSize()+3*uint64(size) { glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.GetVolumeSizeLimit()) - if _, _, e := s.SendHeartbeatToMaster(); e != nil { + if e := s.SendHeartbeatToMaster(nil); e != nil { glog.V(0).Infoln("error when reporting size:", e) } } @@ -390,3 +400,7 @@ func (s *Store) SetJoinKey(k string) { defer s.mutex.Unlock() s.joinKey = k } + +func (s *Store) GetMaster() string { + return s.masterNodes.GetMaster() +} diff --git a/go/storage/store_task_replication.go b/go/storage/store_task_replication.go index aa01e79bf..4461b2911 100644 --- a/go/storage/store_task_replication.go +++ b/go/storage/store_task_replication.go @@ -89,7 +89,7 @@ func (t *ReplicaTask) Commit() error { volume, e = NewVolume(t.location.Directory, t.Collection, t.VID, t.s.needleMapKind, nil) if e == nil { t.location.AddVolume(t.VID, volume) - t.s.SendHeartbeatToMaster() + t.s.SendHeartbeatToMaster(nil) } return e } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 767ecb85d..058da427c 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -74,7 +74,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { } writeJsonQuiet(w, r, http.StatusOK, JoinResult{ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, - SecretKey: string(ms.guard.SecretKey), + SecretKey: string(ms.guard.GetSecretKey()), }) } @@ -106,7 +106,7 @@ func (ms *MasterServer) dirJoin2Handler(w http.ResponseWriter, r *http.Request) if joinMsgV2.JoinKey != joinResp.JoinKey { joinResp.JoinIp = joinMsgV2.Ip joinResp.VolumeSizeLimit = ms.Topo.GetVolumeSizeLimit() - joinResp.SecretKey = string(ms.guard.SecretKey) + joinResp.SecretKey = string(ms.guard.GetSecretKey()) } writeObjResponse(w, r, http.StatusOK, joinResp) } diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index fd6832f7b..5d0fa35c8 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -3,20 +3,16 @@ package weed_server import ( "math/rand" "net/http" - "sync" "time" "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/security" "github.com/chrislusf/seaweedfs/go/storage" + "github.com/chrislusf/seaweedfs/go/weedpb" ) type VolumeServer struct { - masterNode string - mnLock sync.RWMutex pulseSeconds int - dataCenter string - rack string store *storage.Store guard *security.Guard @@ -36,14 +32,14 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, readRedirect, readRemoteNeedle bool) *VolumeServer { vs := &VolumeServer{ pulseSeconds: pulseSeconds, - dataCenter: dataCenter, - rack: rack, FixJpgOrientation: fixJpgOrientation, ReadRedirect: readRedirect, ReadRemoteNeedle: readRemoteNeedle, } - vs.SetMasterNode(masterNode) vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, needleMapKind) + vs.store.SetBootstrapMaster(masterNode) + vs.store.SetDataCenter(dataCenter) + vs.store.SetRack(rack) vs.guard = security.NewGuard(whiteList, "") @@ -77,23 +73,19 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, go func() { connected := true + glog.V(0).Infof("Volume server bootstraps with master %s", masterNode) - glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode()) - vs.store.SetBootstrapMaster(vs.GetMasterNode()) - vs.store.SetDataCenter(vs.dataCenter) - vs.store.SetRack(vs.rack) for { - glog.V(4).Infof("Volume server sending to master %s", vs.GetMasterNode()) - master, secretKey, err := vs.store.SendHeartbeatToMaster() + err := vs.store.SendHeartbeatToMaster(func(s *weedpb.JoinResponse) { + vs.guard.SetSecretKey(s.SecretKey) + }) if err == nil { if !connected { connected = true - vs.SetMasterNode(master) - vs.guard.SecretKey = secretKey - glog.V(0).Infoln("Volume Server Connected with master at", master) + glog.V(0).Infoln("Volume Server Connected with master at", vs.GetMasterNode()) } } else { - glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.masterNode, err) + glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.GetMasterNode(), err) if connected { connected = false } @@ -110,15 +102,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, } func (vs *VolumeServer) GetMasterNode() string { - vs.mnLock.RLock() - defer vs.mnLock.RUnlock() - return vs.masterNode -} - -func (vs *VolumeServer) SetMasterNode(masterNode string) { - vs.mnLock.Lock() - defer vs.mnLock.Unlock() - vs.masterNode = masterNode + return vs.store.GetMaster() } func (vs *VolumeServer) Shutdown() { @@ -128,5 +112,5 @@ func (vs *VolumeServer) Shutdown() { } func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(vs.guard.SecretKey, fileId) + return security.GenJwt(vs.guard.GetSecretKey(), fileId) } diff --git a/go/weed/weed_server/volume_server_handlers_ui.go b/go/weed/weed_server/volume_server_handlers_ui.go index 5925b5a88..5e1bd6478 100644 --- a/go/weed/weed_server/volume_server_handlers_ui.go +++ b/go/weed/weed_server/volume_server_handlers_ui.go @@ -28,7 +28,7 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) Counters *stats.ServerStats }{ util.VERSION, - vs.masterNode, + vs.GetMasterNode(), vs.store.Status(), ds, infos, diff --git a/go/weedpb/system_message.proto b/go/weedpb/system_message.proto index b780bb929..52e0520e7 100644 --- a/go/weedpb/system_message.proto +++ b/go/weedpb/system_message.proto @@ -54,8 +54,6 @@ message JoinResponse { uint64 volume_size_limit = 4; repeated CollectionSetting collection_settings = 5; string secret_key = 6; -// repeated string master_peers = 7; - }