From 7cf64ae07a0f2eb0695f6e69360f12f2e830bea1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9C=8D=E6=99=93=E6=A0=8B?= Date: Wed, 13 Jan 2016 16:58:07 +0800 Subject: [PATCH] dereplicate filer post request & support filer get request with raw fileId --- go/weed/weed_server/filer_server_handlers.go | 106 ++++++++++++++++--- 1 file changed, 91 insertions(+), 15 deletions(-) diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index e6f1628da..339f1b5d8 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "encoding/json" "errors" "io" @@ -13,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" "github.com/syndtr/goleveldb/leveldb" ) @@ -32,6 +34,15 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { } } +func (fs *FilerServer) publicReadOnlyfilerHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + fs.GetOrHeadHandler(w, r, true) + case "HEAD": + fs.GetOrHeadHandler(w, r, false) + } +} + // listDirectoryHandler lists directories and folers under a directory // files are sorted by name and paginated via "lastFileName" and "limit". // sub directories are listed on the first page, when "lastFileName" @@ -68,11 +79,20 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, fs.listDirectoryHandler(w, r) return } - fileId, err := fs.filer.FindFile(r.URL.Path) - if err == leveldb.ErrNotFound { - glog.V(3).Infoln("Not found in db", r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return + var fileId string + uri_elements := strings.Split(r.URL.Path, "/") + //glog.V(1).Infof("debug uri_elements %#v", uri_elements) + if len(uri_elements) == 2 { + // /3,01e587647e will be split into []string{"","3,01e587647e"} + fileId = uri_elements[1] + } else { + var err error + fileId, err = fs.filer.FindFile(r.URL.Path) + if err == leveldb.ErrNotFound { + glog.V(3).Infoln("Not found in db", r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } } parts := strings.Split(fileId, ",") if len(parts) != 2 { @@ -124,6 +144,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, io.Copy(w, resp.Body) } +type analogueReader struct { + *bytes.Buffer +} + +// So that it implements the io.ReadCloser interface +func (m analogueReader) Close() error { return nil } + func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() replication := query.Get("replication") @@ -134,14 +161,63 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if collection == "" { collection = fs.collection } - assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) - if ae != nil { - glog.V(0).Infoln("failing to assign a file id", ae.Error()) - writeJsonError(w, r, http.StatusInternalServerError, ae) + + buf, _ := ioutil.ReadAll(r.Body) + r.Body = analogueReader{bytes.NewBuffer(buf)} + fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r) + if pe != nil { + glog.V(0).Infoln("failing to parse post body", pe.Error()) + writeJsonError(w, r, http.StatusInternalServerError, pe) + return + } + //reconstruct http request body for following new request to volume server + r.Body = analogueReader{bytes.NewBuffer(buf)} + + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if fileName != "" { + path += fileName + } + } + var fileId string + var err error + var urlLocation string + if fileId, err = fs.filer.FindFile(path); err != nil { + glog.V(0).Infoln("failing to find path in filer store", path, pe.Error()) + writeJsonError(w, r, http.StatusInternalServerError, err) return + } else if fileId != "" { + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + glog.V(1).Infoln("Invalid fileId", fileId) + w.WriteHeader(http.StatusNotFound) + return + } + lookup, lookupError := operation.Lookup(fs.master, parts[0]) + if lookupError != nil { + glog.V(1).Infoln("Invalid lookup", lookupError.Error()) + w.WriteHeader(http.StatusNotFound) + return + } + if len(lookup.Locations) == 0 { + glog.V(1).Infoln("Can not find location for volume", parts[0]) + w.WriteHeader(http.StatusNotFound) + return + } + urlLocation = lookup.Locations[rand.Intn(len(lookup.Locations))].Url + + } else { + assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) + if ae != nil { + glog.V(0).Infoln("failing to assign a file id", ae.Error()) + writeJsonError(w, r, http.StatusInternalServerError, ae) + return + } + fileId = assignResult.Fid + urlLocation = assignResult.Url } - u, _ := url.Parse("http://" + assignResult.Url + "/" + assignResult.Fid) + u, _ := url.Parse("http://" + urlLocation + "/" + fileId) glog.V(4).Infoln("post to", u) request := &http.Request{ Method: r.Method, @@ -180,21 +256,21 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) return } - path := r.URL.Path + path = r.URL.Path if strings.HasSuffix(path, "/") { if ret.Name != "" { path += ret.Name } else { - operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up + operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") writeJsonError(w, r, http.StatusInternalServerError, errors.New("Can not to write to folder "+path+" without a file name")) return } } - glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) - if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { - operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up + glog.V(4).Infoln("saving", path, "=>", fileId) + if db_err := fs.filer.CreateFile(path, fileId); db_err != nil { + operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) return