From 7cf64ae07a0f2eb0695f6e69360f12f2e830bea1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Wed, 13 Jan 2016 16:58:07 +0800 Subject: [PATCH 1/2] dereplicate filer post request & support filer get request with raw fileId --- go/weed/weed_server/filer_server_handlers.go | 106 ++++++++++++++++--- 1 file changed, 91 insertions(+), 15 deletions(-) diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index e6f1628da..339f1b5d8 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "encoding/json" "errors" "io" @@ -13,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" "github.com/syndtr/goleveldb/leveldb" ) @@ -32,6 +34,15 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { } } +func (fs *FilerServer) publicReadOnlyfilerHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + fs.GetOrHeadHandler(w, r, true) + case "HEAD": + fs.GetOrHeadHandler(w, r, false) + } +} + // listDirectoryHandler lists directories and folers under a directory // files are sorted by name and paginated via "lastFileName" and "limit". // sub directories are listed on the first page, when "lastFileName" @@ -68,11 +79,20 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, fs.listDirectoryHandler(w, r) return } - fileId, err := fs.filer.FindFile(r.URL.Path) - if err == leveldb.ErrNotFound { - glog.V(3).Infoln("Not found in db", r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return + var fileId string + uri_elements := strings.Split(r.URL.Path, "/") + //glog.V(1).Infof("debug uri_elements %#v", uri_elements) + if len(uri_elements) == 2 { + // /3,01e587647e will be split into []string{"","3,01e587647e"} + fileId = uri_elements[1] + } else { + var err error + fileId, err = fs.filer.FindFile(r.URL.Path) + if err == leveldb.ErrNotFound { + glog.V(3).Infoln("Not found in db", r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } } parts := strings.Split(fileId, ",") if len(parts) != 2 { @@ -124,6 +144,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, io.Copy(w, resp.Body) } +type analogueReader struct { + *bytes.Buffer +} + +// So that it implements the io.ReadCloser interface +func (m analogueReader) Close() error { return nil } + func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() replication := query.Get("replication") @@ -134,14 +161,63 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if collection == "" { collection = fs.collection } - assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) - if ae != nil { - glog.V(0).Infoln("failing to assign a file id", ae.Error()) - writeJsonError(w, r, http.StatusInternalServerError, ae) + + buf, _ := ioutil.ReadAll(r.Body) + r.Body = analogueReader{bytes.NewBuffer(buf)} + fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r) + if pe != nil { + glog.V(0).Infoln("failing to parse post body", pe.Error()) + writeJsonError(w, r, http.StatusInternalServerError, pe) + return + } + //reconstruct http request body for following new request to volume server + r.Body = analogueReader{bytes.NewBuffer(buf)} + + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if fileName != "" { + path += fileName + } + } + var fileId string + var err error + var urlLocation string + if fileId, err = fs.filer.FindFile(path); err != nil { + glog.V(0).Infoln("failing to find path in filer store", path, pe.Error()) + writeJsonError(w, r, http.StatusInternalServerError, err) return + } else if fileId != "" { + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + glog.V(1).Infoln("Invalid fileId", fileId) + w.WriteHeader(http.StatusNotFound) + return + } + lookup, lookupError := operation.Lookup(fs.master, parts[0]) + if lookupError != nil { + glog.V(1).Infoln("Invalid lookup", lookupError.Error()) + w.WriteHeader(http.StatusNotFound) + return + } + if len(lookup.Locations) == 0 { + glog.V(1).Infoln("Can not find location for volume", parts[0]) + w.WriteHeader(http.StatusNotFound) + return + } + urlLocation = lookup.Locations[rand.Intn(len(lookup.Locations))].Url + + } else { + assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) + if ae != nil { + glog.V(0).Infoln("failing to assign a file id", ae.Error()) + writeJsonError(w, r, http.StatusInternalServerError, ae) + return + } + fileId = assignResult.Fid + urlLocation = assignResult.Url } - u, _ := url.Parse("http://" + assignResult.Url + "/" + assignResult.Fid) + u, _ := url.Parse("http://" + urlLocation + "/" + fileId) glog.V(4).Infoln("post to", u) request := &http.Request{ Method: r.Method, @@ -180,21 +256,21 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) return } - path := r.URL.Path + path = r.URL.Path if strings.HasSuffix(path, "/") { if ret.Name != "" { path += ret.Name } else { - operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up + operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") writeJsonError(w, r, http.StatusInternalServerError, errors.New("Can not to write to folder "+path+" without a file name")) return } } - glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) - if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { - operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up + glog.V(4).Infoln("saving", path, "=>", fileId) + if db_err := fs.filer.CreateFile(path, fileId); db_err != nil { + operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) return From 61b1d73f5a482e075d741574eeb3d2d7b08b8840 Mon Sep 17 00:00:00 2001 From: Xiaodong Huo Date: Sun, 17 Jan 2016 12:30:23 +0800 Subject: [PATCH 2/2] Update filer_server_handlers.go filer service PUT method update file while POST method create file --- go/weed/weed_server/filer_server_handlers.go | 117 +++++++------------ 1 file changed, 40 insertions(+), 77 deletions(-) diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index 339f1b5d8..efc4c0381 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -6,7 +6,6 @@ import ( "errors" "io" "io/ioutil" - "math/rand" "net/http" "net/url" "strconv" @@ -34,15 +33,6 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { } } -func (fs *FilerServer) publicReadOnlyfilerHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - fs.GetOrHeadHandler(w, r, true) - case "HEAD": - fs.GetOrHeadHandler(w, r, false) - } -} - // listDirectoryHandler lists directories and folers under a directory // files are sorted by name and paginated via "lastFileName" and "limit". // sub directories are listed on the first page, when "lastFileName" @@ -70,6 +60,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFileName, limit) writeJsonQuiet(w, r, http.StatusOK, m) } + func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { if strings.HasSuffix(r.URL.Path, "/") { if fs.disableDirListing { @@ -79,40 +70,21 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, fs.listDirectoryHandler(w, r) return } - var fileId string - uri_elements := strings.Split(r.URL.Path, "/") - //glog.V(1).Infof("debug uri_elements %#v", uri_elements) - if len(uri_elements) == 2 { - // /3,01e587647e will be split into []string{"","3,01e587647e"} - fileId = uri_elements[1] - } else { - var err error - fileId, err = fs.filer.FindFile(r.URL.Path) - if err == leveldb.ErrNotFound { - glog.V(3).Infoln("Not found in db", r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - } - parts := strings.Split(fileId, ",") - if len(parts) != 2 { - glog.V(1).Infoln("Invalid fileId", fileId) - w.WriteHeader(http.StatusNotFound) - return - } - lookup, lookupError := operation.Lookup(fs.master, parts[0]) - if lookupError != nil { - glog.V(1).Infoln("Invalid lookup", lookupError.Error()) + + fileId, err := fs.filer.FindFile(r.URL.Path) + if err == leveldb.ErrNotFound { + glog.V(3).Infoln("Not found in db", r.URL.Path) w.WriteHeader(http.StatusNotFound) return } - if len(lookup.Locations) == 0 { - glog.V(1).Infoln("Can not find location for volume", parts[0]) + + urlLocation, err := operation.LookupFileId(fs.master, fileId) + if err != nil { + glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) w.WriteHeader(http.StatusNotFound) return } - urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].Url - urlString := "http://" + urlLocation + "/" + fileId + urlString := urlLocation if fs.redirectOnRead { http.Redirect(w, r, urlString, http.StatusFound) return @@ -162,50 +134,41 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { collection = fs.collection } - buf, _ := ioutil.ReadAll(r.Body) - r.Body = analogueReader{bytes.NewBuffer(buf)} - fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r) - if pe != nil { - glog.V(0).Infoln("failing to parse post body", pe.Error()) - writeJsonError(w, r, http.StatusInternalServerError, pe) - return - } - //reconstruct http request body for following new request to volume server - r.Body = analogueReader{bytes.NewBuffer(buf)} - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if fileName != "" { - path += fileName - } - } var fileId string var err error var urlLocation string - if fileId, err = fs.filer.FindFile(path); err != nil { - glog.V(0).Infoln("failing to find path in filer store", path, pe.Error()) - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } else if fileId != "" { - parts := strings.Split(fileId, ",") - if len(parts) != 2 { - glog.V(1).Infoln("Invalid fileId", fileId) - w.WriteHeader(http.StatusNotFound) + if r.Method == "PUT" { + buf, _ := ioutil.ReadAll(r.Body) + r.Body = analogueReader{bytes.NewBuffer(buf)} + fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r) + if pe != nil { + glog.V(0).Infoln("failing to parse post body", pe.Error()) + writeJsonError(w, r, http.StatusInternalServerError, pe) return } - lookup, lookupError := operation.Lookup(fs.master, parts[0]) - if lookupError != nil { - glog.V(1).Infoln("Invalid lookup", lookupError.Error()) - w.WriteHeader(http.StatusNotFound) - return + //reconstruct http request body for following new request to volume server + r.Body = analogueReader{bytes.NewBuffer(buf)} + + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if fileName != "" { + path += fileName + } } - if len(lookup.Locations) == 0 { - glog.V(1).Infoln("Can not find location for volume", parts[0]) - w.WriteHeader(http.StatusNotFound) + + if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound { + glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, err) return + } else if fileId != "" && err == nil { + var le error + urlLocation, le = operation.LookupFileId(fs.master, fileId) + if le != nil { + glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error()) + w.WriteHeader(http.StatusNotFound) + return + } } - urlLocation = lookup.Locations[rand.Intn(len(lookup.Locations))].Url - } else { assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) if ae != nil { @@ -214,10 +177,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } fileId = assignResult.Fid - urlLocation = assignResult.Url + urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid } - u, _ := url.Parse("http://" + urlLocation + "/" + fileId) + u, _ := url.Parse(urlLocation) glog.V(4).Infoln("post to", u) request := &http.Request{ Method: r.Method, @@ -256,7 +219,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) return } - path = r.URL.Path + path := r.URL.Path if strings.HasSuffix(path, "/") { if ret.Name != "" { path += ret.Name