From 27c74a7e66558a4f9ce0d10621606dfed98a3abb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 2 Mar 2014 22:16:54 -0800 Subject: [PATCH] Major: change replication_type to ReplicaPlacement, hopefully cleaner code works for 9 possible ReplicaPlacement xyz x : number of copies on other data centers y : number of copies on other racks z : number of copies on current rack x y z each can be 0,1,2 Minor: weed server "-mdir" default to "-dir" if empty --- go/operation/list_masters.go | 6 +- go/replication/allocate_volume.go | 4 +- go/replication/volume_growth.go | 246 ++++++------------ go/replication/volume_growth_test.go | 28 +- go/storage/cdb_map.go | 4 +- go/storage/compact_map_perf_test.go | 2 +- go/storage/replica_placement.go | 61 +++++ go/storage/replication_type.go | 123 --------- go/storage/store.go | 18 +- go/storage/volume.go | 18 +- go/storage/volume_info.go | 2 +- go/topology/collection.go | 14 +- go/topology/data_center.go | 1 + go/topology/node.go | 69 ++++- go/topology/node_list.go | 83 ------ go/topology/node_list_test.go | 60 ----- go/topology/rack.go | 1 + go/topology/topo_test.go | 14 - go/topology/topology.go | 20 +- go/topology/topology_event_handling.go | 6 +- go/topology/volume_layout.go | 16 +- go/util/file_util.go | 2 +- go/weed/compact.go | 2 +- go/weed/download.go | 2 +- go/weed/export.go | 4 +- go/weed/master.go | 26 +- go/weed/server.go | 39 +-- go/weed/version.go | 2 +- go/weed/weed_server/master_server.go | 30 +-- go/weed/weed_server/master_server_handlers.go | 24 +- go/weed/weed_server/volume_server.go | 2 +- note/replication.txt | 37 ++- 32 files changed, 371 insertions(+), 595 deletions(-) create mode 100644 go/storage/replica_placement.go delete mode 100644 go/storage/replication_type.go delete mode 100644 go/topology/node_list.go delete mode 100644 go/topology/node_list_test.go diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go index 05235aed0..ade975c71 100644 --- a/go/operation/list_masters.go +++ b/go/operation/list_masters.go @@ -23,5 +23,9 @@ func ListMasters(server string) ([]string, error) { if err != nil { return nil, err } - return ret.Peers, nil + masters := ret.Peers + if ret.IsLeader { + masters = append(masters, ret.Leader) + } + return masters, nil } diff --git a/go/replication/allocate_volume.go b/go/replication/allocate_volume.go index 0f5ebc00f..fb40c6353 100644 --- a/go/replication/allocate_volume.go +++ b/go/replication/allocate_volume.go @@ -13,11 +13,11 @@ type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, repType storage.ReplicationType) error { +func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error { values := make(url.Values) values.Add("volume", vid.String()) values.Add("collection", collection) - values.Add("replication", repType.String()) + values.Add("replication", rp.String()) jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values) if err != nil { return err diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go index d7d1c90bd..8466b149f 100644 --- a/go/replication/volume_growth.go +++ b/go/replication/volume_growth.go @@ -5,7 +5,6 @@ import ( "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/topology" "errors" - "fmt" "math/rand" "sync" ) @@ -19,188 +18,115 @@ This package is created to resolve these replica placement issues: */ type VolumeGrowth struct { - copy1factor int - copy2factor int - copy3factor int - copyAll int - accessLock sync.Mutex } func NewDefaultVolumeGrowth() *VolumeGrowth { - return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3} + return &VolumeGrowth{} } -func (vg *VolumeGrowth) AutomaticGrowByType(collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (count int, err error) { - factor := 1 - switch repType { - case storage.Copy000: - factor = 1 - count, err = vg.GrowByCountAndType(vg.copy1factor, collection, repType, dataCenter, topo) - case storage.Copy001: - factor = 2 - count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo) - case storage.Copy010: - factor = 2 - count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo) - case storage.Copy100: - factor = 2 - count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo) - case storage.Copy110: - factor = 3 - count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo) - case storage.Copy200: - factor = 3 - count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo) +// one replication type may need rp.GetCopyCount() actual volumes +// given copyCount, how many logical volumes to create +func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { + switch copyCount { + case 1: + count = 7 + case 2: + count = 6 + case 3: + count = 3 default: - err = errors.New("Unknown Replication Type!") + count = 1 } - if count > 0 && count%factor == 0 { + return +} + +func (vg *VolumeGrowth) AutomaticGrowByType(collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (count int, err error) { + count, err = vg.GrowByCountAndType(vg.findVolumeCount(rp.GetCopyCount()), collection, rp, preferredDataCenter, topo) + if count > 0 && count%rp.GetCopyCount() == 0 { return count, nil } return count, err } -func (vg *VolumeGrowth) GrowByCountAndType(count int, collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) { +func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (counter int, err error) { vg.accessLock.Lock() defer vg.accessLock.Unlock() - counter = 0 - switch repType { - case storage.Copy000: - for i := 0; i < count; i++ { - if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok { - if err = vg.grow(topo, *vid, collection, repType, server); err == nil { - counter++ - } else { - return counter, err - } - } else { - return counter, fmt.Errorf("Failed to grown volume for data center %s", dataCenter) - } - } - case storage.Copy001: - for i := 0; i < count; i++ { - //randomly pick one server from the datacenter, and then choose from the same rack - if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok { - rack := server1.Parent() - exclusion := make(map[string]topology.Node) - exclusion[server1.String()] = server1 - newNodeList := topology.NewNodeList(rack.Children(), exclusion) - if newNodeList.FreeSpace() > 0 { - if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { - if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil { - counter++ - } - } - } - } - } - case storage.Copy010: - for i := 0; i < count; i++ { - //randomly pick one server from the datacenter, and then choose from the a different rack - if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok { - rack := server1.Parent() - dc := rack.Parent() - exclusion := make(map[string]topology.Node) - exclusion[rack.String()] = rack - newNodeList := topology.NewNodeList(dc.Children(), exclusion) - if newNodeList.FreeSpace() > 0 { - if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { - if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil { - counter++ - } - } - } - } + for i := 0; i < targetCount; i++ { + if c, e := vg.findAndGrow(topo, preferredDataCenter, collection, rp); e == nil { + counter += c + } else { + return counter, e } - case storage.Copy100: - for i := 0; i < count; i++ { - nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(2, 1, dataCenter) - vid := topo.NextVolumeId() - if ret { - var servers []*topology.DataNode - for _, n := range picked { - if n.FreeSpace() > 0 { - if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok { - servers = append(servers, server) - } - } - } - if len(servers) == 2 { - if err = vg.grow(topo, vid, collection, repType, servers...); err == nil { - counter++ - } - } - } else { - return counter, fmt.Errorf("Failed to grown volume on data center %s and another data center", dataCenter) - } + } + return +} + +func (vg *VolumeGrowth) findAndGrow(topo *topology.Topology, preferredDataCenter string, collection string, rp *storage.ReplicaPlacement) (int, error) { + servers, e := vg.findEmptySlotsForOneVolume(topo, preferredDataCenter, rp) + if e != nil { + return 0, e + } + vid := topo.NextVolumeId() + err := vg.grow(topo, vid, collection, rp, servers...) + return len(servers), err +} + +func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, preferredDataCenter string, rp *storage.ReplicaPlacement) (servers []*topology.DataNode, err error) { + //find main datacenter and other data centers + mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node topology.Node) bool { + if preferredDataCenter != "" && node.IsDataCenter() && node.Id() != topology.NodeId(preferredDataCenter) { + return false } - case storage.Copy110: - for i := 0; i < count; i++ { - nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(2, 2, dataCenter) - vid := topo.NextVolumeId() - if ret { - var servers []*topology.DataNode - dc1, dc2 := picked[0], picked[1] - if dc2.FreeSpace() > dc1.FreeSpace() { - dc1, dc2 = dc2, dc1 - } - if dc1.FreeSpace() > 0 { - if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid, ""); ok { - servers = append(servers, server1) - rack := server1.Parent() - exclusion := make(map[string]topology.Node) - exclusion[rack.String()] = rack - newNodeList := topology.NewNodeList(dc1.Children(), exclusion) - if newNodeList.FreeSpace() > 0 { - if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 { - servers = append(servers, server2) - } - } - } - } - if dc2.FreeSpace() > 0 { - if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid, ""); ok { - servers = append(servers, server) - } - } - if len(servers) == 3 { - if err = vg.grow(topo, vid, collection, repType, servers...); err == nil { - counter++ - } - } - } + return node.FreeSpace() > rp.DiffRackCount+rp.SameRackCount+1 + }) + if dc_err != nil { + return nil, dc_err + } + + //find main rack and other racks + mainRack, otherRacks, rack_err := mainDataCenter.(*topology.DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node topology.Node) bool { + return node.FreeSpace() > rp.SameRackCount+1 + }) + if rack_err != nil { + return nil, rack_err + } + + //find main rack and other racks + mainServer, otherServers, server_err := mainRack.(*topology.Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node topology.Node) bool { + return node.FreeSpace() > 1 + }) + if server_err != nil { + return nil, server_err + } + + servers = append(servers, mainServer.(*topology.DataNode)) + for _, server := range otherServers { + servers = append(servers, server.(*topology.DataNode)) + } + for _, rack := range otherRacks { + r := rand.Intn(rack.FreeSpace()) + if server, e := rack.ReserveOneVolume(r); e == nil { + servers = append(servers, server) + } else { + return servers, e } - case storage.Copy200: - for i := 0; i < count; i++ { - nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(3, 1, dataCenter) - vid := topo.NextVolumeId() - if ret { - var servers []*topology.DataNode - for _, n := range picked { - if n.FreeSpace() > 0 { - if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok { - servers = append(servers, server) - } - } - } - if len(servers) == 3 { - if err = vg.grow(topo, vid, collection, repType, servers...); err == nil { - counter++ - } - } - } + } + for _, datacenter := range otherDataCenters { + r := rand.Intn(datacenter.FreeSpace()) + if server, e := datacenter.ReserveOneVolume(r); e == nil { + servers = append(servers, server) + } else { + return servers, e } } return } -func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error { + +func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement, servers ...*topology.DataNode) error { for _, server := range servers { - if err := AllocateVolume(server, vid, collection, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion} + if err := AllocateVolume(server, vid, collection, rp); err == nil { + vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, ReplicaPlacement: rp, Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) glog.V(0).Infoln("Created Volume", vid, "on", server) diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go index 99f82a7fa..bb6cbe90e 100644 --- a/go/replication/volume_growth_test.go +++ b/go/replication/volume_growth_test.go @@ -13,7 +13,7 @@ var topologyLayout = ` { "dc1":{ "rack1":{ - "server1":{ + "server111":{ "volumes":[ {"id":1, "size":12312}, {"id":2, "size":12312}, @@ -21,7 +21,7 @@ var topologyLayout = ` ], "limit":3 }, - "server2":{ + "server112":{ "volumes":[ {"id":4, "size":12312}, {"id":5, "size":12312}, @@ -31,7 +31,7 @@ var topologyLayout = ` } }, "rack2":{ - "server1":{ + "server121":{ "volumes":[ {"id":4, "size":12312}, {"id":5, "size":12312}, @@ -39,17 +39,17 @@ var topologyLayout = ` ], "limit":4 }, - "server2":{ + "server122":{ "volumes":[], "limit":4 }, - "server3":{ + "server123":{ "volumes":[ {"id":2, "size":12312}, {"id":3, "size":12312}, {"id":4, "size":12312} ], - "limit":2 + "limit":5 } } }, @@ -57,7 +57,7 @@ var topologyLayout = ` }, "dc3":{ "rack2":{ - "server1":{ + "server321":{ "volumes":[ {"id":1, "size":12312}, {"id":3, "size":12312}, @@ -113,14 +113,16 @@ func setup(topologyLayout string) *topology.Topology { return topo } -func TestRemoveDataCenter(t *testing.T) { +func TestFindEmptySlotsForOneVolume(t *testing.T) { topo := setup(topologyLayout) - topo.UnlinkChildNode(topology.NodeId("dc2")) - if topo.GetActiveVolumeCount() != 15 { + vg := NewDefaultVolumeGrowth() + rp, _ := storage.NewReplicaPlacementFromString("002") + servers, err := vg.findEmptySlotsForOneVolume(topo, "dc1", rp) + if err != nil { + fmt.Println("finding empty slots error :", err) t.Fail() } - topo.UnlinkChildNode(topology.NodeId("dc3")) - if topo.GetActiveVolumeCount() != 12 { - t.Fail() + for _, server := range servers { + fmt.Println("assigned node :", server.Id()) } } diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go index 3e38b0bdb..0d790cc0f 100644 --- a/go/storage/cdb_map.go +++ b/go/storage/cdb_map.go @@ -76,8 +76,8 @@ func (m cdbMap) FileCount() int { func (m *cdbMap) DeletedCount() int { return m.DeletionCounter } -func (m *cdbMap) NextFileKey(count int) (uint64) { - return 0 +func (m *cdbMap) NextFileKey(count int) uint64 { + return 0 } func getMetric(c *cdb.Cdb, m *mapMetric) error { diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go index e940310c0..37b23a59f 100644 --- a/go/storage/compact_map_perf_test.go +++ b/go/storage/compact_map_perf_test.go @@ -3,8 +3,8 @@ package storage import ( "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/util" - "os" "log" + "os" "testing" ) diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go new file mode 100644 index 000000000..55428749b --- /dev/null +++ b/go/storage/replica_placement.go @@ -0,0 +1,61 @@ +package storage + +import ( + "errors" + "fmt" +) + +const ( + ReplicaPlacementCount = 9 +) + +type ReplicaPlacement struct { + SameRackCount int + DiffRackCount int + DiffDataCenterCount int +} + +func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) { + rp := &ReplicaPlacement{} + for i, c := range t { + count := int(c - '0') + if 0 <= count && count <= 2 { + switch i { + case 0: + rp.DiffDataCenterCount = count + case 1: + rp.DiffRackCount = count + case 2: + rp.SameRackCount = count + } + } else { + return rp, errors.New("Unknown Replication Type:" + t) + } + } + return rp, nil +} + +func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) { + return NewReplicaPlacementFromString(fmt.Sprintf("%d", b)) +} + +func (rp *ReplicaPlacement) Byte() byte { + ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount + return byte(ret) +} + +func (rp *ReplicaPlacement) String() string { + b := make([]byte, 3) + b[0] = byte(rp.DiffDataCenterCount + '0') + b[1] = byte(rp.DiffRackCount + '0') + b[2] = byte(rp.SameRackCount + '0') + return string(b) +} + +func (rp *ReplicaPlacement) GetCopyCount() int { + return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 +} + +func (rp *ReplicaPlacement) GetReplicationLevelIndex() int { + return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount +} diff --git a/go/storage/replication_type.go b/go/storage/replication_type.go deleted file mode 100644 index 0902d1016..000000000 --- a/go/storage/replication_type.go +++ /dev/null @@ -1,123 +0,0 @@ -package storage - -import ( - "errors" -) - -type ReplicationType string - -const ( - Copy000 = ReplicationType("000") // single copy - Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center - Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center - Copy100 = ReplicationType("100") // 2 copies, each on different data center - Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center - Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center - LengthRelicationType = 6 - CopyNil = ReplicationType(255) // nil value -) - -func NewReplicationTypeFromString(t string) (ReplicationType, error) { - switch t { - case "000": - return Copy000, nil - case "001": - return Copy001, nil - case "010": - return Copy010, nil - case "100": - return Copy100, nil - case "110": - return Copy110, nil - case "200": - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:" + t) -} -func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { - switch b { - case byte(000): - return Copy000, nil - case byte(001): - return Copy001, nil - case byte(010): - return Copy010, nil - case byte(100): - return Copy100, nil - case byte(110): - return Copy110, nil - case byte(200): - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:" + string(b)) -} - -func (r *ReplicationType) String() string { - switch *r { - case Copy000: - return "000" - case Copy001: - return "001" - case Copy010: - return "010" - case Copy100: - return "100" - case Copy110: - return "110" - case Copy200: - return "200" - } - return "000" -} -func (r *ReplicationType) Byte() byte { - switch *r { - case Copy000: - return byte(000) - case Copy001: - return byte(001) - case Copy010: - return byte(010) - case Copy100: - return byte(100) - case Copy110: - return byte(110) - case Copy200: - return byte(200) - } - return byte(000) -} - -func (repType ReplicationType) GetReplicationLevelIndex() int { - switch repType { - case Copy000: - return 0 - case Copy001: - return 1 - case Copy010: - return 2 - case Copy100: - return 3 - case Copy110: - return 4 - case Copy200: - return 5 - } - return -1 -} -func (repType ReplicationType) GetCopyCount() int { - switch repType { - case Copy000: - return 1 - case Copy001: - return 2 - case Copy010: - return 2 - case Copy100: - return 2 - case Copy110: - return 3 - case Copy200: - return 3 - } - return 0 -} diff --git a/go/storage/store.go b/go/storage/store.go index 52e78d27d..2df0e6cb7 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -79,8 +79,8 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, collection string, replicationType string) error { - rt, e := NewReplicationTypeFromString(replicationType) +func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error { + rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e } @@ -130,13 +130,13 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, replicationType ReplicationType) error { +func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %s already exists!", vid) } if location := s.findFreeLocation(); location != nil { - glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicationType =", replicationType) - if volume, err := NewVolume(location.directory, collection, vid, replicationType); err == nil { + glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement) + if volume, err := NewVolume(location.directory, collection, vid, replicaPlacement); err == nil { location.volumes[vid] = volume return nil } else { @@ -206,9 +206,9 @@ func (l *DiskLocation) loadExistingVolumes() { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.directory, collection, vid, CopyNil); e == nil { + if v, e := NewVolume(l.directory, collection, vid, nil); e == nil { l.volumes[vid] = v - glog.V(0).Infoln("data file", l.directory+"/"+name, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size()) + glog.V(0).Infoln("data file", l.directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size()) } } } @@ -223,7 +223,7 @@ func (s *Store) Status() []*VolumeInfo { for k, v := range location.volumes { s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(), Collection: v.Collection, - RepType: v.ReplicaType, + ReplicaPlacement: v.ReplicaPlacement, Version: v.Version(), FileCount: v.nm.FileCount(), DeleteCount: v.nm.DeletedCount(), @@ -261,7 +261,7 @@ func (s *Store) Join() error { for k, v := range location.volumes { s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()), Collection: v.Collection, - RepType: v.ReplicaType, + ReplicaPlacement: v.ReplicaPlacement, Version: v.Version(), FileCount: v.nm.FileCount(), DeleteCount: v.nm.DeletedCount(), diff --git a/go/storage/volume.go b/go/storage/volume.go index a8d8f9a58..59c3055e3 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -17,14 +17,14 @@ const ( ) type SuperBlock struct { - Version Version - ReplicaType ReplicationType + Version Version + ReplicaPlacement *ReplicaPlacement } func (s *SuperBlock) Bytes() []byte { header := make([]byte, SuperBlockSize) header[0] = byte(s.Version) - header[1] = s.ReplicaType.Byte() + header[1] = s.ReplicaPlacement.Byte() return header } @@ -41,15 +41,15 @@ type Volume struct { accessLock sync.Mutex } -func NewVolume(dirname string, collection string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaType: replicationType} + v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement} e = v.load(true, true) return } func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaType: CopyNil} + v.SuperBlock = SuperBlock{} e = v.load(false, false) return } @@ -90,7 +90,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { } } - if v.ReplicaType == CopyNil { + if v.ReplicaPlacement == nil { e = v.readSuperBlock() } else { e = v.maybeWriteSuperBlock() @@ -173,13 +173,13 @@ func (v *Volume) readSuperBlock() (err error) { } func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { superBlock.Version = Version(header[0]) - if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { + if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { err = fmt.Errorf("cannot read replica type: %s", err.Error()) } return } func (v *Volume) NeedToReplicate() bool { - return v.ReplicaType.GetCopyCount() > 1 + return v.ReplicaPlacement.GetCopyCount() > 1 } func (v *Volume) isFileUnchanged(n *Needle) bool { diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index c8eb7612e..1dfb3dcae 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -5,7 +5,7 @@ import () type VolumeInfo struct { Id VolumeId Size uint64 - RepType ReplicationType + ReplicaPlacement *ReplicaPlacement Collection string Version Version FileCount int diff --git a/go/topology/collection.go b/go/topology/collection.go index 0a7971424..8042369a9 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -13,17 +13,17 @@ type Collection struct { func NewCollection(name string, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} - c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) + c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount) return c } -func (c *Collection) GetOrCreateVolumeLayout(repType storage.ReplicationType) *VolumeLayout { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if c.replicaType2VolumeLayout[replicationTypeIndex] == nil { - glog.V(0).Infoln("collection", c.Name, "adding replication type", repType) - c.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, c.volumeSizeLimit) +func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout { + replicaPlacementIndex := rp.GetReplicationLevelIndex() + if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil { + glog.V(0).Infoln("collection", c.Name, "adding replication type", rp) + c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit) } - return c.replicaType2VolumeLayout[replicationTypeIndex] + return c.replicaType2VolumeLayout[replicaPlacementIndex] } func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { diff --git a/go/topology/data_center.go b/go/topology/data_center.go index a3b2b7d13..ebd07803b 100644 --- a/go/topology/data_center.go +++ b/go/topology/data_center.go @@ -29,6 +29,7 @@ func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { func (dc *DataCenter) ToMap() interface{} { m := make(map[string]interface{}) + m["Id"] = dc.Id() m["Max"] = dc.GetMaxVolumeCount() m["Free"] = dc.FreeSpace() var racks []interface{} diff --git a/go/topology/node.go b/go/topology/node.go index cfd6f6489..abe363b39 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -3,6 +3,8 @@ package topology import ( "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/storage" + "errors" + "math/rand" ) type NodeId string @@ -10,7 +12,7 @@ type Node interface { Id() NodeId String() string FreeSpace() int - ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) + ReserveOneVolume(r int) (*DataNode, error) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) UpAdjustVolumeCountDelta(volumeCountDelta int) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) @@ -47,6 +49,54 @@ type NodeImpl struct { value interface{} } +// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot +func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) bool) (firstNode Node, restNodes []Node, err error) { + candidates := make([]Node, 0, len(n.children)) + for _, node := range n.children { + if filterFirstNodeFn(node) { + candidates = append(candidates, node) + } + } + if len(candidates) == 0 { + return nil, nil, errors.New("No matching data node found!") + } + firstNode = candidates[rand.Intn(len(candidates))] + glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) + + restNodes = make([]Node, numberOfNodes-1) + candidates = candidates[:0] + for _, node := range n.children { + if node.Id() == firstNode.Id() { + continue + } + if node.FreeSpace() <= 0 { + continue + } + glog.V(2).Infoln("select rest node candidate:", node.Id()) + candidates = append(candidates, node) + } + glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") + ret := len(restNodes) == 0 + for k, node := range candidates { + if k < len(restNodes) { + restNodes[k] = node + if k == len(restNodes)-1 { + ret = true + } + } else { + r := rand.Intn(k + 1) + if r < len(restNodes) { + restNodes[r] = node + } + } + } + if !ret { + glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") + err = errors.New("Not enough data node found!") + } + return +} + func (n *NodeImpl) IsDataNode() bool { return n.nodeType == "DataNode" } @@ -80,32 +130,27 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) { - ret := false - var assignedNode *DataNode +func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { for _, node := range n.children { freeSpace := node.FreeSpace() // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue } - if dataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(dataCenter) { - continue - } if r >= freeSpace { r -= freeSpace } else { if node.IsDataNode() && node.FreeSpace() > 0 { // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) - return true, node.(*DataNode) + return node.(*DataNode), nil } - ret, assignedNode = node.ReserveOneVolume(r, vid, dataCenter) - if ret { - break + assignedNode, err = node.ReserveOneVolume(r) + if err != nil { + return } } } - return ret, assignedNode + return } func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative diff --git a/go/topology/node_list.go b/go/topology/node_list.go deleted file mode 100644 index bed151b54..000000000 --- a/go/topology/node_list.go +++ /dev/null @@ -1,83 +0,0 @@ -package topology - -import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" - "math/rand" -) - -type NodeList struct { - nodes map[NodeId]Node - except map[string]Node -} - -func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList { - m := make(map[NodeId]Node, len(nodes)-len(except)) - for _, n := range nodes { - if except[n.String()] == nil { - m[n.Id()] = n - } - } - nl := &NodeList{nodes: m} - return nl -} - -func (nl *NodeList) FreeSpace() int { - freeSpace := 0 - for _, n := range nl.nodes { - freeSpace += n.FreeSpace() - } - return freeSpace -} - -func (nl *NodeList) RandomlyPickN(count int, minSpace int, firstNodeName string) ([]Node, bool) { - var list []Node - var preferredNode *Node - if firstNodeName != "" { - for _, n := range nl.nodes { - if n.Id() == NodeId(firstNodeName) && n.FreeSpace() >= minSpace { - preferredNode = &n - break - } - } - if preferredNode == nil { - return list, false - } - } - - for _, n := range nl.nodes { - if n.FreeSpace() >= minSpace && n.Id() != NodeId(firstNodeName) { - list = append(list, n) - } - } - if count > len(list) || count == len(list) && firstNodeName != "" { - return nil, false - } - for i := len(list); i > 0; i-- { - r := rand.Intn(i) - list[r], list[i-1] = list[i-1], list[r] - } - if firstNodeName != "" { - list[0] = *preferredNode - } - return list[:count], true -} - -func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) { - for _, node := range nl.nodes { - freeSpace := node.FreeSpace() - if randomVolumeIndex >= freeSpace { - randomVolumeIndex -= freeSpace - } else { - if node.IsDataNode() && node.FreeSpace() > 0 { - glog.V(0).Infoln("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) - return true, node.(*DataNode) - } - children := node.Children() - newNodeList := NewNodeList(children, nl.except) - return newNodeList.ReserveOneVolume(randomVolumeIndex, vid) - } - } - return false, nil - -} diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go deleted file mode 100644 index c526f55f8..000000000 --- a/go/topology/node_list_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package topology - -import ( - "code.google.com/p/weed-fs/go/sequence" - _ "fmt" - "strconv" - "testing" -) - -func TestXYZ(t *testing.T) { - topo, err := NewTopology("topo", "/etc/weed.conf", sequence.NewMemorySequencer(), 234, 5) - if err != nil { - t.Error("cannot create new topology:", err) - t.FailNow() - } - for i := 0; i < 5; i++ { - dc := NewDataCenter("dc" + strconv.Itoa(i)) - dc.activeVolumeCount = i - dc.maxVolumeCount = 5 - topo.LinkChildNode(dc) - } - nl := NewNodeList(topo.Children(), nil) - - picked, ret := nl.RandomlyPickN(1, 0, "") - if !ret || len(picked) != 1 { - t.Error("need to randomly pick 1 node") - } - - picked, ret = nl.RandomlyPickN(1, 0, "dc1") - if !ret || len(picked) != 1 { - t.Error("need to randomly pick 1 node") - } - if picked[0].Id() != "dc1" { - t.Error("need to randomly pick 1 dc1 node") - } - - picked, ret = nl.RandomlyPickN(2, 0, "dc1") - if !ret || len(picked) != 2 { - t.Error("need to randomly pick 1 node") - } - if picked[0].Id() != "dc1" { - t.Error("need to randomly pick 2 with one dc1 node") - } - - picked, ret = nl.RandomlyPickN(4, 0, "") - if !ret || len(picked) != 4 { - t.Error("need to randomly pick 4 nodes") - } - - picked, ret = nl.RandomlyPickN(5, 0, "") - if !ret || len(picked) != 5 { - t.Error("need to randomly pick 5 nodes") - } - - picked, ret = nl.RandomlyPickN(6, 0, "") - if ret || len(picked) != 0 { - t.Error("can not randomly pick 6 nodes:", ret, picked) - } - -} diff --git a/go/topology/rack.go b/go/topology/rack.go index acc34417a..40e19dd0d 100644 --- a/go/topology/rack.go +++ b/go/topology/rack.go @@ -52,6 +52,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol func (rack *Rack) ToMap() interface{} { m := make(map[string]interface{}) + m["Id"] = rack.Id() m["Max"] = rack.GetMaxVolumeCount() m["Free"] = rack.FreeSpace() var dns []interface{} diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go index c0edca7c1..f3ae2096b 100644 --- a/go/topology/topo_test.go +++ b/go/topology/topo_test.go @@ -5,9 +5,7 @@ import ( "code.google.com/p/weed-fs/go/storage" "encoding/json" "fmt" - "math/rand" "testing" - "time" ) var topologyLayout = ` @@ -124,15 +122,3 @@ func TestRemoveDataCenter(t *testing.T) { t.Fail() } } - -func TestReserveOneVolume(t *testing.T) { - topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - rand.Seed(1) - ret, node, vid := topo.RandomlyReserveOneVolume("dc1") - if node.Parent().Parent().Id() != NodeId("dc1") { - t.Fail() - } - fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) - -} diff --git a/go/topology/topology.go b/go/topology/topology.go index 24b3ab337..1426f7a12 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -77,23 +77,13 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { return nil } -func (t *Topology) RandomlyReserveOneVolume(dataCenter string) (bool, *DataNode, *storage.VolumeId) { - if t.FreeSpace() <= 0 { - glog.V(0).Infoln("Topology does not have free space left!") - return false, nil, nil - } - vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid, dataCenter) - return ret, node, &vid -} - func (t *Topology) NextVolumeId() storage.VolumeId { vid := t.GetMaxVolumeId() return vid.Next() } -func (t *Topology) PickForWrite(collectionName string, repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(collectionName, repType).PickForWrite(count, dataCenter) +func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) { + vid, count, datanodes, err := t.GetVolumeLayout(collectionName, rp).PickForWrite(count, dataCenter) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -101,16 +91,16 @@ func (t *Topology) PickForWrite(collectionName string, repType storage.Replicati return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, repType storage.ReplicationType) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout { _, ok := t.collectionMap[collectionName] if !ok { t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(repType) + return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp) } func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.RepType).RegisterVolume(v, dn) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn) } func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 5097e9874..5740c9a03 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.RepType) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -53,7 +53,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.Collection, v.RepType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) @@ -63,7 +63,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.Collection, v.RepType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 8877d7ccf..40628b4a0 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -9,16 +9,16 @@ import ( ) type VolumeLayout struct { - repType storage.ReplicationType + rp *storage.ReplicaPlacement vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id volumeSizeLimit uint64 accessLock sync.Mutex } -func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ - repType: repType, + rp: rp, vid2location: make(map[storage.VolumeId]*VolumeLocationList), writables: *new([]storage.VolumeId), volumeSizeLimit: volumeSizeLimit, @@ -33,7 +33,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id] = NewVolumeLocationList() } if vl.vid2location[v.Id].Add(dn) { - if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { + if len(vl.vid2location[v.Id].list) == v.ReplicaPlacement.GetCopyCount() { if vl.isWritable(v) { vl.writables = append(vl.writables, v.Id) } else { @@ -135,8 +135,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) defer vl.accessLock.Unlock() if vl.vid2location[vid].Remove(dn) { - if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { - glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount()) + if vl.vid2location[vid].Length() < vl.rp.GetCopyCount() { + glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.rp.GetCopyCount()) return vl.removeFromWritable(vid) } } @@ -147,7 +147,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b defer vl.accessLock.Unlock() if vl.vid2location[vid].Add(dn) { - if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { + if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { return vl.setVolumeWritable(vid) } } @@ -164,7 +164,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { func (vl *VolumeLayout) ToMap() map[string]interface{} { m := make(map[string]interface{}) - m["replication"] = vl.repType.String() + m["replication"] = vl.rp.String() m["writables"] = vl.writables //m["locations"] = vl.vid2location return m diff --git a/go/util/file_util.go b/go/util/file_util.go index 46e404851..8444296d3 100644 --- a/go/util/file_util.go +++ b/go/util/file_util.go @@ -1,7 +1,7 @@ package util import ( - "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/glog" "errors" "os" ) diff --git a/go/weed/compact.go b/go/weed/compact.go index 2600b3362..580f3f98d 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool { } vid := storage.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, storage.CopyNil) + v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/download.go b/go/weed/download.go index 4f332bd2e..2e32f3ae9 100644 --- a/go/weed/download.go +++ b/go/weed/download.go @@ -49,7 +49,7 @@ func runDownload(cmd *Command, args []string) bool { filename = fid } if strings.HasSuffix(filename, "-list") { - filename = filename[0:len(filename)-len("-list")] + filename = filename[0 : len(filename)-len("-list")] fids := strings.Split(string(content), "\n") f, err := os.OpenFile(path.Join(*downloadDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) if err != nil { diff --git a/go/weed/export.go b/go/weed/export.go index 2ab197652..02452950d 100644 --- a/go/weed/export.go +++ b/go/weed/export.go @@ -82,8 +82,8 @@ func runExport(cmd *Command, args []string) bool { } fileName := strconv.Itoa(*exportVolumeId) - if *exportCollection!=""{ - fileName = *exportCollection + "_" + fileName + if *exportCollection != "" { + fileName = *exportCollection + "_" + fileName } vid := storage.VolumeId(*exportVolumeId) indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644) diff --git a/go/weed/master.go b/go/weed/master.go index 97def1948..0d2866c68 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -27,18 +27,18 @@ var cmdMaster = &Command{ } var ( - mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - masterIp = cmdMaster.Flag.String("ip", "", "master ip address") - metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") - masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list") - volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes") - mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") - defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") - mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds") - mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") - masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + mport = cmdMaster.Flag.Int("port", 9333, "http listen port") + masterIp = cmdMaster.Flag.String("ip", "", "master ip address") + metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") + masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list") + volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes") + mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") + defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") + mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds") + mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") + masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") masterWhiteList []string ) @@ -57,7 +57,7 @@ func runMaster(cmd *Command, args []string) bool { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder, - *volumeSizeLimitMB, *mpulse, *confFile, *defaultRepType, *garbageThreshold, masterWhiteList, + *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList, ) glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport)) diff --git a/go/weed/server.go b/go/weed/server.go index b1f5dc049..4cd800142 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -35,23 +35,23 @@ var cmdServer = &Command{ } var ( - serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") - serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - serverReadTimeout = cmdServer.Flag.Int("readTimeout", 3, "connection read timeout in seconds. Increase this if uploading large files.") - serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") - serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") - serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - serverPeers = cmdServer.Flag.String("peers", "", "other master nodes in comma separated ip:masterPort list") - masterPort = cmdServer.Flag.Int("masterPort", 9333, "master server http listen port") - masterMetaFolder = cmdServer.Flag.String("mdir", os.TempDir(), "data directory to store meta data") - masterVolumeSizeLimitMB = cmdServer.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes") - masterConfFile = cmdServer.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") - masterDefaultRepType = cmdServer.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") - volumePort = cmdServer.Flag.Int("port", 8080, "volume server http listen port") - volumePublicUrl = cmdServer.Flag.String("publicUrl", "", "Publicly accessible :") - volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") - volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") - volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting") + serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") + serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + serverReadTimeout = cmdServer.Flag.Int("readTimeout", 3, "connection read timeout in seconds. Increase this if uploading large files.") + serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") + serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") + serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + serverPeers = cmdServer.Flag.String("peers", "", "other master nodes in comma separated ip:masterPort list") + masterPort = cmdServer.Flag.Int("masterPort", 9333, "master server http listen port") + masterMetaFolder = cmdServer.Flag.String("mdir", "", "data directory to store meta data, default to same as -dir specified") + masterVolumeSizeLimitMB = cmdServer.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes") + masterConfFile = cmdServer.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") + masterDefaultReplicaPlacement = cmdServer.Flag.String("defaultReplicaPlacement", "000", "Default replication type if not specified.") + volumePort = cmdServer.Flag.Int("port", 8080, "volume server http listen port") + volumePublicUrl = cmdServer.Flag.String("publicUrl", "", "Publicly accessible :") + volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") + volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") + volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting") serverWhiteList []string ) @@ -62,6 +62,9 @@ func runServer(cmd *Command, args []string) bool { } runtime.GOMAXPROCS(*serverMaxCpu) + if *masterMetaFolder == "" { + *masterMetaFolder = *volumeDataFolders + } if err := util.TestFolderWritable(*masterMetaFolder); err != nil { glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *masterMetaFolder, err) } @@ -95,7 +98,7 @@ func runServer(cmd *Command, args []string) bool { go func() { r := mux.NewRouter() ms := weed_server.NewMasterServer(r, VERSION, *masterPort, *masterMetaFolder, - *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultRepType, *garbageThreshold, serverWhiteList, + *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *garbageThreshold, serverWhiteList, ) glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *serverIp+":"+strconv.Itoa(*masterPort)) diff --git a/go/weed/version.go b/go/weed/version.go index 561057778..9b6ca63e1 100644 --- a/go/weed/version.go +++ b/go/weed/version.go @@ -6,7 +6,7 @@ import ( ) const ( - VERSION = "0.46" + VERSION = "0.47" ) var cmdVersion = &Command{ diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 738484ff0..c8a9bbbed 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -15,14 +15,14 @@ import ( ) type MasterServer struct { - port int - metaFolder string - volumeSizeLimitMB uint - pulseSeconds int - defaultRepType string - garbageThreshold string - whiteList []string - version string + port int + metaFolder string + volumeSizeLimitMB uint + pulseSeconds int + defaultReplicaPlacement string + garbageThreshold string + whiteList []string + version string topo *topology.Topology vg *replication.VolumeGrowth @@ -35,17 +35,17 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, volumeSizeLimitMB uint, pulseSeconds int, confFile string, - defaultRepType string, + defaultReplicaPlacement string, garbageThreshold string, whiteList []string, ) *MasterServer { ms := &MasterServer{ - version: version, - volumeSizeLimitMB: volumeSizeLimitMB, - pulseSeconds: pulseSeconds, - defaultRepType: defaultRepType, - garbageThreshold: garbageThreshold, - whiteList: whiteList, + version: version, + volumeSizeLimitMB: volumeSizeLimitMB, + pulseSeconds: pulseSeconds, + defaultReplicaPlacement: defaultReplicaPlacement, + garbageThreshold: garbageThreshold, + whiteList: whiteList, } seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq")) var e error diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index 99e8fd1ef..f53d2b8ed 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -40,20 +40,20 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) if e != nil { c = 1 } - repType := r.FormValue("replication") - if repType == "" { - repType = ms.defaultRepType + replication := r.FormValue("replication") + if replication == "" { + replication = ms.defaultReplicaPlacement } collection := r.FormValue("collection") dataCenter := r.FormValue("dataCenter") - rt, err := storage.NewReplicationTypeFromString(repType) + replicaPlacement, err := storage.NewReplicaPlacementFromString(replication) if err != nil { w.WriteHeader(http.StatusNotAcceptable) writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) return } - if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { + if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 { if ms.topo.FreeSpace() <= 0 { w.WriteHeader(http.StatusNotFound) writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) @@ -61,15 +61,15 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) } else { ms.vgLock.Lock() defer ms.vgLock.Unlock() - if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { - if _, err = ms.vg.AutomaticGrowByType(collection, rt, dataCenter, ms.topo); err != nil { + if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 { + if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.topo); err != nil { writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) return } } } } - fid, count, dn, err := ms.topo.PickForWrite(collection, rt, c, dataCenter) + fid, count, dn, err := ms.topo.PickForWrite(collection, replicaPlacement, c, dataCenter) if err == nil { writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) } else { @@ -119,13 +119,13 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { count := 0 - rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) + replicaPlacement, err := storage.NewReplicaPlacementFromString(r.FormValue("replication")) if err == nil { if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.topo.FreeSpace() < count*rt.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) + if ms.topo.FreeSpace() < count*replicaPlacement.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount())) } else { - count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), ms.topo) + count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCneter"), ms.topo) } } else { err = errors.New("parameter count is not found") diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index f42585da2..a2939f848 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -61,7 +61,7 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ if connected { time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond) } else { - time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)* 0.25) * time.Millisecond) + time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*0.25) * time.Millisecond) } } }() diff --git a/note/replication.txt b/note/replication.txt index c4bf46044..a151e80c3 100644 --- a/note/replication.txt +++ b/note/replication.txt @@ -59,11 +59,6 @@ If any "assign" request comes in 3. return a writable volume to the user -Plan: - Step 1. implement one copy(no replication), automatically assign volume ids - Step 2. add replication - -For the above operations, here are the todo list: for data node: 0. detect existing volumes DONE 1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE @@ -77,10 +72,38 @@ For the above operations, here are the todo list: 1. accept data node's report of existing volumes and maxVolumeCount ALREADY EXISTS /dir/join 2. periodically refresh for active data nodes, and adjust writable volumes 3. send command to grow a volume(id + replication level) DONE - 4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info - to other data nodes. BECAUSE the master will stop sending writes to these data nodes 5. accept lookup for volume locations ALREADY EXISTS /dir/lookup 6. read topology/datacenter/rack layout +An algorithm to allocate volumes evenly, but may be inefficient if free volumes are plenty: +input: replication=xyz +algorithm: +ret_dcs = [] +foreach dc that has y+z+1 volumes{ + ret_racks = [] + foreach rack with z+1 volumes{ + ret = select z+1 servers with 1 volume + if ret.size()==z+1 { + ret_racks.append(ret) + } + } + randomly pick one rack from ret_racks + ret += select y racks with 1 volume each + if ret.size()==y+z+1{ + ret_dcs.append(ret) + } +} +randomly pick one dc from ret_dcs +ret += select x data centers with 1 volume each + +A simple replica placement algorithm, but may fail when free volume slots are not plenty: +ret := []volumes +dc = randomly pick 1 data center with y+z+1 volumes + rack = randomly pick 1 rack with z+1 volumes + ret = ret.append(randomly pick z+1 volumes) + ret = ret.append(randomly pick y racks with 1 volume) +ret = ret.append(randomly pick x data centers with 1 volume) + + TODO: 1. replicate content to the other server if the replication type needs replicas