Browse Source

better error reporting and handling

pull/2/head
Chris Lu 12 years ago
parent
commit
a847e2beee
  1. 5
      weed-fs/src/cmd/weed/upload.go
  2. 60
      weed-fs/src/cmd/weed/volume.go
  3. 2
      weed-fs/src/pkg/operation/delete_content.go
  4. 8
      weed-fs/src/pkg/operation/upload_content.go

5
weed-fs/src/cmd/weed/upload.go

@ -44,7 +44,7 @@ func assign(count int) (*AssignResult, error) {
values.Add("replication", *uploadReplication) values.Add("replication", *uploadReplication)
jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values) jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values)
if *IsDebug { if *IsDebug {
fmt.Println("debug", *IsDebug, "assign result :", string(jsonBlob))
fmt.Println("assign result :", string(jsonBlob))
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -83,7 +83,8 @@ type SubmitResult struct {
func submit(files []string) []SubmitResult { func submit(files []string) []SubmitResult {
ret, err := assign(len(files)) ret, err := assign(len(files))
if err != nil { if err != nil {
panic(err)
fmt.Println(err)
return nil
} }
results := make([]SubmitResult, len(files)) results := make([]SubmitResult, len(files))
for index, file := range files { for index, file := range files {

60
weed-fs/src/cmd/weed/volume.go

@ -135,14 +135,21 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
ret := store.Write(volumeId, needle) ret := store.Write(volumeId, needle)
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
if r.FormValue("type") != "standard" { if r.FormValue("type") != "standard" {
waitTime, err := strconv.Atoi(r.FormValue("wait"))
distributedOperation(volumeId, func(location operation.Location) bool {
operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
return true
}, err == nil && waitTime > 0)
if distributedOperation(volumeId, func(location operation.Location) bool {
_, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
return err == nil
}) {
w.WriteHeader(http.StatusCreated)
} else {
ret = 0
w.WriteHeader(http.StatusInternalServerError)
} }
} else {
w.WriteHeader(http.StatusCreated) w.WriteHeader(http.StatusCreated)
} }
} else {
w.WriteHeader(http.StatusInternalServerError)
}
m := make(map[string]uint32) m := make(map[string]uint32)
m["size"] = ret m["size"] = ret
writeJson(w, r, m) writeJson(w, r, m)
@ -179,14 +186,20 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
if r.FormValue("type") != "standard" { if r.FormValue("type") != "standard" {
waitTime, err := strconv.Atoi(r.FormValue("wait"))
distributedOperation(volumeId, func(location operation.Location) bool {
operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
return true
}, err == nil && waitTime > 0)
if distributedOperation(volumeId, func(location operation.Location) bool {
return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
}) {
w.WriteHeader(http.StatusCreated)
} else {
ret = 0
w.WriteHeader(http.StatusInternalServerError)
} }
} else {
w.WriteHeader(http.StatusCreated) w.WriteHeader(http.StatusCreated)
} }
} else {
w.WriteHeader(http.StatusInternalServerError)
}
m := make(map[string]uint32) m := make(map[string]uint32)
m["size"] = uint32(count) m["size"] = uint32(count)
@ -215,23 +228,32 @@ func parseURLPath(path string) (vid, fid, ext string) {
type distributedFunction func(location operation.Location) bool type distributedFunction func(location operation.Location) bool
func distributedOperation(volumeId storage.VolumeId, op distributedFunction, wait bool) {
func distributedOperation(volumeId storage.VolumeId, op distributedFunction) bool {
if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
sendFunc := func(background bool) {
length := 0
sem := make(chan int, len(lookupResult.Locations))
selfUrl := (*ip + ":" + strconv.Itoa(*vport))
results := make(chan bool)
for _, location := range lookupResult.Locations { for _, location := range lookupResult.Locations {
if location.Url != (*ip + ":" + strconv.Itoa(*vport)) {
if background {
go op(location)
} else {
op(location)
}
if location.Url != selfUrl {
sem <- 1
length++
go func(op distributedFunction, location operation.Location, sem chan int, results chan bool) {
ret := op(location)
<-sem
results <- ret
}(op, location, sem, results)
} }
} }
ret := true
for i := 0; i < length; i++ {
ret = ret && <-results
} }
sendFunc(wait)
return ret
} else { } else {
log.Println("Failed to lookup for", volumeId, lookupErr.Error()) log.Println("Failed to lookup for", volumeId, lookupErr.Error())
} }
return false
} }
func runVolume(cmd *Command, args []string) bool { func runVolume(cmd *Command, args []string) bool {

2
weed-fs/src/pkg/operation/delete_content.go

@ -2,11 +2,13 @@ package operation
import ( import (
"net/http" "net/http"
"log"
) )
func Delete(url string) error { func Delete(url string) error {
req, err := http.NewRequest("DELETE", url, nil) req, err := http.NewRequest("DELETE", url, nil)
if err != nil { if err != nil {
log.Println("failing to delete", url)
return err return err
} }
_, err = http.DefaultClient.Do(req) _, err = http.DefaultClient.Do(req)

8
weed-fs/src/pkg/operation/upload_content.go

@ -6,6 +6,7 @@ import (
_ "fmt" _ "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
) )
@ -23,7 +24,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
body_writer.Close() body_writer.Close()
resp, err := http.Post(uploadUrl, content_type, body_buf) resp, err := http.Post(uploadUrl, content_type, body_buf)
if err != nil { if err != nil {
println("uploading to", uploadUrl)
log.Println("failing to upload to", uploadUrl)
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
@ -34,9 +35,8 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
var ret UploadResult var ret UploadResult
err = json.Unmarshal(resp_body, &ret) err = json.Unmarshal(resp_body, &ret)
if err != nil { if err != nil {
println("upload response to", uploadUrl, resp_body)
panic(err.Error())
log.Println("failing to read upload resonse", uploadUrl, resp_body)
return nil, err
} }
//fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
return &ret, nil return &ret, nil
} }
Loading…
Cancel
Save