Browse Source

change to 3-digit replication types

pull/2/head
Chris Lu 12 years ago
parent
commit
6ce41e30a4
  1. 11
      weed-fs/src/cmd/weed/master.go
  2. 2
      weed-fs/src/cmd/weed/upload.go
  3. 5
      weed-fs/src/cmd/weed/volume.go
  4. 119
      weed-fs/src/pkg/replication/volume_growth.go
  5. 19
      weed-fs/src/pkg/replication/volume_growth_test.go
  6. 2
      weed-fs/src/pkg/storage/compact_map_test.go
  7. 2
      weed-fs/src/pkg/storage/store.go
  8. 4
      weed-fs/src/pkg/storage/volume.go
  9. 124
      weed-fs/src/pkg/storage/volume_info.go
  10. 4
      weed-fs/src/pkg/topology/node_list.go

11
weed-fs/src/cmd/weed/master.go

@ -33,7 +33,7 @@ var (
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "00", "Default replication type if not specified.")
defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
mReadTimeout = cmdMaster.Flag.Int("readTimeout", 5, "connection read timeout in seconds") mReadTimeout = cmdMaster.Flag.Int("readTimeout", 5, "connection read timeout in seconds")
) )
@ -70,7 +70,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
if repType == "" { if repType == "" {
repType = *defaultRepType repType = *defaultRepType
} }
rt, err := storage.NewReplicationType(repType)
rt, err := storage.NewReplicationTypeFromString(repType)
if err != nil { if err != nil {
writeJson(w, r, map[string]string{"error": err.Error()}) writeJson(w, r, map[string]string{"error": err.Error()})
return return
@ -107,12 +107,15 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
} }
func dirStatusHandler(w http.ResponseWriter, r *http.Request) { func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, topo.ToMap())
m := make(map[string]interface{})
m["Version"] = VERSION
m["Topology"] = topo.ToMap()
writeJson(w, r, m)
} }
func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
count := 0 count := 0
rt, err := storage.NewReplicationType(r.FormValue("replication"))
rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
if err == nil { if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil { if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
if topo.FreeSpace() < count*rt.GetCopyCount() { if topo.FreeSpace() < count*rt.GetCopyCount() {

2
weed-fs/src/cmd/weed/upload.go

@ -17,7 +17,7 @@ func init() {
cmdUpload.Run = runUpload // break init cycle cmdUpload.Run = runUpload // break init cycle
IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information")
server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location")
uploadReplication = cmdUpload.Flag.String("replication", "00", "replication type(00,01,10,11)")
uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)")
} }
var cmdUpload = &Command{ var cmdUpload = &Command{

5
weed-fs/src/cmd/weed/volume.go

@ -41,7 +41,10 @@ var (
) )
func statusHandler(w http.ResponseWriter, r *http.Request) { func statusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, store.Status())
m := make(map[string]interface{})
m["Version"] = VERSION
m["Volumes"] = store.Status()
writeJson(w, r, m)
} }
func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))

119
weed-fs/src/pkg/replication/volume_growth.go

