From e7c4ee1c64a0b523d7fcfd9f1620c85bf3b642f4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 14 Sep 2012 01:17:13 -0700 Subject: [PATCH] register reported topology --- weed-fs/note/replication.txt | 1 + weed-fs/src/pkg/admin/storage.go | 16 +++++++++++ weed-fs/src/pkg/topology/data_center.go | 37 +++++++++++++++++++------ weed-fs/src/pkg/topology/data_node.go | 6 ++-- weed-fs/src/pkg/topology/ip_range.go | 10 +++++++ weed-fs/src/pkg/topology/rack.go | 25 ++++++++++++++++- weed-fs/src/pkg/topology/topology.go | 32 +++++++++++++++++---- 7 files changed, 109 insertions(+), 18 deletions(-) diff --git a/weed-fs/note/replication.txt b/weed-fs/note/replication.txt index 398b9b411..0dd73fd90 100644 --- a/weed-fs/note/replication.txt +++ b/weed-fs/note/replication.txt @@ -80,5 +80,6 @@ For the above operations, here are the todo list: 4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info to other data nodes. BECAUSE the master will stop sending writes to these data nodes 5. accept lookup for volume locations ALREADY EXISTS /dir/lookup + 6. read topology/datacenter/rack layout diff --git a/weed-fs/src/pkg/admin/storage.go b/weed-fs/src/pkg/admin/storage.go index ca4de3095..8e78b8697 100644 --- a/weed-fs/src/pkg/admin/storage.go +++ b/weed-fs/src/pkg/admin/storage.go @@ -29,3 +29,19 @@ func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage } return nil } + +func SendVolumeLocationsList(t *topology.Topology, vid storage.VolumeId) error{ +// values := make(url.Values) +// values.Add("volumeLocationsList", vid.String()) +// volumeLocations:= []map[string]string{} +// list := t.GetVolumeLocations(vid) +// m := make(map[string]interface{}) +// m["Vid"] = vid.String() +// for _, dn := range list { +// m["Locations"] = append(m["Locations"], dn) +// } +// for _, dn := range list { +// util.Post("http://"+dn.Ip+":"+strconv.Itoa(dn.Port)+"/admin/set_volume_locations_list", values) +// } + return nil +} diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go index 8dced9c44..48466e258 100644 --- a/weed-fs/src/pkg/topology/data_center.go +++ b/weed-fs/src/pkg/topology/data_center.go @@ -1,16 +1,35 @@ package topology -import ( -) +import () type DataCenter struct { NodeImpl - ipRange IpRange + ipRange *IpRange } -func NewDataCenter(id string) *DataCenter{ - dc := &DataCenter{} - dc.id = NodeId(id) - dc.nodeType = "DataCenter" - dc.children = make(map[NodeId]Node) - return dc + +func NewDataCenter(id string) *DataCenter { + dc := &DataCenter{} + dc.id = NodeId(id) + dc.nodeType = "DataCenter" + dc.children = make(map[NodeId]Node) + return dc +} + +func (dc *DataCenter) MatchLocationRange(ip string) bool { + if dc.ipRange == nil { + return true + } + return dc.ipRange.Match(ip) +} + +func (dc *DataCenter) GetOrCreateRack(ip string) *Rack { + for _, c := range dc.Children() { + rack := c.(*Rack) + if rack.MatchLocationRange(ip) { + return rack + } + } + rack := NewRack("DefaultRack") + dc.LinkChildNode(rack) + return rack } diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go index 4d6f18bea..254754ccd 100644 --- a/weed-fs/src/pkg/topology/data_node.go +++ b/weed-fs/src/pkg/topology/data_node.go @@ -22,14 +22,13 @@ func NewDataNode(id string) *DataNode { return s } func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId { - dn.AddVolume(&storage.VolumeInfo{Id: vid, Size: 32 * 1024 * 1024 * 1024}) + dn.AddVolume(&storage.VolumeInfo{Id: vid}) return vid } func (dn *DataNode) AddVolume(v *storage.VolumeInfo) { dn.volumes[v.Id] = v dn.UpAdjustActiveVolumeCountDelta(1) dn.UpAdjustMaxVolumeId(v.Id) - dn.GetTopology().RegisterVolume(v,dn) } func (dn *DataNode) GetTopology() *Topology { p := dn.parent @@ -39,3 +38,6 @@ func (dn *DataNode) GetTopology() *Topology { t := p.(*Topology) return t } +func (dn *DataNode) MatchLocation(ip string, port int) bool { + return dn.Ip == ip && dn.Port == port +} diff --git a/weed-fs/src/pkg/topology/ip_range.go b/weed-fs/src/pkg/topology/ip_range.go index e7bd5a0fe..06dab74ef 100644 --- a/weed-fs/src/pkg/topology/ip_range.go +++ b/weed-fs/src/pkg/topology/ip_range.go @@ -9,3 +9,13 @@ type IpRange struct { inclusives []string exclusives []string } + +func (r *IpRange) Match(ip string) bool { +// TODO +// for _, exc := range r.exclusives { +// if exc +// } +// for _, inc := range r.inclusives { +// } + return true +} \ No newline at end of file diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index 46e7780dc..a52223887 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -1,11 +1,12 @@ package topology import ( + "strconv" ) type Rack struct { NodeImpl - ipRange IpRange + ipRange *IpRange } func NewRack(id string) *Rack { @@ -15,3 +16,25 @@ func NewRack(id string) *Rack { r.children = make(map[NodeId]Node) return r } + +func (r *Rack) MatchLocationRange(ip string) bool{ + if r.ipRange == nil { + return true + } + return r.ipRange.Match(ip) +} + +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string) *DataNode{ + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip,port) { + return dn + } + } + dn := NewDataNode("DataNode"+ip+":"+strconv.Itoa(port)) + dn.Ip = ip + dn.Port = port + dn.PublicUrl = publicUrl + r.LinkChildNode(dn) + return dn +} diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 2e617c927..2366c29d0 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -52,10 +52,30 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return vid.Next() } -func (t *Topology) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { - replicationTypeIndex := storage.GetReplicationLevelIndex(v) - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(t.volumeSizeLimit, t.pulse) - } - t.replicaType2VolumeLayout[replicationTypeIndex].RegisterVolume(v, dn) +func (t *Topology) registerVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { + replicationTypeIndex := storage.GetReplicationLevelIndex(v) + if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(t.volumeSizeLimit, t.pulse) + } + t.replicaType2VolumeLayout[replicationTypeIndex].RegisterVolume(v, dn) +} + +func (t *Topology) RegisterVolume(v *storage.VolumeInfo, ip string, port int, publicUrl string) { + dc := t.GetOrCreateDataCenter(ip) + rack := dc.GetOrCreateRack(ip) + dn := rack.GetOrCreateDataNode(ip, port, publicUrl) + dn.AddVolume(v) + t.registerVolumeLayout(v,dn) +} + +func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter{ + for _, c := range t.Children() { + dc := c.(*DataCenter) + if dc.MatchLocationRange(ip) { + return dc + } + } + dc := NewDataCenter("DefaultDataCenter") + t.LinkChildNode(dc) + return dc }