From 72b497debc5eb5de2b185f997a10db9af522e7a3 Mon Sep 17 00:00:00 2001 From: tnextday Date: Sat, 12 Mar 2016 17:39:56 +0800 Subject: [PATCH] fix some concurrent bug in topology --- go/storage/volume_info.go | 4 +- go/topology/data_center.go | 10 ++- go/topology/data_node.go | 65 +++++++++++++----- go/topology/node.go | 92 +++++++++++++++++++++++--- go/topology/rack.go | 29 ++++---- go/topology/topology.go | 20 +++--- go/topology/topology_event_handling.go | 6 +- go/topology/topology_map.go | 2 +- go/topology/volume_growth.go | 2 +- 9 files changed, 166 insertions(+), 64 deletions(-) diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index 659faf213..1dab44cb2 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -19,8 +19,8 @@ type VolumeInfo struct { ReadOnly bool } -func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) { - vi = VolumeInfo{ +func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi *VolumeInfo, err error) { + vi = &VolumeInfo{ Id: VolumeId(*m.Id), Size: *m.Size, Collection: *m.Collection, diff --git a/go/topology/data_center.go b/go/topology/data_center.go index bcf2dfd31..9d992a2cd 100644 --- a/go/topology/data_center.go +++ b/go/topology/data_center.go @@ -14,13 +14,11 @@ func NewDataCenter(id string) *DataCenter { } func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - for _, c := range dc.Children() { - rack := c.(*Rack) - if string(rack.Id()) == rackName { - return rack - } + rack := dc.GetChildren(NodeId(rackName)).(*Rack) + if rack != nil { + return rack } - rack := NewRack(rackName) + rack = NewRack(rackName) dc.LinkChildNode(rack) return rack } diff --git a/go/topology/data_node.go b/go/topology/data_node.go index 4fea289f8..b6090fb76 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -11,7 +11,7 @@ import ( type DataNode struct { NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo + volumes map[storage.VolumeId]*storage.VolumeInfo Ip string Port int PublicUrl string @@ -23,7 +23,7 @@ func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo) s.NodeImpl.value = s return s } @@ -32,34 +32,69 @@ func (dn *DataNode) String() string { return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) } -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { - if _, ok := dn.volumes[v.Id]; !ok { - dn.volumes[v.Id] = v +func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) { + if dn.GetVolume(v.Id) == nil { + dn.SetVolume(v) dn.UpAdjustVolumeCountDelta(1) if !v.ReadOnly { dn.UpAdjustActiveVolumeCountDelta(1) } dn.UpAdjustMaxVolumeId(v.Id) } else { - dn.volumes[v.Id] = v + dn.SetVolume(v) } return } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { - actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) +func (dn *DataNode) SetVolume(v *storage.VolumeInfo) { + dn.mutex.Lock() + defer dn.mutex.Unlock() + dn.volumes[v.Id] = v +} + +func (dn *DataNode) GetVolume(vid storage.VolumeId) (v *storage.VolumeInfo) { + dn.mutex.RLock() + defer dn.mutex.RUnlock() + return dn.volumes[vid] +} + +func (dn *DataNode) DeleteVolume(vid storage.VolumeId) { + dn.mutex.Lock() + defer dn.mutex.Unlock() + delete(dn.volumes, vid) +} + +func (dn *DataNode) Volumes() (list []*storage.VolumeInfo) { + dn.mutex.RLock() + defer dn.mutex.RUnlock() + list = make([]*storage.VolumeInfo, 0, len(dn.volumes)) + for _, v := range dn.volumes { + list = append(list, v) + } + return list +} + +func (dn *DataNode) UpdateVolumes(actualVolumes []*storage.VolumeInfo) (deletedVolumes []*storage.VolumeInfo) { + actualVolumeMap := make(map[storage.VolumeId]*storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } for vid, v := range dn.volumes { if _, ok := actualVolumeMap[vid]; !ok { - glog.V(0).Infoln("Deleting volume id:", vid) - delete(dn.volumes, vid) deletedVolumes = append(deletedVolumes, v) - dn.UpAdjustVolumeCountDelta(-1) - dn.UpAdjustActiveVolumeCountDelta(-1) } - } //TODO: adjust max volume id, if need to reclaim volume ids + } + dn.mutex.Lock() + for _, v := range deletedVolumes { + glog.V(0).Infoln("Deleting volume id:", v.Id) + dn.DeleteVolume(v.Id) + dn.UpAdjustVolumeCountDelta(-1) + dn.UpAdjustActiveVolumeCountDelta(-1) + } + dn.mutex.Unlock() + + //TODO: adjust max volume id, if need to reclaim volume ids + for _, v := range actualVolumes { dn.AddOrUpdateVolume(v) } @@ -67,11 +102,11 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo } func (dn *DataNode) GetDataCenter() *DataCenter { - return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) + return dn.Parent().Parent().GetValue().(*DataCenter) } func (dn *DataNode) GetRack() *Rack { - return dn.Parent().(*NodeImpl).value.(*Rack) + return dn.Parent().GetValue().(*Rack) } func (dn *DataNode) GetTopology() *Topology { diff --git a/go/topology/node.go b/go/topology/node.go index 655e496b1..d403ea68f 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -6,6 +6,8 @@ import ( "sort" + "sync" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" ) @@ -35,7 +37,7 @@ type Node interface { IsDataNode() bool IsRack() bool IsDataCenter() bool - Children() map[NodeId]Node + Children() []Node Parent() Node GetValue() interface{} //get reference to the topology,dc,rack,datanode @@ -53,6 +55,7 @@ type NodeImpl struct { //for rack, data center, topology nodeType string value interface{} + mutex sync.RWMutex } type NodePicker interface { @@ -66,6 +69,8 @@ type PickNodesFn func(nodes []Node, count int) []Node // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot func (n *NodeImpl) PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error) { + n.mutex.RLock() + defer n.mutex.RUnlock() candidates := make([]Node, 0, len(n.children)) var errs []string for _, node := range n.children { @@ -127,6 +132,8 @@ func (n *NodeImpl) IsDataCenter() bool { return n.nodeType == "DataCenter" } func (n *NodeImpl) String() string { + n.mutex.RLock() + defer n.mutex.RUnlock() if n.parent != nil { return n.parent.String() + ":" + string(n.id) } @@ -136,21 +143,68 @@ func (n *NodeImpl) Id() NodeId { return n.id } func (n *NodeImpl) FreeSpace() int { + n.mutex.RLock() + defer n.mutex.RUnlock() return n.maxVolumeCount - n.volumeCount - n.plannedVolumeCount } + func (n *NodeImpl) SetParent(node Node) { + n.mutex.Lock() + defer n.mutex.Unlock() n.parent = node } -func (n *NodeImpl) Children() map[NodeId]Node { - return n.children + +func (n *NodeImpl) GetChildren(id NodeId) Node { + n.mutex.RLock() + defer n.mutex.RUnlock() + return n.children[id] +} + +func (n *NodeImpl) SetChildren(c Node) { + n.mutex.Lock() + defer n.mutex.Unlock() + n.children[c.Id()] = c +} + +func (n *NodeImpl) DeleteChildren(id NodeId) { + n.mutex.Lock() + defer n.mutex.Unlock() + delete(n.children, id) +} + +func (n *NodeImpl) FindChildren(filter func(Node) bool) Node { + n.mutex.RLock() + defer n.mutex.RUnlock() + for _, c := range n.children { + if filter(c) { + return c + } + } + return nil +} + +func (n *NodeImpl) Children() (children []Node) { + n.mutex.RLock() + defer n.mutex.RUnlock() + children = make([]Node, 0, len(n.children)) + for _, c := range n.children { + children = append(children, c) + } + return } func (n *NodeImpl) Parent() Node { + n.mutex.RLock() + defer n.mutex.RUnlock() return n.parent } func (n *NodeImpl) GetValue() interface{} { + n.mutex.RLock() + defer n.mutex.RUnlock() return n.value } func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { + n.mutex.RLock() + defer n.mutex.RUnlock() for _, node := range n.children { freeSpace := node.FreeSpace() // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) @@ -174,18 +228,24 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { } func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative + n.mutex.Lock() + defer n.mutex.Unlock() n.maxVolumeCount += maxVolumeCountDelta if n.parent != nil { n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative + n.mutex.Lock() + defer n.mutex.Unlock() n.volumeCount += volumeCountDelta if n.parent != nil { n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) } } func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative + n.mutex.Lock() + defer n.mutex.Unlock() n.activeVolumeCount += activeVolumeCountDelta if n.parent != nil { n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) @@ -193,6 +253,8 @@ func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { } func (n *NodeImpl) UpAdjustPlannedVolumeCountDelta(delta int) { //can be negative + n.mutex.Lock() + defer n.mutex.Unlock() n.plannedVolumeCount += delta if n.parent != nil { n.parent.UpAdjustPlannedVolumeCountDelta(delta) @@ -200,6 +262,8 @@ func (n *NodeImpl) UpAdjustPlannedVolumeCountDelta(delta int) { //can be negativ } func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative + n.mutex.Lock() + defer n.mutex.Unlock() if n.maxVolumeId < vid { n.maxVolumeId = vid if n.parent != nil { @@ -208,25 +272,35 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative } } func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { + n.mutex.RLock() + defer n.mutex.RUnlock() return n.maxVolumeId } func (n *NodeImpl) GetVolumeCount() int { + n.mutex.RLock() + defer n.mutex.RUnlock() return n.volumeCount } func (n *NodeImpl) GetActiveVolumeCount() int { + n.mutex.RLock() + defer n.mutex.RUnlock() return n.activeVolumeCount } func (n *NodeImpl) GetMaxVolumeCount() int { + n.mutex.RLock() + defer n.mutex.RUnlock() return n.maxVolumeCount } func (n *NodeImpl) GetPlannedVolumeCount() int { + n.mutex.RLock() + defer n.mutex.RUnlock() return n.plannedVolumeCount } func (n *NodeImpl) LinkChildNode(node Node) { - if n.children[node.Id()] == nil { - n.children[node.Id()] = node + if n.GetChildren(node.Id()) == nil { + n.SetChildren(node) n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) @@ -237,10 +311,10 @@ func (n *NodeImpl) LinkChildNode(node Node) { } func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { - node := n.children[nodeId] + node := n.GetChildren(nodeId) node.SetParent(nil) if node != nil { - delete(n.children, node.Id()) + n.DeleteChildren(node.Id()) n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) @@ -258,10 +332,10 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi n.GetTopology().chanDeadDataNodes <- dn } } - for _, v := range dn.volumes { + for _, v := range dn.Volumes() { if uint64(v.Size) >= volumeSizeLimit { //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) - n.GetTopology().chanFullVolumes <- v + n.GetTopology().chanFullVolumes <- *v } } } diff --git a/go/topology/rack.go b/go/topology/rack.go index 634103f95..4f4b3b573 100644 --- a/go/topology/rack.go +++ b/go/topology/rack.go @@ -20,27 +20,24 @@ func NewRack(id string) *Rack { } func (r *Rack) FindDataNode(ip string, port int) *DataNode { - for _, c := range r.Children() { + n := r.FindChildren(func(c Node) bool { dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - return dn - } - } - return nil + return dn.MatchLocation(ip, port) + }) + return n.(*DataNode) } + func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - dn.LastSeen = time.Now().Unix() - if dn.Dead { - dn.Dead = false - r.GetTopology().chanRecoveredDataNodes <- dn - dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) - } - return dn + if dn := r.FindDataNode(ip, port); dn != nil { + dn.LastSeen = time.Now().Unix() + if dn.Dead { + dn.Dead = false + r.GetTopology().chanRecoveredDataNodes <- dn + dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) } + return dn } + dn := NewDataNode(net.JoinHostPort(ip, strconv.Itoa(port))) dn.Ip = ip dn.Port = port diff --git a/go/topology/topology.go b/go/topology/topology.go index b310c465c..04c69de9c 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -139,12 +139,12 @@ func (t *Topology) DeleteCollection(collectionName string) { t.collectionMap.Delete(collectionName) } -func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.Ttl).RegisterVolume(&v, dn) +func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { + t.GetVolumeLayout(v.Collection, v.Ttl).RegisterVolume(v, dn) } -func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { +func (t *Topology) UnRegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { glog.Infof("removing volume info:%+v", v) - t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(v, dn) } func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { @@ -159,7 +159,7 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { dn = rack.GetOrCreateDataNode(*joinMessage.Ip, int(*joinMessage.Port), *joinMessage.PublicUrl, int(*joinMessage.MaxVolumeCount)) - var volumeInfos []storage.VolumeInfo + var volumeInfos []*storage.VolumeInfo for _, v := range joinMessage.Volumes { if vi, err := storage.NewVolumeInfo(v); err == nil { volumeInfos = append(volumeInfos, vi) @@ -179,13 +179,11 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { } func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { - for _, c := range t.Children() { - dc := c.(*DataCenter) - if string(dc.Id()) == dcName { - return dc - } + dc := t.GetChildren(NodeId(dcName)).(*DataCenter) + if dc != nil { + return dc } - dc := NewDataCenter(dcName) + dc = NewDataCenter(dcName) t.LinkChildNode(dc) return dc } diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 8dc808039..6fa62a20b 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -59,7 +59,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { return true } func (t *Topology) UnRegisterDataNode(dn *DataNode) { - for _, v := range dn.volumes { + for _, v := range dn.Volumes() { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) vl := t.GetVolumeLayout(v.Collection, v.Ttl) vl.SetVolumeUnavailable(dn, v.Id) @@ -70,9 +70,9 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { dn.Parent().UnlinkChildNode(dn.Id()) } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { - for _, v := range dn.volumes { + for _, v := range dn.Volumes() { vl := t.GetVolumeLayout(v.Collection, v.Ttl) - if vl.IsWritable(&v) { + if vl.IsWritable(v) { vl.SetVolumeAvailable(dn, v.Id) } } diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index 66ec1593f..c75cb1e41 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -40,7 +40,7 @@ func (t *Topology) ToVolumeMap() interface{} { for _, d := range rack.Children() { dn := d.(*DataNode) var volumes []interface{} - for _, v := range dn.volumes { + for _, v := range dn.Volumes() { volumes = append(volumes, v) } dataNodes[d.Id()] = volumes diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 00826faee..d475b3aad 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -88,7 +88,7 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { for _, server := range servers { if err := AllocateVolume(server, vid, option); err == nil { - vi := storage.VolumeInfo{ + vi := &storage.VolumeInfo{ Id: vid, Size: 0, Collection: option.Collection,