Browse Source

schedule new volume by free volume number of nodes

pull/1219/head
zhangsong 5 years ago
parent
commit
40f70481cd
  1. 78
      weed/topology/node.go
  2. 6
      weed/topology/volume_growth.go
  3. 87
      weed/topology/volume_growth_test.go

78
weed/topology/node.go

@ -62,56 +62,64 @@ type NodeImpl struct {
} }
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
candidates := make([]Node, 0, len(n.children))
func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
var totalWeights int64
var errs []string var errs []string
n.RLock() n.RLock()
candidates := make([]Node, 0, len(n.children))
candidatesWeights := make([]int64, 0, len(n.children))
//pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
for _, node := range n.children { for _, node := range n.children {
if err := filterFirstNodeFn(node); err == nil {
candidates = append(candidates, node)
} else {
errs = append(errs, string(node.Id())+":"+err.Error())
if node.FreeSpace() <= 0 {
continue
} }
totalWeights += node.FreeSpace()
candidates = append(candidates, node)
candidatesWeights = append(candidatesWeights, node.FreeSpace())
} }
n.RUnlock() n.RUnlock()
if len(candidates) == 0 {
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
if len(candidates) < numberOfNodes {
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
return nil, nil, errors.New("No enough data node found!")
} }
firstNode = candidates[rand.Intn(len(candidates))]
glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
restNodes = make([]Node, numberOfNodes-1)
candidates = candidates[:0]
n.RLock()
for _, node := range n.children {
if node.Id() == firstNode.Id() {
continue
}
if node.FreeSpace() <= 0 {
continue
//pick nodes randomly by weights, the node picked earlier has higher final weights
sortedCandidates := make([]Node, 0, len(candidates))
for i:=0; i<len(candidates); i++ {
weightsInterval := rand.Int63n(totalWeights)
lastWeights := int64(0)
for k, weights := range candidatesWeights {
if (weightsInterval>=lastWeights) && (weightsInterval<lastWeights + weights) {
sortedCandidates = append(sortedCandidates, candidates[k])
candidatesWeights[k] = 0
totalWeights -= weights
break
}
lastWeights += weights
} }
glog.V(2).Infoln("select rest node candidate:", node.Id())
candidates = append(candidates, node)
} }
n.RUnlock()
glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
ret := len(restNodes) == 0
for k, node := range candidates {
if k < len(restNodes) {
restNodes[k] = node
if k == len(restNodes)-1 {
ret = true
restNodes = make([]Node, 0, numberOfNodes-1)
ret := false
n.RLock()
for k, node := range sortedCandidates {
if err := filterFirstNodeFn(node); err == nil {
firstNode = node
if k >= numberOfNodes-1 {
restNodes = sortedCandidates[:numberOfNodes-1]
} else {
restNodes = append(restNodes, sortedCandidates[:k]...)
restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
} }
ret = true
break
} else { } else {
r := rand.Intn(k + 1)
if r < len(restNodes) {
restNodes[r] = node
}
errs = append(errs, string(node.Id())+":"+err.Error())
} }
} }
n.RUnlock()
if !ret { if !ret {
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
err = errors.New("No enough data node found!")
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
} }
return return
} }

6
weed/topology/volume_growth.go

@ -112,7 +112,7 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
//find main datacenter and other data centers //find main datacenter and other data centers
rp := option.ReplicaPlacement rp := option.ReplicaPlacement
mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
} }
@ -144,7 +144,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
} }
//find main rack and other racks //find main rack and other racks
mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, func(node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack) return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
} }
@ -171,7 +171,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
} }
//find main rack and other racks //find main rack and other racks
mainServer, otherServers, serverErr := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
} }

87
weed/topology/volume_growth_test.go

@ -253,3 +253,90 @@ func TestReplication011(t *testing.T) {
fmt.Println("assigned node :", server.Id()) fmt.Println("assigned node :", server.Id())
} }
} }
var topologyLayout3 = `
{
"dc1":{
"rack1":{
"server111":{
"volumes":[],
"limit":2000
}
}
},
"dc2":{
"rack2":{
"server222":{
"volumes":[],
"limit":2000
}
}
},
"dc3":{
"rack3":{
"server333":{
"volumes":[],
"limit":1000
}
}
},
"dc4":{
"rack4":{
"server444":{
"volumes":[],
"limit":1000
}
}
},
"dc5":{
"rack5":{
"server555":{
"volumes":[],
"limit":500
}
}
},
"dc6":{
"rack6":{
"server666":{
"volumes":[],
"limit":500
}
}
}
}
`
func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) {
topo := setup(topologyLayout3)
vg := NewDefaultVolumeGrowth()
rp, _ := super_block.NewReplicaPlacementFromString("100")
volumeGrowOption := &VolumeGrowOption{
Collection: "Weight",
ReplicaPlacement: rp,
DataCenter: "",
Rack: "",
DataNode: "",
}
distribution := map[NodeId]int{}
// assign 1000 volumes
for i:=0;i<1000 ;i++ {
servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
if err != nil {
fmt.Println("finding empty slots error :", err)
t.Fail()
}
for _, server := range servers {
fmt.Println("assigned node :", server.Id())
if _, ok := distribution[server.id]; !ok {
distribution[server.id] = 0
}
distribution[server.id] += 1
}
}
for k, v := range distribution {
fmt.Println(k, "%s : %d", k, v)
}
}
Loading…
Cancel
Save