diff --git a/go/topology/data_node.go b/go/topology/data_node.go index fe0926e85..01419a791 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -3,6 +3,7 @@ package topology import ( "fmt" "strconv" + "sync" "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" @@ -10,6 +11,7 @@ import ( type DataNode struct { NodeImpl + sync.RWMutex volumes map[storage.VolumeId]storage.VolumeInfo Ip string Port int @@ -28,10 +30,14 @@ func NewDataNode(id string) *DataNode { } func (dn *DataNode) String() string { + dn.RLock() + defer dn.RUnlock() return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) } func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { + dn.Lock() + defer dn.Unlock() if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v dn.UpAdjustVolumeCountDelta(1) @@ -49,6 +55,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } + dn.RLock() for vid, v := range dn.volumes { if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) @@ -58,12 +65,22 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo dn.UpAdjustActiveVolumeCountDelta(-1) } } //TODO: adjust max volume id, if need to reclaim volume ids + dn.RUnlock() for _, v := range actualVolumes { dn.AddOrUpdateVolume(v) } return } +func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { + dn.RLock() + for _, v := range dn.volumes { + ret = append(ret, v) + } + dn.RUnlock() + return ret +} + func (dn *DataNode) GetDataCenter() *DataCenter { return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) } diff --git a/go/topology/node.go b/go/topology/node.go index f48f18236..356c0abeb 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -231,7 +231,7 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi n.GetTopology().chanDeadDataNodes <- dn } } - for _, v := range dn.volumes { + for _, v := range dn.GetVolumes() { if uint64(v.Size) >= volumeSizeLimit { //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) n.GetTopology().chanFullVolumes <- v diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 3879fbaba..8a3cc5a89 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -54,7 +54,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { return true } func (t *Topology) UnRegisterDataNode(dn *DataNode) { - for _, v := range dn.volumes { + for _, v := range dn.GetVolumes() { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) vl.SetVolumeUnavailable(dn, v.Id) @@ -65,7 +65,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { dn.Parent().UnlinkChildNode(dn.Id()) } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { - for _, v := range dn.volumes { + for _, v := range dn.GetVolumes() { vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index 6a1423ca8..dff11aaad 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -39,7 +39,7 @@ func (t *Topology) ToVolumeMap() interface{} { for _, d := range rack.Children() { dn := d.(*DataNode) var volumes []interface{} - for _, v := range dn.volumes { + for _, v := range dn.GetVolumes() { volumes = append(volumes, v) } dataNodes[d.Id()] = volumes