Browse Source

adjusting return messages for growing volumes

pull/2/head
Chris Lu 12 years ago
parent
commit
4846a7232e
  1. 22
      weed-fs/src/cmd/weed/master.go
  2. 50
      weed-fs/src/pkg/replication/volume_growth.go
  3. 10
      weed-fs/src/pkg/storage/store.go
  4. 22
      weed-fs/src/pkg/storage/volume_info.go
  5. 4
      weed-fs/src/pkg/topology/topology.go
  6. 2
      weed-fs/src/pkg/topology/volume_layout.go

22
weed-fs/src/cmd/weed/master.go

@ -69,7 +69,11 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
} }
func dirAssign2Handler(w http.ResponseWriter, r *http.Request) { func dirAssign2Handler(w http.ResponseWriter, r *http.Request) {
c, _ := strconv.Atoi(r.FormValue("count")) c, _ := strconv.Atoi(r.FormValue("count"))
rt := storage.NewReplicationType(r.FormValue("replication"))
rt, err := storage.NewReplicationType(r.FormValue("replication"))
if err!=nil {
writeJson(w, r, map[string]string{"error": err.Error()})
return
}
if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
if topo.FreeSpace() <= 0 { if topo.FreeSpace() <= 0 {
writeJson(w, r, map[string]string{"error": "No free volumes left!"}) writeJson(w, r, map[string]string{"error": "No free volumes left!"})
@ -107,12 +111,22 @@ func dirNewStatusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, topo.ToMap()) writeJson(w, r, topo.ToMap())
} }
func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
rt := storage.NewReplicationType(r.FormValue("replication"))
rt, err := storage.NewReplicationType(r.FormValue("replication"))
if err!=nil {
writeJson(w, r, map[string]string{"error": err.Error()})
return
}
count, err := strconv.Atoi(r.FormValue("count")) count, err := strconv.Atoi(r.FormValue("count"))
if topo.FreeSpace() < count * rt.GetCopyCount() {
writeJson(w, r, map[string]string{"error": "Only "+strconv.Itoa(topo.FreeSpace())+" volumes left! Not enough for "+strconv.Itoa(count*rt.GetCopyCount())})
return
}
if err != nil { if err != nil {
vg.GrowByType(rt, topo)
count, err := vg.GrowByType(rt, topo)
writeJson(w, r, map[string]interface{}{"count": count, "error": err})
} else { } else {
vg.GrowByCountAndType(count, rt, topo)
count, err := vg.GrowByCountAndType(count, rt, topo)
writeJson(w, r, map[string]interface{}{"count": count, "error": err})
} }
} }

50
weed-fs/src/pkg/replication/volume_growth.go

@ -31,28 +31,30 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3} return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
} }
func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) {
func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) {
switch repType { switch repType {
case storage.Copy00: case storage.Copy00:
vg.GrowByCountAndType(vg.copy1factor, repType, topo)
return vg.GrowByCountAndType(vg.copy1factor, repType, topo)
case storage.Copy10: case storage.Copy10:
vg.GrowByCountAndType(vg.copy2factor, repType, topo)
return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
case storage.Copy20: case storage.Copy20:
vg.GrowByCountAndType(vg.copy3factor, repType, topo)
return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
case storage.Copy01: case storage.Copy01:
vg.GrowByCountAndType(vg.copy2factor, repType, topo)
return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
case storage.Copy11: case storage.Copy11:
vg.GrowByCountAndType(vg.copy3factor, repType, topo)
return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
} }
return 0, nil
} }
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) {
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
counter = 0
switch repType { switch repType {
case storage.Copy00: case storage.Copy00:
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
ret, server, vid := topo.RandomlyReserveOneVolume()
if ret {
vg.grow(topo, *vid, repType, server)
if ok, server, vid := topo.RandomlyReserveOneVolume(); ok {
if err = vg.grow(topo, *vid, repType, server); err == nil {
counter++
}
} }
} }
case storage.Copy10: case storage.Copy10:
@ -70,7 +72,9 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
} }
} }
if len(servers) == 2 { if len(servers) == 2 {
vg.grow(topo, vid, repType, servers[0], servers[1])
if err = vg.grow(topo, vid, repType, servers[0], servers[1]); err == nil {
counter++
}
} }
} }
} }
@ -89,23 +93,25 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
} }
} }
if len(servers) == 3 { if len(servers) == 3 {
vg.grow(topo, vid, repType, servers[0], servers[1], servers[2])
if err = vg.grow(topo, vid, repType, servers[0], servers[1], servers[2]); err == nil {
counter++
}
} }
} }
} }
case storage.Copy01: case storage.Copy01:
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
//randomly pick one server, and then choose from the same rack //randomly pick one server, and then choose from the same rack
ret, server1, vid := topo.RandomlyReserveOneVolume()
if ret {
if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
rack := server1.Parent() rack := server1.Parent()
exclusion := make(map[string]topology.Node) exclusion := make(map[string]topology.Node)
exclusion[server1.String()] = server1 exclusion[server1.String()] = server1
newNodeList := topology.NewNodeList(rack.Children(), exclusion) newNodeList := topology.NewNodeList(rack.Children(), exclusion)
if newNodeList.FreeSpace() > 0 { if newNodeList.FreeSpace() > 0 {
ret2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid)
if ret2 {
vg.grow(topo, *vid, repType, server1, server2)
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
counter++
}
} }
} }
} }
@ -113,10 +119,11 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
case storage.Copy11: case storage.Copy11:
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
} }
err = errors.New("Replication Type Not Implemented Yet!")
} }
return
} }
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) {
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers { for _, server := range servers {
if err := AllocateVolume(server, vid, repType); err == nil { if err := AllocateVolume(server, vid, repType); err == nil {
vi := &storage.VolumeInfo{Id: vid, Size: 0} vi := &storage.VolumeInfo{Id: vid, Size: 0}
@ -124,11 +131,12 @@ func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repT
topo.RegisterVolumeLayout(vi, server) topo.RegisterVolumeLayout(vi, server)
fmt.Println("added", vid, "to", server) fmt.Println("added", vid, "to", server)
} else { } else {
//TODO: need error handling
fmt.Println("Failed to assign", vid, "to", servers) fmt.Println("Failed to assign", vid, "to", servers)
return errors.New("Failed to assign " + vid.String())
} }
} }
fmt.Println("Assigning", vid, "to", servers) fmt.Println("Assigning", vid, "to", servers)
return nil
} }
type AllocateVolumeResult struct { type AllocateVolumeResult struct {

10
weed-fs/src/pkg/storage/store.go

@ -30,7 +30,11 @@ func NewStore(port int, publicUrl, dirname string, maxVolumeCount int, volumeLis
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes", volumeListString) log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes", volumeListString)
return return
} }
func (s *Store) AddVolume(volumeListString string, replicationType string) (e error) {
func (s *Store) AddVolume(volumeListString string, replicationType string) (error) {
rt, e := NewReplicationType(replicationType)
if e!=nil {
return e
}
for _, range_string := range strings.Split(volumeListString, ",") { for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 { if strings.Index(range_string, "-") < 0 {
id_string := range_string id_string := range_string
@ -38,7 +42,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) (e er
if err != nil { if err != nil {
return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!") return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!")
} }
e = s.addVolume(VolumeId(id), NewReplicationType(replicationType))
e = s.addVolume(VolumeId(id), rt)
} else { } else {
pair := strings.Split(range_string, "-") pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64) start, start_err := strconv.ParseUint(pair[0], 10, 64)
@ -50,7 +54,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) (e er
return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!") return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!")
} }
for id := start; id <= end; id++ { for id := start; id <= end; id++ {
if err := s.addVolume(VolumeId(id), NewReplicationType(replicationType)); err != nil {
if err := s.addVolume(VolumeId(id), rt); err != nil {
e = err e = err
} }
} }

