From b0e250d43753a0c983fd574700a0c6d726f30360 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 17 Sep 2012 01:48:09 -0700 Subject: [PATCH] adjusting refresh topology writable volumes(not finished yet) Need to refreshWritableVolumes for each replication type --- weed-fs/src/cmd/weed/master.go | 33 ++++++++++---------- weed-fs/src/pkg/directory/volume_mapping.go | 2 +- weed-fs/src/pkg/replication/volume_growth.go | 2 +- weed-fs/src/pkg/topology/data_node.go | 2 +- weed-fs/src/pkg/topology/node.go | 2 +- weed-fs/src/pkg/topology/rack.go | 5 ++- weed-fs/src/pkg/topology/topology.go | 17 ++++++++++ weed-fs/src/pkg/topology/volume_location.go | 4 +-- 8 files changed, 44 insertions(+), 23 deletions(-) diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index a61c65f39..d2e58eb5b 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "errors" "log" "net/http" "pkg/directory" @@ -70,9 +71,9 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { func dirAssign2Handler(w http.ResponseWriter, r *http.Request) { c, _ := strconv.Atoi(r.FormValue("count")) rt, err := storage.NewReplicationType(r.FormValue("replication")) - if err!=nil { - writeJson(w, r, map[string]string{"error": err.Error()}) - return + if err != nil { + writeJson(w, r, map[string]string{"error": err.Error()}) + return } if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { if topo.FreeSpace() <= 0 { @@ -111,22 +112,21 @@ func dirNewStatusHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, topo.ToMap()) } func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { - rt, err := storage.NewReplicationType(r.FormValue("replication")) - if err!=nil { - writeJson(w, r, map[string]string{"error": err.Error()}) - return - } - count, err := strconv.Atoi(r.FormValue("count")) - if topo.FreeSpace() < count * rt.GetCopyCount() { - writeJson(w, r, map[string]string{"error": "Only "+strconv.Itoa(topo.FreeSpace())+" volumes left! Not enough for "+strconv.Itoa(count*rt.GetCopyCount())}) - return + count := 0 + rt, err := storage.NewReplicationType(r.FormValue("replication")) + if err == nil { + if count, err = strconv.Atoi(r.FormValue("count")); err == nil { + if topo.FreeSpace() < count*rt.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) + } else { + count, err = vg.GrowByCountAndType(count, rt, topo) + } + } } if err != nil { - count, err := vg.GrowByType(rt, topo) - writeJson(w, r, map[string]interface{}{"count": count, "error": err}) + writeJson(w, r, map[string]string{"error": err.Error()}) } else { - count, err := vg.GrowByCountAndType(count, rt, topo) - writeJson(w, r, map[string]interface{}{"count": count, "error": err}) + writeJson(w, r, map[string]interface{}{"count": count}) } } @@ -144,6 +144,7 @@ func runMaster(cmd *Command, args []string) bool { http.HandleFunc("/vol/grow", volumeGrowHandler) mapper.StartRefreshWritableVolumes() + topo.StartRefreshWritableVolumes() log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport)) e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil) diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 495c57930..da41b3510 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -120,7 +120,7 @@ func (m *Mapper) StartRefreshWritableVolumes() { } func (m *Mapper) refreshWritableVolumes() { - freshThreshHold := time.Now().Unix() - 5*m.pulse //5 times of sleep interval + freshThreshHold := time.Now().Unix() - 3*m.pulse //3 times of sleep interval //setting Writers, copy-on-write because of possible updating, this needs some future work! var writers []storage.VolumeId for _, machine_entry := range m.Machines { diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go index 8519921f5..28dca68b4 100644 --- a/weed-fs/src/pkg/replication/volume_growth.go +++ b/weed-fs/src/pkg/replication/volume_growth.go @@ -44,7 +44,7 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo case storage.Copy11: return vg.GrowByCountAndType(vg.copy3factor, repType, topo) } - return 0, nil + return 0, errors.New("Unknown Replication Type!") } func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { counter = 0 diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go index 1516572fd..04c7cf111 100644 --- a/weed-fs/src/pkg/topology/data_node.go +++ b/weed-fs/src/pkg/topology/data_node.go @@ -11,7 +11,7 @@ type DataNode struct { Ip string Port int PublicUrl string - lastSeen int64 // unix time in seconds + LastSeen int64 // unix time in seconds } func NewDataNode(id string) *DataNode { diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go index e73719794..ddaf9f0b2 100644 --- a/weed-fs/src/pkg/topology/node.go +++ b/weed-fs/src/pkg/topology/node.go @@ -151,7 +151,7 @@ func (n *NodeImpl) CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit if n.IsRack() { for _, c := range n.Children() { dn := c.(*DataNode) //can not cast n to DataNode - if dn.lastSeen > freshThreshHold { + if dn.LastSeen > freshThreshHold { continue } for _, v := range dn.volumes { diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go index f685f1a82..c819feb00 100644 --- a/weed-fs/src/pkg/topology/rack.go +++ b/weed-fs/src/pkg/topology/rack.go @@ -2,6 +2,7 @@ package topology import ( "strconv" + "time" ) type Rack struct { @@ -28,7 +29,8 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol for _, c := range r.Children() { dn := c.(*DataNode) if dn.MatchLocation(ip, port) { - dn.NodeImpl.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) + dn.LastSeen = time.Now().Unix() + dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) return dn } } @@ -37,6 +39,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol dn.Port = port dn.PublicUrl = publicUrl dn.maxVolumeCount = maxVolumeCount + dn.LastSeen = time.Now().Unix() r.LinkChildNode(dn) return dn } diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go index 079fb54d9..a60768f14 100644 --- a/weed-fs/src/pkg/topology/topology.go +++ b/weed-fs/src/pkg/topology/topology.go @@ -6,6 +6,7 @@ import ( "pkg/directory" "pkg/sequence" "pkg/storage" + "time" ) type Topology struct { @@ -125,3 +126,19 @@ func (t *Topology) ToMap() interface{} { m["layouts"] = layouts return m } + +func (t *Topology) StartRefreshWritableVolumes() { + go func() { + for { + t.refreshWritableVolumes() + time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() +} + +func (t *Topology) refreshWritableVolumes() { + freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval + //setting Writers, copy-on-write because of possible updating, this needs some future work! + t.CollectWritableVolumes(freshThreshHold, t.volumeSizeLimit) + //TODO: collect writable columes for each replication type +} diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go index 92d89ae46..f2e5dd894 100644 --- a/weed-fs/src/pkg/topology/volume_location.go +++ b/weed-fs/src/pkg/topology/volume_location.go @@ -27,7 +27,7 @@ func (dnll *DataNodeLocationList) Add(loc *DataNode) bool { func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) { var changed bool for _, dnl := range dnll.list { - if dnl.lastSeen < freshThreshHold { + if dnl.LastSeen < freshThreshHold { changed = true break } @@ -35,7 +35,7 @@ func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) { if changed { var l []*DataNode for _, dnl := range dnll.list { - if dnl.lastSeen >= freshThreshHold { + if dnl.LastSeen >= freshThreshHold { l = append(l, dnl) } }