diff --git a/go/filer/client_operations.go b/go/filer/client_operations.go index 80ac51693..b3ccc633a 100644 --- a/go/filer/client_operations.go +++ b/go/filer/client_operations.go @@ -58,7 +58,7 @@ func call(server string, request ApiRequest, ret interface{}) error { } values := make(url.Values) values.Add("request", string(b)) - jsonBlob, err := util.Post("http://"+server+"/__api__", values) + jsonBlob, err := util.Post(server, "/__api__", values) if err != nil { return err } diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go index fa436b651..45ac5c362 100644 --- a/go/operation/assign_file_id.go +++ b/go/operation/assign_file_id.go @@ -31,7 +31,7 @@ func Assign(server string, count uint64, replication string, collection string, if ttl != "" { values.Add("ttl", ttl) } - jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) + jsonBlob, err := util.Post(server, "/dir/assign", values) glog.V(2).Info("assign result :", string(jsonBlob)) if err != nil { return nil, err diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go index 9bd6654d7..a8cd46f71 100644 --- a/go/operation/delete_content.go +++ b/go/operation/delete_content.go @@ -97,7 +97,7 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { for _, fid := range fidList { values.Add("fid", fid) } - jsonBlob, err := util.Post("http://"+server+"/delete", values) + jsonBlob, err := util.Post(server, "/delete", values) if err != nil { ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) return diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go index bda6f3c65..8aa1eae58 100644 --- a/go/operation/list_masters.go +++ b/go/operation/list_masters.go @@ -14,7 +14,7 @@ type ClusterStatusResult struct { } func ListMasters(server string) ([]string, error) { - jsonBlob, err := util.Get("http://" + server + "/cluster/status") + jsonBlob, err := util.Get(server, "/cluster/status", nil) glog.V(2).Info("list masters result :", string(jsonBlob)) if err != nil { return nil, err diff --git a/go/operation/lookup.go b/go/operation/lookup.go index 83ef55752..86a2ff760 100644 --- a/go/operation/lookup.go +++ b/go/operation/lookup.go @@ -63,7 +63,7 @@ func LookupNoCache(server string, vid string) (ret *LookupResult, err error) { func do_lookup(server string, vid string) (*LookupResult, error) { values := make(url.Values) values.Add("volumeId", vid) - jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) + jsonBlob, err := util.Post(server, "/dir/lookup", values) if err != nil { return nil, err } @@ -96,7 +96,7 @@ func LookupFileId(server string, fileId string, readonly bool) (fullUrl string, } else { u = lookup.Locations.Head().Url } - return "http://" + u + "/" + fileId, nil + return util.MkUrl(u, "/"+fileId, nil), nil } // LookupVolumeIds find volume locations by cache and actual lookup @@ -123,7 +123,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err for _, vid := range unknown_vids { values.Add("volumeId", vid) } - jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values) + jsonBlob, err := util.Post(server, "/vol/lookup", values) if err != nil { return nil, err } diff --git a/go/operation/sync_volume.go b/go/operation/sync_volume.go index 713cf33c1..f63d6f96a 100644 --- a/go/operation/sync_volume.go +++ b/go/operation/sync_volume.go @@ -20,7 +20,7 @@ type SyncVolumeResponse struct { func GetVolumeSyncStatus(server string, vid string) (*SyncVolumeResponse, error) { values := make(url.Values) values.Add("volume", vid) - jsonBlob, err := util.Post("http://"+server+"/admin/sync/status", values) + jsonBlob, err := util.Post(server, "/admin/sync/status", values) glog.V(2).Info("sync volume result :", string(jsonBlob)) if err != nil { return nil, err diff --git a/go/storage/store.go b/go/storage/store.go index 434c81b0a..702db99fa 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -316,7 +316,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S return "", "", err } - joinUrl := "http://" + masterNode + "/dir/join" + joinUrl := util.MkUrl(masterNode, "/dir/join", nil) glog.V(4).Infof("Connecting to %s ...", joinUrl) jsonBlob, err := util.PostBytes(joinUrl, data) diff --git a/go/storage/store_task.go b/go/storage/store_task.go index 2f8c0515f..123b12f5f 100644 --- a/go/storage/store_task.go +++ b/go/storage/store_task.go @@ -4,12 +4,16 @@ import ( "errors" "net/url" "time" + + "github.com/chrislusf/seaweedfs/go/glog" ) +type TaskType string + const ( - TaskVacuum = "VACUUM" - TaskReplica = "REPLICA" - TaskBalance = "BALANCE" + TaskVacuum TaskType = "VACUUM" + TaskReplica TaskType = "REPLICA" + TaskBalance TaskType = "BALANCE" ) var ( @@ -27,18 +31,21 @@ type TaskWorker interface { } type Task struct { - startTime time.Time - worker TaskWorker - ch chan bool - result error + Id string + startTime time.Time + worker TaskWorker + ch chan bool + result error + cleanWhenFinish bool } type TaskManager struct { TaskList map[string]*Task } -func NewTask(worker TaskWorker) *Task { +func NewTask(worker TaskWorker, id string) *Task { t := &Task{ + Id: id, worker: worker, startTime: time.Now(), result: ErrTaskNotFinish, @@ -46,7 +53,12 @@ func NewTask(worker TaskWorker) *Task { } go func(t *Task) { t.result = t.worker.Run() + if t.cleanWhenFinish { + glog.V(0).Infof("clean task (%s) when finish.", t.Id) + t.worker.Clean() + } t.ch <- true + }(t) return t } @@ -75,7 +87,7 @@ func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) return tid, ErrTaskExists } var tw TaskWorker - switch tt { + switch TaskType(tt) { case TaskVacuum: tw, e = NewVacuumTask(s, args) case TaskReplica: @@ -88,7 +100,7 @@ func (tm *TaskManager) NewTask(s *Store, args url.Values) (tid string, e error) if tw == nil { return "", ErrTaskInvalid } - tm.TaskList[tid] = NewTask(tw) + tm.TaskList[tid] = NewTask(tw, tid) return tid, nil } @@ -112,16 +124,19 @@ func (tm *TaskManager) Commit(tid string) (e error) { return t.worker.Commit() } -func (tm *TaskManager) Clean(tid string) (e error) { +func (tm *TaskManager) Clean(tid string) error { t, ok := tm.TaskList[tid] if !ok { return ErrTaskNotFound } + delete(tm.TaskList, tid) if t.QueryResult(time.Second*30) == ErrTaskNotFinish { - return ErrTaskNotFinish + t.cleanWhenFinish = true + glog.V(0).Infof("task (%s) is not finish, clean it later.", tid) + } else { + t.worker.Clean() } - delete(tm.TaskList, tid) - return t.worker.Clean() + return nil } func (tm *TaskManager) ElapsedDuration(tid string) (time.Duration, error) { diff --git a/go/storage/store_task_cli.go b/go/storage/store_task_cli.go new file mode 100644 index 000000000..fb69fed39 --- /dev/null +++ b/go/storage/store_task_cli.go @@ -0,0 +1,69 @@ +package storage + +import ( + "errors" + "fmt" + "net/url" + "time" + + "github.com/chrislusf/seaweedfs/go/util" +) + +type TaskParams map[string]string + +var ( + ErrTaskTimeout = errors.New("TaskTimeout") +) + +type TaskCli struct { + TID string + DataNode string +} + +func NewTaskCli(dataNode string, task TaskType, params TaskParams) (*TaskCli, error) { + args := url.Values{} + args.Set("task", string(task)) + for k, v := range params { + args.Set(k, v) + } + m, e := util.RemoteApiCall(dataNode, "/admin/task/new", args) + if e != nil { + return nil, e + } + tid := m["tid"].(string) + if tid == "" { + return nil, fmt.Errorf("Empty %s task", task) + } + return &TaskCli{ + TID: tid, + DataNode: dataNode, + }, nil +} + +func (c *TaskCli) WaitAndQueryResult(timeout time.Duration) error { + startTime := time.Now() + args := url.Values{} + args.Set("task", c.TID) + for time.Since(startTime) < timeout { + _, e := util.RemoteApiCall(c.DataNode, "/admin/task/query", args) + if e.Error() == ErrTaskNotFinish.Error() { + continue + } + return e + } + return ErrTaskTimeout +} + +func (c *TaskCli) Commit() error { + args := url.Values{} + args.Set("task", c.TID) + _, e := util.RemoteApiCall(c.DataNode, "/admin/task/commit", args) + return e +} + +func (c *TaskCli) Clean() error { + args := url.Values{} + args.Set("task", c.TID) + _, e := util.RemoteApiCall(c.DataNode, "/admin/task/clean", args) + return e +} diff --git a/go/storage/store_task_replication.go b/go/storage/store_task_replication.go index b6b8e230a..0931c831e 100644 --- a/go/storage/store_task_replication.go +++ b/go/storage/store_task_replication.go @@ -61,7 +61,7 @@ func (t *ReplicaTask) Run() error { } ch <- e }() - errs := make([]error, 0, 2) + errs := make([]error, 0) for i := 0; i < 2; i++ { if e := <-ch; e != nil { errs = append(errs, e) diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go index 6de3130b1..e48f01495 100644 --- a/go/topology/allocate_volume.go +++ b/go/topology/allocate_volume.go @@ -19,7 +19,7 @@ func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption values.Add("volume", vid.String()) values.Add("collection", option.Collection) values.Add("ttl", option.Ttl.String()) - jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) + jsonBlob, err := util.Post(dn.Url(), "/admin/assign_volume", values) if err != nil { return err } diff --git a/go/topology/topology_replicate.go b/go/topology/topology_replicate.go index dbb7d490a..9346ca743 100644 --- a/go/topology/topology_replicate.go +++ b/go/topology/topology_replicate.go @@ -6,7 +6,7 @@ func (t *Topology) Replicate() int { glog.V(0).Infoln("Start replicate checker on demand") for _, col := range t.collectionMap.Items { c := col.(*Collection) - glog.V(0).Infoln("replicate on collection:", c.Name) + glog.V(0).Infoln("checking replicate on collection:", c.Name) for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { volumeLayout := vl.(*VolumeLayout) diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index cba3e8a16..cd85f3b15 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -110,7 +110,7 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho values := make(url.Values) values.Add("volume", vid.String()) values.Add("garbageThreshold", garbageThreshold) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values) + jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/check", values) if err != nil { glog.V(0).Infoln("parameters:", values) return err, false @@ -127,7 +127,7 @@ func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThresho func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { values := make(url.Values) values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values) + jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/compact", values) if err != nil { return err } @@ -143,7 +143,7 @@ func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { values := make(url.Values) values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values) + jsonBlob, err := util.Post(urlLocation, "/admin/vacuum/commit", values) if err != nil { return err } diff --git a/go/util/http_util.go b/go/util/http_util.go index 5060c77ad..65a4d21df 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -28,6 +28,18 @@ func init() { client = &http.Client{Transport: Transport} } +func MkUrl(host, path string, args url.Values) string { + u := url.URL{ + Scheme: "http", + Host: host, + Path: path, + } + if args != nil { + u.RawQuery = args.Encode() + } + return u.String() +} + func PostBytes(url string, body []byte) ([]byte, error) { r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) if err != nil { @@ -41,20 +53,45 @@ func PostBytes(url string, body []byte) ([]byte, error) { return b, nil } -func Post(url string, values url.Values) ([]byte, error) { +func PostEx(host, path string, values url.Values) (content []byte, statusCode int, e error) { + url := MkUrl(host, path, nil) r, err := client.PostForm(url, values) if err != nil { - return nil, err + return nil, 0, err } defer r.Body.Close() b, err := ioutil.ReadAll(r.Body) if err != nil { - return nil, err + return nil, r.StatusCode, err } - return b, nil + return b, r.StatusCode, nil +} + +func Post(host, path string, values url.Values) (content []byte, e error) { + content, _, e = PostEx(host, path, values) + return +} + +func RemoteApiCall(host, path string, values url.Values) (result map[string]interface{}, e error) { + jsonBlob, code, e := PostEx(host, path, values) + if e != nil { + return nil, e + } + result = make(map[string]interface{}) + if e := json.Unmarshal(jsonBlob, result); e != nil { + return nil, e + } + if err, ok := result["error"]; ok && err.(string) != "" { + return nil, errors.New(err.(string)) + } + if code != http.StatusOK { + return nil, fmt.Errorf("RemoteApiCall %s/%s return %d", host, path, code) + } + return result, nil } -func Get(url string) ([]byte, error) { +func Get(host, path string, values url.Values) ([]byte, error) { + url := MkUrl(host, path, values) r, err := client.Get(url) if err != nil { return nil, err diff --git a/go/util/url_util.go b/go/util/url_util.go deleted file mode 100644 index 7204d08b0..000000000 --- a/go/util/url_util.go +++ /dev/null @@ -1,13 +0,0 @@ -package util - -import "net/url" - -func MkUrl(host, path string, args url.Values) string { - u := url.URL{ - Scheme: "http", - Host: host, - Path: path, - } - u.RawQuery = args.Encode() - return u.String() -} diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index 51652b1ae..daa970788 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -255,14 +255,13 @@ func readFiles(fileIdLineChan chan string, s *stat) { continue } server := ret.Locations.PickForRead().Url - url := "http://" + server + "/" + fid - if bytesRead, err := util.Get(url); err == nil { + if bytesRead, err := util.Get(server, "/"+fid, nil); err == nil { s.completed++ s.transferred += int64(len(bytesRead)) readStats.addSample(time.Now().Sub(start)) } else { s.failed++ - fmt.Printf("Failed to read %s error:%v\n", url, err) + fmt.Printf("Failed to read %s/%s error:%v\n", server, fid, err) } } } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index 89c373ec7..4b7f809ec 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -28,7 +28,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection")) + _, err := util.Get(server.Ip+":"+strconv.Itoa(server.Port), "/admin/delete_collection", url.Values{"collection": r.Form["collection"]}) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return @@ -204,7 +204,7 @@ func (ms *MasterServer) batchSetVolumeOption(settingKey, settingValue string, vo wg.Add(1) go func(server string, values url.Values) { defer wg.Done() - jsonBlob, e := util.Post("http://"+server+"/admin/setting", values) + jsonBlob, e := util.Post(server, "/admin/setting", values) if e != nil { result[server] = map[string]interface{}{ "error": e.Error() + " " + string(jsonBlob),