Browse Source

adjusting refresh topology writable volumes(not finished yet)

Need to refreshWritableVolumes for each replication type
pull/2/head
Chris Lu 12 years ago
parent
commit
b0e250d437
  1. 33
      weed-fs/src/cmd/weed/master.go
  2. 2
      weed-fs/src/pkg/directory/volume_mapping.go
  3. 2
      weed-fs/src/pkg/replication/volume_growth.go
  4. 2
      weed-fs/src/pkg/topology/data_node.go
  5. 2
      weed-fs/src/pkg/topology/node.go
  6. 5
      weed-fs/src/pkg/topology/rack.go
  7. 17
      weed-fs/src/pkg/topology/topology.go
  8. 4
      weed-fs/src/pkg/topology/volume_location.go

33
weed-fs/src/cmd/weed/master.go

@ -2,6 +2,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"errors"
"log" "log"
"net/http" "net/http"
"pkg/directory" "pkg/directory"
@ -70,9 +71,9 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
func dirAssign2Handler(w http.ResponseWriter, r *http.Request) { func dirAssign2Handler(w http.ResponseWriter, r *http.Request) {
c, _ := strconv.Atoi(r.FormValue("count")) c, _ := strconv.Atoi(r.FormValue("count"))
rt, err := storage.NewReplicationType(r.FormValue("replication")) rt, err := storage.NewReplicationType(r.FormValue("replication"))
if err!=nil {
writeJson(w, r, map[string]string{"error": err.Error()})
return
if err != nil {
writeJson(w, r, map[string]string{"error": err.Error()})
return
} }
if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
if topo.FreeSpace() <= 0 { if topo.FreeSpace() <= 0 {
@ -111,22 +112,21 @@ func dirNewStatusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, topo.ToMap()) writeJson(w, r, topo.ToMap())
} }
func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
rt, err := storage.NewReplicationType(r.FormValue("replication"))
if err!=nil {
writeJson(w, r, map[string]string{"error": err.Error()})
return
}
count, err := strconv.Atoi(r.FormValue("count"))
if topo.FreeSpace() < count * rt.GetCopyCount() {
writeJson(w, r, map[string]string{"error": "Only "+strconv.Itoa(topo.FreeSpace())+" volumes left! Not enough for "+strconv.Itoa(count*rt.GetCopyCount())})
return
count := 0
rt, err := storage.NewReplicationType(r.FormValue("replication"))
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
if topo.FreeSpace() < count*rt.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
} else {
count, err = vg.GrowByCountAndType(count, rt, topo)
}
}
} }
if err != nil { if err != nil {
count, err := vg.GrowByType(rt, topo)
writeJson(w, r, map[string]interface{}{"count": count, "error": err})
writeJson(w, r, map[string]string{"error": err.Error()})
} else { } else {
count, err := vg.GrowByCountAndType(count, rt, topo)
writeJson(w, r, map[string]interface{}{"count": count, "error": err})
writeJson(w, r, map[string]interface{}{"count": count})
} }
} }
@ -144,6 +144,7 @@ func runMaster(cmd *Command, args []string) bool {
http.HandleFunc("/vol/grow", volumeGrowHandler) http.HandleFunc("/vol/grow", volumeGrowHandler)
mapper.StartRefreshWritableVolumes() mapper.StartRefreshWritableVolumes()
topo.StartRefreshWritableVolumes()
log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport)) log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport))
e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil) e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil)

2
weed-fs/src/pkg/directory/volume_mapping.go

@ -120,7 +120,7 @@ func (m *Mapper) StartRefreshWritableVolumes() {
} }
func (m *Mapper) refreshWritableVolumes() { func (m *Mapper) refreshWritableVolumes() {
freshThreshHold := time.Now().Unix() - 5*m.pulse //5 times of sleep interval
freshThreshHold := time.Now().Unix() - 3*m.pulse //3 times of sleep interval
//setting Writers, copy-on-write because of possible updating, this needs some future work! //setting Writers, copy-on-write because of possible updating, this needs some future work!
var writers []storage.VolumeId var writers []storage.VolumeId
for _, machine_entry := range m.Machines { for _, machine_entry := range m.Machines {

2
weed-fs/src/pkg/replication/volume_growth.go

@ -44,7 +44,7 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo
case storage.Copy11: case storage.Copy11:
return vg.GrowByCountAndType(vg.copy3factor, repType, topo) return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
} }
return 0, nil
return 0, errors.New("Unknown Replication Type!")
} }
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
counter = 0 counter = 0

