diff --git a/weed/topology/node.go b/weed/topology/node.go index 572a89d4d..501a22623 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -62,56 +62,64 @@ type NodeImpl struct { } // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { - candidates := make([]Node, 0, len(n.children)) +func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { + var totalWeights int64 var errs []string n.RLock() + candidates := make([]Node, 0, len(n.children)) + candidatesWeights := make([]int64, 0, len(n.children)) + //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight. for _, node := range n.children { - if err := filterFirstNodeFn(node); err == nil { - candidates = append(candidates, node) - } else { - errs = append(errs, string(node.Id())+":"+err.Error()) + if node.FreeSpace() <= 0 { + continue } + totalWeights += node.FreeSpace() + candidates = append(candidates, node) + candidatesWeights = append(candidatesWeights, node.FreeSpace()) } n.RUnlock() - if len(candidates) == 0 { - return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) + if len(candidates) < numberOfNodes { + glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates") + return nil, nil, errors.New("No enough data node found!") } - firstNode = candidates[rand.Intn(len(candidates))] - glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) - restNodes = make([]Node, numberOfNodes-1) - candidates = candidates[:0] - n.RLock() - for _, node := range n.children { - if node.Id() == firstNode.Id() { - continue - } - if node.FreeSpace() <= 0 { - continue + //pick nodes randomly by weights, the node picked earlier has higher final weights + sortedCandidates := make([]Node, 0, len(candidates)) + for i:=0; i=lastWeights) && (weightsInterval= numberOfNodes-1 { + restNodes = sortedCandidates[:numberOfNodes-1] + } else { + restNodes = append(restNodes, sortedCandidates[:k]...) + restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...) } + ret = true + break } else { - r := rand.Intn(k + 1) - if r < len(restNodes) { - restNodes[r] = node - } + errs = append(errs, string(node.Id())+":"+err.Error()) } } + n.RUnlock() if !ret { - glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") - err = errors.New("No enough data node found!") + return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) } return } diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 781a34ba3..446c88f60 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -112,7 +112,7 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { //find main datacenter and other data centers rp := option.ReplicaPlacement - mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { + mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, func(node Node) error { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) } @@ -144,7 +144,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } //find main rack and other racks - mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { + mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, func(node Node) error { if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } @@ -171,7 +171,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } //find main rack and other racks - mainServer, otherServers, serverErr := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { + mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, func(node Node) error { if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) } diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index a004dc210..bca6a8fc7 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -253,3 +253,90 @@ func TestReplication011(t *testing.T) { fmt.Println("assigned node :", server.Id()) } } + +var topologyLayout3 = ` +{ + "dc1":{ + "rack1":{ + "server111":{ + "volumes":[], + "limit":2000 + } + } + }, + "dc2":{ + "rack2":{ + "server222":{ + "volumes":[], + "limit":2000 + } + } + }, + "dc3":{ + "rack3":{ + "server333":{ + "volumes":[], + "limit":1000 + } + } + }, + "dc4":{ + "rack4":{ + "server444":{ + "volumes":[], + "limit":1000 + } + } + }, + "dc5":{ + "rack5":{ + "server555":{ + "volumes":[], + "limit":500 + } + } + }, + "dc6":{ + "rack6":{ + "server666":{ + "volumes":[], + "limit":500 + } + } + } +} +` + +func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) { + topo := setup(topologyLayout3) + vg := NewDefaultVolumeGrowth() + rp, _ := super_block.NewReplicaPlacementFromString("100") + volumeGrowOption := &VolumeGrowOption{ + Collection: "Weight", + ReplicaPlacement: rp, + DataCenter: "", + Rack: "", + DataNode: "", + } + + distribution := map[NodeId]int{} + // assign 1000 volumes + for i:=0;i<1000 ;i++ { + servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + if err != nil { + fmt.Println("finding empty slots error :", err) + t.Fail() + } + for _, server := range servers { + fmt.Println("assigned node :", server.Id()) + if _, ok := distribution[server.id]; !ok { + distribution[server.id] = 0 + } + distribution[server.id] += 1 + } + } + + for k, v := range distribution { + fmt.Println(k, "%s : %d", k, v) + } +} \ No newline at end of file