diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 798fe6995..ec442716a 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -135,27 +135,11 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { ret := store.Write(volumeId, needle) if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations if r.FormValue("type") != "standard" { - if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { - sendFunc := func(background bool) { - postContentFunc := func(location operation.Location) bool { - operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) - return true - } - for _, location := range lookupResult.Locations { - if location.Url != (*ip+":"+strconv.Itoa(*vport)) { - if background { - go postContentFunc(location) - } else { - postContentFunc(location) - } - } - } - } - waitTime, err := strconv.Atoi(r.FormValue("wait")) - sendFunc(err == nil && waitTime > 0) - } else { - log.Println("Failed to lookup for", volumeId, lookupErr.Error()) - } + waitTime, err := strconv.Atoi(r.FormValue("wait")) + distributedOperation(volumeId, func(location operation.Location) bool { + operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) + return true + }, err == nil && waitTime > 0) } w.WriteHeader(http.StatusCreated) } @@ -191,7 +175,19 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { } n.Size = 0 - store.Delete(volumeId, n) + ret := store.Delete(volumeId, n) + + if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations + if r.FormValue("type") != "standard" { + waitTime, err := strconv.Atoi(r.FormValue("wait")) + distributedOperation(volumeId, func(location operation.Location) bool { + operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + return true + }, err == nil && waitTime > 0) + } + w.WriteHeader(http.StatusCreated) + } + m := make(map[string]uint32) m["size"] = uint32(count) writeJson(w, r, m) @@ -217,6 +213,27 @@ func parseURLPath(path string) (vid, fid, ext string) { return } +type distributedFunction func(location operation.Location) bool + +func distributedOperation(volumeId storage.VolumeId, op distributedFunction, wait bool) { + if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { + sendFunc := func(background bool) { + for _, location := range lookupResult.Locations { + if location.Url != (*ip + ":" + strconv.Itoa(*vport)) { + if background { + go op(location) + } else { + op(location) + } + } + } + } + sendFunc(wait) + } else { + log.Println("Failed to lookup for", volumeId, lookupErr.Error()) + } +} + func runVolume(cmd *Command, args []string) bool { fileInfo, err := os.Stat(*volumeFolder) //TODO: now default to 1G, this value should come from server? @@ -228,9 +245,9 @@ func runVolume(cmd *Command, args []string) bool { } perm := fileInfo.Mode().Perm() log.Println("Volume Folder permission:", perm) - + if *publicUrl == "" { - *publicUrl = *ip + ":" + strconv.Itoa(*vport) + *publicUrl = *ip + ":" + strconv.Itoa(*vport) } store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount) @@ -247,7 +264,7 @@ func runVolume(cmd *Command, args []string) bool { }() log.Println("store joined at", *masterNode) - log.Println("Start storage service at http://"+*ip+":"+strconv.Itoa(*vport)) + log.Println("Start storage service at http://" + *ip + ":" + strconv.Itoa(*vport)) e := http.ListenAndServe(":"+strconv.Itoa(*vport), nil) if e != nil { log.Fatalf("Fail to start:%s", e.Error()) diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go new file mode 100644 index 000000000..66a61f55c --- /dev/null +++ b/weed-fs/src/pkg/operation/delete_content.go @@ -0,0 +1,14 @@ +package operation + +import ( + "net/http" +) + +func Delete(url string) error { + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + return err + } + _, err = http.DefaultClient.Do(req) + return err +} diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go index e7822d387..ce2e5af68 100644 --- a/weed-fs/src/pkg/operation/upload_content.go +++ b/weed-fs/src/pkg/operation/upload_content.go @@ -1,42 +1,42 @@ package operation import ( - "bytes" - "encoding/json" - "mime/multipart" - "net/http" - _ "fmt" - "io" - "io/ioutil" + "bytes" + "encoding/json" + _ "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" ) type UploadResult struct { - Size int + Size int } func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { - println("uploading to", uploadUrl) - body_buf := bytes.NewBufferString("") - body_writer := multipart.NewWriter(body_buf) - file_writer, err := body_writer.CreateFormFile("file", filename) - io.Copy(file_writer, reader) - content_type := body_writer.FormDataContentType() - body_writer.Close() - resp, err := http.Post(uploadUrl, content_type, body_buf) - if err != nil { - return nil, err - } - defer resp.Body.Close() - resp_body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - var ret UploadResult - println("upload response to", uploadUrl, resp_body) - err = json.Unmarshal(resp_body, &ret) - if err != nil { - panic(err.Error()) - } - //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl) - return &ret, nil + body_buf := bytes.NewBufferString("") + body_writer := multipart.NewWriter(body_buf) + file_writer, err := body_writer.CreateFormFile("file", filename) + io.Copy(file_writer, reader) + content_type := body_writer.FormDataContentType() + body_writer.Close() + resp, err := http.Post(uploadUrl, content_type, body_buf) + if err != nil { + println("uploading to", uploadUrl) + return nil, err + } + defer resp.Body.Close() + resp_body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var ret UploadResult + err = json.Unmarshal(resp_body, &ret) + if err != nil { + println("upload response to", uploadUrl, resp_body) + panic(err.Error()) + } + //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl) + return &ret, nil }