Browse Source

interface and default implementation for node

pull/2/head
Chris Lu 13 years ago
parent
commit
b95c9860ed
  1. 1
      weed-fs/bin/.gitignore
  2. 5
      weed-fs/pkg/linux_amd64/pkg/.gitignore
  3. 9
      weed-fs/src/pkg/topology/data_center.go
  4. 136
      weed-fs/src/pkg/topology/node.go
  5. 9
      weed-fs/src/pkg/topology/rack.go
  6. 17
      weed-fs/src/pkg/topology/server.go
  7. 48
      weed-fs/src/pkg/topology/topo_test.go
  8. 22
      weed-fs/src/pkg/topology/topology.go

1
weed-fs/bin/.gitignore

@ -0,0 +1 @@
/weed

5
weed-fs/pkg/linux_amd64/pkg/.gitignore

@ -0,0 +1,5 @@
/directory.a
/replication.a
/storage.a
/topology.a
/util.a

9
weed-fs/src/pkg/topology/data_center.go

@ -4,12 +4,13 @@ import (
) )
type DataCenter struct { type DataCenter struct {
Node
NodeImpl
ipRange IpRange ipRange IpRange
} }
func NewDataCenter(id NodeId) *DataCenter{
func NewDataCenter(id string) *DataCenter{
dc := &DataCenter{} dc := &DataCenter{}
dc.Node = *NewNode()
dc.Node.Id = id
dc.id = NodeId(id)
dc.nodeType = "DataCenter"
dc.children = make(map[NodeId]Node)
return dc return dc
} }

136
weed-fs/src/pkg/topology/node.go

@ -6,35 +6,64 @@ import (
) )
type NodeId string type NodeId string
type Node struct {
Id NodeId
type Node interface {
Id() NodeId
String() string
FreeSpace() int
ReserveOneVolume(r int, vid storage.VolumeId) (bool, *Server)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
UpAdjustMaxVolumeId(vid storage.VolumeId)
GetActiveVolumeCount() int
GetMaxVolumeCount() int
GetMaxVolumeId() storage.VolumeId
setParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
IsServer() bool
}
type NodeImpl struct {
id NodeId
activeVolumeCount int activeVolumeCount int
maxVolumeCount int maxVolumeCount int
parent *Node
children map[NodeId]*Node
parent Node
children map[NodeId]Node
maxVolumeId storage.VolumeId maxVolumeId storage.VolumeId
}
func NewNode() *Node {
n := &Node{}
n.children = make(map[NodeId]*Node)
return n
}
func (n *Node) String() string {
if n.parent!=nil {
return n.parent.String()+":"+string(n.Id)
}
return string(n.Id)
//for rack, data center, topology
nodeType string
} }
func (n *Node) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *Node) {
if n.children == nil {
return true, n
func (n *NodeImpl) IsServer() bool {
return n.nodeType == "Server"
}
func (n *NodeImpl) IsRack() bool {
return n.nodeType == "Rack"
}
func (n *NodeImpl) IsDataCenter() bool {
return n.nodeType == "DataCenter"
}
func (n *NodeImpl) String() string {
if n.parent != nil {
return n.parent.String() + ":" + string(n.id)
} }
return string(n.id)
}
func (n *NodeImpl) Id() NodeId {
return n.id
}
func (n *NodeImpl) FreeSpace() int {
return n.maxVolumeCount - n.activeVolumeCount
}
func (n *NodeImpl) setParent(node Node) {
n.parent = node
}
func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *Server) {
ret := false ret := false
var assignedNode *Node
var assignedNode *Server
for _, node := range n.children { for _, node := range n.children {
freeSpace := node.maxVolumeCount - node.activeVolumeCount
freeSpace := node.FreeSpace()
fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 { if freeSpace <= 0 {
continue continue
@ -42,6 +71,10 @@ func (n *Node) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *Node) {
if r >= freeSpace { if r >= freeSpace {
r -= freeSpace r -= freeSpace
} else { } else {
if node.IsServer() && node.FreeSpace()>0 {
fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
return true, node.(*Server)
}
ret, assignedNode = node.ReserveOneVolume(r, vid) ret, assignedNode = node.ReserveOneVolume(r, vid)
if ret { if ret {
break break
@ -51,48 +84,57 @@ func (n *Node) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *Node) {
return ret, assignedNode return ret, assignedNode
} }
func (n *Node) AddVolume(v *storage.VolumeInfo) {
if n.maxVolumeId < v.Id {
n.maxVolumeId = v.Id
func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
n.maxVolumeCount += maxVolumeCountDelta
if n.parent != nil {
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
} }
n.activeVolumeCount++
fmt.Println(n.Id, "adds 1, volumeCount =", n.activeVolumeCount)
}
func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
n.activeVolumeCount += activeVolumeCountDelta
if n.parent != nil { if n.parent != nil {
n.parent.AddVolume(v)
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
} }
} }
func (n *Node) AddMaxVolumeCount(maxVolumeCount int) {//can be negative
n.maxVolumeCount += maxVolumeCount
func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
if n.maxVolumeId < vid {
n.maxVolumeId = vid
if n.parent != nil { if n.parent != nil {
n.parent.AddMaxVolumeCount(maxVolumeCount)
n.parent.UpAdjustMaxVolumeId(vid)
}
} }
} }
func (n *Node) GetMaxVolumeId() storage.VolumeId {
func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
return n.maxVolumeId return n.maxVolumeId
} }
func (n *NodeImpl) GetActiveVolumeCount() int {
return n.activeVolumeCount
}
func (n *NodeImpl) GetMaxVolumeCount() int {
return n.maxVolumeCount
}
func (n *Node) AddNode(node *Node) {
if n.children[node.Id] == nil {
n.children[node.Id] = node
n.activeVolumeCount += node.activeVolumeCount
n.maxVolumeCount += node.maxVolumeCount
fmt.Println(n.Id, "adds", node.Id, "volumeCount =", n.activeVolumeCount)
func (n *NodeImpl) LinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
n.activeVolumeCount += node.GetActiveVolumeCount()
n.maxVolumeCount += node.GetMaxVolumeCount()
node.setParent(n)
if n.maxVolumeId < node.GetMaxVolumeId() {
n.maxVolumeId = node.GetMaxVolumeId()
}
fmt.Println(n, "adds", node, "volumeCount =", n.activeVolumeCount)
} }
} }
func (n *Node) RemoveNode(nodeId NodeId) {
func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node := n.children[nodeId] node := n.children[nodeId]
node.setParent(nil)
if node != nil { if node != nil {
delete(n.children, node.Id)
n.activeVolumeCount -= node.activeVolumeCount
n.maxVolumeCount -= node.maxVolumeCount
p := n.parent
for p != nil {
p.activeVolumeCount -= node.activeVolumeCount
p.maxVolumeCount -= node.maxVolumeCount
p = p.parent
}
fmt.Println(n.Id, "removes", node.Id, "volumeCount =", n.activeVolumeCount)
delete(n.children, node.Id())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount)
} }
} }