2
weed-fs/src/pkg/topology/data_node.go

@ -11,7 +11,7 @@ type DataNode struct {
Ip string Ip string
Port int Port int
PublicUrl string PublicUrl string
lastSeen int64 // unix time in seconds
LastSeen int64 // unix time in seconds
} }
func NewDataNode(id string) *DataNode { func NewDataNode(id string) *DataNode {

2
weed-fs/src/pkg/topology/node.go

@ -151,7 +151,7 @@ func (n *NodeImpl) CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit
if n.IsRack() { if n.IsRack() {
for _, c := range n.Children() { for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode dn := c.(*DataNode) //can not cast n to DataNode
if dn.lastSeen > freshThreshHold {
if dn.LastSeen > freshThreshHold {
continue continue
} }
for _, v := range dn.volumes { for _, v := range dn.volumes {

5
weed-fs/src/pkg/topology/rack.go

@ -2,6 +2,7 @@ package topology
import ( import (
"strconv" "strconv"
"time"
) )
type Rack struct { type Rack struct {
@ -28,7 +29,8 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
for _, c := range r.Children() { for _, c := range r.Children() {
dn := c.(*DataNode) dn := c.(*DataNode)
if dn.MatchLocation(ip, port) { if dn.MatchLocation(ip, port) {
dn.NodeImpl.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
dn.LastSeen = time.Now().Unix()
dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
return dn return dn
} }
} }
@ -37,6 +39,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn.Port = port dn.Port = port
dn.PublicUrl = publicUrl dn.PublicUrl = publicUrl
dn.maxVolumeCount = maxVolumeCount dn.maxVolumeCount = maxVolumeCount
dn.LastSeen = time.Now().Unix()
r.LinkChildNode(dn) r.LinkChildNode(dn)
return dn return dn
} }

17
weed-fs/src/pkg/topology/topology.go

@ -6,6 +6,7 @@ import (
"pkg/directory" "pkg/directory"
"pkg/sequence" "pkg/sequence"
"pkg/storage" "pkg/storage"
"time"
) )
type Topology struct { type Topology struct {
@ -125,3 +126,19 @@ func (t *Topology) ToMap() interface{} {
m["layouts"] = layouts m["layouts"] = layouts
return m return m
} }
func (t *Topology) StartRefreshWritableVolumes() {
go func() {
for {
t.refreshWritableVolumes()
time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
}
}()
}
func (t *Topology) refreshWritableVolumes() {
freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
//setting Writers, copy-on-write because of possible updating, this needs some future work!
t.CollectWritableVolumes(freshThreshHold, t.volumeSizeLimit)
//TODO: collect writable columes for each replication type
}

4
weed-fs/src/pkg/topology/volume_location.go

@ -27,7 +27,7 @@ func (dnll *DataNodeLocationList) Add(loc *DataNode) bool {
func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) { func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) {
var changed bool var changed bool
for _, dnl := range dnll.list { for _, dnl := range dnll.list {
if dnl.lastSeen < freshThreshHold {
if dnl.LastSeen < freshThreshHold {
changed = true changed = true
break break
} }
@ -35,7 +35,7 @@ func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) {
if changed { if changed {
var l []*DataNode var l []*DataNode
for _, dnl := range dnll.list { for _, dnl := range dnll.list {
if dnl.lastSeen >= freshThreshHold {
if dnl.LastSeen >= freshThreshHold {
l = append(l, dnl) l = append(l, dnl)
} }
} }

Loading…
Cancel
Save