From 9f3ba7c15371bb2039cd2494460370668461ff9c Mon Sep 17 00:00:00 2001 From: tnextday Date: Thu, 31 Dec 2015 11:50:37 +0800 Subject: [PATCH] *:temp commit --- go/topology/node.go | 29 +++-- go/topology/topology_replicate.go | 17 ++- go/topology/volume_growth.go | 183 +++++++++++++++++----------- go/topology/volume_growth_test.go | 30 ++++- go/topology/volume_location_list.go | 77 ++++++++++++ 5 files changed, 251 insertions(+), 85 deletions(-) diff --git a/go/topology/node.go b/go/topology/node.go index 242f60b6f..292e88fad 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -2,12 +2,14 @@ package topology import ( "errors" + "fmt" "math/rand" "strings" + "sort" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" - "sort" ) type NodeId string @@ -52,25 +54,34 @@ type NodeImpl struct { value interface{} } +type NodePicker interface { + PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error) +} + + +var ErrFilterContinue = errors.New("continue") + type FilterNodeFn func(dn Node) error type PickNodesFn func(nodes []Node, count int) []Node // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) PickNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn, pickFn PickNodesFn) (firstNode Node, restNodes []Node, err error) { +func (n *NodeImpl) PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error) { candidates := make([]Node, 0, len(n.children)) var errs []string for _, node := range n.children { - if err := filterFirstNodeFn(node); err == nil { + if err := filterNodeFn(node); err == nil { candidates = append(candidates, node) + }else if err == ErrFilterContinue{ + continue } else { errs = append(errs, string(node.Id())+":"+err.Error()) } } - ns := pickFn(candidates, 1) - if ns == nil { - return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) + if len(candidates) < numberOfNodes{ + return nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) } - firstNode = ns[0] + return pickFn(candidates, numberOfNodes), nil + glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) @@ -163,7 +174,7 @@ func (n *NodeImpl) GetValue() interface{} { func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { for _, node := range n.children { freeSpace := node.FreeSpace() - // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) + fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue } @@ -171,7 +182,7 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { r -= freeSpace } else { if node.IsDataNode() && node.FreeSpace() > 0 { - // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) + fmt.Println("assigned to node =", node, ", freeSpace =", node.FreeSpace()) return node.(*DataNode), nil } assignedNode, err = node.ReserveOneVolume(r) diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go index 9346ca743..000d6ef4f 100644 --- a/go/topology/topology_replicate.go +++ b/go/topology/topology_replicate.go @@ -1,8 +1,11 @@ package topology -import "github.com/chrislusf/seaweedfs/go/glog" +import ( + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/storage" +) -func (t *Topology) Replicate() int { +func (t *Topology) CheckReplicate() int { glog.V(0).Infoln("Start replicate checker on demand") for _, col := range t.collectionMap.Items { c := col.(*Collection) @@ -15,6 +18,7 @@ func (t *Topology) Replicate() int { if locationList.Length() < copyCount { //set volume readonly glog.V(0).Infoln("replicate volume :", vid) + SetVolumeReadonly(locationList, vid.String(), true) } } @@ -23,3 +27,12 @@ func (t *Topology) Replicate() int { } return 0 } + +func (t *Topology) doReplicate(vl *VolumeLayout, vid storage.VolumeId) { + locationList := vl.vid2location[vid] + if !SetVolumeReadonly(locationList, vid.String(), true) { + return + } + defer SetVolumeReadonly(locationList, vid.String(), false) + +} diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index ed3f8fee9..b8e3ca450 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -76,7 +76,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOp } func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { - servers, e := vg.findEmptySlotsForOneVolume(topo, option) + servers, e := vg.findEmptySlotsForOneVolume(topo, option, nil) if e != nil { return 0, e } @@ -85,105 +85,150 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i return len(servers), err } +func filterMainDataCenter(option *VolumeGrowOption, node Node) error { + if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { + return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) + } + rp := option.ReplicaPlacement + if len(node.Children()) < rp.DiffRackCount+1 { + return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) + } + if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) + } + possibleRacksCount := 0 + for _, rack := range node.Children() { + possibleDataNodesCount := 0 + for _, n := range rack.Children() { + if n.FreeSpace() >= 1 { + possibleDataNodesCount++ + } + } + if possibleDataNodesCount >= rp.SameRackCount+1 { + possibleRacksCount++ + } + } + if possibleRacksCount < rp.DiffRackCount+1 { + return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1) + } + return nil +} + +func filterMainRack(option *VolumeGrowOption, node Node) error { + if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { + return fmt.Errorf("Not matching preferred rack:%s", option.Rack) + } + rp := option.ReplicaPlacement + if node.FreeSpace() < rp.SameRackCount+1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) + } + if len(node.Children()) < rp.SameRackCount+1 { + // a bit faster way to test free racks + return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1) + } + possibleDataNodesCount := 0 + for _, n := range node.Children() { + if n.FreeSpace() >= 1 { + possibleDataNodesCount++ + } + } + if possibleDataNodesCount < rp.SameRackCount+1 { + return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1) + } + return nil +} + +func makeExceptNodeFilter(nodes []Node) FilterNodeFn { + m := make(map[string]bool) + for _, n := range nodes { + m[n.Id()] = true + } + return func(dn Node) { + if dn.FreeSpace() <= 0 { + return ErrFilterContinue + } + if _, ok := m[dn.Id()]; ok { + return ErrFilterContinue + } + return nil + } +} + // 1. find the main data node // 1.1 collect all data nodes that have 1 slots // 2.2 collect all racks that have rp.SameRackCount+1 // 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1 // 2. find rest data nodes -func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { +func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, existsServer *VolumeLocationList) (additionServers []*DataNode, err error) { //find main datacenter and other data centers pickNodesFn := PickLowUsageNodeFn rp := option.ReplicaPlacement - mainDataCenter, otherDataCenters, dc_err := topo.PickNodes(rp.DiffDataCenterCount+1, func(node Node) error { - if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { - return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) - } - if len(node.Children()) < rp.DiffRackCount+1 { - return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) - } - if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) - } - possibleRacksCount := 0 - for _, rack := range node.Children() { - possibleDataNodesCount := 0 - for _, n := range rack.Children() { - if n.FreeSpace() >= 1 { - possibleDataNodesCount++ - } - } - if possibleDataNodesCount >= rp.SameRackCount+1 { - possibleRacksCount++ - } + + pickMainAndRestNodes := func(np NodePicker, restNodeCount int, filterNodeFn FilterNodeFn) (mainNode Node, restNodes []Node, e error) { + mainNodes, err := np.PickNodes(1, filterNodeFn, pickNodesFn) + if err != nil { + return nil, err } - if possibleRacksCount < rp.DiffRackCount+1 { - return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1) + restNodes, err := np.PickNodes(restNodeCount, + makeExceptNodeFilter(mainNodes), pickNodesFn) + if err != nil { + return nil, err } - return nil - }, pickNodesFn) + return mainNodes[0], restNodes + } + + mainDataCenter, otherDataCenters, dc_err := pickMainAndRestNodes(topo, rp.DiffDataCenterCount, + func(node Node) error { + return filterMainDataCenter(option, node) + }) if dc_err != nil { return nil, dc_err } - //find main rack and other racks - mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).PickNodes(rp.DiffRackCount+1, func(node Node) error { - if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { - return fmt.Errorf("Not matching preferred rack:%s", option.Rack) - } - if node.FreeSpace() < rp.SameRackCount+1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) - } - if len(node.Children()) < rp.SameRackCount+1 { - // a bit faster way to test free racks - return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1) - } - possibleDataNodesCount := 0 - for _, n := range node.Children() { - if n.FreeSpace() >= 1 { - possibleDataNodesCount++ - } - } - if possibleDataNodesCount < rp.SameRackCount+1 { - return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1) - } - return nil - }, pickNodesFn) + mainRack, otherRacks, rack_err := pickMainAndRestNodes(mainDataCenter.(*DataCenter), rp.DiffRackCount, + func(node Node) error { + return filterMainRack(option, node) + }, + ) if rack_err != nil { return nil, rack_err } - //find main rack and other racks - mainServer, otherServers, server_err := mainRack.(*Rack).PickNodes(rp.SameRackCount+1, func(node Node) error { - if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { - return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) - } - if node.FreeSpace() < 1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) - } - return nil - }, pickNodesFn) + //find main server and other servers + mainServer, otherServers, server_err := pickMainAndRestNodes(mainRack.(*Rack), rp.SameRackCount, + func(node Node) error { + if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { + return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) + } + if node.FreeSpace() < 1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) + } + return nil + }, + ) + if server_err != nil { return nil, server_err } - servers = append(servers, mainServer.(*DataNode)) + additionServers = append(additionServers, mainServer.(*DataNode)) for _, server := range otherServers { - servers = append(servers, server.(*DataNode)) + additionServers = append(additionServers, server.(*DataNode)) } for _, rack := range otherRacks { r := rand.Intn(rack.FreeSpace()) if server, e := rack.ReserveOneVolume(r); e == nil { - servers = append(servers, server) + additionServers = append(additionServers, server) } else { - return servers, e + return additionServers, e } } - for _, datacenter := range otherDataCenters { - r := rand.Intn(datacenter.FreeSpace()) - if server, e := datacenter.ReserveOneVolume(r); e == nil { - servers = append(servers, server) + for _, dc := range otherDataCenters { + r := rand.Intn(dc.FreeSpace()) + if server, e := dc.ReserveOneVolume(r); e == nil { + additionServers = append(additionServers, server) } else { - return servers, e + return additionServers, e } } return diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go index 8f50a6f90..a89ad6986 100644 --- a/go/topology/volume_growth_test.go +++ b/go/topology/volume_growth_test.go @@ -19,7 +19,7 @@ var topologyLayout = ` {"id":2, "size":12312}, {"id":3, "size":12312} ], - "limit":3 + "limit":15 }, "server112":{ "volumes":[ @@ -28,6 +28,18 @@ var topologyLayout = ` {"id":6, "size":12312} ], "limit":10 + }, + "server113":{ + "volumes":[ + {"id":7, "size":12312}, + {"id":8, "size":12312}, + {"id":9, "size":12312} + ], + "limit":8 + }, + "server114":{ + "volumes":[], + "limit":8 } }, "rack2":{ @@ -37,11 +49,15 @@ var topologyLayout = ` {"id":5, "size":12312}, {"id":6, "size":12312} ], - "limit":4 + "limit":8 }, "server122":{ "volumes":[], - "limit":4 + "limit":8 + }, + "server124":{ + "volumes":[], + "limit":8 }, "server123":{ "volumes":[ @@ -63,7 +79,11 @@ var topologyLayout = ` {"id":3, "size":12312}, {"id":5, "size":12312} ], - "limit":4 + "limit":8 + }, + "server322":{ + "volumes":[], + "limit":7 } } } @@ -117,7 +137,7 @@ func setup(topologyLayout string) *Topology { func TestFindEmptySlotsForOneVolume(t *testing.T) { topo := setup(topologyLayout) vg := NewDefaultVolumeGrowth() - rp, _ := storage.NewReplicaPlacementFromString("002") + rp, _ := storage.NewReplicaPlacementFromString("011") volumeGrowOption := &VolumeGrowOption{ Collection: "", ReplicaPlacement: rp, diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go index 7166a4add..fed2c1574 100644 --- a/go/topology/volume_location_list.go +++ b/go/topology/volume_location_list.go @@ -70,3 +70,80 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { dnll.list = l } } + +// return all data centers, first is main data center +func (dnll *VolumeLocationList) DataCenters() []*DataCenter { + m := make(map[*DataCenter]int) + maxCount := 0 + var mainDC *DataCenter + for _, dn := range dnll.list { + var dc *DataCenter + if dc = dn.GetDataCenter(); dc == nil { + continue + } + m[dc] = m[dc] + 1 + if m[dc] > maxCount { + mainDC = dc + maxCount = m[dc] + } + } + dataCenters := make([]*DataCenter, 0, len(m)) + if mainDC != nil { + dataCenters = append(dataCenters, mainDC) + } + for dc := range m { + if dc != mainDC { + dataCenters = append(dataCenters, dc) + } + } + return dataCenters +} + +// return all racks if data center set nil +func (dnll *VolumeLocationList) Racks(dc *DataCenter) []*Rack { + m := make(map[*Rack]int) + maxCount := 0 + var mainRack *Rack + for _, dn := range dnll.list { + if dc != nil && dn.GetDataCenter() != dc { + continue + } + var rack *Rack + if rack = dn.GetRack(); rack == nil { + continue + } + m[rack] = m[rack] + 1 + if m[rack] > maxCount { + mainRack = rack + maxCount = m[rack] + } + } + racks := make([]*Rack, 0, len(m)) + if mainRack != nil { + racks = append(racks, mainRack) + } + for rack := range m { + racks = append(racks, rack) + } + return racks +} + + +func (dnll *VolumeLocationList) Servers(rack *Rack) []*DataNode { + servers := make([]*DataNode) + for _, dn := range dnll.list { + if rack != nil && dn.GetRack() != rack { + continue + } + var rack *Rack + if rack = dn.GetRack(); rack == nil { + continue + } + servers = append(servers, dn) + } + return servers +} + +//func (dnll *VolumeLocationList)ContainDataNode(nodeType, id string)bool { +// +//}