diff --git a/Makefile b/Makefile index 52c2ef8ca..14c07fa86 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ BINARY = weed -GO_FLAGS = #-v +GO_FLAGS = -race #-v SOURCE_DIR = ./go/weed/ all: build diff --git a/go/topology/data_center.go b/go/topology/data_center.go index 9d992a2cd..a2063f2f4 100644 --- a/go/topology/data_center.go +++ b/go/topology/data_center.go @@ -14,11 +14,11 @@ func NewDataCenter(id string) *DataCenter { } func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - rack := dc.GetChildren(NodeId(rackName)).(*Rack) - if rack != nil { - return rack + n := dc.GetChildren(NodeId(rackName)) + if n != nil { + return n.(*Rack) } - rack = NewRack(rackName) + rack := NewRack(rackName) dc.LinkChildNode(rack) return rack } diff --git a/go/topology/data_node.go b/go/topology/data_node.go index b6090fb76..f0f261ea1 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -7,16 +7,17 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" "net" + "time" ) type DataNode struct { NodeImpl volumes map[storage.VolumeId]*storage.VolumeInfo + lastSeen int64 // unix time in seconds + dead bool Ip string Port int PublicUrl string - LastSeen int64 // unix time in seconds - Dead bool } func NewDataNode(id string) *DataNode { @@ -29,7 +30,33 @@ func NewDataNode(id string) *DataNode { } func (dn *DataNode) String() string { - return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) + dn.mutex.RLock() + defer dn.mutex.RUnlock() + return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.dead) +} + +func (dn *DataNode) LastSeen() int64 { + dn.mutex.RLock() + defer dn.mutex.RUnlock() + return dn.lastSeen +} + +func (dn *DataNode) UpdateLastSeen() { + dn.mutex.Lock() + defer dn.mutex.Unlock() + dn.lastSeen = time.Now().Unix() +} + +func (dn *DataNode) IsDead() bool { + dn.mutex.RLock() + defer dn.mutex.RUnlock() + return dn.dead +} + +func (dn *DataNode) SetDead(b bool) { + dn.mutex.Lock() + defer dn.mutex.Unlock() + dn.dead = b } func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) { diff --git a/go/topology/node.go b/go/topology/node.go index d403ea68f..6d018fe39 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -326,9 +326,9 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi if n.IsRack() { for _, c := range n.Children() { dn := c.(*DataNode) //can not cast n to DataNode - if dn.LastSeen < freshThreshHold { - if !dn.Dead { - dn.Dead = true + if dn.LastSeen() < freshThreshHold { + if !dn.IsDead() { + dn.SetDead(true) n.GetTopology().chanDeadDataNodes <- dn } } diff --git a/go/topology/rack.go b/go/topology/rack.go index 4f4b3b573..66725fc2f 100644 --- a/go/topology/rack.go +++ b/go/topology/rack.go @@ -3,7 +3,6 @@ package topology import ( "net" "strconv" - "time" ) type Rack struct { @@ -24,14 +23,17 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { dn := c.(*DataNode) return dn.MatchLocation(ip, port) }) + if n == nil { + return nil + } return n.(*DataNode) } func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { if dn := r.FindDataNode(ip, port); dn != nil { - dn.LastSeen = time.Now().Unix() - if dn.Dead { - dn.Dead = false + dn.UpdateLastSeen() + if dn.IsDead() { + dn.SetDead(false) r.GetTopology().chanRecoveredDataNodes <- dn dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) } @@ -48,7 +50,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol } dn.PublicUrl = publicUrl dn.maxVolumeCount = maxVolumeCount - dn.LastSeen = time.Now().Unix() + dn.UpdateLastSeen() r.LinkChildNode(dn) return dn } diff --git a/go/topology/topology.go b/go/topology/topology.go index 04c69de9c..b8a22e94f 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -22,7 +22,7 @@ type Topology struct { Sequence sequence.Sequencer CollectionSettings *storage.CollectionSettings configuration *Configuration - RaftServer raft.Server + raftServer raft.Server chanDeadDataNodes chan *DataNode chanRecoveredDataNodes chan *DataNode @@ -51,24 +51,36 @@ func NewTopology(id string, confFile string, cs *storage.CollectionSettings, seq return t, err } +func (t *Topology) GetRaftServer() raft.Server { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.raftServer +} + +func (t *Topology) SetRaftServer(s raft.Server) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.raftServer = s +} + func (t *Topology) IsLeader() bool { if leader, e := t.Leader(); e == nil { - return leader == t.RaftServer.Name() + return leader == t.GetRaftServer().Name() } return false } func (t *Topology) Leader() (string, error) { l := "" - if t.RaftServer != nil { - l = t.RaftServer.Leader() + if t.GetRaftServer() != nil { + l = t.GetRaftServer().Leader() } else { return "", errors.New("Raft Server not ready yet!") } if l == "" { // We are a single node cluster, we are the leader - return t.RaftServer.Name(), errors.New("Raft Server not initialized!") + return t.GetRaftServer().Name(), errors.New("Raft Server not initialized!") } return l, nil @@ -106,7 +118,7 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) (vl *VolumeLo func (t *Topology) NextVolumeId() storage.VolumeId { vid := t.GetMaxVolumeId() next := vid.Next() - go t.RaftServer.Do(NewMaxVolumeIdCommand(next)) + go t.GetRaftServer().Do(NewMaxVolumeIdCommand(next)) return next } @@ -179,11 +191,11 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { } func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { - dc := t.GetChildren(NodeId(dcName)).(*DataCenter) - if dc != nil { - return dc + n := t.GetChildren(NodeId(dcName)) + if n != nil { + return n.(*DataCenter) } - dc = NewDataCenter(dcName) + dc := NewDataCenter(dcName) t.LinkChildNode(dc) return dc } diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go index 901d27ce0..ec71d6115 100644 --- a/go/topology/volume_location_list.go +++ b/go/topology/volume_location_list.go @@ -65,7 +65,7 @@ func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { var changed bool for _, dnl := range dnll.list { - if dnl.LastSeen < freshThreshHold { + if dnl.LastSeen() < freshThreshHold { changed = true break } @@ -73,7 +73,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { if changed { var l []*DataNode for _, dnl := range dnll.list { - if dnl.LastSeen >= freshThreshHold { + if dnl.LastSeen() >= freshThreshHold { l = append(l, dnl) } } diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 62ec5c9aa..07d9a4722 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -86,17 +86,17 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, } func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { - ms.Topo.RaftServer = raftServer.raftServer - ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + ms.Topo.SetRaftServer(raftServer.raftServer) + ms.Topo.GetRaftServer().AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + if ms.Topo.GetRaftServer().Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.GetRaftServer().Name(), "]", ms.Topo.GetRaftServer().Leader(), "becomes leader.") } }) if ms.Topo.IsLeader() { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") + glog.V(0).Infoln("[", ms.Topo.GetRaftServer().Name(), "]", "I am the leader!") } else { - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + if ms.Topo.GetRaftServer().Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.GetRaftServer().Name(), "]", ms.Topo.GetRaftServer().Leader(), "is the leader.") } } } @@ -105,16 +105,16 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { f(w, r) - } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { + } else if ms.Topo.GetRaftServer() != nil && ms.Topo.GetRaftServer().Leader() != "" { ms.bounedLeaderChan <- 1 defer func() { <-ms.bounedLeaderChan }() - targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) + targetUrl, err := url.Parse("http://" + ms.Topo.GetRaftServer().Leader()) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, - fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err)) + fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.GetRaftServer().Leader(), err)) return } - glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader()) + glog.V(4).Infoln("proxying to leader", ms.Topo.GetRaftServer().Leader()) proxy := httputil.NewSingleHostReverseProxy(targetUrl) director := proxy.Director proxy.Director = func(req *http.Request) { diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 748c98fda..b06440d85 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -171,7 +171,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r * if ms.Topo.IsLeader() { deleteForClientHandler(w, r, ms.selfUrl(r)) } else { - deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader()) + deleteForClientHandler(w, r, ms.Topo.GetRaftServer().Leader()) } } diff --git a/go/weed/weed_server/master_server_handlers_ui.go b/go/weed/weed_server/master_server_handlers_ui.go index af7261ab3..8bbc34461 100644 --- a/go/weed/weed_server/master_server_handlers_ui.go +++ b/go/weed/weed_server/master_server_handlers_ui.go @@ -21,8 +21,8 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) }{ util.VERSION, ms.Topo.ToMap(), - ms.Topo.RaftServer.Leader(), - ms.Topo.RaftServer.Peers(), + ms.Topo.GetRaftServer().Leader(), + ms.Topo.GetRaftServer().Peers(), infos, serverStats, }