diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index faed646f1..89e0c34fc 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -44,7 +44,7 @@ func assign(count int) (*AssignResult, error) { values.Add("replication", *uploadReplication) jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values) if *IsDebug { - fmt.Println("debug", *IsDebug, "assign result :", string(jsonBlob)) + fmt.Println("assign result :", string(jsonBlob)) } if err != nil { return nil, err @@ -83,7 +83,8 @@ type SubmitResult struct { func submit(files []string) []SubmitResult { ret, err := assign(len(files)) if err != nil { - panic(err) + fmt.Println(err) + return nil } results := make([]SubmitResult, len(files)) for index, file := range files { diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index fb3e66c09..ebe67b0b9 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -135,13 +135,20 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { ret := store.Write(volumeId, needle) if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations if r.FormValue("type") != "standard" { - waitTime, err := strconv.Atoi(r.FormValue("wait")) - distributedOperation(volumeId, func(location operation.Location) bool { - operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) - return true - }, err == nil && waitTime > 0) + if distributedOperation(volumeId, func(location operation.Location) bool { + _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) + return err == nil + }) { + w.WriteHeader(http.StatusCreated) + } else { + ret = 0 + w.WriteHeader(http.StatusInternalServerError) + } + } else { + w.WriteHeader(http.StatusCreated) } - w.WriteHeader(http.StatusCreated) + } else { + w.WriteHeader(http.StatusInternalServerError) } m := make(map[string]uint32) m["size"] = ret @@ -179,13 +186,19 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations if r.FormValue("type") != "standard" { - waitTime, err := strconv.Atoi(r.FormValue("wait")) - distributedOperation(volumeId, func(location operation.Location) bool { - operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - return true - }, err == nil && waitTime > 0) + if distributedOperation(volumeId, func(location operation.Location) bool { + return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + }) { + w.WriteHeader(http.StatusCreated) + } else { + ret = 0 + w.WriteHeader(http.StatusInternalServerError) + } + } else { + w.WriteHeader(http.StatusCreated) } - w.WriteHeader(http.StatusCreated) + } else { + w.WriteHeader(http.StatusInternalServerError) } m := make(map[string]uint32) @@ -215,23 +228,32 @@ func parseURLPath(path string) (vid, fid, ext string) { type distributedFunction func(location operation.Location) bool -func distributedOperation(volumeId storage.VolumeId, op distributedFunction, wait bool) { +func distributedOperation(volumeId storage.VolumeId, op distributedFunction) bool { if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { - sendFunc := func(background bool) { - for _, location := range lookupResult.Locations { - if location.Url != (*ip + ":" + strconv.Itoa(*vport)) { - if background { - go op(location) - } else { - op(location) - } - } + length := 0 + sem := make(chan int, len(lookupResult.Locations)) + selfUrl := (*ip + ":" + strconv.Itoa(*vport)) + results := make(chan bool) + for _, location := range lookupResult.Locations { + if location.Url != selfUrl { + sem <- 1 + length++ + go func(op distributedFunction, location operation.Location, sem chan int, results chan bool) { + ret := op(location) + <-sem + results <- ret + }(op, location, sem, results) } } - sendFunc(wait) + ret := true + for i := 0; i < length; i++ { + ret = ret && <-results + } + return ret } else { log.Println("Failed to lookup for", volumeId, lookupErr.Error()) } + return false } func runVolume(cmd *Command, args []string) bool { diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go index 66a61f55c..aeab9c3ac 100644 --- a/weed-fs/src/pkg/operation/delete_content.go +++ b/weed-fs/src/pkg/operation/delete_content.go @@ -2,11 +2,13 @@ package operation import ( "net/http" + "log" ) func Delete(url string) error { req, err := http.NewRequest("DELETE", url, nil) if err != nil { + log.Println("failing to delete", url) return err } _, err = http.DefaultClient.Do(req) diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go index ce2e5af68..652cbe71b 100644 --- a/weed-fs/src/pkg/operation/upload_content.go +++ b/weed-fs/src/pkg/operation/upload_content.go @@ -6,6 +6,7 @@ import ( _ "fmt" "io" "io/ioutil" + "log" "mime/multipart" "net/http" ) @@ -23,7 +24,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, body_writer.Close() resp, err := http.Post(uploadUrl, content_type, body_buf) if err != nil { - println("uploading to", uploadUrl) + log.Println("failing to upload to", uploadUrl) return nil, err } defer resp.Body.Close() @@ -34,9 +35,8 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, var ret UploadResult err = json.Unmarshal(resp_body, &ret) if err != nil { - println("upload response to", uploadUrl, resp_body) - panic(err.Error()) + log.Println("failing to read upload resonse", uploadUrl, resp_body) + return nil, err } - //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl) return &ret, nil }