Browse Source

util: http Post auto make url

util: add RemoteApiCall
*: add store task client
store task: auto clean task when finish
pull/279/head
tnextday 10 years ago
parent
commit
69831c40fe
  1. 2
      go/filer/client_operations.go
  2. 2
      go/operation/assign_file_id.go
  3. 2
      go/operation/delete_content.go
  4. 2
      go/operation/list_masters.go
  5. 6
      go/operation/lookup.go
  6. 2
      go/operation/sync_volume.go
  7. 2
      go/storage/store.go
  8. 35
      go/storage/store_task.go
  9. 69
      go/storage/store_task_cli.go
  10. 2
      go/storage/store_task_replication.go
  11. 2
      go/topology/allocate_volume.go
  12. 2
      go/topology/topology_replicate.go
  13. 6
      go/topology/topology_vacuum.go
  14. 47
      go/util/http_util.go
  15. 13
      go/util/url_util.go
  16. 5
      go/weed/benchmark.go
  17. 4
      go/weed/weed_server/master_server_handlers_admin.go

2
go/filer/client_operations.go

@ -58,7 +58,7 @@ func call(server string, request ApiRequest, ret interface{}) error {
} }
values := make(url.Values) values := make(url.Values)
values.Add("request", string(b)) values.Add("request", string(b))
jsonBlob, err := util.Post("http://"+server+"/__api__", values)
jsonBlob, err := util.Post(server, "/__api__", values)
if err != nil { if err != nil {
return err return err
} }

2
go/operation/assign_file_id.go

