diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index e6f1628da..efc4c0381 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -1,11 +1,11 @@ package weed_server import ( + "bytes" "encoding/json" "errors" "io" "io/ioutil" - "math/rand" "net/http" "net/url" "strconv" @@ -13,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" "github.com/syndtr/goleveldb/leveldb" ) @@ -59,6 +60,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFileName, limit) writeJsonQuiet(w, r, http.StatusOK, m) } + func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { if strings.HasSuffix(r.URL.Path, "/") { if fs.disableDirListing { @@ -68,31 +70,21 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, fs.listDirectoryHandler(w, r) return } + fileId, err := fs.filer.FindFile(r.URL.Path) if err == leveldb.ErrNotFound { glog.V(3).Infoln("Not found in db", r.URL.Path) w.WriteHeader(http.StatusNotFound) return } - parts := strings.Split(fileId, ",") - if len(parts) != 2 { - glog.V(1).Infoln("Invalid fileId", fileId) - w.WriteHeader(http.StatusNotFound) - return - } - lookup, lookupError := operation.Lookup(fs.master, parts[0]) - if lookupError != nil { - glog.V(1).Infoln("Invalid lookup", lookupError.Error()) - w.WriteHeader(http.StatusNotFound) - return - } - if len(lookup.Locations) == 0 { - glog.V(1).Infoln("Can not find location for volume", parts[0]) + + urlLocation, err := operation.LookupFileId(fs.master, fileId) + if err != nil { + glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) w.WriteHeader(http.StatusNotFound) return } - urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].Url - urlString := "http://" + urlLocation + "/" + fileId + urlString := urlLocation if fs.redirectOnRead { http.Redirect(w, r, urlString, http.StatusFound) return @@ -124,6 +116,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, io.Copy(w, resp.Body) } +type analogueReader struct { + *bytes.Buffer +} + +// So that it implements the io.ReadCloser interface +func (m analogueReader) Close() error { return nil } + func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() replication := query.Get("replication") @@ -134,14 +133,54 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if collection == "" { collection = fs.collection } - assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) - if ae != nil { - glog.V(0).Infoln("failing to assign a file id", ae.Error()) - writeJsonError(w, r, http.StatusInternalServerError, ae) - return + + var fileId string + var err error + var urlLocation string + if r.Method == "PUT" { + buf, _ := ioutil.ReadAll(r.Body) + r.Body = analogueReader{bytes.NewBuffer(buf)} + fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r) + if pe != nil { + glog.V(0).Infoln("failing to parse post body", pe.Error()) + writeJsonError(w, r, http.StatusInternalServerError, pe) + return + } + //reconstruct http request body for following new request to volume server + r.Body = analogueReader{bytes.NewBuffer(buf)} + + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if fileName != "" { + path += fileName + } + } + + if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound { + glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } else if fileId != "" && err == nil { + var le error + urlLocation, le = operation.LookupFileId(fs.master, fileId) + if le != nil { + glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error()) + w.WriteHeader(http.StatusNotFound) + return + } + } + } else { + assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) + if ae != nil { + glog.V(0).Infoln("failing to assign a file id", ae.Error()) + writeJsonError(w, r, http.StatusInternalServerError, ae) + return + } + fileId = assignResult.Fid + urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid } - u, _ := url.Parse("http://" + assignResult.Url + "/" + assignResult.Fid) + u, _ := url.Parse(urlLocation) glog.V(4).Infoln("post to", u) request := &http.Request{ Method: r.Method, @@ -185,16 +224,16 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if ret.Name != "" { path += ret.Name } else { - operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up + operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") writeJsonError(w, r, http.StatusInternalServerError, errors.New("Can not to write to folder "+path+" without a file name")) return } } - glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) - if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { - operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up + glog.V(4).Infoln("saving", path, "=>", fileId) + if db_err := fs.filer.CreateFile(path, fileId); db_err != nil { + operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) return