Browse Source

Major:

change replication_type to ReplicaPlacement, hopefully cleaner code
works for 9 possible ReplicaPlacement
xyz
x : number of copies on other data centers
y : number of copies on other racks
z : number of copies on current rack
x y z each can be 0,1,2

Minor:
weed server "-mdir" default to "-dir" if empty
pull/2/head
Chris Lu 11 years ago
parent
commit
27c74a7e66
  1. 6
      go/operation/list_masters.go
  2. 4
      go/replication/allocate_volume.go
  3. 246
      go/replication/volume_growth.go
  4. 28
      go/replication/volume_growth_test.go
  5. 4
      go/storage/cdb_map.go
  6. 2
      go/storage/compact_map_perf_test.go
  7. 61
      go/storage/replica_placement.go
  8. 123
      go/storage/replication_type.go
  9. 18
      go/storage/store.go
  10. 18
      go/storage/volume.go
  11. 2
      go/storage/volume_info.go
  12. 14
      go/topology/collection.go
  13. 1
      go/topology/data_center.go
  14. 69
      go/topology/node.go
  15. 83
      go/topology/node_list.go
  16. 60
      go/topology/node_list_test.go
  17. 1
      go/topology/rack.go
  18. 14
      go/topology/topo_test.go
  19. 20
      go/topology/topology.go
  20. 6
      go/topology/topology_event_handling.go
  21. 16
      go/topology/volume_layout.go
  22. 2
      go/util/file_util.go
  23. 2
      go/weed/compact.go
  24. 2
      go/weed/download.go
  25. 4
      go/weed/export.go
  26. 26
      go/weed/master.go
  27. 39
      go/weed/server.go
  28. 2
      go/weed/version.go
  29. 30
      go/weed/weed_server/master_server.go
  30. 24
      go/weed/weed_server/master_server_handlers.go
  31. 2
      go/weed/weed_server/volume_server.go
  32. 37
      note/replication.txt

6
go/operation/list_masters.go

@ -23,5 +23,9 @@ func ListMasters(server string) ([]string, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return ret.Peers, nil
masters := ret.Peers
if ret.IsLeader {
masters = append(masters, ret.Leader)
}
return masters, nil
} }

4
go/replication/allocate_volume.go

