Browse Source

volume growth: default use low usage data node

volume growth: update test
*: add batch remote api call
pull/279/head
tnextday 10 years ago
parent
commit
47083630d7
  1. 2
      go/storage/store_task_cli.go
  2. 43
      go/topology/batch_operation.go
  3. 6
      go/topology/node.go
  4. 8
      go/topology/topology_vacuum.go
  5. 13
      go/topology/volume_growth.go
  6. 2
      go/topology/volume_growth_test.go
  7. 2
      go/util/http_util.go

2
go/storage/store_task_cli.go

@ -50,7 +50,7 @@ func (c *TaskCli) WaitAndQueryResult(timeout time.Duration) error {
for time.Since(startTime) < timeout { for time.Since(startTime) < timeout {
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/query", args) _, e := util.RemoteApiCall(c.DataNode, "/admin/task/query", args)
if e == nil { if e == nil {
//task finished and have no error
//task have finished and have no error
return nil return nil
} }
if util.IsRemoteApiError(e) { if util.IsRemoteApiError(e) {

43
go/topology/batch_operation.go

@ -0,0 +1,43 @@
package topology
import (
"net/url"
"strconv"
"time"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/util"
)
func BatchOperation(locationList *VolumeLocationList, path string, values url.Values) (isSuccess bool) {
ch := make(chan bool, locationList.Length())
for _, dn := range locationList.list {
go func(url string, path string, values url.Values) {
_, e := util.RemoteApiCall(url, path, values)
if e != nil {
glog.V(0).Infoln("RemoteApiCall:", util.MkUrl(url, path, values), "error =", e)
}
ch <- e == nil
}(dn.Url(), path, values)
}
isSuccess = true
for range locationList.list {
select {
case canVacuum := <-ch:
isSuccess = isSuccess && canVacuum
case <-time.After(30 * time.Minute):
isSuccess = false
break
}
}
return isSuccess
}
func SetVolumeReadonly(locationList *VolumeLocationList, volume string, isReadonly bool) (isSuccess bool) {
forms := url.Values{}
forms.Set("key", "readonly")
forms.Set("value", strconv.FormatBool(isReadonly))
forms.Set("volume", volume)
return BatchOperation(locationList, "/admin/setting", forms)
}

6
go/topology/node.go

@ -115,16 +115,16 @@ func (s nodeList) Len() int { return len(s) }
func (s nodeList) Less(i, j int) bool { return s[i].FreeSpace() < s[j].FreeSpace() } func (s nodeList) Less(i, j int) bool { return s[i].FreeSpace() < s[j].FreeSpace() }
func (s nodeList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s nodeList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func LowUsagePickNodeFn(nodes []Node, count int) []Node {
func PickLowUsageNodeFn(nodes []Node, count int) []Node {
if len(nodes) < count { if len(nodes) < count {
return nil return nil
} }
sort.Sort(nodeList(nodes))
sort.Sort(sort.Reverse(nodeList(nodes)))
return nodes[:count] return nodes[:count]
} }
func (n *NodeImpl) PickLowUsageNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (firstNode Node, restNodes []Node, err error) { func (n *NodeImpl) PickLowUsageNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (firstNode Node, restNodes []Node, err error) {
return n.PickNodes(numberOfNodes, filterFirstNodeFn, LowUsagePickNodeFn)
return n.PickNodes(numberOfNodes, filterFirstNodeFn, PickLowUsageNodeFn)
} }
func (n *NodeImpl) IsDataNode() bool { func (n *NodeImpl) IsDataNode() bool {

8
go/topology/topology_vacuum.go

@ -87,11 +87,11 @@ func (t *Topology) Vacuum(garbageThreshold string) int {
for _, vl := range c.storageType2VolumeLayout.Items { for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil { if vl != nil {
volumeLayout := vl.(*VolumeLayout) volumeLayout := vl.(*VolumeLayout)
for vid, locationlist := range volumeLayout.vid2location {
for vid, locationList := range volumeLayout.vid2location {
glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid) glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid)
if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationList) {
batchVacuumVolumeCommit(volumeLayout, vid, locationList)
} }
} }
} }

13
go/topology/volume_growth.go

@ -92,8 +92,9 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i
// 2. find rest data nodes // 2. find rest data nodes
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
//find main datacenter and other data centers //find main datacenter and other data centers
pickNodesFn := PickLowUsageNodeFn
rp := option.ReplicaPlacement rp := option.ReplicaPlacement
mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
mainDataCenter, otherDataCenters, dc_err := topo.PickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
} }
@ -119,13 +120,13 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1) return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
} }
return nil return nil
})
}, pickNodesFn)
if dc_err != nil { if dc_err != nil {
return nil, dc_err return nil, dc_err
} }
//find main rack and other racks //find main rack and other racks
mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).PickNodes(rp.DiffRackCount+1, func(node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack) return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
} }
@ -146,13 +147,13 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1) return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
} }
return nil return nil
})
}, pickNodesFn)
if rack_err != nil { if rack_err != nil {
return nil, rack_err return nil, rack_err
} }
//find main rack and other racks //find main rack and other racks
mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
mainServer, otherServers, server_err := mainRack.(*Rack).PickNodes(rp.SameRackCount+1, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
} }
@ -160,7 +161,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
} }
return nil return nil
})
}, pickNodesFn)
if server_err != nil { if server_err != nil {
return nil, server_err return nil, server_err
} }

2
go/topology/volume_growth_test.go

@ -131,6 +131,6 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) {
t.Fail() t.Fail()
} }
for _, server := range servers { for _, server := range servers {
fmt.Println("assigned node :", server.Id())
fmt.Printf("assigned node: %s, free space: %d\n", server.Id(), server.FreeSpace())
} }
} }

2
go/util/http_util.go

@ -100,7 +100,7 @@ func RemoteApiCall(host, path string, values url.Values) (result map[string]inte
if err, ok := result["error"]; ok && err.(string) != "" { if err, ok := result["error"]; ok && err.(string) != "" {
return nil, &RApiError{E: err.(string)} return nil, &RApiError{E: err.(string)}
} }
if code != http.StatusOK {
if code != http.StatusOK || code != http.StatusAccepted {
return nil, fmt.Errorf("RemoteApiCall %s/%s return %d", host, path, code) return nil, fmt.Errorf("RemoteApiCall %s/%s return %d", host, path, code)
} }
return result, nil return result, nil

Loading…
Cancel
Save