Browse Source

channel based visitor pattern

pull/2/head
Chris Lu 13 years ago
parent
commit
f3da0906ed
  1. 11
      weed-fs/src/pkg/directory/volume_mapping.go
  2. 1
      weed-fs/src/pkg/topology/data_node.go
  3. 23
      weed-fs/src/pkg/topology/node.go
  4. 4
      weed-fs/src/pkg/topology/rack.go
  5. 38
      weed-fs/src/pkg/topology/topology.go

11
weed-fs/src/pkg/directory/volume_mapping.go

@ -97,18 +97,9 @@ func (m *Mapper) remove(machine *Machine) {
foundIndex = index foundIndex = index
} }
} }
m.vid2machines[v.Id] = deleteFromSlice(foundIndex,m.vid2machines[v.Id])
m.vid2machines[v.Id] = append(m.vid2machines[v.Id][:foundIndex], m.vid2machines[v.Id][foundIndex+1:]...)
} }
} }
func deleteFromSlice(i int, slice []*Machine) []*Machine{
switch i {
case -1://do nothing
case 0: slice = slice[1:]
case len(slice)-1: slice = slice[:len(slice)-1]
default: slice = append(slice[:i], slice[i+1:]...)
}
return slice
}
func (m *Mapper) StartRefreshWritableVolumes() { func (m *Mapper) StartRefreshWritableVolumes() {
go func() { go func() {

1
weed-fs/src/pkg/topology/data_node.go

@ -12,6 +12,7 @@ type DataNode struct {
Port int Port int
PublicUrl string PublicUrl string
LastSeen int64 // unix time in seconds LastSeen int64 // unix time in seconds
Dead bool
} }
func NewDataNode(id string) *DataNode { func NewDataNode(id string) *DataNode {

23
weed-fs/src/pkg/topology/node.go

@ -20,7 +20,7 @@ type Node interface {
setParent(Node) setParent(Node)
LinkChildNode(node Node) LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId) UnlinkChildNode(nodeId NodeId)
CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId
CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
IsDataNode() bool IsDataNode() bool
Children() map[NodeId]Node Children() map[NodeId]Node
@ -146,25 +146,34 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
} }
} }
func (n *NodeImpl) CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId {
var ret []storage.VolumeId
func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
if n.IsRack() { if n.IsRack() {
for _, c := range n.Children() { for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode dn := c.(*DataNode) //can not cast n to DataNode
if dn.LastSeen > freshThreshHold { if dn.LastSeen > freshThreshHold {
continue
if !dn.Dead {
dn.Dead = true
n.GetTopology().chanDeadDataNodes <- dn
}
} }
for _, v := range dn.volumes { for _, v := range dn.volumes {
if uint64(v.Size) < volumeSizeLimit { if uint64(v.Size) < volumeSizeLimit {
ret = append(ret, v.Id)
n.GetTopology().chanFullVolumes <- v
} }
} }
} }
} else { } else {
for _, c := range n.Children() { for _, c := range n.Children() {
ret = append(ret, c.CollectWritableVolumes(freshThreshHold, volumeSizeLimit)...)
c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
}
} }
} }
return ret
func (n *NodeImpl) GetTopology() *Topology{
var p Node
p = n
for p.Parent() != nil {
p = p.Parent()
}
return p.(*Topology)
} }

4
weed-fs/src/pkg/topology/rack.go

@ -30,6 +30,10 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn := c.(*DataNode) dn := c.(*DataNode)
if dn.MatchLocation(ip, port) { if dn.MatchLocation(ip, port) {
dn.LastSeen = time.Now().Unix() dn.LastSeen = time.Now().Unix()
if dn.Dead {
dn.Dead = false
r.GetTopology().chanRecoveredDataNodes <- dn
}
dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
return dn return dn
} }

38
weed-fs/src/pkg/topology/topology.go

@ -20,6 +20,12 @@ type Topology struct {
volumeSizeLimit uint64 volumeSizeLimit uint64
sequence sequence.Sequencer sequence sequence.Sequencer
chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan *storage.VolumeInfo
chanIncomplemteVolumes chan *storage.VolumeInfo
chanRecoveredVolumes chan *storage.VolumeInfo
} }
func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology { func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology {
@ -31,6 +37,11 @@ func NewTopology(id string, dirname string, filename string, volumeSizeLimit uin
t.pulse = int64(pulse) t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit t.volumeSizeLimit = volumeSizeLimit
t.sequence = sequence.NewSequencer(dirname, filename) t.sequence = sequence.NewSequencer(dirname, filename)
t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan *storage.VolumeInfo)
t.chanIncomplemteVolumes = make(chan *storage.VolumeInfo)
t.chanRecoveredVolumes = make(chan *storage.VolumeInfo)
return t return t
} }
@ -95,6 +106,12 @@ func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string,
t.RegisterVolumeLayout(&v, dn) t.RegisterVolumeLayout(&v, dn)
} }
} }
func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) {
//TODO
}
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
//TODO
}
func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter { func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter {
for _, c := range t.Children() { for _, c := range t.Children() {
@ -130,15 +147,22 @@ func (t *Topology) ToMap() interface{} {
func (t *Topology) StartRefreshWritableVolumes() { func (t *Topology) StartRefreshWritableVolumes() {
go func() { go func() {
for { for {
t.refreshWritableVolumes()
freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
} }
}() }()
go func() {
for {
select {
case <-t.chanIncomplemteVolumes:
case <-t.chanRecoveredVolumes:
case fv := <-t.chanFullVolumes:
t.SetVolumeReadOnly(fv)
case <-t.chanRecoveredDataNodes:
case dn := <-t.chanDeadDataNodes:
t.UnRegisterDataNode(dn)
} }
func (t *Topology) refreshWritableVolumes() {
freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
//setting Writers, copy-on-write because of possible updating, this needs some future work!
t.CollectWritableVolumes(freshThreshHold, t.volumeSizeLimit)
//TODO: collect writable columes for each replication type
}
}()
} }
Loading…
Cancel
Save