@ -13,11 +13,11 @@ type AllocateVolumeResult struct {
Error string Error string
} }
func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, repType storage.ReplicationType) error {
func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
values := make(url.Values) values := make(url.Values)
values.Add("volume", vid.String()) values.Add("volume", vid.String())
values.Add("collection", collection) values.Add("collection", collection)
values.Add("replication", repType.String())
values.Add("replication", rp.String())
jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values) jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
if err != nil { if err != nil {
return err return err

246
go/replication/volume_growth.go

@ -5,7 +5,6 @@ import (
"code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/topology"
"errors" "errors"
"fmt"
"math/rand" "math/rand"
"sync" "sync"
) )
@ -19,188 +18,115 @@ This package is created to resolve these replica placement issues:
*/ */
type VolumeGrowth struct { type VolumeGrowth struct {
copy1factor int
copy2factor int
copy3factor int
copyAll int
accessLock sync.Mutex accessLock sync.Mutex
} }
func NewDefaultVolumeGrowth() *VolumeGrowth { func NewDefaultVolumeGrowth() *VolumeGrowth {
return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
return &VolumeGrowth{}
} }
func (vg *VolumeGrowth) AutomaticGrowByType(collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (count int, err error) {
factor := 1
switch repType {
case storage.Copy000:
factor = 1
count, err = vg.GrowByCountAndType(vg.copy1factor, collection, repType, dataCenter, topo)
case storage.Copy001:
factor = 2
count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
case storage.Copy010:
factor = 2
count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
case storage.Copy100:
factor = 2
count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
case storage.Copy110:
factor = 3
count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo)
case storage.Copy200:
factor = 3
count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo)
// one replication type may need rp.GetCopyCount() actual volumes
// given copyCount, how many logical volumes to create
func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
switch copyCount {
case 1:
count = 7
case 2:
count = 6
case 3:
count = 3
default: default:
err = errors.New("Unknown Replication Type!")
count = 1
} }
if count > 0 && count%factor == 0 {
return
}
func (vg *VolumeGrowth) AutomaticGrowByType(collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (count int, err error) {
count, err = vg.GrowByCountAndType(vg.findVolumeCount(rp.GetCopyCount()), collection, rp, preferredDataCenter, topo)
if count > 0 && count%rp.GetCopyCount() == 0 {
return count, nil return count, nil
} }
return count, err return count, err
} }
func (vg *VolumeGrowth) GrowByCountAndType(count int, collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) {
func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (counter int, err error) {
vg.accessLock.Lock() vg.accessLock.Lock()
defer vg.accessLock.Unlock() defer vg.accessLock.Unlock()
counter = 0
switch repType {
case storage.Copy000:
for i := 0; i < count; i++ {
if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
if err = vg.grow(topo, *vid, collection, repType, server); err == nil {
counter++
} else {
return counter, err
}
} else {
return counter, fmt.Errorf("Failed to grown volume for data center %s", dataCenter)
}
}
case storage.Copy001:
for i := 0; i < count; i++ {
//randomly pick one server from the datacenter, and then choose from the same rack
if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
rack := server1.Parent()
exclusion := make(map[string]topology.Node)
exclusion[server1.String()] = server1
newNodeList := topology.NewNodeList(rack.Children(), exclusion)
if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil {
counter++
}
}
}
}
}
case storage.Copy010:
for i := 0; i < count; i++ {
//randomly pick one server from the datacenter, and then choose from the a different rack
if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
rack := server1.Parent()
dc := rack.Parent()
exclusion := make(map[string]topology.Node)
exclusion[rack.String()] = rack
newNodeList := topology.NewNodeList(dc.Children(), exclusion)
if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil {
counter++
}
}
}
}
for i := 0; i < targetCount; i++ {
if c, e := vg.findAndGrow(topo, preferredDataCenter, collection, rp); e == nil {
counter += c
} else {
return counter, e
} }
case storage.Copy100:
for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(2, 1, dataCenter)
vid := topo.NextVolumeId()
if ret {
var servers []*topology.DataNode
for _, n := range picked {
if n.FreeSpace() > 0 {
if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok {
servers = append(servers, server)
}
}
}
if len(servers) == 2 {
if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
counter++
}
}
} else {
return counter, fmt.Errorf("Failed to grown volume on data center %s and another data center", dataCenter)
}
}
return
}
func (vg *VolumeGrowth) findAndGrow(topo *topology.Topology, preferredDataCenter string, collection string, rp *storage.ReplicaPlacement) (int, error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, preferredDataCenter, rp)
if e != nil {
return 0, e
}
vid := topo.NextVolumeId()
err := vg.grow(topo, vid, collection, rp, servers...)
return len(servers), err
}
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, preferredDataCenter string, rp *storage.ReplicaPlacement) (servers []*topology.DataNode, err error) {
//find main datacenter and other data centers
mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node topology.Node) bool {
if preferredDataCenter != "" && node.IsDataCenter() && node.Id() != topology.NodeId(preferredDataCenter) {
return false
} }
case storage.Copy110:
for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(2, 2, dataCenter)
vid := topo.NextVolumeId()
if ret {
var servers []*topology.DataNode
dc1, dc2 := picked[0], picked[1]
if dc2.FreeSpace() > dc1.FreeSpace() {
dc1, dc2 = dc2, dc1
}
if dc1.FreeSpace() > 0 {
if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid, ""); ok {
servers = append(servers, server1)
rack := server1.Parent()
exclusion := make(map[string]topology.Node)
exclusion[rack.String()] = rack
newNodeList := topology.NewNodeList(dc1.Children(), exclusion)
if newNodeList.FreeSpace() > 0 {
if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 {
servers = append(servers, server2)
}
}
}
}
if dc2.FreeSpace() > 0 {
if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid, ""); ok {
servers = append(servers, server)
}
}
if len(servers) == 3 {
if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
counter++
}
}
}
return node.FreeSpace() > rp.DiffRackCount+rp.SameRackCount+1
})
if dc_err != nil {
return nil, dc_err
}
//find main rack and other racks
mainRack, otherRacks, rack_err := mainDataCenter.(*topology.DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node topology.Node) bool {
return node.FreeSpace() > rp.SameRackCount+1
})
if rack_err != nil {
return nil, rack_err
}
//find main rack and other racks
mainServer, otherServers, server_err := mainRack.(*topology.Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node topology.Node) bool {
return node.FreeSpace() > 1
})
if server_err != nil {
return nil, server_err
}
servers = append(servers, mainServer.(*topology.DataNode))
for _, server := range otherServers {
servers = append(servers, server.(*topology.DataNode))
}
for _, rack := range otherRacks {
r := rand.Intn(rack.FreeSpace())
if server, e := rack.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
return servers, e
} }
case storage.Copy200:
for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(3, 1, dataCenter)
vid := topo.NextVolumeId()
if ret {
var servers []*topology.DataNode
for _, n := range picked {
if n.FreeSpace() > 0 {
if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok {
servers = append(servers, server)
}
}
}
if len(servers) == 3 {
if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
counter++
}
}
}
}
for _, datacenter := range otherDataCenters {
r := rand.Intn(datacenter.FreeSpace())
if server, e := datacenter.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
return servers, e
} }
} }
return return
} }
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error {
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement, servers ...*topology.DataNode) error {
for _, server := range servers { for _, server := range servers {
if err := AllocateVolume(server, vid, collection, repType); err == nil {
vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion}
if err := AllocateVolume(server, vid, collection, rp); err == nil {
vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, ReplicaPlacement: rp, Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server) topo.RegisterVolumeLayout(&vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server) glog.V(0).Infoln("Created Volume", vid, "on", server)

28
go/replication/volume_growth_test.go

@ -13,7 +13,7 @@ var topologyLayout = `
{ {
"dc1":{ "dc1":{
"rack1":{ "rack1":{
"server1":{
"server111":{
"volumes":[ "volumes":[
{"id":1, "size":12312}, {"id":1, "size":12312},
{"id":2, "size":12312}, {"id":2, "size":12312},
@ -21,7 +21,7 @@ var topologyLayout = `
], ],
"limit":3 "limit":3
}, },
"server2":{
"server112":{
"volumes":[ "volumes":[
{"id":4, "size":12312}, {"id":4, "size":12312},
{"id":5, "size":12312}, {"id":5, "size":12312},
@ -31,7 +31,7 @@ var topologyLayout = `
} }
}, },
"rack2":{ "rack2":{
"server1":{
"server121":{
"volumes":[ "volumes":[
{"id":4, "size":12312}, {"id":4, "size":12312},
{"id":5, "size":12312}, {"id":5, "size":12312},
@ -39,17 +39,17 @@ var topologyLayout = `
], ],
"limit":4 "limit":4
}, },
"server2":{
"server122":{
"volumes":[], "volumes":[],
"limit":4 "limit":4
}, },
"server3":{
"server123":{
"volumes":[ "volumes":[
{"id":2, "size":12312}, {"id":2, "size":12312},
{"id":3, "size":12312}, {"id":3, "size":12312},
{"id":4, "size":12312} {"id":4, "size":12312}
], ],
"limit":2
"limit":5
} }
} }
}, },
@ -57,7 +57,7 @@ var topologyLayout = `
}, },
"dc3":{ "dc3":{
"rack2":{ "rack2":{
"server1":{
"server321":{
"volumes":[ "volumes":[
{"id":1, "size":12312}, {"id":1, "size":12312},
{"id":3, "size":12312}, {"id":3, "size":12312},
@ -113,14 +113,16 @@ func setup(topologyLayout string) *topology.Topology {
return topo return topo
} }
func TestRemoveDataCenter(t *testing.T) {
func TestFindEmptySlotsForOneVolume(t *testing.T) {
topo := setup(topologyLayout) topo := setup(topologyLayout)
topo.UnlinkChildNode(topology.NodeId("dc2"))
if topo.GetActiveVolumeCount() != 15 {
vg := NewDefaultVolumeGrowth()
rp, _ := storage.NewReplicaPlacementFromString("002")
servers, err := vg.findEmptySlotsForOneVolume(topo, "dc1", rp)
if err != nil {
fmt.Println("finding empty slots error :", err)
t.Fail() t.Fail()
} }
topo.UnlinkChildNode(topology.NodeId("dc3"))
if topo.GetActiveVolumeCount() != 12 {
t.Fail()
for _, server := range servers {
fmt.Println("assigned node :", server.Id())
} }
} }

4
go/storage/cdb_map.go

@ -76,8 +76,8 @@ func (m cdbMap) FileCount() int {
func (m *cdbMap) DeletedCount() int { func (m *cdbMap) DeletedCount() int {
return m.DeletionCounter return m.DeletionCounter
} }
func (m *cdbMap) NextFileKey(count int) (uint64) {
return 0
func (m *cdbMap) NextFileKey(count int) uint64 {
return 0
} }
func getMetric(c *cdb.Cdb, m *mapMetric) error { func getMetric(c *cdb.Cdb, m *mapMetric) error {

2
go/storage/compact_map_perf_test.go

@ -3,8 +3,8 @@ package storage
import ( import (
"code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util" "code.google.com/p/weed-fs/go/util"
"os"
"log" "log"
"os"
"testing" "testing"
) )

61
go/storage/replica_placement.go

@ -0,0 +1,61 @@
package storage
import (
"errors"
"fmt"
)
const (
ReplicaPlacementCount = 9
)
type ReplicaPlacement struct {
SameRackCount int
DiffRackCount int
DiffDataCenterCount int
}
func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
rp := &ReplicaPlacement{}
for i, c := range t {
count := int(c - '0')
if 0 <= count && count <= 2 {
switch i {
case 0:
rp.DiffDataCenterCount = count
case 1:
rp.DiffRackCount = count
case 2:
rp.SameRackCount = count
}
} else {
return rp, errors.New("Unknown Replication Type:" + t)
}
}
return rp, nil
}
func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) {
return NewReplicaPlacementFromString(fmt.Sprintf("%d", b))
}
func (rp *ReplicaPlacement) Byte() byte {
ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount
return byte(ret)
}
func (rp *ReplicaPlacement) String() string {
b := make([]byte, 3)
b[0] = byte(rp.DiffDataCenterCount + '0')
b[1] = byte(rp.DiffRackCount + '0')
b[2] = byte(rp.SameRackCount + '0')
return string(b)
}
func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
}
func (rp *ReplicaPlacement) GetReplicationLevelIndex() int {
return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount
}

123
go/storage/replication_type.go

@ -1,123 +0,0 @@
package storage
import (
"errors"
)
type ReplicationType string
const (
Copy000 = ReplicationType("000") // single copy
Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
Copy100 = ReplicationType("100") // 2 copies, each on different data center
Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
LengthRelicationType = 6
CopyNil = ReplicationType(255) // nil value
)
func NewReplicationTypeFromString(t string) (ReplicationType, error) {
switch t {
case "000":
return Copy000, nil
case "001":
return Copy001, nil
case "010":
return Copy010, nil
case "100":
return Copy100, nil
case "110":
return Copy110, nil
case "200":
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:" + t)
}
func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
switch b {
case byte(000):
return Copy000, nil
case byte(001):
return Copy001, nil
case byte(010):
return Copy010, nil
case byte(100):
return Copy100, nil
case byte(110):
return Copy110, nil
case byte(200):
return Copy200, nil
}
return Copy000, errors.New("Unknown Replication Type:" + string(b))
}
func (r *ReplicationType) String() string {
switch *r {
case Copy000:
return "000"
case Copy001:
return "001"
case Copy010:
return "010"
case Copy100:
return "100"
case Copy110:
return "110"
case Copy200:
return "200"
}
return "000"
}
func (r *ReplicationType) Byte() byte {
switch *r {
case Copy000:
return byte(000)
case Copy001:
return byte(001)
case Copy010:
return byte(010)
case Copy100:
return byte(100)
case Copy110:
return byte(110)
case Copy200:
return byte(200)
}
return byte(000)
}
func (repType ReplicationType) GetReplicationLevelIndex() int {
switch repType {
case Copy000:
return 0
case Copy001:
return 1
case Copy010:
return 2
case Copy100:
return 3
case Copy110:
return 4
case Copy200:
return 5
}
return -1
}
func (repType ReplicationType) GetCopyCount() int {
switch repType {
case Copy000:
return 1
case Copy001:
return 2
case Copy010:
return 2
case Copy100:
return 2
case Copy110:
return 3
case Copy200:
return 3
}
return 0
}

18
go/storage/store.go

@ -79,8 +79,8 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
} }
return return
} }
func (s *Store) AddVolume(volumeListString string, collection string, replicationType string) error {
rt, e := NewReplicationTypeFromString(replicationType)
func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil { if e != nil {
return e return e
} }
@ -130,13 +130,13 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
} }
return ret return ret
} }
func (s *Store) addVolume(vid VolumeId, collection string, replicationType ReplicationType) error {
func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error {
if s.findVolume(vid) != nil { if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %s already exists!", vid) return fmt.Errorf("Volume Id %s already exists!", vid)
} }
if location := s.findFreeLocation(); location != nil { if location := s.findFreeLocation(); location != nil {
glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicationType =", replicationType)
if volume, err := NewVolume(location.directory, collection, vid, replicationType); err == nil {
glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement)
if volume, err := NewVolume(location.directory, collection, vid, replicaPlacement); err == nil {
location.volumes[vid] = volume location.volumes[vid] = volume
return nil return nil
} else { } else {
@ -206,9 +206,9 @@ func (l *DiskLocation) loadExistingVolumes() {
} }
if vid, err := NewVolumeId(base); err == nil { if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil { if l.volumes[vid] == nil {
if v, e := NewVolume(l.directory, collection, vid, CopyNil); e == nil {
if v, e := NewVolume(l.directory, collection, vid, nil); e == nil {
l.volumes[vid] = v l.volumes[vid] = v
glog.V(0).Infoln("data file", l.directory+"/"+name, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
glog.V(0).Infoln("data file", l.directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size())
} }
} }
} }
@ -223,7 +223,7 @@ func (s *Store) Status() []*VolumeInfo {
for k, v := range location.volumes { for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(), s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
Collection: v.Collection, Collection: v.Collection,
RepType: v.ReplicaType,
ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(), Version: v.Version(),
FileCount: v.nm.FileCount(), FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(), DeleteCount: v.nm.DeletedCount(),
@ -261,7 +261,7 @@ func (s *Store) Join() error {
for k, v := range location.volumes { for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
Collection: v.Collection, Collection: v.Collection,
RepType: v.ReplicaType,
ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(), Version: v.Version(),
FileCount: v.nm.FileCount(), FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(), DeleteCount: v.nm.DeletedCount(),

18
go/storage/volume.go

@ -17,14 +17,14 @@ const (
) )
type SuperBlock struct { type SuperBlock struct {
Version Version
ReplicaType ReplicationType
Version Version
ReplicaPlacement *ReplicaPlacement
} }
func (s *SuperBlock) Bytes() []byte { func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize) header := make([]byte, SuperBlockSize)
header[0] = byte(s.Version) header[0] = byte(s.Version)
header[1] = s.ReplicaType.Byte()
header[1] = s.ReplicaPlacement.Byte()
return header return header
} }
@ -41,15 +41,15 @@ type Volume struct {
accessLock sync.Mutex accessLock sync.Mutex
} }
func NewVolume(dirname string, collection string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id} v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaType: replicationType}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
e = v.load(true, true) e = v.load(true, true)
return return
} }
func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id} v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
v.SuperBlock = SuperBlock{}
e = v.load(false, false) e = v.load(false, false)
return return
} }
@ -90,7 +90,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
} }
} }
if v.ReplicaType == CopyNil {
if v.ReplicaPlacement == nil {
e = v.readSuperBlock() e = v.readSuperBlock()
} else { } else {
e = v.maybeWriteSuperBlock() e = v.maybeWriteSuperBlock()
@ -173,13 +173,13 @@ func (v *Volume) readSuperBlock() (err error) {
} }
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
superBlock.Version = Version(header[0]) superBlock.Version = Version(header[0])
if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error()) err = fmt.Errorf("cannot read replica type: %s", err.Error())
} }
return return
} }
func (v *Volume) NeedToReplicate() bool { func (v *Volume) NeedToReplicate() bool {
return v.ReplicaType.GetCopyCount() > 1
return v.ReplicaPlacement.GetCopyCount() > 1
} }
func (v *Volume) isFileUnchanged(n *Needle) bool { func (v *Volume) isFileUnchanged(n *Needle) bool {

2
go/storage/volume_info.go

@ -5,7 +5,7 @@ import ()
type VolumeInfo struct { type VolumeInfo struct {
Id VolumeId Id VolumeId
Size uint64 Size uint64
RepType ReplicationType
ReplicaPlacement *ReplicaPlacement
Collection string Collection string
Version Version Version Version
FileCount int FileCount int

14
go/topology/collection.go

@ -13,17 +13,17 @@ type Collection struct {
func NewCollection(name string, volumeSizeLimit uint64) *Collection { func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount)
return c return c
} }
func (c *Collection) GetOrCreateVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
replicationTypeIndex := repType.GetReplicationLevelIndex()
if c.replicaType2VolumeLayout[replicationTypeIndex] == nil {
glog.V(0).Infoln("collection", c.Name, "adding replication type", repType)
c.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, c.volumeSizeLimit)
func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout {
replicaPlacementIndex := rp.GetReplicationLevelIndex()
if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil {
glog.V(0).Infoln("collection", c.Name, "adding replication type", rp)
c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit)
} }
return c.replicaType2VolumeLayout[replicationTypeIndex]
return c.replicaType2VolumeLayout[replicaPlacementIndex]
} }
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {

1
go/topology/data_center.go

@ -29,6 +29,7 @@ func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
func (dc *DataCenter) ToMap() interface{} { func (dc *DataCenter) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Id"] = dc.Id()
m["Max"] = dc.GetMaxVolumeCount() m["Max"] = dc.GetMaxVolumeCount()
m["Free"] = dc.FreeSpace() m["Free"] = dc.FreeSpace()
var racks []interface{} var racks []interface{}

69
go/topology/node.go

@ -3,6 +3,8 @@ package topology
import ( import (
"code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/storage"
"errors"
"math/rand"
) )
type NodeId string type NodeId string
@ -10,7 +12,7 @@ type Node interface {
Id() NodeId Id() NodeId
String() string String() string
FreeSpace() int FreeSpace() int
ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode)
ReserveOneVolume(r int) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
UpAdjustVolumeCountDelta(volumeCountDelta int) UpAdjustVolumeCountDelta(volumeCountDelta int)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
@ -47,6 +49,54 @@ type NodeImpl struct {
value interface{} value interface{}
} }
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) bool) (firstNode Node, restNodes []Node, err error) {
candidates := make([]Node, 0, len(n.children))
for _, node := range n.children {
if filterFirstNodeFn(node) {
candidates = append(candidates, node)
}
}
if len(candidates) == 0 {
return nil, nil, errors.New("No matching data node found!")
}
firstNode = candidates[rand.Intn(len(candidates))]
glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
restNodes = make([]Node, numberOfNodes-1)
candidates = candidates[:0]
for _, node := range n.children {
if node.Id() == firstNode.Id() {
continue
}
if node.FreeSpace() <= 0 {
continue
}
glog.V(2).Infoln("select rest node candidate:", node.Id())
candidates = append(candidates, node)
}
glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
ret := len(restNodes) == 0
for k, node := range candidates {
if k < len(restNodes) {
restNodes[k] = node
if k == len(restNodes)-1 {
ret = true
}
} else {
r := rand.Intn(k + 1)
if r < len(restNodes) {
restNodes[r] = node
}
}
}
if !ret {
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
err = errors.New("Not enough data node found!")
}
return
}
func (n *NodeImpl) IsDataNode() bool { func (n *NodeImpl) IsDataNode() bool {
return n.nodeType == "DataNode" return n.nodeType == "DataNode"
} }
@ -80,32 +130,27 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} { func (n *NodeImpl) GetValue() interface{} {
return n.value return n.value
} }
func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) {
ret := false
var assignedNode *DataNode
func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
for _, node := range n.children { for _, node := range n.children {
freeSpace := node.FreeSpace() freeSpace := node.FreeSpace()
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 { if freeSpace <= 0 {
continue continue
} }
if dataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(dataCenter) {
continue
}
if r >= freeSpace { if r >= freeSpace {
r -= freeSpace r -= freeSpace
} else { } else {
if node.IsDataNode() && node.FreeSpace() > 0 { if node.IsDataNode() && node.FreeSpace() > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
return true, node.(*DataNode)
return node.(*DataNode), nil
} }
ret, assignedNode = node.ReserveOneVolume(r, vid, dataCenter)
if ret {
break
assignedNode, err = node.ReserveOneVolume(r)
if err != nil {
return
} }
} }
} }
return ret, assignedNode
return
} }
func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative

83
go/topology/node_list.go

@ -1,83 +0,0 @@
package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
"math/rand"
)
type NodeList struct {
nodes map[NodeId]Node
except map[string]Node
}
func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList {
m := make(map[NodeId]Node, len(nodes)-len(except))
for _, n := range nodes {
if except[n.String()] == nil {
m[n.Id()] = n
}
}
nl := &NodeList{nodes: m}
return nl
}
func (nl *NodeList) FreeSpace() int {
freeSpace := 0
for _, n := range nl.nodes {
freeSpace += n.FreeSpace()
}
return freeSpace
}
func (nl *NodeList) RandomlyPickN(count int, minSpace int, firstNodeName string) ([]Node, bool) {
var list []Node
var preferredNode *Node
if firstNodeName != "" {
for _, n := range nl.nodes {
if n.Id() == NodeId(firstNodeName) && n.FreeSpace() >= minSpace {
preferredNode = &n
break
}
}
if preferredNode == nil {
return list, false
}
}
for _, n := range nl.nodes {
if n.FreeSpace() >= minSpace && n.Id() != NodeId(firstNodeName) {
list = append(list, n)
}
}
if count > len(list) || count == len(list) && firstNodeName != "" {
return nil, false
}
for i := len(list); i > 0; i-- {
r := rand.Intn(i)
list[r], list[i-1] = list[i-1], list[r]
}
if firstNodeName != "" {
list[0] = *preferredNode
}
return list[:count], true
}
func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) {
for _, node := range nl.nodes {
freeSpace := node.FreeSpace()
if randomVolumeIndex >= freeSpace {
randomVolumeIndex -= freeSpace
} else {
if node.IsDataNode() && node.FreeSpace() > 0 {
glog.V(0).Infoln("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
return true, node.(*DataNode)
}
children := node.Children()
newNodeList := NewNodeList(children, nl.except)
return newNodeList.ReserveOneVolume(randomVolumeIndex, vid)
}
}
return false, nil
}

60
go/topology/node_list_test.go

@ -1,60 +0,0 @@
package topology
import (
"code.google.com/p/weed-fs/go/sequence"
_ "fmt"
"strconv"
"testing"
)
func TestXYZ(t *testing.T) {
topo, err := NewTopology("topo", "/etc/weed.conf", sequence.NewMemorySequencer(), 234, 5)
if err != nil {
t.Error("cannot create new topology:", err)
t.FailNow()
}
for i := 0; i < 5; i++ {
dc := NewDataCenter("dc" + strconv.Itoa(i))
dc.activeVolumeCount = i
dc.maxVolumeCount = 5
topo.LinkChildNode(dc)
}
nl := NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(1, 0, "")
if !ret || len(picked) != 1 {
t.Error("need to randomly pick 1 node")
}
picked, ret = nl.RandomlyPickN(1, 0, "dc1")
if !ret || len(picked) != 1 {
t.Error("need to randomly pick 1 node")
}
if picked[0].Id() != "dc1" {
t.Error("need to randomly pick 1 dc1 node")
}
picked, ret = nl.RandomlyPickN(2, 0, "dc1")
if !ret || len(picked) != 2 {
t.Error("need to randomly pick 1 node")
}
if picked[0].Id() != "dc1" {
t.Error("need to randomly pick 2 with one dc1 node")
}
picked, ret = nl.RandomlyPickN(4, 0, "")
if !ret || len(picked) != 4 {
t.Error("need to randomly pick 4 nodes")
}
picked, ret = nl.RandomlyPickN(5, 0, "")
if !ret || len(picked) != 5 {
t.Error("need to randomly pick 5 nodes")
}
picked, ret = nl.RandomlyPickN(6, 0, "")
if ret || len(picked) != 0 {
t.Error("can not randomly pick 6 nodes:", ret, picked)
}
}

1
go/topology/rack.go

@ -52,6 +52,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
func (rack *Rack) ToMap() interface{} { func (rack *Rack) ToMap() interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["Id"] = rack.Id()
m["Max"] = rack.GetMaxVolumeCount() m["Max"] = rack.GetMaxVolumeCount()
m["Free"] = rack.FreeSpace() m["Free"] = rack.FreeSpace()
var dns []interface{} var dns []interface{}

14
go/topology/topo_test.go

@ -5,9 +5,7 @@ import (
"code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/storage"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/rand"
"testing" "testing"
"time"
) )
var topologyLayout = ` var topologyLayout = `
@ -124,15 +122,3 @@ func TestRemoveDataCenter(t *testing.T) {
t.Fail() t.Fail()
} }
} }
func TestReserveOneVolume(t *testing.T) {
topo := setup(topologyLayout)
rand.Seed(time.Now().UnixNano())
rand.Seed(1)
ret, node, vid := topo.RandomlyReserveOneVolume("dc1")
if node.Parent().Parent().Id() != NodeId("dc1") {
t.Fail()
}
fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
}

20
go/topology/topology.go

@ -77,23 +77,13 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
return nil return nil
} }
func (t *Topology) RandomlyReserveOneVolume(dataCenter string) (bool, *DataNode, *storage.VolumeId) {
if t.FreeSpace() <= 0 {
glog.V(0).Infoln("Topology does not have free space left!")
return false, nil, nil
}
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid, dataCenter)
return ret, node, &vid
}
func (t *Topology) NextVolumeId() storage.VolumeId { func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId() vid := t.GetMaxVolumeId()
return vid.Next() return vid.Next()
} }
func (t *Topology) PickForWrite(collectionName string, repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(collectionName, repType).PickForWrite(count, dataCenter)
func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(collectionName, rp).PickForWrite(count, dataCenter)
if err != nil || datanodes.Length() == 0 { if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!") return "", 0, nil, errors.New("No writable volumes avalable!")
} }
@ -101,16 +91,16 @@ func (t *Topology) PickForWrite(collectionName string, repType storage.Replicati
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
} }
func (t *Topology) GetVolumeLayout(collectionName string, repType storage.ReplicationType) *VolumeLayout {
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout {
_, ok := t.collectionMap[collectionName] _, ok := t.collectionMap[collectionName]
if !ok { if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
} }
return t.collectionMap[collectionName].GetOrCreateVolumeLayout(repType)
return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
} }
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.RepType).RegisterVolume(v, dn)
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn)
} }
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {

6
go/topology/topology_event_handling.go

@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}() }()
} }
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.RepType)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) { if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false return false
} }
@ -53,7 +53,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) { func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes { for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
vl := t.GetVolumeLayout(v.Collection, v.RepType)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
vl.SetVolumeUnavailable(dn, v.Id) vl.SetVolumeUnavailable(dn, v.Id)
} }
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@ -63,7 +63,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
} }
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes { for _, v := range dn.volumes {
vl := t.GetVolumeLayout(v.Collection, v.RepType)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
if vl.isWritable(&v) { if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id) vl.SetVolumeAvailable(dn, v.Id)
} }

16
go/topology/volume_layout.go

@ -9,16 +9,16 @@ import (
) )
type VolumeLayout struct { type VolumeLayout struct {
repType storage.ReplicationType
rp *storage.ReplicaPlacement
vid2location map[storage.VolumeId]*VolumeLocationList vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64 volumeSizeLimit uint64
accessLock sync.Mutex accessLock sync.Mutex
} }
func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64) *VolumeLayout {
func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{ return &VolumeLayout{
repType: repType,
rp: rp,
vid2location: make(map[storage.VolumeId]*VolumeLocationList), vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId), writables: *new([]storage.VolumeId),
volumeSizeLimit: volumeSizeLimit, volumeSizeLimit: volumeSizeLimit,
@ -33,7 +33,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id] = NewVolumeLocationList() vl.vid2location[v.Id] = NewVolumeLocationList()
} }
if vl.vid2location[v.Id].Add(dn) { if vl.vid2location[v.Id].Add(dn) {
if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
if len(vl.vid2location[v.Id].list) == v.ReplicaPlacement.GetCopyCount() {
if vl.isWritable(v) { if vl.isWritable(v) {
vl.writables = append(vl.writables, v.Id) vl.writables = append(vl.writables, v.Id)
} else { } else {
@ -135,8 +135,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
defer vl.accessLock.Unlock() defer vl.accessLock.Unlock()
if vl.vid2location[vid].Remove(dn) { if vl.vid2location[vid].Remove(dn) {
if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount())
if vl.vid2location[vid].Length() < vl.rp.GetCopyCount() {
glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.rp.GetCopyCount())
return vl.removeFromWritable(vid) return vl.removeFromWritable(vid)
} }
} }
@ -147,7 +147,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b
defer vl.accessLock.Unlock() defer vl.accessLock.Unlock()
if vl.vid2location[vid].Add(dn) { if vl.vid2location[vid].Add(dn) {
if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
return vl.setVolumeWritable(vid) return vl.setVolumeWritable(vid)
} }
} }
@ -164,7 +164,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
func (vl *VolumeLayout) ToMap() map[string]interface{} { func (vl *VolumeLayout) ToMap() map[string]interface{} {
m := make(map[string]interface{}) m := make(map[string]interface{})
m["replication"] = vl.repType.String()
m["replication"] = vl.rp.String()
m["writables"] = vl.writables m["writables"] = vl.writables
//m["locations"] = vl.vid2location //m["locations"] = vl.vid2location
return m return m

2
go/util/file_util.go

@ -1,7 +1,7 @@
package util package util
import ( import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/glog"
"errors" "errors"
"os" "os"
) )

2
go/weed/compact.go

@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
} }
vid := storage.VolumeId(*compactVolumeId) vid := storage.VolumeId(*compactVolumeId)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, storage.CopyNil)
v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil)
if err != nil { if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err) glog.Fatalf("Load Volume [ERROR] %s\n", err)
} }

2
go/weed/download.go

@ -49,7 +49,7 @@ func runDownload(cmd *Command, args []string) bool {
filename = fid filename = fid
} }
if strings.HasSuffix(filename, "-list") { if strings.HasSuffix(filename, "-list") {
filename = filename[0:len(filename)-len("-list")]
filename = filename[0 : len(filename)-len("-list")]
fids := strings.Split(string(content), "\n") fids := strings.Split(string(content), "\n")
f, err := os.OpenFile(path.Join(*downloadDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) f, err := os.OpenFile(path.Join(*downloadDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil { if err != nil {

4
go/weed/export.go

@ -82,8 +82,8 @@ func runExport(cmd *Command, args []string) bool {
} }
fileName := strconv.Itoa(*exportVolumeId) fileName := strconv.Itoa(*exportVolumeId)
if *exportCollection!=""{
fileName = *exportCollection + "_" + fileName
if *exportCollection != "" {
fileName = *exportCollection + "_" + fileName
} }
vid := storage.VolumeId(*exportVolumeId) vid := storage.VolumeId(*exportVolumeId)
indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644) indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644)

26
go/weed/master.go

@ -27,18 +27,18 @@ var cmdMaster = &Command{
} }
var ( var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
masterIp = cmdMaster.Flag.String("ip", "", "master ip address")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
masterIp = cmdMaster.Flag.String("ip", "", "master ip address")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
masterWhiteList []string masterWhiteList []string
) )
@ -57,7 +57,7 @@ func runMaster(cmd *Command, args []string) bool {
r := mux.NewRouter() r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder, ms := weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder,
*volumeSizeLimitMB, *mpulse, *confFile, *defaultRepType, *garbageThreshold, masterWhiteList,
*volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList,
) )
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport)) glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport))

39
go/weed/server.go

@ -35,23 +35,23 @@ var cmdServer = &Command{
} }
var ( var (
serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
serverReadTimeout = cmdServer.Flag.Int("readTimeout", 3, "connection read timeout in seconds. Increase this if uploading large files.")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
serverPeers = cmdServer.Flag.String("peers", "", "other master nodes in comma separated ip:masterPort list")
masterPort = cmdServer.Flag.Int("masterPort", 9333, "master server http listen port")
masterMetaFolder = cmdServer.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterVolumeSizeLimitMB = cmdServer.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
masterConfFile = cmdServer.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
masterDefaultRepType = cmdServer.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
volumePort = cmdServer.Flag.Int("port", 8080, "volume server http listen port")
volumePublicUrl = cmdServer.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
serverReadTimeout = cmdServer.Flag.Int("readTimeout", 3, "connection read timeout in seconds. Increase this if uploading large files.")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
serverPeers = cmdServer.Flag.String("peers", "", "other master nodes in comma separated ip:masterPort list")
masterPort = cmdServer.Flag.Int("masterPort", 9333, "master server http listen port")
masterMetaFolder = cmdServer.Flag.String("mdir", "", "data directory to store meta data, default to same as -dir specified")
masterVolumeSizeLimitMB = cmdServer.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
masterConfFile = cmdServer.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
masterDefaultReplicaPlacement = cmdServer.Flag.String("defaultReplicaPlacement", "000", "Default replication type if not specified.")
volumePort = cmdServer.Flag.Int("port", 8080, "volume server http listen port")
volumePublicUrl = cmdServer.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
serverWhiteList []string serverWhiteList []string
) )
@ -62,6 +62,9 @@ func runServer(cmd *Command, args []string) bool {
} }
runtime.GOMAXPROCS(*serverMaxCpu) runtime.GOMAXPROCS(*serverMaxCpu)
if *masterMetaFolder == "" {
*masterMetaFolder = *volumeDataFolders
}
if err := util.TestFolderWritable(*masterMetaFolder); err != nil { if err := util.TestFolderWritable(*masterMetaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *masterMetaFolder, err) glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *masterMetaFolder, err)
} }
@ -95,7 +98,7 @@ func runServer(cmd *Command, args []string) bool {
go func() { go func() {
r := mux.NewRouter() r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, VERSION, *masterPort, *masterMetaFolder, ms := weed_server.NewMasterServer(r, VERSION, *masterPort, *masterMetaFolder,
*masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultRepType, *garbageThreshold, serverWhiteList,
*masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *garbageThreshold, serverWhiteList,
) )
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *serverIp+":"+strconv.Itoa(*masterPort)) glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *serverIp+":"+strconv.Itoa(*masterPort))

2
go/weed/version.go

@ -6,7 +6,7 @@ import (
) )
const ( const (
VERSION = "0.46"
VERSION = "0.47"
) )
var cmdVersion = &Command{ var cmdVersion = &Command{

30
go/weed/weed_server/master_server.go

@ -15,14 +15,14 @@ import (
) )
type MasterServer struct { type MasterServer struct {
port int
metaFolder string
volumeSizeLimitMB uint
pulseSeconds int
defaultRepType string
garbageThreshold string
whiteList []string
version string
port int
metaFolder string
volumeSizeLimitMB uint
pulseSeconds int
defaultReplicaPlacement string
garbageThreshold string
whiteList []string
version string
topo *topology.Topology topo *topology.Topology
vg *replication.VolumeGrowth vg *replication.VolumeGrowth
@ -35,17 +35,17 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
volumeSizeLimitMB uint, volumeSizeLimitMB uint,
pulseSeconds int, pulseSeconds int,
confFile string, confFile string,
defaultRepType string,
defaultReplicaPlacement string,
garbageThreshold string, garbageThreshold string,
whiteList []string, whiteList []string,
) *MasterServer { ) *MasterServer {
ms := &MasterServer{ ms := &MasterServer{
version: version,
volumeSizeLimitMB: volumeSizeLimitMB,
pulseSeconds: pulseSeconds,
defaultRepType: defaultRepType,
garbageThreshold: garbageThreshold,
whiteList: whiteList,
version: version,
volumeSizeLimitMB: volumeSizeLimitMB,
pulseSeconds: pulseSeconds,
defaultReplicaPlacement: defaultReplicaPlacement,
garbageThreshold: garbageThreshold,
whiteList: whiteList,
} }
seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq")) seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq"))
var e error var e error

24
go/weed/weed_server/master_server_handlers.go

@ -40,20 +40,20 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
if e != nil { if e != nil {
c = 1 c = 1
} }
repType := r.FormValue("replication")
if repType == "" {
repType = ms.defaultRepType
replication := r.FormValue("replication")
if replication == "" {
replication = ms.defaultReplicaPlacement
} }
collection := r.FormValue("collection") collection := r.FormValue("collection")
dataCenter := r.FormValue("dataCenter") dataCenter := r.FormValue("dataCenter")
rt, err := storage.NewReplicationTypeFromString(repType)
replicaPlacement, err := storage.NewReplicaPlacementFromString(replication)
if err != nil { if err != nil {
w.WriteHeader(http.StatusNotAcceptable) w.WriteHeader(http.StatusNotAcceptable)
writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
return return
} }
if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
if ms.topo.FreeSpace() <= 0 { if ms.topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
@ -61,15 +61,15 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
} else { } else {
ms.vgLock.Lock() ms.vgLock.Lock()
defer ms.vgLock.Unlock() defer ms.vgLock.Unlock()
if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
if _, err = ms.vg.AutomaticGrowByType(collection, rt, dataCenter, ms.topo); err != nil {
if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.topo); err != nil {
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
return return
} }
} }
} }
} }
fid, count, dn, err := ms.topo.PickForWrite(collection, rt, c, dataCenter)
fid, count, dn, err := ms.topo.PickForWrite(collection, replicaPlacement, c, dataCenter)
if err == nil { if err == nil {
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else { } else {
@ -119,13 +119,13 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
count := 0 count := 0
rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
replicaPlacement, err := storage.NewReplicaPlacementFromString(r.FormValue("replication"))
if err == nil { if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil { if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
if ms.topo.FreeSpace() < count*rt.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
if ms.topo.FreeSpace() < count*replicaPlacement.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount()))
} else { } else {
count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), ms.topo)
count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCneter"), ms.topo)
} }
} else { } else {
err = errors.New("parameter count is not found") err = errors.New("parameter count is not found")

2
go/weed/weed_server/volume_server.go

@ -61,7 +61,7 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ
if connected { if connected {
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond) time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond)
} else { } else {
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)* 0.25) * time.Millisecond)
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*0.25) * time.Millisecond)
} }
} }
}() }()

37
note/replication.txt

@ -59,11 +59,6 @@ If any "assign" request comes in
3. return a writable volume to the user 3. return a writable volume to the user
Plan:
Step 1. implement one copy(no replication), automatically assign volume ids
Step 2. add replication
For the above operations, here are the todo list:
for data node: for data node:
0. detect existing volumes DONE 0. detect existing volumes DONE
1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE 1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE
@ -77,10 +72,38 @@ For the above operations, here are the todo list:
1. accept data node's report of existing volumes and maxVolumeCount ALREADY EXISTS /dir/join 1. accept data node's report of existing volumes and maxVolumeCount ALREADY EXISTS /dir/join
2. periodically refresh for active data nodes, and adjust writable volumes 2. periodically refresh for active data nodes, and adjust writable volumes
3. send command to grow a volume(id + replication level) DONE 3. send command to grow a volume(id + replication level) DONE
4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info
to other data nodes. BECAUSE the master will stop sending writes to these data nodes
5. accept lookup for volume locations ALREADY EXISTS /dir/lookup 5. accept lookup for volume locations ALREADY EXISTS /dir/lookup
6. read topology/datacenter/rack layout 6. read topology/datacenter/rack layout
An algorithm to allocate volumes evenly, but may be inefficient if free volumes are plenty:
input: replication=xyz
algorithm:
ret_dcs = []
foreach dc that has y+z+1 volumes{
ret_racks = []
foreach rack with z+1 volumes{
ret = select z+1 servers with 1 volume
if ret.size()==z+1 {
ret_racks.append(ret)
}
}
randomly pick one rack from ret_racks
ret += select y racks with 1 volume each
if ret.size()==y+z+1{
ret_dcs.append(ret)
}
}
randomly pick one dc from ret_dcs
ret += select x data centers with 1 volume each
A simple replica placement algorithm, but may fail when free volume slots are not plenty:
ret := []volumes
dc = randomly pick 1 data center with y+z+1 volumes
rack = randomly pick 1 rack with z+1 volumes
ret = ret.append(randomly pick z+1 volumes)
ret = ret.append(randomly pick y racks with 1 volume)
ret = ret.append(randomly pick x data centers with 1 volume)
TODO: TODO:
1. replicate content to the other server if the replication type needs replicas 1. replicate content to the other server if the replication type needs replicas
Loading…
Cancel
Save