From 5e97179d061fd885ab5df0d91c1713a5139ca112 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 20 Sep 2012 17:58:29 -0700 Subject: [PATCH] refactoring content uploading --- weed-fs/note/replication.txt | 3 +- weed-fs/src/cmd/weed/upload.go | 51 ++----------------- weed-fs/src/cmd/weed/volume.go | 24 +++++++++ .../{lookup.go => lookup_volume_id.go} | 0 weed-fs/src/pkg/operation/upload_content.go | 40 +++++++++++++++ weed-fs/src/pkg/storage/store.go | 3 ++ 6 files changed, 74 insertions(+), 47 deletions(-) rename weed-fs/src/pkg/operation/{lookup.go => lookup_volume_id.go} (100%) create mode 100644 weed-fs/src/pkg/operation/upload_content.go diff --git a/weed-fs/note/replication.txt b/weed-fs/note/replication.txt index 0dd73fd90..c4bf46044 100644 --- a/weed-fs/note/replication.txt +++ b/weed-fs/note/replication.txt @@ -82,4 +82,5 @@ For the above operations, here are the todo list: 5. accept lookup for volume locations ALREADY EXISTS /dir/lookup 6. read topology/datacenter/rack layout - +TODO: + 1. replicate content to the other server if the replication type needs replicas diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go index 515816921..c025c5029 100644 --- a/weed-fs/src/cmd/weed/upload.go +++ b/weed-fs/src/cmd/weed/upload.go @@ -1,16 +1,12 @@ package main import ( - "bytes" "encoding/json" "errors" "fmt" - "io" - "io/ioutil" - "mime/multipart" - "net/http" "net/url" "os" + "pkg/operation" "pkg/util" "strconv" ) @@ -64,23 +60,10 @@ func assign(count int) (*AssignResult, error) { return &ret, nil } -type UploadResult struct { - Size int -} - -func upload(filename string, uploadUrl string) (int, string) { +func upload(filename string, server string, fid string) (int) { if *IsDebug { fmt.Println("Start uploading file:", filename) } - body_buf := bytes.NewBufferString("") - body_writer := multipart.NewWriter(body_buf) - file_writer, err := body_writer.CreateFormFile("file", filename) - if err != nil { - if *IsDebug { - fmt.Println("Failed to create form file:", filename) - } - panic(err.Error()) - } fh, err := os.Open(filename) if err != nil { if *IsDebug { @@ -88,31 +71,8 @@ func upload(filename string, uploadUrl string) (int, string) { } panic(err.Error()) } - io.Copy(file_writer, fh) - content_type := body_writer.FormDataContentType() - body_writer.Close() - resp, err := http.Post(uploadUrl, content_type, body_buf) - if err != nil { - if *IsDebug { - fmt.Println("Failed to upload file to", uploadUrl) - } - panic(err.Error()) - } - defer resp.Body.Close() - resp_body, err := ioutil.ReadAll(resp.Body) - if *IsDebug { - fmt.Println("Upload response:", string(resp_body)) - } - if err != nil { - panic(err.Error()) - } - var ret UploadResult - err = json.Unmarshal(resp_body, &ret) - if err != nil { - panic(err.Error()) - } - //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl) - return ret.Size, uploadUrl + ret, _ := operation.Upload(server, fid, filename, fh) + return ret.Size } type SubmitResult struct { @@ -131,8 +91,7 @@ func submit(files []string) []SubmitResult { if index > 0 { fid = fid + "_" + strconv.Itoa(index) } - uploadUrl := "http://" + ret.PublicUrl + "/" + fid - results[index].Size, _ = upload(file, uploadUrl) + results[index].Size = upload(file, ret.PublicUrl, fid) results[index].Fid = fid } return results diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go index 3b41a60d1..af4ec8d5e 100644 --- a/weed-fs/src/cmd/weed/volume.go +++ b/weed-fs/src/cmd/weed/volume.go @@ -137,6 +137,30 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { writeJson(w, r, ne) } else { ret := store.Write(volumeId, needle) + if ret > 0 { //send to other replica locations + if r.FormValue("type") != "standard" { + waitTime, err := strconv.Atoi(r.FormValue("wait")) + lookupResult, lookupErr := operation.Lookup(*server, volumeId) + if lookupErr == nil { + sendFunc := func(background bool) { + postContentFunc := func(location operation.Location) bool{ + + return true + } + for _, location := range lookupResult.Locations { + if background { + go postContentFunc(location) + }else{ + postContentFunc(location) + } + } + } + sendFunc(err == nil && waitTime > 0) + } else { + log.Println("Failed to lookup for", volumeId, lookupErr.Error()) + } + } + } m := make(map[string]uint32) m["size"] = ret writeJson(w, r, m) diff --git a/weed-fs/src/pkg/operation/lookup.go b/weed-fs/src/pkg/operation/lookup_volume_id.go similarity index 100% rename from weed-fs/src/pkg/operation/lookup.go rename to weed-fs/src/pkg/operation/lookup_volume_id.go diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go new file mode 100644 index 000000000..83e20bf3b --- /dev/null +++ b/weed-fs/src/pkg/operation/upload_content.go @@ -0,0 +1,40 @@ +package operation + +import ( + "bytes" + "encoding/json" + "mime/multipart" + "net/http" + _ "fmt" + "io" + "io/ioutil" +) + +type UploadResult struct { + Size int +} + +func Upload(server string, fid string, filename string, reader io.Reader) (*UploadResult, error) { + body_buf := bytes.NewBufferString("") + body_writer := multipart.NewWriter(body_buf) + file_writer, err := body_writer.CreateFormFile("file", filename) + io.Copy(file_writer, reader) + content_type := body_writer.FormDataContentType() + body_writer.Close() + resp, err := http.Post("http://"+server+"/"+fid, content_type, body_buf) + if err != nil { + return nil, err + } + defer resp.Body.Close() + resp_body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var ret UploadResult + err = json.Unmarshal(resp_body, &ret) + if err != nil { + panic(err.Error()) + } + //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl) + return &ret, nil +} diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index d1155bb78..4f7e66ef0 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -135,6 +135,9 @@ func (s *Store) Read(i VolumeId, n *Needle) (int, error) { } return 0, errors.New("Not Found") } +func (s *Store) GetVolume(i VolumeId) *Volume { + return s.volumes[i] +} func (s *Store) HasVolume(i VolumeId) bool { _, ok := s.volumes[i]