diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 28dca68b4..41d7bd18c 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -126,9 +126,9 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { if err := AllocateVolume(server, vid, repType); err == nil { - vi := &storage.VolumeInfo{Id: vid, Size: 0} + vi := storage.VolumeInfo{Id: vid, Size: 0} server.AddOrUpdateVolume(vi) - topo.RegisterVolumeLayout(vi, server) + topo.RegisterVolumeLayout(&vi, server) fmt.Println("Created Volume", vid, "on", server) } else { fmt.Println("Failed to assign", vid, "to", servers) diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index 5edf7c6eb..c661090e8 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -13,6 +13,7 @@ func NewDataCenter(id string) *DataCenter { dc.id = NodeId(id) dc.nodeType = "DataCenter" dc.children = make(map[NodeId]Node) + dc.NodeImpl.value = dc return dc } diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go index 2305dddd2..cb625a41b 100644 --- a/weed-fs/src/pkg/topology/data_node.go +++ b/weed-fs/src/pkg/topology/data_node.go @@ -7,7 +7,7 @@ import ( type DataNode struct { NodeImpl - volumes map[storage.VolumeId]*storage.VolumeInfo + volumes map[storage.VolumeId]storage.VolumeInfo Ip string Port int PublicUrl string @@ -19,15 +19,12 @@ func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo) + s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.NodeImpl.value = s return s } -func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId { - dn.AddOrUpdateVolume(&storage.VolumeInfo{Id: vid}) - return vid -} -func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) { - if dn.volumes[v.Id] == nil { +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { + if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v dn.UpAdjustActiveVolumeCountDelta(1) dn.UpAdjustMaxVolumeId(v.Id) diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go index 52315ca2f..044b832ef 100644 --- a/weed-fs/src/pkg/topology/node.go +++ b/weed-fs/src/pkg/topology/node.go @@ -17,7 +17,7 @@ type Node interface { GetActiveVolumeCount() int GetMaxVolumeCount() int GetMaxVolumeId() storage.VolumeId - setParent(Node) + SetParent(Node) LinkChildNode(node Node) UnlinkChildNode(nodeId NodeId) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) @@ -25,6 +25,8 @@ type Node interface { IsDataNode() bool Children() map[NodeId]Node Parent() Node + + GetValue()interface{} //get reference to the topology,dc,rack,datanode } type NodeImpl struct { id NodeId @@ -36,6 +38,7 @@ type NodeImpl struct { //for rack, data center, topology nodeType string + value interface{} } func (n *NodeImpl) IsDataNode() bool { @@ -59,7 +62,7 @@ func (n *NodeImpl) Id() NodeId { func (n *NodeImpl) FreeSpace() int { return n.maxVolumeCount - n.activeVolumeCount } -func (n *NodeImpl) setParent(node Node) { +func (n *NodeImpl) SetParent(node Node) { n.parent = node } func (n *NodeImpl) Children() map[NodeId]Node { @@ -68,6 +71,9 @@ func (n *NodeImpl) Children() map[NodeId]Node { func (n *NodeImpl) Parent() Node { return n.parent } +func (n *NodeImpl) GetValue()interface{}{ + return n.value +} func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) { ret := false var assignedNode *DataNode @@ -130,14 +136,14 @@ func (n *NodeImpl) LinkChildNode(node Node) { n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) - node.setParent(n) + node.SetParent(n) fmt.Println(n, "adds child", node) } } func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { node := n.children[nodeId] - node.setParent(nil) + node.SetParent(nil) if node != nil { delete(n.children, node.Id()) n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) @@ -150,15 +156,15 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi if n.IsRack() { for _, c := range n.Children() { dn := c.(*DataNode) //can not cast n to DataNode - if dn.LastSeen > freshThreshHold { + if dn.LastSeen < freshThreshHold { if !dn.Dead { dn.Dead = true n.GetTopology().chanDeadDataNodes <- dn } } for _, v := range dn.volumes { - if uint64(v.Size) < volumeSizeLimit { - n.GetTopology().chanFullVolumes <- v + if uint64(v.Size) >= volumeSizeLimit { + n.GetTopology().chanFullVolumes <- &v } } } @@ -175,5 +181,5 @@ func (n *NodeImpl) GetTopology() *Topology{ for p.Parent() != nil { p = p.Parent() } - return p.(*Topology) + return p.GetValue().(*Topology) } diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index bbcd594a2..16520d14a 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -15,6 +15,7 @@ func NewRack(id string) *Rack { r.id = NodeId(id) r.nodeType = "Rack" r.children = make(map[NodeId]Node) + r.NodeImpl.value = r return r } diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 3943f9555..fc2c902b0 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -2,6 +2,7 @@ package topology import ( "errors" + "fmt" "math/rand" "pkg/directory" "pkg/sequence" @@ -32,16 +33,20 @@ func NewTopology(id string, dirname string, filename string, volumeSizeLimit uin t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" + t.NodeImpl.value = t t.children = make(map[NodeId]Node) t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit + t.sequence = sequence.NewSequencer(dirname, filename) + t.chanDeadDataNodes = make(chan *DataNode) t.chanRecoveredDataNodes = make(chan *DataNode) t.chanFullVolumes = make(chan *storage.VolumeInfo) t.chanIncomplemteVolumes = make(chan *storage.VolumeInfo) - t.chanRecoveredVolumes = make(chan *storage.VolumeInfo) + t.chanRecoveredVolumes = make(chan *storage.VolumeInfo) + return t } @@ -102,16 +107,10 @@ func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, rack := dc.GetOrCreateRack(ip) dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) for _, v := range volumeInfos { - dn.AddOrUpdateVolume(&v) + dn.AddOrUpdateVolume(v) t.RegisterVolumeLayout(&v, dn) } } -func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) { - //TODO -} -func (t *Topology) UnRegisterDataNode(dn *DataNode) { - //TODO -} func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter { for _, c := range t.Children() { @@ -155,14 +154,41 @@ func (t *Topology) StartRefreshWritableVolumes() { go func() { for { select { - case <-t.chanIncomplemteVolumes: - case <-t.chanRecoveredVolumes: - case fv := <-t.chanFullVolumes: - t.SetVolumeReadOnly(fv) - case <-t.chanRecoveredDataNodes: + case v := <-t.chanIncomplemteVolumes: + fmt.Println("Volume", v, "is incomplete!") + case v := <-t.chanRecoveredVolumes: + fmt.Println("Volume", v, "is recovered!") + case v := <-t.chanFullVolumes: + t.SetVolumeReadOnly(v) + fmt.Println("Volume", v, "is full!") + case dn := <-t.chanRecoveredDataNodes: + t.RegisterRecoveredDataNode(dn) + fmt.Println("DataNode", dn, "is back alive!") case dn := <-t.chanDeadDataNodes: t.UnRegisterDataNode(dn) + fmt.Println("DataNode", dn, "is dead!") } } }() } +func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) { + vl := t.GetVolumeLayout(volumeInfo.RepType) + vl.SetVolumeReadOnly(volumeInfo.Id) +} +func (t *Topology) SetVolumeWritable(volumeInfo *storage.VolumeInfo) { + vl := t.GetVolumeLayout(volumeInfo.RepType) + vl.SetVolumeWritable(volumeInfo.Id) +} +func (t *Topology) UnRegisterDataNode(dn *DataNode) { + for _, v := range dn.volumes { + fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn) + t.SetVolumeReadOnly(&v) + } +} +func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { + for _, v := range dn.volumes { + if uint64(v.Size) < t.volumeSizeLimit { + t.SetVolumeWritable(&v) + } + } +} diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go index 031840e88..0c3841c72 100644 --- a/weed-fs/src/pkg/topology/volume_layout.go +++ b/weed-fs/src/pkg/topology/volume_layout.go @@ -56,6 +56,26 @@ func (vl *VolumeLayout) GetActiveVolumeCount() int { return len(vl.writables) } +func (vl *VolumeLayout) SetVolumeReadOnly(vid storage.VolumeId) bool { + for i, v := range vl.writables{ + if v == vid { + vl.writables = append(vl.writables[:i],vl.writables[i+1:]...) + return true + } + } + return false +} + +func (vl *VolumeLayout) SetVolumeWritable(vid storage.VolumeId) bool { + for _, v := range vl.writables{ + if v == vid { + return false + } + } + vl.writables = append(vl.writables, vid) + return true +} + func (vl *VolumeLayout) ToMap() interface{} { m := make(map[string]interface{}) m["replication"] = vl.repType.String()