From 47083630d79e1dceb5e320567a1f0cdce502cb98 Mon Sep 17 00:00:00 2001 From: tnextday Date: Sun, 27 Dec 2015 02:44:39 +0800 Subject: [PATCH] volume growth: default use low usage data node volume growth: update test *: add batch remote api call --- go/storage/store_task_cli.go | 2 +- go/topology/batch_operation.go | 43 +++++++++++++++++++++++++++++++ go/topology/node.go | 6 ++--- go/topology/topology_vacuum.go | 8 +++--- go/topology/volume_growth.go | 13 +++++----- go/topology/volume_growth_test.go | 2 +- go/util/http_util.go | 2 +- 7 files changed, 60 insertions(+), 16 deletions(-) create mode 100644 go/topology/batch_operation.go diff --git a/go/storage/store_task_cli.go b/go/storage/store_task_cli.go index 953c1f791..06a18235c 100644 --- a/go/storage/store_task_cli.go +++ b/go/storage/store_task_cli.go @@ -50,7 +50,7 @@ func (c *TaskCli) WaitAndQueryResult(timeout time.Duration) error { for time.Since(startTime) < timeout { _, e := util.RemoteApiCall(c.DataNode, "/admin/task/query", args) if e == nil { - //task finished and have no error + //task have finished and have no error return nil } if util.IsRemoteApiError(e) { diff --git a/go/topology/batch_operation.go b/go/topology/batch_operation.go new file mode 100644 index 000000000..3cf791d1e --- /dev/null +++ b/go/topology/batch_operation.go @@ -0,0 +1,43 @@ +package topology + +import ( + "net/url" + "strconv" + "time" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/util" +) + +func BatchOperation(locationList *VolumeLocationList, path string, values url.Values) (isSuccess bool) { + ch := make(chan bool, locationList.Length()) + for _, dn := range locationList.list { + go func(url string, path string, values url.Values) { + _, e := util.RemoteApiCall(url, path, values) + if e != nil { + glog.V(0).Infoln("RemoteApiCall:", util.MkUrl(url, path, values), "error =", e) + } + ch <- e == nil + + }(dn.Url(), path, values) + } + isSuccess = true + for range locationList.list { + select { + case canVacuum := <-ch: + isSuccess = isSuccess && canVacuum + case <-time.After(30 * time.Minute): + isSuccess = false + break + } + } + return isSuccess +} + +func SetVolumeReadonly(locationList *VolumeLocationList, volume string, isReadonly bool) (isSuccess bool) { + forms := url.Values{} + forms.Set("key", "readonly") + forms.Set("value", strconv.FormatBool(isReadonly)) + forms.Set("volume", volume) + return BatchOperation(locationList, "/admin/setting", forms) +} diff --git a/go/topology/node.go b/go/topology/node.go index a2c78f63d..242f60b6f 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -115,16 +115,16 @@ func (s nodeList) Len() int { return len(s) } func (s nodeList) Less(i, j int) bool { return s[i].FreeSpace() < s[j].FreeSpace() } func (s nodeList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func LowUsagePickNodeFn(nodes []Node, count int) []Node { +func PickLowUsageNodeFn(nodes []Node, count int) []Node { if len(nodes) < count { return nil } - sort.Sort(nodeList(nodes)) + sort.Sort(sort.Reverse(nodeList(nodes))) return nodes[:count] } func (n *NodeImpl) PickLowUsageNodes(numberOfNodes int, filterFirstNodeFn FilterNodeFn) (firstNode Node, restNodes []Node, err error) { - return n.PickNodes(numberOfNodes, filterFirstNodeFn, LowUsagePickNodeFn) + return n.PickNodes(numberOfNodes, filterFirstNodeFn, PickLowUsageNodeFn) } func (n *NodeImpl) IsDataNode() bool { diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index cd85f3b15..446eb0c1c 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -87,11 +87,11 @@ func (t *Topology) Vacuum(garbageThreshold string) int { for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { volumeLayout := vl.(*VolumeLayout) - for vid, locationlist := range volumeLayout.vid2location { + for vid, locationList := range volumeLayout.vid2location { glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid) - if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) { - batchVacuumVolumeCommit(volumeLayout, vid, locationlist) + if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) { + if batchVacuumVolumeCompact(volumeLayout, vid, locationList) { + batchVacuumVolumeCommit(volumeLayout, vid, locationList) } } } diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 31307ffe0..ed3f8fee9 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -92,8 +92,9 @@ func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (i // 2. find rest data nodes func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { //find main datacenter and other data centers + pickNodesFn := PickLowUsageNodeFn rp := option.ReplicaPlacement - mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { + mainDataCenter, otherDataCenters, dc_err := topo.PickNodes(rp.DiffDataCenterCount+1, func(node Node) error { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) } @@ -119,13 +120,13 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1) } return nil - }) + }, pickNodesFn) if dc_err != nil { return nil, dc_err } //find main rack and other racks - mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { + mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).PickNodes(rp.DiffRackCount+1, func(node Node) error { if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } @@ -146,13 +147,13 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1) } return nil - }) + }, pickNodesFn) if rack_err != nil { return nil, rack_err } //find main rack and other racks - mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { + mainServer, otherServers, server_err := mainRack.(*Rack).PickNodes(rp.SameRackCount+1, func(node Node) error { if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) } @@ -160,7 +161,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) } return nil - }) + }, pickNodesFn) if server_err != nil { return nil, server_err } diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go index 08377b4fd..8f50a6f90 100644 --- a/go/topology/volume_growth_test.go +++ b/go/topology/volume_growth_test.go @@ -131,6 +131,6 @@ func TestFindEmptySlotsForOneVolume(t *testing.T) { t.Fail() } for _, server := range servers { - fmt.Println("assigned node :", server.Id()) + fmt.Printf("assigned node: %s, free space: %d\n", server.Id(), server.FreeSpace()) } } diff --git a/go/util/http_util.go b/go/util/http_util.go index 0f3d92ea1..ceae6faa7 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -100,7 +100,7 @@ func RemoteApiCall(host, path string, values url.Values) (result map[string]inte if err, ok := result["error"]; ok && err.(string) != "" { return nil, &RApiError{E: err.(string)} } - if code != http.StatusOK { + if code != http.StatusOK || code != http.StatusAccepted { return nil, fmt.Errorf("RemoteApiCall %s/%s return %d", host, path, code) } return result, nil