22
weed-fs/src/pkg/storage/volume_info.go

@ -1,6 +1,8 @@
package storage package storage
import ()
import (
"errors"
)
type VolumeInfo struct { type VolumeInfo struct {
Id VolumeId Id VolumeId
@ -19,20 +21,20 @@ const (
CopyNil = ReplicationType(255) // nil value CopyNil = ReplicationType(255) // nil value
) )
func NewReplicationType(t string) ReplicationType {
func NewReplicationType(t string) (ReplicationType, error) {
switch t { switch t {
case "00": case "00":
return Copy00
return Copy00, nil
case "01": case "01":
return Copy01
return Copy01, nil
case "10": case "10":
return Copy10
return Copy10, nil
case "11": case "11":
return Copy11
return Copy11, nil
case "20": case "20":
return Copy20
return Copy20, nil
} }
return Copy00
return Copy00, errors.New("Unknown Replication Type:"+t)
} }
func (r *ReplicationType) String() string { func (r *ReplicationType) String() string {
switch *r { switch *r {
@ -50,7 +52,7 @@ func (r *ReplicationType) String() string {
return "00" return "00"
} }
func GetReplicationLevelIndex(repType ReplicationType) int {
func (repType ReplicationType)GetReplicationLevelIndex() int {
switch repType { switch repType {
case Copy00: case Copy00:
return 0 return 0
@ -65,7 +67,7 @@ func GetReplicationLevelIndex(repType ReplicationType) int {
} }
return -1 return -1
} }
func GetCopyCount(repType ReplicationType) int {
func (repType ReplicationType)GetCopyCount() int {
switch repType { switch repType {
case Copy00: case Copy00:
return 1 return 1

4
weed-fs/src/pkg/topology/topology.go

@ -61,7 +61,7 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
} }
func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) { func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
replicationTypeIndex := storage.GetReplicationLevelIndex(repType)
replicationTypeIndex := repType.GetReplicationLevelIndex()
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
} }
@ -74,7 +74,7 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (str
} }
func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
replicationTypeIndex := storage.GetReplicationLevelIndex(repType)
replicationTypeIndex := repType.GetReplicationLevelIndex()
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
} }

2
weed-fs/src/pkg/topology/volume_layout.go

@ -30,7 +30,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id] = NewDataNodeLocationList() vl.vid2location[v.Id] = NewDataNodeLocationList()
} }
if vl.vid2location[v.Id].Add(dn) { if vl.vid2location[v.Id].Add(dn) {
if len(vl.vid2location[v.Id].list) == storage.GetCopyCount(v.RepType) {
if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
if uint64(v.Size) < vl.volumeSizeLimit { if uint64(v.Size) < vl.volumeSizeLimit {
vl.writables = append(vl.writables, v.Id) vl.writables = append(vl.writables, v.Id)
} }

Loading…
Cancel
Save