@ -4,9 +4,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"pkg/operation"
"pkg/storage" "pkg/storage"
"pkg/topology" "pkg/topology"
"pkg/operation"
) )
/* /*
@ -30,15 +30,17 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) { func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) {
switch repType { switch repType {
case storage.Copy00:
case storage.Copy000:
return vg.GrowByCountAndType(vg.copy1factor, repType, topo) return vg.GrowByCountAndType(vg.copy1factor, repType, topo)
case storage.Copy10:
case storage.Copy001:
return vg.GrowByCountAndType(vg.copy2factor, repType, topo) return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
case storage.Copy20:
return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
case storage.Copy01:
case storage.Copy010:
return vg.GrowByCountAndType(vg.copy2factor, repType, topo) return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
case storage.Copy11:
case storage.Copy100:
return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
case storage.Copy110:
return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
case storage.Copy200:
return vg.GrowByCountAndType(vg.copy3factor, repType, topo) return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
} }
return 0, errors.New("Unknown Replication Type!") return 0, errors.New("Unknown Replication Type!")
@ -46,7 +48,7 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
counter = 0 counter = 0
switch repType { switch repType {
case storage.Copy00:
case storage.Copy000:
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
if ok, server, vid := topo.RandomlyReserveOneVolume(); ok { if ok, server, vid := topo.RandomlyReserveOneVolume(); ok {
if err = vg.grow(topo, *vid, repType, server); err == nil { if err = vg.grow(topo, *vid, repType, server); err == nil {
@ -54,10 +56,45 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
} }
} }
} }
case storage.Copy10:
case storage.Copy001:
for i := 0; i < count; i++ {
//randomly pick one server, and then choose from the same rack
if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
rack := server1.Parent()
exclusion := make(map[string]topology.Node)
exclusion[server1.String()] = server1
newNodeList := topology.NewNodeList(rack.Children(), exclusion)
if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
counter++
}
}
}
}
}
case storage.Copy010:
for i := 0; i < count; i++ {
//randomly pick one server, and then choose from the same rack
if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
rack := server1.Parent()
dc := rack.Parent()
exclusion := make(map[string]topology.Node)
exclusion[rack.String()] = rack
newNodeList := topology.NewNodeList(dc.Children(), exclusion)
if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
counter++
}
}
}
}
}
case storage.Copy100:
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil) nl := topology.NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(2)
picked, ret := nl.RandomlyPickN(2, 1)
vid := topo.NextVolumeId() vid := topo.NextVolumeId()
if ret { if ret {
var servers []*topology.DataNode var servers []*topology.DataNode
@ -69,61 +106,77 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
} }
} }
if len(servers) == 2 { if len(servers) == 2 {
if err = vg.grow(topo, vid, repType, servers[0], servers[1]); err == nil {
if err = vg.grow(topo, vid, repType, servers...); err == nil {
counter++ counter++
} }
} }
} }
} }
case storage.Copy20:
case storage.Copy110:
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil) nl := topology.NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(3)
picked, ret := nl.RandomlyPickN(2, 2)
vid := topo.NextVolumeId() vid := topo.NextVolumeId()
if ret { if ret {
var servers []*topology.DataNode var servers []*topology.DataNode
for _, n := range picked {
if n.FreeSpace() > 0 {
if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
servers = append(servers, server)
dc1, dc2 := picked[0], picked[1]
if dc2.FreeSpace() > dc1.FreeSpace() {
dc1, dc2 = dc2, dc1
}
if dc1.FreeSpace() > 0 {
if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok {
servers = append(servers, server1)
rack := server1.Parent()
exclusion := make(map[string]topology.Node)
exclusion[rack.String()] = rack
newNodeList := topology.NewNodeList(dc1.Children(), exclusion)
if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 {
servers = append(servers, server2)
}
} }
} }
} }
if dc2.FreeSpace() > 0 {
if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok {
servers = append(servers, server)
}
}
if len(servers) == 3 { if len(servers) == 3 {
if err = vg.grow(topo, vid, repType, servers[0], servers[1], servers[2]); err == nil {
if err = vg.grow(topo, vid, repType, servers...); err == nil {
counter++ counter++
} }
} }
} }
} }
case storage.Copy01:
case storage.Copy200:
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
//randomly pick one server, and then choose from the same rack
if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
rack := server1.Parent()
exclusion := make(map[string]topology.Node)
exclusion[server1.String()] = server1
newNodeList := topology.NewNodeList(rack.Children(), exclusion)
if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
counter++
nl := topology.NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(3, 1)
vid := topo.NextVolumeId()
if ret {
var servers []*topology.DataNode
for _, n := range picked {
if n.FreeSpace() > 0 {
if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
servers = append(servers, server)
} }
} }
} }
if len(servers) == 3 {
if err = vg.grow(topo, vid, repType, servers...); err == nil {
counter++
}
} }
} }
case storage.Copy11:
for i := 0; i < count; i++ {
} }
err = errors.New("Replication Type Not Implemented Yet!")
} }
return return
} }
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers { for _, server := range servers {
if err := operation.AllocateVolume(server, vid, repType); err == nil { if err := operation.AllocateVolume(server, vid, repType); err == nil {
vi := storage.VolumeInfo{Id: vid, Size: 0, RepType:repType}
vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server) topo.RegisterVolumeLayout(&vi, server)
fmt.Println("Created Volume", vid, "on", server) fmt.Println("Created Volume", vid, "on", server)

19
weed-fs/src/pkg/replication/volume_growth_test.go

@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology {
fmt.Println("data:", data) fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes //need to connect all nodes first before server adding volumes
topo := topology.NewTopology("mynetwork","/tmp","testing",32*1024, 5)
topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5)
mTopology := data.(map[string]interface{}) mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology { for dcKey, dcValue := range mTopology {
dc := topology.NewDataCenter(dcKey) dc := topology.NewDataCenter(dcKey)
@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology {
rack.LinkChildNode(server) rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) { for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{}) m := v.(map[string]interface{})
vi := &storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))}
vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
} }
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
@ -104,8 +104,6 @@ func setup(topologyLayout string) *topology.Topology {
} }
} }
fmt.Println("topology:", *topo)
return topo return topo
} }
@ -125,15 +123,8 @@ func TestReserveOneVolume(t *testing.T) {
topo := setup(topologyLayout) topo := setup(topologyLayout)
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4} vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4}
vg.GrowByType(storage.Copy20,topo)
if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{
t.Log("reserved", c)
}
} }
func TestXYZ(t *testing.T) {
dn := topology.NewDataNode("server1")
dn.Ip = "localhost"
dn.Port = 8080
vid, _:= storage.NewVolumeId("600")
out := AllocateVolume(dn,vid,storage.Copy00)
fmt.Println(out)
}

2
weed-fs/src/pkg/storage/compact_map_test.go

@ -44,8 +44,6 @@ func TestXYZ(t *testing.T) {
} }
} }
//println("cm.list =", len(m.list))
for i := uint32(10 * batch); i < 100*batch; i++ { for i := uint32(10 * batch); i < 100*batch; i++ {
v, ok := m.Get(Key(i)) v, ok := m.Get(Key(i))
if i%37 == 0 { if i%37 == 0 {

2
weed-fs/src/pkg/storage/store.go

@ -29,7 +29,7 @@ func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *S
return return
} }
func (s *Store) AddVolume(volumeListString string, replicationType string) error { func (s *Store) AddVolume(volumeListString string, replicationType string) error {
rt, e := NewReplicationType(replicationType)
rt, e := NewReplicationTypeFromString(replicationType)
if e != nil { if e != nil {
return e return e
} }

4
weed-fs/src/pkg/storage/volume.go

@ -61,7 +61,7 @@ func (v *Volume) maybeWriteSuperBlock() {
if stat.Size() == 0 { if stat.Size() == 0 {
header := make([]byte, SuperBlockSize) header := make([]byte, SuperBlockSize)
header[0] = 1 header[0] = 1
header[1] = byte(v.replicaType)
header[1] = v.replicaType.Byte()
v.dataFile.Write(header) v.dataFile.Write(header)
} }
} }
@ -69,7 +69,7 @@ func (v *Volume) readSuperBlock() {
v.dataFile.Seek(0, 0) v.dataFile.Seek(0, 0)
header := make([]byte, SuperBlockSize) header := make([]byte, SuperBlockSize)
if _, error := v.dataFile.Read(header); error == nil { if _, error := v.dataFile.Read(header); error == nil {
v.replicaType = ReplicationType(header[1])
v.replicaType, _ = NewReplicationTypeFromByte(header[1])
} }
} }

124
weed-fs/src/pkg/storage/volume_info.go

@ -9,75 +9,119 @@ type VolumeInfo struct {
Size int64 Size int64
RepType ReplicationType RepType ReplicationType
} }
type ReplicationType byte
type ReplicationType string
const ( const (
Copy00 = ReplicationType(00) // single copy
Copy01 = ReplicationType(01) // 2 copies, each on different racks, same data center
Copy10 = ReplicationType(10) // 2 copies, each on different data center
Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center
Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center
LengthRelicationType = 5
Copy000 = ReplicationType("000") // single copy
Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
Copy100 = ReplicationType("100") // 2 copies, each on different data center
Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
LengthRelicationType = 6
CopyNil = ReplicationType(255) // nil value CopyNil = ReplicationType(255) // nil value
) )
func NewReplicationType(t string) (ReplicationType, error) {
func NewReplicationTypeFromString(t string) (ReplicationType, error) {
switch t { switch t {
case "00":
return Copy00, nil
case "01":
return Copy01, nil
case "10":
return Copy10, nil
case "11":
return Copy11, nil
case "20":
return Copy20, nil
case "000":
return Copy000, nil
case "001":
return Copy001, nil
case "010":
return Copy010, nil
case "100":
return Copy100, nil
case "110":
return Copy110, nil
case "200":
return Copy200, nil
} }
return Copy00, errors.New("Unknown Replication Type:"+t)
return Copy000, errors.New("Unknown Replication Type:"+t)
} }
func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
switch b {
case byte(000):
return Copy000, nil
case byte(001):
return Copy001, nil
case byte(010):
return Copy010, nil
case byte(100):
return Copy100, nil
case byte(110):
return Copy110, nil
case byte(200):
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:"+string(b))
}
func (r *ReplicationType) String() string { func (r *ReplicationType) String() string {
switch *r { switch *r {
case Copy00:
return "00"
case Copy01:
return "01"
case Copy10:
return "10"
case Copy11:
return "11"
case Copy20:
return "20"
case Copy000:
return "000"
case Copy001:
return "001"
case Copy010:
return "010"
case Copy100:
return "100"
case Copy110:
return "110"
case Copy200:
return "200"
} }
return "00"
return "000"
}
func (r *ReplicationType) Byte() byte {
switch *r {
case Copy000:
return byte(000)
case Copy001:
return byte(001)
case Copy010:
return byte(010)
case Copy100:
return byte(100)
case Copy110:
return byte(110)
case Copy200:
return byte(200)
}
return byte(000)
} }
func (repType ReplicationType)GetReplicationLevelIndex() int { func (repType ReplicationType)GetReplicationLevelIndex() int {
switch repType { switch repType {
case Copy00:
case Copy000:
return 0 return 0
case Copy01:
case Copy001:
return 1 return 1
case Copy10:
case Copy010:
return 2 return 2
case Copy11:
case Copy100:
return 3 return 3
case Copy20:
case Copy110:
return 4 return 4
case Copy200:
return 5
} }
return -1 return -1
} }
func (repType ReplicationType)GetCopyCount() int { func (repType ReplicationType)GetCopyCount() int {
switch repType { switch repType {
case Copy00:
case Copy000:
return 1 return 1
case Copy01:
case Copy001:
return 2
case Copy010:
return 2 return 2
case Copy10:
case Copy100:
return 2 return 2
case Copy11:
case Copy110:
return 3 return 3
case Copy20:
case Copy200:
return 3 return 3
} }
return 0 return 0

4
weed-fs/src/pkg/topology/node_list.go

@ -30,10 +30,10 @@ func (nl *NodeList) FreeSpace() int {
return freeSpace return freeSpace
} }
func (nl *NodeList) RandomlyPickN(n int) ([]Node, bool) {
func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
var list []Node var list []Node
for _, n := range nl.nodes { for _, n := range nl.nodes {
if n.FreeSpace() > 0 {
if n.FreeSpace() >= min {
list = append(list, n) list = append(list, n)
} }
} }

Loading…
Cancel
Save