Browse Source

register reported topology

pull/2/head
Chris Lu 12 years ago
parent
commit
e7c4ee1c64
  1. 1
      weed-fs/note/replication.txt
  2. 16
      weed-fs/src/pkg/admin/storage.go
  3. 37
      weed-fs/src/pkg/topology/data_center.go
  4. 6
      weed-fs/src/pkg/topology/data_node.go
  5. 10
      weed-fs/src/pkg/topology/ip_range.go
  6. 25
      weed-fs/src/pkg/topology/rack.go
  7. 32
      weed-fs/src/pkg/topology/topology.go

1
weed-fs/note/replication.txt

@ -80,5 +80,6 @@ For the above operations, here are the todo list:
4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info 4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info
to other data nodes. BECAUSE the master will stop sending writes to these data nodes to other data nodes. BECAUSE the master will stop sending writes to these data nodes
5. accept lookup for volume locations ALREADY EXISTS /dir/lookup 5. accept lookup for volume locations ALREADY EXISTS /dir/lookup
6. read topology/datacenter/rack layout

16
weed-fs/src/pkg/admin/storage.go

@ -29,3 +29,19 @@ func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage
} }
return nil return nil
} }
func SendVolumeLocationsList(t *topology.Topology, vid storage.VolumeId) error{
// values := make(url.Values)
// values.Add("volumeLocationsList", vid.String())
// volumeLocations:= []map[string]string{}
// list := t.GetVolumeLocations(vid)
// m := make(map[string]interface{})
// m["Vid"] = vid.String()
// for _, dn := range list {
// m["Locations"] = append(m["Locations"], dn)
// }
// for _, dn := range list {
// util.Post("http://"+dn.Ip+":"+strconv.Itoa(dn.Port)+"/admin/set_volume_locations_list", values)
// }
return nil
}

37
weed-fs/src/pkg/topology/data_center.go

@ -1,16 +1,35 @@
package topology package topology
import (
)
import ()
type DataCenter struct { type DataCenter struct {
NodeImpl NodeImpl
ipRange IpRange
ipRange *IpRange
} }
func NewDataCenter(id string) *DataCenter{
dc := &DataCenter{}
dc.id = NodeId(id)
dc.nodeType = "DataCenter"
dc.children = make(map[NodeId]Node)
return dc
func NewDataCenter(id string) *DataCenter {
dc := &DataCenter{}
dc.id = NodeId(id)
dc.nodeType = "DataCenter"
dc.children = make(map[NodeId]Node)
return dc
}
func (dc *DataCenter) MatchLocationRange(ip string) bool {
if dc.ipRange == nil {
return true
}
return dc.ipRange.Match(ip)
}
func (dc *DataCenter) GetOrCreateRack(ip string) *Rack {
for _, c := range dc.Children() {
rack := c.(*Rack)
if rack.MatchLocationRange(ip) {
return rack
}
}
rack := NewRack("DefaultRack")
dc.LinkChildNode(rack)
return rack
} }

6
weed-fs/src/pkg/topology/data_node.go

@ -22,14 +22,13 @@ func NewDataNode(id string) *DataNode {
return s return s
} }
func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId { func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId {
dn.AddVolume(&storage.VolumeInfo{Id: vid, Size: 32 * 1024 * 1024 * 1024})
dn.AddVolume(&storage.VolumeInfo{Id: vid})
return vid return vid
} }
func (dn *DataNode) AddVolume(v *storage.VolumeInfo) { func (dn *DataNode) AddVolume(v *storage.VolumeInfo) {
dn.volumes[v.Id] = v dn.volumes[v.Id] = v
dn.UpAdjustActiveVolumeCountDelta(1) dn.UpAdjustActiveVolumeCountDelta(1)
dn.UpAdjustMaxVolumeId(v.Id) dn.UpAdjustMaxVolumeId(v.Id)
dn.GetTopology().RegisterVolume(v,dn)
} }
func (dn *DataNode) GetTopology() *Topology { func (dn *DataNode) GetTopology() *Topology {
p := dn.parent p := dn.parent
@ -39,3 +38,6 @@ func (dn *DataNode) GetTopology() *Topology {
t := p.(*Topology) t := p.(*Topology)
return t return t
} }
func (dn *DataNode) MatchLocation(ip string, port int) bool {
return dn.Ip == ip && dn.Port == port
}

10
weed-fs/src/pkg/topology/ip_range.go

@ -9,3 +9,13 @@ type IpRange struct {
inclusives []string inclusives []string
exclusives []string exclusives []string
} }
func (r *IpRange) Match(ip string) bool {
// TODO
// for _, exc := range r.exclusives {
// if exc
// }
// for _, inc := range r.inclusives {
// }
return true
}

25
weed-fs/src/pkg/topology/rack.go

@ -1,11 +1,12 @@
package topology package topology
import ( import (
"strconv"
) )
type Rack struct { type Rack struct {
NodeImpl NodeImpl
ipRange IpRange
ipRange *IpRange
} }
func NewRack(id string) *Rack { func NewRack(id string) *Rack {
@ -15,3 +16,25 @@ func NewRack(id string) *Rack {
r.children = make(map[NodeId]Node) r.children = make(map[NodeId]Node)
return r return r
} }
func (r *Rack) MatchLocationRange(ip string) bool{
if r.ipRange == nil {
return true
}
return r.ipRange.Match(ip)
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string) *DataNode{
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip,port) {
return dn
}
}
dn := NewDataNode("DataNode"+ip+":"+strconv.Itoa(port))
dn.Ip = ip
dn.Port = port
dn.PublicUrl = publicUrl
r.LinkChildNode(dn)
return dn
}

32
weed-fs/src/pkg/topology/topology.go

@ -52,10 +52,30 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return vid.Next() return vid.Next()
} }
func (t *Topology) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
replicationTypeIndex := storage.GetReplicationLevelIndex(v)
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(t.volumeSizeLimit, t.pulse)
}
t.replicaType2VolumeLayout[replicationTypeIndex].RegisterVolume(v, dn)
func (t *Topology) registerVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
replicationTypeIndex := storage.GetReplicationLevelIndex(v)
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(t.volumeSizeLimit, t.pulse)
}
t.replicaType2VolumeLayout[replicationTypeIndex].RegisterVolume(v, dn)
}
func (t *Topology) RegisterVolume(v *storage.VolumeInfo, ip string, port int, publicUrl string) {
dc := t.GetOrCreateDataCenter(ip)
rack := dc.GetOrCreateRack(ip)
dn := rack.GetOrCreateDataNode(ip, port, publicUrl)
dn.AddVolume(v)
t.registerVolumeLayout(v,dn)
}
func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter{
for _, c := range t.Children() {
dc := c.(*DataCenter)
if dc.MatchLocationRange(ip) {
return dc
}
}
dc := NewDataCenter("DefaultDataCenter")
t.LinkChildNode(dc)
return dc
} }
Loading…
Cancel
Save