Browse Source

fix some concurrent bug in topology

pull/279/head
tnextday 10 years ago
parent
commit
72b497debc
  1. 4
      go/storage/volume_info.go
  2. 8
      go/topology/data_center.go
  3. 61
      go/topology/data_node.go
  4. 92
      go/topology/node.go
  5. 17
      go/topology/rack.go
  6. 18
      go/topology/topology.go
  7. 6
      go/topology/topology_event_handling.go
  8. 2
      go/topology/topology_map.go
  9. 2
      go/topology/volume_growth.go

4
go/storage/volume_info.go

@ -19,8 +19,8 @@ type VolumeInfo struct {
ReadOnly bool
}
func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) {
vi = VolumeInfo{
func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi *VolumeInfo, err error) {
vi = &VolumeInfo{
Id: VolumeId(*m.Id),
Size: *m.Size,
Collection: *m.Collection,

8
go/topology/data_center.go

@ -14,13 +14,11 @@ func NewDataCenter(id string) *DataCenter {
}
func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
for _, c := range dc.Children() {
rack := c.(*Rack)
if string(rack.Id()) == rackName {
rack := dc.GetChildren(NodeId(rackName)).(*Rack)
if rack != nil {
return rack
}
}
rack := NewRack(rackName)
rack = NewRack(rackName)
dc.LinkChildNode(rack)
return rack
}

61
go/topology/data_node.go

@ -11,7 +11,7 @@ import (
type DataNode struct {
NodeImpl
volumes map[storage.VolumeId]storage.VolumeInfo
volumes map[storage.VolumeId]*storage.VolumeInfo
Ip string
Port int
PublicUrl string
@ -23,7 +23,7 @@ func NewDataNode(id string) *DataNode {
s := &DataNode{}
s.id = NodeId(id)
s.nodeType = "DataNode"
s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo)
s.NodeImpl.value = s
return s
}
@ -32,34 +32,69 @@ func (dn *DataNode) String() string {
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
}
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) {
if dn.GetVolume(v.Id) == nil {
dn.SetVolume(v)
dn.UpAdjustVolumeCountDelta(1)
if !v.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(1)
}
dn.UpAdjustMaxVolumeId(v.Id)
} else {
dn.volumes[v.Id] = v
dn.SetVolume(v)
}
return
}
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
func (dn *DataNode) SetVolume(v *storage.VolumeInfo) {
dn.mutex.Lock()
defer dn.mutex.Unlock()
dn.volumes[v.Id] = v
}
func (dn *DataNode) GetVolume(vid storage.VolumeId) (v *storage.VolumeInfo) {
dn.mutex.RLock()
defer dn.mutex.RUnlock()
return dn.volumes[vid]
}
func (dn *DataNode) DeleteVolume(vid storage.VolumeId) {
dn.mutex.Lock()
defer dn.mutex.Unlock()
delete(dn.volumes, vid)
}
func (dn *DataNode) Volumes() (list []*storage.VolumeInfo) {
dn.mutex.RLock()
defer dn.mutex.RUnlock()
list = make([]*storage.VolumeInfo, 0, len(dn.volumes))
for _, v := range dn.volumes {
list = append(list, v)
}
return list
}
func (dn *DataNode) UpdateVolumes(actualVolumes []*storage.VolumeInfo) (deletedVolumes []*storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]*storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
}
}
dn.mutex.Lock()
for _, v := range deletedVolumes {
glog.V(0).Infoln("Deleting volume id:", v.Id)
dn.DeleteVolume(v.Id)
dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1)
}
} //TODO: adjust max volume id, if need to reclaim volume ids
dn.mutex.Unlock()
//TODO: adjust max volume id, if need to reclaim volume ids
for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v)
}
@ -67,11 +102,11 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo
}
func (dn *DataNode) GetDataCenter() *DataCenter {
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
return dn.Parent().Parent().GetValue().(*DataCenter)
}
func (dn *DataNode) GetRack() *Rack {
return dn.Parent().(*NodeImpl).value.(*Rack)
return dn.Parent().GetValue().(*Rack)
}
func (dn *DataNode) GetTopology() *Topology {

92
go/topology/node.go

@ -6,6 +6,8 @@ import (
"sort"
"sync"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
)
@ -35,7 +37,7 @@ type Node interface {
IsDataNode() bool
IsRack() bool
IsDataCenter() bool
Children() map[NodeId]Node
Children() []Node
Parent() Node
GetValue() interface{} //get reference to the topology,dc,rack,datanode
@ -53,6 +55,7 @@ type NodeImpl struct {
//for rack, data center, topology
nodeType string
value interface{}
mutex sync.RWMutex
}
type NodePicker interface {
@ -66,6 +69,8 @@ type PickNodesFn func(nodes []Node, count int) []Node
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error) {
n.mutex.RLock()
defer n.mutex.RUnlock()
candidates := make([]Node, 0, len(n.children))
var errs []string
for _, node := range n.children {
@ -127,6 +132,8 @@ func (n *NodeImpl) IsDataCenter() bool {
return n.nodeType == "DataCenter"
}
func (n *NodeImpl) String() string {
n.mutex.RLock()
defer n.mutex.RUnlock()
if n.parent != nil {
return n.parent.String() + ":" + string(n.id)
}
@ -136,21 +143,68 @@ func (n *NodeImpl) Id() NodeId {
return n.id
}
func (n *NodeImpl) FreeSpace() int {
n.mutex.RLock()
defer n.mutex.RUnlock()
return n.maxVolumeCount - n.volumeCount - n.plannedVolumeCount
}
func (n *NodeImpl) SetParent(node Node) {
n.mutex.Lock()
defer n.mutex.Unlock()
n.parent = node
}
func (n *NodeImpl) Children() map[NodeId]Node {
return n.children
func (n *NodeImpl) GetChildren(id NodeId) Node {
n.mutex.RLock()
defer n.mutex.RUnlock()
return n.children[id]
}
func (n *NodeImpl) SetChildren(c Node) {
n.mutex.Lock()
defer n.mutex.Unlock()
n.children[c.Id()] = c
}
func (n *NodeImpl) DeleteChildren(id NodeId) {
n.mutex.Lock()
defer n.mutex.Unlock()
delete(n.children, id)
}
func (n *NodeImpl) FindChildren(filter func(Node) bool) Node {
n.mutex.RLock()
defer n.mutex.RUnlock()
for _, c := range n.children {
if filter(c) {
return c
}
}
return nil
}
func (n *NodeImpl) Children() (children []Node) {
n.mutex.RLock()
defer n.mutex.RUnlock()
children = make([]Node, 0, len(n.children))
for _, c := range n.children {
children = append(children, c)
}
return
}
func (n *NodeImpl) Parent() Node {
n.mutex.RLock()
defer n.mutex.RUnlock()
return n.parent
}
func (n *NodeImpl) GetValue() interface{} {
n.mutex.RLock()
defer n.mutex.RUnlock()
return n.value
}
func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
n.mutex.RLock()
defer n.mutex.RUnlock()
for _, node := range n.children {
freeSpace := node.FreeSpace()
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
@ -174,18 +228,24 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
}
func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
n.mutex.Lock()
defer n.mutex.Unlock()
n.maxVolumeCount += maxVolumeCountDelta
if n.parent != nil {
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
n.mutex.Lock()
defer n.mutex.Unlock()
n.volumeCount += volumeCountDelta
if n.parent != nil {
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
n.mutex.Lock()
defer n.mutex.Unlock()
n.activeVolumeCount += activeVolumeCountDelta
if n.parent != nil {
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
@ -193,6 +253,8 @@ func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) {
}
func (n *NodeImpl) UpAdjustPlannedVolumeCountDelta(delta int) { //can be negative
n.mutex.Lock()
defer n.mutex.Unlock()
n.plannedVolumeCount += delta
if n.parent != nil {
n.parent.UpAdjustPlannedVolumeCountDelta(delta)
@ -200,6 +262,8 @@ func (n *NodeImpl) UpAdjustPlannedVolumeCountDelta(delta int) { //can be negativ
}
func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
n.mutex.Lock()
defer n.mutex.Unlock()
if n.maxVolumeId < vid {
n.maxVolumeId = vid
if n.parent != nil {
@ -208,25 +272,35 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
}
}
func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
n.mutex.RLock()
defer n.mutex.RUnlock()
return n.maxVolumeId
}
func (n *NodeImpl) GetVolumeCount() int {
n.mutex.RLock()
defer n.mutex.RUnlock()
return n.volumeCount
}
func (n *NodeImpl) GetActiveVolumeCount() int {
n.mutex.RLock()
defer n.mutex.RUnlock()
return n.activeVolumeCount
}
func (n *NodeImpl) GetMaxVolumeCount() int {
n.mutex.RLock()
defer n.mutex.RUnlock()
return n.maxVolumeCount
}
func (n *NodeImpl) GetPlannedVolumeCount() int {
n.mutex.RLock()
defer n.mutex.RUnlock()
return n.plannedVolumeCount
}
func (n *NodeImpl) LinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
if n.GetChildren(node.Id()) == nil {
n.SetChildren(node)
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
@ -237,10 +311,10 @@ func (n *NodeImpl) LinkChildNode(node Node) {
}
func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node := n.children[nodeId]
node := n.GetChildren(nodeId)
node.SetParent(nil)
if node != nil {
delete(n.children, node.Id())
n.DeleteChildren(node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
@ -258,10 +332,10 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
n.GetTopology().chanDeadDataNodes <- dn
}
}
for _, v := range dn.volumes {
for _, v := range dn.Volumes() {
if uint64(v.Size) >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- v
n.GetTopology().chanFullVolumes <- *v
}
}
}

17
go/topology/rack.go

@ -20,18 +20,15 @@ func NewRack(id string) *Rack {
}
func (r *Rack) FindDataNode(ip string, port int) *DataNode {
for _, c := range r.Children() {
n := r.FindChildren(func(c Node) bool {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
return dn
}
}
return nil
return dn.MatchLocation(ip, port)
})
return n.(*DataNode)
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
if dn := r.FindDataNode(ip, port); dn != nil {
dn.LastSeen = time.Now().Unix()
if dn.Dead {
dn.Dead = false
@ -40,7 +37,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
}
return dn
}
}
dn := NewDataNode(net.JoinHostPort(ip, strconv.Itoa(port)))
dn.Ip = ip
dn.Port = port

18
go/topology/topology.go

@ -139,12 +139,12 @@ func (t *Topology) DeleteCollection(collectionName string) {
t.collectionMap.Delete(collectionName)
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.Ttl).RegisterVolume(&v, dn)
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.Ttl).RegisterVolume(v, dn)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
func (t *Topology) UnRegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
glog.Infof("removing volume info:%+v", v)
t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(&v, dn)
t.GetVolumeLayout(v.Collection, v.Ttl).UnRegisterVolume(v, dn)
}
func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
@ -159,7 +159,7 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
int(*joinMessage.Port), *joinMessage.PublicUrl,
int(*joinMessage.MaxVolumeCount))
var volumeInfos []storage.VolumeInfo
var volumeInfos []*storage.VolumeInfo
for _, v := range joinMessage.Volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
volumeInfos = append(volumeInfos, vi)
@ -179,13 +179,11 @@ func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
for _, c := range t.Children() {
dc := c.(*DataCenter)
if string(dc.Id()) == dcName {
dc := t.GetChildren(NodeId(dcName)).(*DataCenter)
if dc != nil {
return dc
}
}
dc := NewDataCenter(dcName)
dc = NewDataCenter(dcName)
t.LinkChildNode(dc)
return dc
}

6
go/topology/topology_event_handling.go

@ -59,7 +59,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
return true
}
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
for _, v := range dn.Volumes() {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
vl := t.GetVolumeLayout(v.Collection, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id)
@ -70,9 +70,9 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
dn.Parent().UnlinkChildNode(dn.Id())
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
for _, v := range dn.Volumes() {
vl := t.GetVolumeLayout(v.Collection, v.Ttl)
if vl.IsWritable(&v) {
if vl.IsWritable(v) {
vl.SetVolumeAvailable(dn, v.Id)
}
}

2
go/topology/topology_map.go

@ -40,7 +40,7 @@ func (t *Topology) ToVolumeMap() interface{} {
for _, d := range rack.Children() {
dn := d.(*DataNode)
var volumes []interface{}
for _, v := range dn.volumes {
for _, v := range dn.Volumes() {
volumes = append(volumes, v)
}
dataNodes[d.Id()] = volumes

2
go/topology/volume_growth.go

@ -88,7 +88,7 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i
func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
if err := AllocateVolume(server, vid, option); err == nil {
vi := storage.VolumeInfo{
vi := &storage.VolumeInfo{
Id: vid,
Size: 0,
Collection: option.Collection,

Loading…
Cancel
Save