9
weed-fs/src/pkg/topology/rack.go

@ -4,13 +4,14 @@ import (
) )
type Rack struct { type Rack struct {
Node
NodeImpl
ipRange IpRange ipRange IpRange
} }
func NewRack(id NodeId) *Rack {
func NewRack(id string) *Rack {
r := &Rack{} r := &Rack{}
r.Node = *NewNode()
r.Node.Id = id
r.id = NodeId(id)
r.nodeType = "Rack"
r.children = make(map[NodeId]Node)
return r return r
} }

17
weed-fs/src/pkg/topology/server.go

@ -1,28 +1,31 @@
package topology package topology
import ( import (
"pkg/storage"
_ "fmt" _ "fmt"
"pkg/storage"
) )
type Server struct { type Server struct {
Node
NodeImpl
volumes map[storage.VolumeId]*storage.VolumeInfo volumes map[storage.VolumeId]*storage.VolumeInfo
Ip NodeId Ip NodeId
Port int Port int
PublicUrl string PublicUrl string
} }
func NewServer(id NodeId) *Server{
func NewServer(id string) *Server {
s := &Server{} s := &Server{}
s.Node.Id = id
s.id = NodeId(id)
s.nodeType = "Server"
s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo) s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo)
return s return s
} }
func (s *Server) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId { func (s *Server) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId {
s.AddVolume(&storage.VolumeInfo{Id:vid, Size: 32*1024*1024*1024})
s.AddVolume(&storage.VolumeInfo{Id: vid, Size: 32 * 1024 * 1024 * 1024})
return vid return vid
} }
func (s *Server) AddVolume(v *storage.VolumeInfo){
func (s *Server) AddVolume(v *storage.VolumeInfo) {
s.volumes[v.Id] = v s.volumes[v.Id] = v
s.Node.AddVolume(v)
s.UpAdjustActiveVolumeCountDelta(1)
s.UpAdjustMaxVolumeId(v.Id)
} }

48
weed-fs/src/pkg/topology/topo_test.go

