Browse Source

*:temp commit

pull/279/head
tnextday 10 years ago
parent
commit
9f3ba7c153
  1. 29
      go/topology/node.go
  2. 17
      go/topology/topology_replicate.go
  3. 183
      go/topology/volume_growth.go
  4. 30
      go/topology/volume_growth_test.go
  5. 77
      go/topology/volume_location_list.go

29
go/topology/node.go

@ -2,12 +2,14 @@ package topology
import ( import (
"errors" "errors"
"fmt"
"math/rand" "math/rand"
"strings" "strings"
"sort"
"github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/storage"
"sort"
) )
type NodeId string type NodeId string
@ -52,25 +54,34 @@ type NodeImpl struct {
value interface{} value interface{}
} }
type NodePicker interface {
PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error)
}
var ErrFilterContinue = errors.New("continue")
type FilterNodeFn func(dn Node) error type FilterNodeFn func(dn Node) error
type PickNodesFn func(nodes []Node, count int) []Node type PickNodesFn func(nodes []Node, count int) []Node
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) PickNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn, pickFn PickNodesFn) (firstNode Node, restNodes []Node, err error) {
func (n *NodeImpl) PickNodes(numberOfNodes int, filterNodeFn FilterNodeFn, pickFn PickNodesFn) (nodes []Node, err error) {
candidates := make([]Node, 0, len(n.children)) candidates := make([]Node, 0, len(n.children))
var errs []string var errs []string
for _, node := range n.children { for _, node := range n.children {
if err := filterFirstNodeFn(node); err == nil {
if err := filterNodeFn(node); err == nil {
candidates = append(candidates, node) candidates = append(candidates, node)
}else if err == ErrFilterContinue{
continue
} else { } else {
errs = append(errs, string(node.Id())+":"+err.Error()) errs = append(errs, string(node.Id())+":"+err.Error())
} }
} }
ns := pickFn(candidates, 1)
if ns == nil {
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
if len(candidates) < numberOfNodes{
return nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
} }
firstNode = ns[0]
return pickFn(candidates, numberOfNodes), nil
glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
@ -163,7 +174,7 @@ func (n *NodeImpl) GetValue() interface{} {
func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
for _, node := range n.children { for _, node := range n.children {
freeSpace := node.FreeSpace() freeSpace := node.FreeSpace()
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 { if freeSpace <= 0 {
continue continue
} }
@ -171,7 +182,7 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
r -= freeSpace r -= freeSpace
} else { } else {
if node.IsDataNode() && node.FreeSpace() > 0 { if node.IsDataNode() && node.FreeSpace() > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
fmt.Println("assigned to node =", node, ", freeSpace =", node.FreeSpace())
return node.(*DataNode), nil return node.(*DataNode), nil
} }
assignedNode, err = node.ReserveOneVolume(r) assignedNode, err = node.ReserveOneVolume(r)

17
go/topology/topology_replicate.go

@ -1,8 +1,11 @@
package topology package topology
import "github.com/chrislusf/seaweedfs/go/glog"
import (
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
)
func (t *Topology) Replicate() int {
func (t *Topology) CheckReplicate() int {
glog.V(0).Infoln("Start replicate checker on demand") glog.V(0).Infoln("Start replicate checker on demand")
for _, col := range t.collectionMap.Items { for _, col := range t.collectionMap.Items {
c := col.(*Collection) c := col.(*Collection)
@ -15,6 +18,7 @@ func (t *Topology) Replicate() int {
if locationList.Length() < copyCount { if locationList.Length() < copyCount {
//set volume readonly //set volume readonly
glog.V(0).Infoln("replicate volume :", vid) glog.V(0).Infoln("replicate volume :", vid)
SetVolumeReadonly(locationList, vid.String(), true)
} }
} }
@ -23,3 +27,12 @@ func (t *Topology) Replicate() int {
} }
return 0 return 0
} }
func (t *Topology) doReplicate(vl *VolumeLayout, vid storage.VolumeId) {
locationList := vl.vid2location[vid]
if !SetVolumeReadonly(locationList, vid.String(), true) {
return
}
defer SetVolumeReadonly(locationList, vid.String(), false)
}

183
go/topology/volume_growth.go

@ -76,7 +76,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOp
} }
func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option)
servers, e := vg.findEmptySlotsForOneVolume(topo, option, nil)
if e != nil { if e != nil {
return 0, e return 0, e
} }
@ -85,105 +85,150 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i
return len(servers), err return len(servers), err
} }
func filterMainDataCenter(option *VolumeGrowOption, node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
}
rp := option.ReplicaPlacement
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
for _, rack := range node.Children() {
possibleDataNodesCount := 0
for _, n := range rack.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount >= rp.SameRackCount+1 {
possibleRacksCount++
}
}
if possibleRacksCount < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
}
return nil
}
func filterMainRack(option *VolumeGrowOption, node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
rp := option.ReplicaPlacement
if node.FreeSpace() < rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
// a bit faster way to test free racks
return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
}
possibleDataNodesCount := 0
for _, n := range node.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount < rp.SameRackCount+1 {
return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
}
return nil
}
func makeExceptNodeFilter(nodes []Node) FilterNodeFn {
m := make(map[string]bool)
for _, n := range nodes {
m[n.Id()] = true
}
return func(dn Node) {
if dn.FreeSpace() <= 0 {
return ErrFilterContinue
}
if _, ok := m[dn.Id()]; ok {
return ErrFilterContinue
}
return nil
}
}
// 1. find the main data node // 1. find the main data node
// 1.1 collect all data nodes that have 1 slots // 1.1 collect all data nodes that have 1 slots
// 2.2 collect all racks that have rp.SameRackCount+1 // 2.2 collect all racks that have rp.SameRackCount+1
// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1 // 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
// 2. find rest data nodes // 2. find rest data nodes
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, existsServer *VolumeLocationList) (additionServers []*DataNode, err error) {
//find main datacenter and other data centers //find main datacenter and other data centers
pickNodesFn := PickLowUsageNodeFn pickNodesFn := PickLowUsageNodeFn
rp := option.ReplicaPlacement rp := option.ReplicaPlacement
mainDataCenter, otherDataCenters, dc_err := topo.PickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
}
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
for _, rack := range node.Children() {
possibleDataNodesCount := 0
for _, n := range rack.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount >= rp.SameRackCount+1 {
possibleRacksCount++
}
pickMainAndRestNodes := func(np NodePicker, restNodeCount int, filterNodeFn FilterNodeFn) (mainNode Node, restNodes []Node, e error) {
mainNodes, err := np.PickNodes(1, filterNodeFn, pickNodesFn)
if err != nil {
return nil, err
} }
if possibleRacksCount < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
restNodes, err := np.PickNodes(restNodeCount,
makeExceptNodeFilter(mainNodes), pickNodesFn)
if err != nil {
return nil, err
} }
return nil
}, pickNodesFn)
return mainNodes[0], restNodes
}
mainDataCenter, otherDataCenters, dc_err := pickMainAndRestNodes(topo, rp.DiffDataCenterCount,
func(node Node) error {
return filterMainDataCenter(option, node)
})
if dc_err != nil { if dc_err != nil {
return nil, dc_err return nil, dc_err
} }
//find main rack and other racks //find main rack and other racks
mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).PickNodes(rp.DiffRackCount+1, func(node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
if node.FreeSpace() < rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
// a bit faster way to test free racks
return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
}
possibleDataNodesCount := 0
for _, n := range node.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount < rp.SameRackCount+1 {
return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
}
return nil
}, pickNodesFn)
mainRack, otherRacks, rack_err := pickMainAndRestNodes(mainDataCenter.(*DataCenter), rp.DiffRackCount,
func(node Node) error {
return filterMainRack(option, node)
},
)
if rack_err != nil { if rack_err != nil {
return nil, rack_err return nil, rack_err
} }
//find main rack and other racks
mainServer, otherServers, server_err := mainRack.(*Rack).PickNodes(rp.SameRackCount+1, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
if node.FreeSpace() < 1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
}
return nil
}, pickNodesFn)
//find main server and other servers
mainServer, otherServers, server_err := pickMainAndRestNodes(mainRack.(*Rack), rp.SameRackCount,
func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
if node.FreeSpace() < 1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
}
return nil
},
)
if server_err != nil { if server_err != nil {
return nil, server_err return nil, server_err
} }
servers = append(servers, mainServer.(*DataNode))
additionServers = append(additionServers, mainServer.(*DataNode))
for _, server := range otherServers { for _, server := range otherServers {
servers = append(servers, server.(*DataNode))
additionServers = append(additionServers, server.(*DataNode))
} }
for _, rack := range otherRacks { for _, rack := range otherRacks {
r := rand.Intn(rack.FreeSpace()) r := rand.Intn(rack.FreeSpace())
if server, e := rack.ReserveOneVolume(r); e == nil { if server, e := rack.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
additionServers = append(additionServers, server)
} else { } else {
return servers, e
return additionServers, e
} }
} }
for _, datacenter := range otherDataCenters {
r := rand.Intn(datacenter.FreeSpace())
if server, e := datacenter.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
for _, dc := range otherDataCenters {
r := rand.Intn(dc.FreeSpace())
if server, e := dc.ReserveOneVolume(r); e == nil {
additionServers = append(additionServers, server)
} else { } else {
return servers, e
return additionServers, e
} }
} }
return return

