From cbc5a76e80b1bc45c642e2e2b601d9b7fa3634f7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 15 Apr 2014 09:09:40 -0700 Subject: [PATCH] Added batch file deleting. --- go/operation/delete_content.go | 89 +++++++++++++++++++ go/operation/lookup.go | 6 +- go/weed/weed_server/common.go | 10 ++- go/weed/weed_server/filer_server_handlers.go | 9 -- go/weed/weed_server/master_server.go | 1 + go/weed/weed_server/master_server_handlers.go | 6 +- go/weed/weed_server/volume_server_handlers.go | 26 ++---- 7 files changed, 110 insertions(+), 37 deletions(-) diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go index 87ebece4d..380312053 100644 --- a/go/operation/delete_content.go +++ b/go/operation/delete_content.go @@ -2,8 +2,19 @@ package operation import ( "code.google.com/p/weed-fs/go/util" + "encoding/json" + "errors" + "net/url" + "strings" + "sync" ) +type DeleteResult struct { + Fid string `json:"fid"` + Size int `json:"size"` + Error string `json:"error,omitempty"` +} + func DeleteFile(master string, fileId string) error { fileUrl, err := LookupFileId(master, fileId) if err != nil { @@ -11,3 +22,81 @@ func DeleteFile(master string, fileId string) error { } return util.Delete(fileUrl) } + +func ParseFileId(fid string) (vid string, key_cookie string, err error) { + commaIndex := strings.Index(fid, ",") + if commaIndex <= 0 { + return "", "", errors.New("Wrong fid format.") + } + return fid[:commaIndex], fid[commaIndex+1:], nil +} + +type DeleteFilesResult struct { + Errors []string + Results []DeleteResult +} + +func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { + vid_to_fileIds := make(map[string][]string) + ret := &DeleteFilesResult{} + var vids []string + for _, fileId := range fileIds { + vid, _, err := ParseFileId(fileId) + if err != nil { + ret.Results = append(ret.Results, DeleteResult{Fid: vid, Error: err.Error()}) + continue + } + if _, ok := vid_to_fileIds[vid]; !ok { + vid_to_fileIds[vid] = make([]string, 0) + vids = append(vids, vid) + } + vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId) + } + + lookupResults, err := LookupVolumeIds(master, vids) + if err != nil { + return ret, err + } + + server_to_fileIds := make(map[string][]string) + for vid, result := range lookupResults { + if result.Error != "" { + ret.Errors = append(ret.Errors, result.Error) + continue + } + for _, location := range result.Locations { + if _, ok := server_to_fileIds[location.PublicUrl]; !ok { + server_to_fileIds[location.PublicUrl] = make([]string, 0) + } + server_to_fileIds[location.PublicUrl] = append( + server_to_fileIds[location.PublicUrl], vid_to_fileIds[vid]...) + } + } + + var wg sync.WaitGroup + for server, fidList := range server_to_fileIds { + wg.Add(1) + go func(server string, fidList []string) { + defer wg.Done() + values := make(url.Values) + for _, fid := range fidList { + values.Add("fid", fid) + } + jsonBlob, err := util.Post("http://"+server+"/delete", values) + if err != nil { + ret.Errors = append(ret.Errors, err.Error()+"\n"+string(jsonBlob)) + return + } + var result []DeleteResult + err = json.Unmarshal(jsonBlob, &result) + if err != nil { + ret.Errors = append(ret.Errors, err.Error()+"\n"+string(jsonBlob)) + return + } + ret.Results = append(ret.Results, result...) + }(server, fidList) + } + wg.Wait() + + return ret, nil +} diff --git a/go/operation/lookup.go b/go/operation/lookup.go index 7e4f5dd08..e9d1586a3 100644 --- a/go/operation/lookup.go +++ b/go/operation/lookup.go @@ -53,7 +53,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl + "/" + fileId, nil } -func LookupVolumeIds(server string, vids []string) ([]LookupResult, error) { +func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) { values := make(url.Values) for _, vid := range vids { values.Add("volumeId", vid) @@ -62,10 +62,10 @@ func LookupVolumeIds(server string, vids []string) ([]LookupResult, error) { if err != nil { return nil, err } - var ret []LookupResult + ret := make(map[string]LookupResult) err = json.Unmarshal(jsonBlob, &ret) if err != nil { - return nil, err + return nil, errors.New(err.Error() + " " + string(jsonBlob)) } return ret, nil } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index 9703302f3..8d8c91da6 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -60,6 +60,7 @@ func writeJsonQuiet(w http.ResponseWriter, r *http.Request, obj interface{}) { } } func writeJsonError(w http.ResponseWriter, r *http.Request, err error) { + w.WriteHeader(http.StatusInternalServerError) m := make(map[string]interface{}) m["error"] = err.Error() writeJsonQuiet(w, r, m) @@ -133,9 +134,12 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st func deleteForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { r.ParseForm() fids := r.Form["fid"] - fids = fids - m := make(map[string]interface{}) - writeJsonQuiet(w, r, m) + ret, err := operation.DeleteFiles(masterUrl, fids) + if err != nil { + writeJsonError(w, r, err) + return + } + writeJsonQuiet(w, r, ret) } func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index 31e9d0c85..7df839754 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -96,7 +96,6 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, resp, do_err := util.Do(request) if do_err != nil { glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) - w.WriteHeader(http.StatusInternalServerError) writeJsonError(w, r, do_err) return } @@ -109,7 +108,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection) if ae != nil { glog.V(0).Infoln("failing to assign a file id", ae.Error()) - w.WriteHeader(http.StatusInternalServerError) writeJsonError(w, r, ae) return } @@ -130,7 +128,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { resp, do_err := util.Do(request) if do_err != nil { glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) - w.WriteHeader(http.StatusInternalServerError) writeJsonError(w, r, do_err) return } @@ -138,7 +135,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { glog.V(0).Infoln("failing to upload to volume server", ra_err.Error()) - w.WriteHeader(http.StatusInternalServerError) writeJsonError(w, r, ra_err) return } @@ -147,13 +143,11 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { glog.V(0).Infoln("failing to read upload resonse", string(resp_body)) - w.WriteHeader(http.StatusInternalServerError) writeJsonError(w, r, unmarshal_err) return } if ret.Error != "" { glog.V(0).Infoln("failing to post to volume server", ra_err.Error()) - w.WriteHeader(http.StatusInternalServerError) writeJsonError(w, r, errors.New(ret.Error)) return } @@ -164,7 +158,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } else { operation.DeleteFile(fs.master, assignResult.Fid) //clean up glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - w.WriteHeader(http.StatusInternalServerError) writeJsonError(w, r, errors.New("Can not to write to folder "+path+" without a file name")) return } @@ -173,7 +166,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { operation.DeleteFile(fs.master, assignResult.Fid) //clean up glog.V(0).Infoln("failing to write to filer server", db_err.Error()) - w.WriteHeader(http.StatusInternalServerError) writeJsonError(w, r, db_err) return } @@ -195,7 +187,6 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, map[string]string{"error": ""}) } else { glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error()) - w.WriteHeader(http.StatusInternalServerError) writeJsonError(w, r, err) } } diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 036b3d9b4..b932e1b11 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -40,6 +40,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, whiteList []string, ) *MasterServer { ms := &MasterServer{ + port: port, volumeSizeLimitMB: volumeSizeLimitMB, pulseSeconds: pulseSeconds, defaultReplicaPlacement: defaultReplicaPlacement, diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index 7da78d556..32422d497 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -61,11 +61,7 @@ func (ms *MasterServer) volumeLookupHandler(w http.ResponseWriter, r *http.Reque vids := r.Form["volumeId"] collection := r.FormValue("collection") //optional, but can be faster if too many collections volumeLocations := ms.lookupVolumeId(vids, collection) - var ret []operation.LookupResult - for _, volumeLocation := range volumeLocations { - ret = append(ret, volumeLocation) - } - writeJsonQuiet(w, r, ret) + writeJsonQuiet(w, r, volumeLocations) } func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go index d105cf72e..110e00ee1 100644 --- a/go/weed/weed_server/volume_server_handlers.go +++ b/go/weed/weed_server/volume_server_handlers.go @@ -196,42 +196,34 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, m) } -type DeleteResult struct { - Fid string `json:"fid"` - Size int `json:"size"` - Error string `json:"error,omitempty"` -} - //Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas. func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Request) { r.ParseForm() - var ret []DeleteResult + var ret []operation.DeleteResult for _, fid := range r.Form["fid"] { - n := new(storage.Needle) - commaIndex := strings.Index(fid, ",") - if commaIndex <= 0 { - ret = append(ret, DeleteResult{Fid: fid, Error: "Wrong fid format."}) + vid, id_cookie, err := operation.ParseFileId(fid) + if err != nil { + ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()}) continue } - vid := fid[:commaIndex] + n := new(storage.Needle) volumeId, _ := storage.NewVolumeId(vid) - id_cookie := fid[commaIndex+1:] n.ParsePath(id_cookie) glog.V(4).Infoln("batch deleting", n) cookie := n.Cookie if _, err := vs.store.Read(volumeId, n); err != nil { - ret = append(ret, DeleteResult{Fid: fid, Error: err.Error()}) + ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()}) continue } if n.Cookie != cookie { - ret = append(ret, DeleteResult{Fid: fid, Error: "File Random Cookie does not match."}) + ret = append(ret, operation.DeleteResult{Fid: fid, Error: "File Random Cookie does not match."}) glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) return } if size, err := vs.store.Delete(volumeId, n); err != nil { - ret = append(ret, DeleteResult{Fid: fid, Error: err.Error()}) + ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()}) } else { - ret = append(ret, DeleteResult{Fid: fid, Size: int(size)}) + ret = append(ret, operation.DeleteResult{Fid: fid, Size: int(size)}) } }