@ -31,7 +31,7 @@ func Assign(server string, count uint64, replication string, collection string,
if ttl != "" { if ttl != "" {
values.Add("ttl", ttl) values.Add("ttl", ttl)
} }
jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
jsonBlob, err := util.Post(server, "/dir/assign", values)
glog.V(2).Info("assign result :", string(jsonBlob)) glog.V(2).Info("assign result :", string(jsonBlob))
if err != nil { if err != nil {
return nil, err return nil, err

2
go/operation/delete_content.go

@ -97,7 +97,7 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) {
for _, fid := range fidList { for _, fid := range fidList {
values.Add("fid", fid) values.Add("fid", fid)
} }
jsonBlob, err := util.Post("http://"+server+"/delete", values)
jsonBlob, err := util.Post(server, "/delete", values)
if err != nil { if err != nil {
ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob))
return return

2
go/operation/list_masters.go

@ -14,7 +14,7 @@ type ClusterStatusResult struct {
} }
func ListMasters(server string) ([]string, error) { func ListMasters(server string) ([]string, error) {
jsonBlob, err := util.Get("http://" + server + "/cluster/status")
jsonBlob, err := util.Get(server, "/cluster/status", nil)
glog.V(2).Info("list masters result :", string(jsonBlob)) glog.V(2).Info("list masters result :", string(jsonBlob))
if err != nil { if err != nil {
return nil, err return nil, err

6
go/operation/lookup.go

@ -63,7 +63,7 @@ func LookupNoCache(server string, vid string) (ret *LookupResult, err error) {
func do_lookup(server string, vid string) (*LookupResult, error) { func do_lookup(server string, vid string) (*LookupResult, error) {
values := make(url.Values) values := make(url.Values)
values.Add("volumeId", vid) values.Add("volumeId", vid)
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
jsonBlob, err := util.Post(server, "/dir/lookup", values)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -96,7 +96,7 @@ func LookupFileId(server string, fileId string, readonly bool) (fullUrl string,
} else { } else {
u = lookup.Locations.Head().Url u = lookup.Locations.Head().Url
} }
return "http://" + u + "/" + fileId, nil
return util.MkUrl(u, "/"+fileId, nil), nil
} }
// LookupVolumeIds find volume locations by cache and actual lookup // LookupVolumeIds find volume locations by cache and actual lookup
@ -123,7 +123,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err
for _, vid := range unknown_vids { for _, vid := range unknown_vids {
values.Add("volumeId", vid) values.Add("volumeId", vid)
} }
jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values)
jsonBlob, err := util.Post(server, "/vol/lookup", values)
if err != nil { if err != nil {
return nil, err return nil, err
} }

2
go/operation/sync_volume.go

@ -20,7 +20,7 @@ type SyncVolumeResponse struct {
func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) { func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) {
values := make(url.Values) values := make(url.Values)
values.Add("volume", vid) values.Add("volume", vid)
jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values)
jsonBlob, err := util.Post(server, "/admin/sync/status", values)
glog.V(2).Info("sync volume result :", string(jsonBlob)) glog.V(2).Info("sync volume result :", string(jsonBlob))
if err != nil { if err != nil {
return nil, err return nil, err

2
go/storage/store.go

@ -316,7 +316,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
return "", "", err return "", "", err
} }
joinUrl := "http://" + masterNode + "/dir/join"
joinUrl := util.MkUrl(masterNode, "/dir/join", nil)
glog.V(4).Infof("Connecting to %s ...", joinUrl) glog.V(4).Infof("Connecting to %s ...", joinUrl)
jsonBlob, err := util.PostBytes(joinUrl, data) jsonBlob, err := util.PostBytes(joinUrl, data)

35
go/storage/store_task.go

@ -4,12 +4,16 @@ import (
"errors" "errors"
"net/url" "net/url"
"time" "time"
"github.com/chrislusf/seaweedfs/go/glog"
) )
type TaskType string
const ( const (
TaskVacuum = "VACUUM"
TaskReplica = "REPLICA"
TaskBalance = "BALANCE"
TaskVacuum TaskType = "VACUUM"
TaskReplica TaskType = "REPLICA"
TaskBalance TaskType = "BALANCE"
) )
var ( var (
@ -27,18 +31,21 @@ type TaskWorker interface {
} }
type Task struct { type Task struct {
Id string
startTime time.Time startTime time.Time
worker TaskWorker worker TaskWorker
ch chan bool ch chan bool
result error result error
cleanWhenFinish bool
} }
type TaskManager struct { type TaskManager struct {
TaskList map[string]*Task TaskList map[string]*Task
} }
func NewTask(worker TaskWorker) *Task {
func NewTask(worker TaskWorker, id string) *Task {
t := &Task{ t := &Task{
Id: id,
worker: worker, worker: worker,
startTime: time.Now(), startTime: time.Now(),
result: ErrTaskNotFinish, result: ErrTaskNotFinish,
@ -46,7 +53,12 @@ func NewTask(worker TaskWorker) *Task {
} }
go func(t *Task) { go func(t *Task) {
t.result = t.worker.Run() t.result = t.worker.Run()
if t.cleanWhenFinish {
glog.V(0).Infof("clean task (%s) when finish.", t.Id)
t.worker.Clean()
}
t.ch <- true t.ch <- true
}(t) }(t)
return t return t
} }
@ -75,7 +87,7 @@ func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error)
return tid, ErrTaskExists return tid, ErrTaskExists
} }
var tw TaskWorker var tw TaskWorker
switch tt {
switch TaskType(tt) {
case TaskVacuum: case TaskVacuum:
tw, e = NewVacuumTask(s, args) tw, e = NewVacuumTask(s, args)
case TaskReplica: case TaskReplica:
@ -88,7 +100,7 @@ func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error)
if tw == nil { if tw == nil {
return "", ErrTaskInvalid return "", ErrTaskInvalid
} }
tm.TaskList[tid] = NewTask(tw)
tm.TaskList[tid] = NewTask(tw, tid)
return tid, nil return tid, nil
} }
@ -112,16 +124,19 @@ func (tm *TaskManager) Commit(tid string) (e error) {
return t.worker.Commit() return t.worker.Commit()
} }
func (tm *TaskManager) Clean(tid string) (e error) {
func (tm *TaskManager) Clean(tid string) error {
t, ok := tm.TaskList[tid] t, ok := tm.TaskList[tid]
if !ok { if !ok {
return ErrTaskNotFound return ErrTaskNotFound
} }
delete(tm.TaskList, tid)
if t.QueryResult(time.Second*30) == ErrTaskNotFinish { if t.QueryResult(time.Second*30) == ErrTaskNotFinish {
return ErrTaskNotFinish
t.cleanWhenFinish = true
glog.V(0).Infof("task (%s) is not finish, clean it later.", tid)
} else {
t.worker.Clean()
} }
delete(tm.TaskList, tid)
return t.worker.Clean()
return nil
} }
func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) { func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) {

69
go/storage/store_task_cli.go

@ -0,0 +1,69 @@
package storage
import (
"errors"
"fmt"
"net/url"
"time"
"github.com/chrislusf/seaweedfs/go/util"
)
type TaskParams map[string]string
var (
ErrTaskTimeout = errors.New("TaskTimeout")
)
type TaskCli struct {
TID string
DataNode string
}
func NewTaskCli(dataNode string, task TaskType, params TaskParams) (*TaskCli, error) {
args := url.Values{}
args.Set("task", string(task))
for k, v := range params {
args.Set(k, v)
}
m, e := util.RemoteApiCall(dataNode, "/admin/task/new", args)
if e != nil {
return nil, e
}
tid := m["tid"].(string)
if tid == "" {
return nil, fmt.Errorf("Empty %s task", task)
}
return &TaskCli{
TID: tid,
DataNode: dataNode,
}, nil
}
func (c *TaskCli) WaitAndQueryResult(timeout time.Duration) error {
startTime := time.Now()
args := url.Values{}
args.Set("task", c.TID)
for time.Since(startTime) < timeout {
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/query", args)
if e.Error() == ErrTaskNotFinish.Error() {
continue
}
return e
}
return ErrTaskTimeout
}
func (c *TaskCli) Commit() error {
args := url.Values{}
args.Set("task", c.TID)
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/commit", args)
return e
}
func (c *TaskCli) Clean() error {
args := url.Values{}
args.Set("task", c.TID)
_, e := util.RemoteApiCall(c.DataNode, "/admin/task/clean", args)
return e
}

2
go/storage/store_task_replication.go

@ -61,7 +61,7 @@ func (t *ReplicaTask) Run() error {
} }
ch <- e ch <- e
}() }()
errs := make([]error, 0, 2)
errs := make([]error, 0)
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
if e := <-ch; e != nil { if e := <-ch; e != nil {
errs = append(errs, e) errs = append(errs, e)

2
go/topology/allocate_volume.go

@ -19,7 +19,7 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption
values.Add("volume", vid.String()) values.Add("volume", vid.String())
values.Add("collection", option.Collection) values.Add("collection", option.Collection)
values.Add("ttl", option.Ttl.String()) values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
jsonBlob, err := util.Post(dn.Url(), "/admin/assign_volume", values)
if err != nil { if err != nil {
return err return err
} }

2
go/topology/topology_replicate.go

@ -6,7 +6,7 @@ func (t *Topology) Replicate() int {
glog.V(0).Infoln("Start replicate checker on demand") glog.V(0).Infoln("Start replicate checker on demand")
for _, col := range t.collectionMap.Items { for _, col := range t.collectionMap.Items {
c := col.(*Collection) c := col.(*Collection)
glog.V(0).Infoln("replicate on collection:", c.Name)
glog.V(0).Infoln("checking replicate on collection:", c.Name)
for _, vl := range c.storageType2VolumeLayout.Items { for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil { if vl != nil {
volumeLayout := vl.(*VolumeLayout) volumeLayout := vl.(*VolumeLayout)

6
go/topology/topology_vacuum.go

@ -110,7 +110,7 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho
values := make(url.Values) values := make(url.Values)
values.Add("volume", vid.String()) values.Add("volume", vid.String())
values.Add("garbageThreshold", garbageThreshold) values.Add("garbageThreshold", garbageThreshold)
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values)
jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/check", values)
if err != nil { if err != nil {
glog.V(0).Infoln("parameters:", values) glog.V(0).Infoln("parameters:", values)
return err, false return err, false
@ -127,7 +127,7 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho
func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
values := make(url.Values) values := make(url.Values)
values.Add("volume", vid.String()) values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values)
jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/compact", values)
if err != nil { if err != nil {
return err return err
} }
@ -143,7 +143,7 @@ func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
values := make(url.Values) values := make(url.Values)
values.Add("volume", vid.String()) values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values)
jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/commit", values)
if err != nil { if err != nil {
return err return err
} }

47
go/util/http_util.go

@ -28,6 +28,18 @@ func init() {
client = &http.Client{Transport: Transport} client = &http.Client{Transport: Transport}
} }
func MkUrl(host, path string, args url.Values) string {
u := url.URL{
Scheme: "http",
Host: host,
Path: path,
}
if args != nil {
u.RawQuery = args.Encode()
}
return u.String()
}
func PostBytes(url string, body []byte) ([]byte, error) { func PostBytes(url string, body []byte) ([]byte, error) {
r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body))
if err != nil { if err != nil {
@ -41,20 +53,45 @@ func PostBytes(url string, body []byte) ([]byte, error) {
return b, nil return b, nil
} }
func Post(url string, values url.Values) ([]byte, error) {
func PostEx(host, path string, values url.Values) (content []byte, statusCode int, e error) {
url := MkUrl(host, path, nil)
r, err := client.PostForm(url, values) r, err := client.PostForm(url, values)
if err != nil { if err != nil {
return nil, err
return nil, 0, err
} }
defer r.Body.Close() defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body) b, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
return nil, err
return nil, r.StatusCode, err
} }
return b, nil
return b, r.StatusCode, nil
}
func Post(host, path string, values url.Values) (content []byte, e error) {
content, _, e = PostEx(host, path, values)
return
}
func RemoteApiCall(host, path string, values url.Values) (result map[string]interface{}, e error) {
jsonBlob, code, e := PostEx(host, path, values)
if e != nil {
return nil, e
}
result = make(map[string]interface{})
if e := json.Unmarshal(jsonBlob, result); e != nil {
return nil, e
}
if err, ok := result["error"]; ok && err.(string) != "" {
return nil, errors.New(err.(string))
}
if code != http.StatusOK {
return nil, fmt.Errorf("RemoteApiCall %s/%s return %d", host, path, code)
}
return result, nil
} }
func Get(url string) ([]byte, error) {
func Get(host, path string, values url.Values) ([]byte, error) {
url := MkUrl(host, path, values)
r, err := client.Get(url) r, err := client.Get(url)
if err != nil { if err != nil {
return nil, err return nil, err

13
go/util/url_util.go

@ -1,13 +0,0 @@
package util
import "net/url"
func MkUrl(host, path string, args url.Values) string {
u := url.URL{
Scheme: "http",
Host: host,
Path: path,
}
u.RawQuery = args.Encode()
return u.String()
}

5
go/weed/benchmark.go

@ -255,14 +255,13 @@ func readFiles(fileIdLineChan chan string, s *stat) {
continue continue
} }
server := ret.Locations.PickForRead().Url server := ret.Locations.PickForRead().Url
url := "http://" + server + "/" + fid
if bytesRead, err := util.Get(url); err == nil {
if bytesRead, err := util.Get(server, "/"+fid, nil); err == nil {
s.completed++ s.completed++
s.transferred += int64(len(bytesRead)) s.transferred += int64(len(bytesRead))
readStats.addSample(time.Now().Sub(start)) readStats.addSample(time.Now().Sub(start))
} else { } else {
s.failed++ s.failed++
fmt.Printf("Failed to read %s error:%v\n", url, err)
fmt.Printf("Failed to read %s/%s error:%v\n", server, fid, err)
} }
} }
} }

4
go/weed/weed_server/master_server_handlers_admin.go

@ -28,7 +28,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return return
} }
for _, server := range collection.ListVolumeServers() { for _, server := range collection.ListVolumeServers() {
_, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection"))
_, err := util.Get(server.Ip+":"+strconv.Itoa(server.Port), "/admin/delete_collection", url.Values{"collection": r.Form["collection"]})
if err != nil { if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err) writeJsonError(w, r, http.StatusInternalServerError, err)
return return
@ -204,7 +204,7 @@ func (ms *MasterServer) batchSetVolumeOption(settingKey, settingValue string, vo
wg.Add(1) wg.Add(1)
go func(server string, values url.Values) { go func(server string, values url.Values) {
defer wg.Done() defer wg.Done()
jsonBlob, e := util.Post("http://"+server+"/admin/setting", values)
jsonBlob, e := util.Post(server, "/admin/setting", values)
if e != nil { if e != nil {
result[server] = map[string]interface{}{ result[server] = map[string]interface{}{
"error": e.Error() + " " + string(jsonBlob), "error": e.Error() + " " + string(jsonBlob),

Loading…
Cancel
Save