Browse Source

volume growth: growth with exists data node

pull/279/head
tnextday 10 years ago
parent
commit
cc3b8c66ed
  1. 14
      go/storage/replica_placement.go
  2. 31
      go/topology/node.go
  3. 56
      go/topology/volume_growth.go
  4. 14
      go/topology/volume_growth_test.go
  5. 33
      go/topology/volume_location_list.go

14
go/storage/replica_placement.go

@ -52,8 +52,16 @@ func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
}
func (rp *ReplicaPlacement) Equal(rp1 *ReplicaPlacement) bool {
return rp.SameRackCount == rp1.SameRackCount &&
func (rp *ReplicaPlacement) Compare(rp1 *ReplicaPlacement) int {
if rp.SameRackCount == rp1.SameRackCount &&
rp.DiffRackCount == rp1.DiffRackCount &&
rp.DiffDataCenterCount == rp1.DiffDataCenterCount
rp.DiffDataCenterCount == rp1.DiffDataCenterCount {
return 0
} else if rp.SameRackCount < rp1.SameRackCount ||
rp.DiffRackCount < rp1.DiffRackCount ||
rp.DiffDataCenterCount < rp1.DiffDataCenterCount {
return -1
} else {
return 1
}
}

31
go/topology/node.go

@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"math/rand"
"strings"
"sort"
@ -58,7 +57,6 @@ type NodePicker interface {
PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error)
}
var ErrFilterContinue = errors.New("continue")
type FilterNodeFn func(dn Node) error
@ -78,31 +76,10 @@ func (n *NodeImpl) PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickF
}
}
if len(candidates) < numberOfNodes {
return nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
return nil, errors.New("Not enough data node found!")
// return nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
}
return pickFn(candidates, numberOfNodes), nil
glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
candidates = candidates[:0]
for _, node := range n.children {
if node.Id() == firstNode.Id() {
continue
}
if node.FreeSpace() <= 0 {
continue
}
glog.V(2).Infoln("select rest node candidate:", node.Id())
candidates = append(candidates, node)
}
glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
restNodes = pickFn(candidates, numberOfNodes-1)
if restNodes == nil {
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
err = errors.New("Not enough data node found!")
}
return
}
func RandomlyPickNodeFn(nodes []Node, count int) []Node {
@ -116,7 +93,7 @@ func RandomlyPickNodeFn(nodes []Node, count int) []Node {
return nodes[:count]
}
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (firstNode Node, restNodes []Node, err error) {
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (nodes []Node, err error) {
return n.PickNodes(numberOfNodes, filterFirstNodeFn, RandomlyPickNodeFn)
}
@ -134,7 +111,7 @@ func PickLowUsageNodeFn(nodes []Node, count int) []Node {
return nodes[:count]
}
func (n *NodeImpl) PickLowUsageNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (firstNode Node, restNodes []Node, err error) {
func (n *NodeImpl) PickLowUsageNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (nodes []Node, err error) {
return n.PickNodes(numberOfNodes, filterFirstNodeFn, PickLowUsageNodeFn)
}

56
go/topology/volume_growth.go

@ -139,11 +139,11 @@ func filterMainRack(option *VolumeGrowOption, node Node) error {
}
func makeExceptNodeFilter(nodes []Node) FilterNodeFn {
m := make(map[string]bool)
m := make(map[NodeId]bool)
for _, n := range nodes {
m[n.Id()] = true
}
return func(dn Node) {
return func(dn Node) error {
if dn.FreeSpace() <= 0 {
return ErrFilterContinue
}
@ -164,38 +164,69 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
pickNodesFn := PickLowUsageNodeFn
rp := option.ReplicaPlacement
pickMainAndRestNodes := func(np NodePicker, restNodeCount int, filterNodeFn FilterNodeFn) (mainNode Node, restNodes []Node, e error) {
pickMainAndRestNodes := func(np NodePicker, totalNodeCount int, filterNodeFn FilterNodeFn, existsNodes []Node) (mainNode Node, restNodes []Node, e error) {
for _, n := range existsNodes {
if filterNodeFn(n) == nil {
mainNode = n
break
}
}
if mainNode == nil {
mainNodes, err := np.PickNodes(1, filterNodeFn, pickNodesFn)
if err != nil {
return nil, err
return nil, nil, err
}
mainNode = mainNodes[0]
existsNodes = append(existsNodes, mainNode)
}
restNodes, err := np.PickNodes(restNodeCount,
makeExceptNodeFilter(mainNodes), pickNodesFn)
glog.V(2).Infoln(mainNode.Id(), "picked main node:", mainNode.Id())
restCount := totalNodeCount - len(existsNodes)
if restCount > 0 {
restNodes, err = np.PickNodes(restCount,
makeExceptNodeFilter(existsNodes), pickNodesFn)
if err != nil {
return nil, err
return nil, nil, err
}
return mainNodes[0], restNodes
}
mainDataCenter, otherDataCenters, dc_err := pickMainAndRestNodes(topo, rp.DiffDataCenterCount,
return mainNode, restNodes, nil
}
var existsNode []Node
if existsServer != nil {
existsNode = existsServer.DiffDataCenters()
}
mainDataCenter, otherDataCenters, dc_err := pickMainAndRestNodes(topo, rp.DiffDataCenterCount+1,
func(node Node) error {
return filterMainDataCenter(option, node)
})
}, existsNode)
if dc_err != nil {
return nil, dc_err
}
//find main rack and other racks
mainRack, otherRacks, rack_err := pickMainAndRestNodes(mainDataCenter.(*DataCenter), rp.DiffRackCount,
if existsServer != nil {
existsNode = existsServer.DiffRacks(mainDataCenter.(*DataCenter))
} else {
existsNode = nil
}
mainRack, otherRacks, rack_err := pickMainAndRestNodes(mainDataCenter.(*DataCenter), rp.DiffRackCount+1,
func(node Node) error {
return filterMainRack(option, node)
},
existsNode,
)
if rack_err != nil {
return nil, rack_err
}
//find main server and other servers
mainServer, otherServers, server_err := pickMainAndRestNodes(mainRack.(*Rack), rp.SameRackCount,
if existsServer != nil {
existsNode = existsServer.SameServers(mainRack.(*Rack))
} else {
existsNode = nil
}
mainServer, otherServers, server_err := pickMainAndRestNodes(mainRack.(*Rack), rp.SameRackCount+1,
func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
@ -205,6 +236,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
return nil
},
existsNode,
)
if server_err != nil {

14
go/topology/volume_growth_test.go

@ -70,6 +70,16 @@ var topologyLayout = `
}
},
"dc2":{
"rack2":{
"server221":{
"volumes":[],
"limit":8
},
"server222":{
"volumes":[],
"limit":8
}
}
},
"dc3":{
"rack2":{
@ -137,7 +147,7 @@ func setup(topologyLayout string) *Topology {
func TestFindEmptySlotsForOneVolume(t *testing.T) {
topo := setup(topologyLayout)
vg := NewDefaultVolumeGrowth()
rp, _ := storage.NewReplicaPlacementFromString("011")
rp, _ := storage.NewReplicaPlacementFromString("111")
volumeGrowOption := &VolumeGrowOption{
Collection: "",
ReplicaPlacement: rp,
@ -145,7 +155,7 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) {
Rack: "",
DataNode: "",
}
servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption, nil)
if err != nil {
fmt.Println("finding empty slots error :", err)
t.Fail()

33
go/topology/volume_location_list.go

@ -27,11 +27,6 @@ func (dnll *VolumeLocationList) Length() int {
return len(dnll.list)
}
func (dnll *VolumeLocationList) CalcReplicaPlacement() (rp *storage.ReplicaPlacement) {
//TODO CalcReplicaPlacement
return nil
}
func (dnll *VolumeLocationList) Set(loc *DataNode) {
for i := 0; i < len(dnll.list); i++ {
if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port {
@ -72,7 +67,7 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
}
// return all data centers, first is main data center
func (dnll *VolumeLocationList) DataCenters() []*DataCenter {
func (dnll *VolumeLocationList) DiffDataCenters() []Node {
m := make(map[*DataCenter]int)
maxCount := 0
var mainDC *DataCenter
@ -87,7 +82,7 @@ func (dnll *VolumeLocationList) DataCenters() []*DataCenter {
maxCount = m[dc]
}
}
dataCenters := make([]*DataCenter, 0, len(m))
dataCenters := make([]Node, 0, len(m))
if mainDC != nil {
dataCenters = append(dataCenters, mainDC)
}
@ -100,12 +95,12 @@ func (dnll *VolumeLocationList) DataCenters() []*DataCenter {
}
// return all racks if data center set nil
func (dnll *VolumeLocationList) Racks(dc *DataCenter) []*Rack {
func (dnll *VolumeLocationList) DiffRacks(mainDC *DataCenter) []Node {
m := make(map[*Rack]int)
maxCount := 0
var mainRack *Rack
for _, dn := range dnll.list {
if dc != nil && dn.GetDataCenter() != dc {
if mainDC != nil && dn.GetDataCenter() != mainDC {
continue
}
var rack *Rack
@ -118,7 +113,7 @@ func (dnll *VolumeLocationList) Racks(dc *DataCenter) []*Rack {
maxCount = m[rack]
}
}
racks := make([]*Rack, 0, len(m))
racks := make([]Node, 0, len(m))
if mainRack != nil {
racks = append(racks, mainRack)
}
@ -128,11 +123,9 @@ func (dnll *VolumeLocationList) Racks(dc *DataCenter) []*Rack {
return racks
}
func (dnll *VolumeLocationList) Servers(rack *Rack) []*DataNode {
servers := make([]*DataNode)
func (dnll *VolumeLocationList) SameServers(mainRack *Rack) (servers []Node) {
for _, dn := range dnll.list {
if rack != nil && dn.GetRack() != rack {
if mainRack != nil && dn.GetRack() != mainRack {
continue
}
var rack *Rack
@ -144,6 +137,18 @@ func (dnll *VolumeLocationList) Servers(rack *Rack) []*DataNode {
return servers
}
func (dnll *VolumeLocationList) CalcReplicaPlacement() (rp *storage.ReplicaPlacement) {
dcs := dnll.DiffDataCenters()
rs := dnll.DiffRacks(dcs[0].(*DataCenter))
ss := dnll.SameServers(rs[0].(*Rack))
rp = &storage.ReplicaPlacement{
len(dcs) - 1,
len(rs) - 1,
len(ss) - 1,
}
return
}
//func (dnll *VolumeLocationList)ContainDataNode(nodeType, id string)bool {
//
//}
Loading…
Cancel
Save