@ -70,7 +70,7 @@ var topologyLayout = `
} }
` `
func setup() *Topology {
func setup(topologyLayout string) *Topology {
var data interface{} var data interface{}
err := json.Unmarshal([]byte(topologyLayout), &data) err := json.Unmarshal([]byte(topologyLayout), &data)
if err != nil { if err != nil {
@ -80,36 +80,33 @@ func setup() *Topology {
printMap(data) printMap(data)
//need to connect all nodes first before server adding volumes //need to connect all nodes first before server adding volumes
topo := NewTopology(NodeId("mynetwork"))
topo := NewTopology("mynetwork")
mTopology := data.(map[string]interface{}) mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology { for dcKey, dcValue := range mTopology {
dc := NewDataCenter(NodeId(dcKey))
dc.Node.parent = &topo.Node
dc := NewDataCenter(dcKey)
dcMap := dcValue.(map[string]interface{}) dcMap := dcValue.(map[string]interface{})
topo.Node.AddNode(&dc.Node)
topo.LinkChildNode(dc)
for rackKey, rackValue := range dcMap { for rackKey, rackValue := range dcMap {
rack := NewRack(NodeId(rackKey))
rack.Node.parent = &dc.Node
rack := NewRack(rackKey)
rackMap := rackValue.(map[string]interface{}) rackMap := rackValue.(map[string]interface{})
dc.Node.AddNode(&rack.Node)
dc.LinkChildNode(rack)
for serverKey, serverValue := range rackMap { for serverKey, serverValue := range rackMap {
server := NewServer(NodeId(serverKey))
server.Node.parent = &rack.Node
server := NewServer(serverKey)
serverMap := serverValue.(map[string]interface{}) serverMap := serverValue.(map[string]interface{})
rack.Node.AddNode(&server.Node)
rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) { for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{}) m := v.(map[string]interface{})
vi := &storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))} vi := &storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))}
server.AddVolume(vi) server.AddVolume(vi)
} }
server.Node.AddMaxVolumeCount(int(serverMap["limit"].(float64)))
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
} }
} }
} }
fmt.Println("topology:", *topo) fmt.Println("topology:", *topo)
bytes, err := json.Marshal(topo.Node.children)
bytes, err := json.Marshal(topo.children)
if err != nil { if err != nil {
fmt.Println("json error:", err) fmt.Println("json error:", err)
} }
@ -140,30 +137,23 @@ func printMap(mm interface{}) {
} }
} }
func TestAddVolume(t *testing.T) {
topo := setup()
v := &storage.VolumeInfo{}
topo.AddVolume(v)
}
func TestRemoveDataCenter(t *testing.T) { func TestRemoveDataCenter(t *testing.T) {
topo := setup()
topo.RemoveNode(NodeId("dc2"))
if topo.activeVolumeCount != 15 {
topo := setup(topologyLayout)
topo.UnlinkChildNode(NodeId("dc2"))
if topo.GetActiveVolumeCount() != 15 {
t.Fail() t.Fail()
} }
topo.RemoveNode(NodeId("dc3"))
if topo.activeVolumeCount != 12 {
topo.UnlinkChildNode(NodeId("dc3"))
if topo.GetActiveVolumeCount() != 12 {
t.Fail() t.Fail()
} }
} }
func TestReserveOneVolume(t *testing.T) { func TestReserveOneVolume(t *testing.T) {
topo := setup()
topo := setup(topologyLayout)
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
rand.Seed(1)
ret, node, vid := topo.RandomlyReserveOneVolume() ret, node, vid := topo.RandomlyReserveOneVolume()
fmt.Println("topology:", topo.Node)
fmt.Println("assigned :", ret)
fmt.Println("assigned node :", node)
fmt.Println("assigned volume id:", vid)
fmt.Println("topology:", topo)
fmt.Println("assigned :", ret, ", node :", node,", volume id:", vid)
} }

22
weed-fs/src/pkg/topology/topology.go

@ -1,30 +1,30 @@
package topology package topology
import ( import (
"fmt"
"math/rand" "math/rand"
"pkg/storage" "pkg/storage"
_ "fmt"
) )
type Topology struct { type Topology struct {
Node
NodeImpl
} }
func NewTopology(id NodeId) *Topology{
func NewTopology(id string) *Topology {
t := &Topology{} t := &Topology{}
t.Node = *NewNode()
t.Node.Id = id
t.id = NodeId(id)
t.nodeType = "Topology"
t.children = make(map[NodeId]Node)
return t return t
} }
func (t *Topology) RandomlyReserveOneVolume() (bool, *Node, storage.VolumeId) {
slots := t.Node.maxVolumeCount-t.Node.activeVolumeCount
r := rand.Intn(slots)
func (t *Topology) RandomlyReserveOneVolume() (bool, *Server, storage.VolumeId) {
vid := t.nextVolumeId() vid := t.nextVolumeId()
ret, node := t.Node.ReserveOneVolume(r,vid)
ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
fmt.Println("node.IsServer", node.IsServer())
return ret, node, vid return ret, node, vid
} }
func (t *Topology) nextVolumeId() storage.VolumeId { func (t *Topology) nextVolumeId() storage.VolumeId {
vid := t.Node.GetMaxVolumeId()
vid := t.GetMaxVolumeId()
return vid.Next() return vid.Next()
} }
Loading…
Cancel
Save