30
go/topology/volume_growth_test.go

@ -19,7 +19,7 @@ var topologyLayout = `
{"id":2, "size":12312}, {"id":2, "size":12312},
{"id":3, "size":12312} {"id":3, "size":12312}
], ],
"limit":3
"limit":15
}, },
"server112":{ "server112":{
"volumes":[ "volumes":[
@ -28,6 +28,18 @@ var topologyLayout = `
{"id":6, "size":12312} {"id":6, "size":12312}
], ],
"limit":10 "limit":10
},
"server113":{
"volumes":[
{"id":7, "size":12312},
{"id":8, "size":12312},
{"id":9, "size":12312}
],
"limit":8
},
"server114":{
"volumes":[],
"limit":8
} }
}, },
"rack2":{ "rack2":{
@ -37,11 +49,15 @@ var topologyLayout = `
{"id":5, "size":12312}, {"id":5, "size":12312},
{"id":6, "size":12312} {"id":6, "size":12312}
], ],
"limit":4
"limit":8
}, },
"server122":{ "server122":{
"volumes":[], "volumes":[],
"limit":4
"limit":8
},
"server124":{
"volumes":[],
"limit":8
}, },
"server123":{ "server123":{
"volumes":[ "volumes":[
@ -63,7 +79,11 @@ var topologyLayout = `
{"id":3, "size":12312}, {"id":3, "size":12312},
{"id":5, "size":12312} {"id":5, "size":12312}
], ],
"limit":4
"limit":8
},
"server322":{
"volumes":[],
"limit":7
} }
} }
} }
@ -117,7 +137,7 @@ func setup(topologyLayout string) *Topology {
func TestFindEmptySlotsForOneVolume(t *testing.T) { func TestFindEmptySlotsForOneVolume(t *testing.T) {
topo := setup(topologyLayout) topo := setup(topologyLayout)
vg := NewDefaultVolumeGrowth() vg := NewDefaultVolumeGrowth()
rp, _ := storage.NewReplicaPlacementFromString("002")
rp, _ := storage.NewReplicaPlacementFromString("011")
volumeGrowOption := &VolumeGrowOption{ volumeGrowOption := &VolumeGrowOption{
Collection: "", Collection: "",
ReplicaPlacement: rp, ReplicaPlacement: rp,

77
go/topology/volume_location_list.go

@ -70,3 +70,80 @@ func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
dnll.list = l dnll.list = l
} }
} }
// return all data centers, first is main data center
func (dnll *VolumeLocationList) DataCenters() []*DataCenter {
m := make(map[*DataCenter]int)
maxCount := 0
var mainDC *DataCenter
for _, dn := range dnll.list {
var dc *DataCenter
if dc = dn.GetDataCenter(); dc == nil {
continue
}
m[dc] = m[dc] + 1
if m[dc] > maxCount {
mainDC = dc
maxCount = m[dc]
}
}
dataCenters := make([]*DataCenter, 0, len(m))
if mainDC != nil {
dataCenters = append(dataCenters, mainDC)
}
for dc := range m {
if dc != mainDC {
dataCenters = append(dataCenters, dc)
}
}
return dataCenters
}
// return all racks if data center set nil
func (dnll *VolumeLocationList) Racks(dc *DataCenter) []*Rack {
m := make(map[*Rack]int)
maxCount := 0
var mainRack *Rack
for _, dn := range dnll.list {
if dc != nil && dn.GetDataCenter() != dc {
continue
}
var rack *Rack
if rack = dn.GetRack(); rack == nil {
continue
}
m[rack] = m[rack] + 1
if m[rack] > maxCount {
mainRack = rack
maxCount = m[rack]
}
}
racks := make([]*Rack, 0, len(m))
if mainRack != nil {
racks = append(racks, mainRack)
}
for rack := range m {
racks = append(racks, rack)
}
return racks
}
func (dnll *VolumeLocationList) Servers(rack *Rack) []*DataNode {
servers := make([]*DataNode)
for _, dn := range dnll.list {
if rack != nil && dn.GetRack() != rack {
continue
}
var rack *Rack
if rack = dn.GetRack(); rack == nil {
continue
}
servers = append(servers, dn)
}
return servers
}
//func (dnll *VolumeLocationList)ContainDataNode(nodeType, id string)bool {
//
//}
Loading…
Cancel
Save