From f457eef144e148396d476d23b762cf4dc99f7c85 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 6 Aug 2013 13:23:10 -0700 Subject: [PATCH] add /submit handler for both master and volume server --- go/weed/master.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++ go/weed/volume.go | 45 +++---------------------------------------- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/go/weed/master.go b/go/weed/master.go index 5ed3c43fd..ee8f7c6a7 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -1,6 +1,8 @@ package main import ( + "bytes" + "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/topology" @@ -191,6 +193,10 @@ func redirectHandler(w http.ResponseWriter, r *http.Request) { } } +func submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { + submitForClientHandler(w, r, "localhost:"+strconv.Itoa(*mport)) +} + func runMaster(cmd *Command, args []string) bool { if *mMaxCpu < 1 { *mMaxCpu = runtime.NumCPU() @@ -211,6 +217,7 @@ func runMaster(cmd *Command, args []string) bool { http.HandleFunc("/vol/status", volumeStatusHandler) http.HandleFunc("/vol/vacuum", volumeVacuumHandler) + http.HandleFunc("/submit", submitFromMasterServerHandler) http.HandleFunc("/", redirectHandler) topo.StartRefreshWritableVolumes(*garbageThreshold) @@ -227,3 +234,45 @@ func runMaster(cmd *Command, args []string) bool { } return true } + +func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { + m := make(map[string]interface{}) + if r.Method != "POST" { + m["error"] = "Only submit via POST!" + writeJsonQuiet(w, r, m) + return + } + + debug("parsing upload file...") + fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r) + if pe != nil { + writeJsonError(w, r, pe) + return + } + + debug("assigning file id for", fname) + assignResult, ae := Assign(masterUrl, 1) + if ae != nil { + writeJsonError(w, r, ae) + return + } + + url := "http://" + assignResult.PublicUrl + "/" + assignResult.Fid + if lastModified != 0 { + url = url + "?ts=" + strconv.FormatUint(lastModified, 10) + } + + debug("upload file to store", url) + uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType) + if err != nil { + writeJsonError(w, r, err) + return + } + + m["fileName"] = fname + m["fid"] = assignResult.Fid + m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid + m["size"] = uploadResult.Size + writeJsonQuiet(w, r, m) + return +} diff --git a/go/weed/volume.go b/go/weed/volume.go index d699fdbed..a2da22684 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/storage" @@ -100,46 +99,8 @@ func freezeVolumeHandler(w http.ResponseWriter, r *http.Request) { } debug("freeze volume =", r.FormValue("volume"), ", error =", err) } -func submitForClientHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - if r.Method != "POST" { - m["error"] = "Only submit via POST!" - writeJsonQuiet(w, r, m) - return - } - - debug("parsing upload file...") - fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r) - if pe != nil { - writeJsonError(w, r, pe) - return - } - - debug("assigning file id for", fname) - assignResult, ae := Assign(*masterNode, 1) - if ae != nil { - writeJsonError(w, r, ae) - return - } - - url := "http://" + assignResult.PublicUrl + "/" + assignResult.Fid - if lastModified != 0 { - url = url + "?ts=" + strconv.FormatUint(lastModified, 10) - } - - debug("upload file to store", url) - uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType) - if err != nil { - writeJsonError(w, r, err) - return - } - - m["fileName"] = fname - m["fid"] = assignResult.Fid - m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid - m["size"] = uploadResult.Size - writeJsonQuiet(w, r, m) - return +func submitFromVolumeServerHandler(w http.ResponseWriter, r *http.Request) { + submitForClientHandler(w, r, *masterNode) } func storeHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { @@ -376,7 +337,7 @@ func runVolume(cmd *Command, args []string) bool { store = storage.NewStore(*vport, *ip, *publicUrl, folders, maxCounts) defer store.Close() http.HandleFunc("/", storeHandler) - http.HandleFunc("/submit", submitForClientHandler) + http.HandleFunc("/submit", submitFromVolumeServerHandler) http.HandleFunc("/status", statusHandler) http.HandleFunc("/admin/assign_volume", assignVolumeHandler) http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)