From f3da0906edf63b9634cf0c2864d88ad4587587ad Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 18 Sep 2012 14:05:12 -0700 Subject: [PATCH] channel based visitor pattern --- weed-fs/src/pkg/directory/volume_mapping.go | 11 +---- weed-fs/src/pkg/topology/data_node.go | 5 ++- weed-fs/src/pkg/topology/node.go | 23 +++++++--- weed-fs/src/pkg/topology/rack.go | 4 ++ weed-fs/src/pkg/topology/topology.go | 50 +++++++++++++++------ 5 files changed, 61 insertions(+), 32 deletions(-) diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index da41b3510..99aa90a5f 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -97,18 +97,9 @@ func (m *Mapper) remove(machine *Machine) { foundIndex = index } } - m.vid2machines[v.Id] = deleteFromSlice(foundIndex,m.vid2machines[v.Id]) + m.vid2machines[v.Id] = append(m.vid2machines[v.Id][:foundIndex], m.vid2machines[v.Id][foundIndex+1:]...) } } -func deleteFromSlice(i int, slice []*Machine) []*Machine{ - switch i { - case -1://do nothing - case 0: slice = slice[1:] - case len(slice)-1: slice = slice[:len(slice)-1] - default: slice = append(slice[:i], slice[i+1:]...) - } - return slice -} func (m *Mapper) StartRefreshWritableVolumes() { go func() { diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go index 04c7cf111..2305dddd2 100644 --- a/weed-fs/src/pkg/topology/data_node.go +++ b/weed-fs/src/pkg/topology/data_node.go @@ -12,6 +12,7 @@ type DataNode struct { Port int PublicUrl string LastSeen int64 // unix time in seconds + Dead bool } func NewDataNode(id string) *DataNode { @@ -30,8 +31,8 @@ func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) { dn.volumes[v.Id] = v dn.UpAdjustActiveVolumeCountDelta(1) dn.UpAdjustMaxVolumeId(v.Id) - }else{ - dn.volumes[v.Id] = v + } else { + dn.volumes[v.Id] = v } } func (dn *DataNode) GetTopology() *Topology { diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go index ddaf9f0b2..52315ca2f 100644 --- a/weed-fs/src/pkg/topology/node.go +++ b/weed-fs/src/pkg/topology/node.go @@ -20,7 +20,7 @@ type Node interface { setParent(Node) LinkChildNode(node Node) UnlinkChildNode(nodeId NodeId) - CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId + CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) IsDataNode() bool Children() map[NodeId]Node @@ -146,25 +146,34 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { } } -func (n *NodeImpl) CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId { - var ret []storage.VolumeId +func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) { if n.IsRack() { for _, c := range n.Children() { dn := c.(*DataNode) //can not cast n to DataNode if dn.LastSeen > freshThreshHold { - continue + if !dn.Dead { + dn.Dead = true + n.GetTopology().chanDeadDataNodes <- dn + } } for _, v := range dn.volumes { if uint64(v.Size) < volumeSizeLimit { - ret = append(ret, v.Id) + n.GetTopology().chanFullVolumes <- v } } } } else { for _, c := range n.Children() { - ret = append(ret, c.CollectWritableVolumes(freshThreshHold, volumeSizeLimit)...) + c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit) } } +} - return ret +func (n *NodeImpl) GetTopology() *Topology{ + var p Node + p = n + for p.Parent() != nil { + p = p.Parent() + } + return p.(*Topology) } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index c819feb00..bbcd594a2 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -30,6 +30,10 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol dn := c.(*DataNode) if dn.MatchLocation(ip, port) { dn.LastSeen = time.Now().Unix() + if dn.Dead { + dn.Dead = false + r.GetTopology().chanRecoveredDataNodes <- dn + } dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) return dn } diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index a60768f14..3943f9555 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -20,6 +20,12 @@ type Topology struct { volumeSizeLimit uint64 sequence sequence.Sequencer + + chanDeadDataNodes chan *DataNode + chanRecoveredDataNodes chan *DataNode + chanFullVolumes chan *storage.VolumeInfo + chanIncomplemteVolumes chan *storage.VolumeInfo + chanRecoveredVolumes chan *storage.VolumeInfo } func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology { @@ -31,6 +37,11 @@ func NewTopology(id string, dirname string, filename string, volumeSizeLimit uin t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit t.sequence = sequence.NewSequencer(dirname, filename) + t.chanDeadDataNodes = make(chan *DataNode) + t.chanRecoveredDataNodes = make(chan *DataNode) + t.chanFullVolumes = make(chan *storage.VolumeInfo) + t.chanIncomplemteVolumes = make(chan *storage.VolumeInfo) + t.chanRecoveredVolumes = make(chan *storage.VolumeInfo) return t } @@ -95,6 +106,12 @@ func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, t.RegisterVolumeLayout(&v, dn) } } +func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) { + //TODO +} +func (t *Topology) UnRegisterDataNode(dn *DataNode) { + //TODO +} func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter { for _, c := range t.Children() { @@ -128,17 +145,24 @@ func (t *Topology) ToMap() interface{} { } func (t *Topology) StartRefreshWritableVolumes() { - go func() { - for { - t.refreshWritableVolumes() - time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() -} - -func (t *Topology) refreshWritableVolumes() { - freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval - //setting Writers, copy-on-write because of possible updating, this needs some future work! - t.CollectWritableVolumes(freshThreshHold, t.volumeSizeLimit) - //TODO: collect writable columes for each replication type + go func() { + for { + freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval + t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) + time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + go func() { + for { + select { + case <-t.chanIncomplemteVolumes: + case <-t.chanRecoveredVolumes: + case fv := <-t.chanFullVolumes: + t.SetVolumeReadOnly(fv) + case <-t.chanRecoveredDataNodes: + case dn := <-t.chanDeadDataNodes: + t.UnRegisterDataNode(dn) + } + } + }() }