From 2861275fb67112cb2fe51521b5052c3f1a98414d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 30 Mar 2014 11:28:04 -0700 Subject: [PATCH] working filer server! --- go/operation/lookup_volume_id.go | 9 +- go/topology/cluster_commands.go | 2 +- go/topology/volume_layout.go | 2 +- go/util/http_util.go | 24 +++ go/weed/download.go | 28 +--- go/weed/server.go | 19 +++ go/weed/weed_server/common.go | 2 +- go/weed/weed_server/filer_server.go | 157 +++++++++++++++++++ go/weed/weed_server/filer_server_handlers.go | 154 ++++++++++++++++++ 9 files changed, 366 insertions(+), 31 deletions(-) create mode 100644 go/weed/weed_server/filer_server.go create mode 100644 go/weed/weed_server/filer_server_handlers.go diff --git a/go/operation/lookup_volume_id.go b/go/operation/lookup_volume_id.go index 6e6035fae..2e488353b 100644 --- a/go/operation/lookup_volume_id.go +++ b/go/operation/lookup_volume_id.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" _ "fmt" + "math/rand" "net/url" "strings" ) @@ -37,16 +38,16 @@ func Lookup(server string, vid string) (*LookupResult, error) { } func LookupFileId(server string, fileId string) (fullUrl string, err error) { - a := strings.Split(fileId, ",") - if len(a) != 2 { + parts := strings.Split(fileId, ",") + if len(parts) != 2 { return "", errors.New("Invalid fileId " + fileId) } - lookup, lookupError := Lookup(server, a[0]) + lookup, lookupError := Lookup(server, parts[0]) if lookupError != nil { return "", lookupError } if len(lookup.Locations) == 0 { return "", errors.New("File Not Found") } - return "http://" + lookup.Locations[0].PublicUrl + "/" + fileId, nil + return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl + "/" + fileId, nil } diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go index d859cf656..703435173 100644 --- a/go/topology/cluster_commands.go +++ b/go/topology/cluster_commands.go @@ -25,7 +25,7 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { before := topo.GetMaxVolumeId() topo.UpAdjustMaxVolumeId(c.MaxVolumeId) - glog.V(2).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId()) + glog.V(4).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId()) return nil, nil } diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 61121a498..a53e2ae82 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -34,7 +34,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id] = NewVolumeLocationList() } vl.vid2location[v.Id].Set(dn) - glog.V(3).Infoln("volume", v.Id, "added to dn", dn, "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) + glog.V(4).Infoln("volume", v.Id, "added to dn", dn, "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { vl.AddToWritable(v.Id) } else { diff --git a/go/util/http_util.go b/go/util/http_util.go index e6f9f0184..a33db9199 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "net/http" "net/url" + "strings" ) var ( @@ -71,3 +72,26 @@ func Delete(url string) error { } return nil } + +func DownloadUrl(fileUrl string) (filename string, content []byte, e error) { + response, err := client.Get(fileUrl) + if err != nil { + return "", nil, err + } + defer response.Body.Close() + contentDisposition := response.Header["Content-Disposition"] + if len(contentDisposition) > 0 { + glog.V(4).Info("Content-Disposition: ", contentDisposition[0]) + if strings.HasPrefix(contentDisposition[0], "filename=") { + filename = contentDisposition[0][len("filename="):] + } + } else { + glog.V(4).Info("No Content-Disposition!") + } + content, e = ioutil.ReadAll(response.Body) + return +} + +func Do(req *http.Request) (resp *http.Response, err error) { + return client.Do(req) +} diff --git a/go/weed/download.go b/go/weed/download.go index 2e32f3ae9..4309fe5e0 100644 --- a/go/weed/download.go +++ b/go/weed/download.go @@ -1,12 +1,11 @@ package main import ( - "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/util" "fmt" "io" "io/ioutil" - "net/http" "os" "path" "strings" @@ -78,31 +77,12 @@ func runDownload(cmd *Command, args []string) bool { return true } -func fetchFileId(server string, fildId string) (filename string, content []byte, e error) { - fileUrl, lookupError := operation.LookupFileId(server, fildId) +func fetchFileId(server string, fileId string) (filename string, content []byte, e error) { + fileUrl, lookupError := operation.LookupFileId(server, fileId) if lookupError != nil { return "", nil, lookupError } - filename, content, e = fetchUrl(fileUrl) - return -} - -func fetchUrl(fileUrl string) (filename string, content []byte, e error) { - response, err := http.Get(fileUrl) - if err != nil { - return "", nil, err - } - defer response.Body.Close() - contentDisposition := response.Header["Content-Disposition"] - if len(contentDisposition) > 0 { - glog.V(4).Info("Content-Disposition: ", contentDisposition[0]) - if strings.HasPrefix(contentDisposition[0], "filename=") { - filename = contentDisposition[0][len("filename="):] - } - } else { - glog.V(4).Info("No Content-Disposition!") - } - content, e = ioutil.ReadAll(response.Body) + filename, content, e = util.DownloadUrl(fileUrl) return } diff --git a/go/weed/server.go b/go/weed/server.go index 37ce13849..c3dca3f4b 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -96,6 +96,25 @@ func runServer(cmd *Command, args []string) bool { serverWhiteList = strings.Split(*serverWhiteListOption, ",") } + go func() { + r := http.NewServeMux() + _, nfs_err := weed_server.NewFilerServer(r, *serverIp+":"+strconv.Itoa(*masterPort), *volumeDataFolders) + if nfs_err != nil { + glog.Fatalf(nfs_err.Error()) + } + glog.V(0).Infoln("Start Weed Filer", util.VERSION, "at port", *serverIp+":"+strconv.Itoa(8888)) + filerListener, e := util.NewListener( + *serverIp+":"+strconv.Itoa(8888), + time.Duration(*serverTimeout)*time.Second, + ) + if e != nil { + glog.Fatalf(e.Error()) + } + if e := http.Serve(filerListener, r); e != nil { + glog.Fatalf("Master Fail to serve:%s", e.Error()) + } + }() + var raftWaitForMaster sync.WaitGroup var volumeWait sync.WaitGroup diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index 1b11a5574..fe18cdd19 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -66,7 +66,7 @@ func writeJsonError(w http.ResponseWriter, r *http.Request, err error) { } func debug(params ...interface{}) { - glog.V(3).Infoln(params) + glog.V(4).Infoln(params) } func secure(whiteList []string, f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go new file mode 100644 index 000000000..80ffb0861 --- /dev/null +++ b/go/weed/weed_server/filer_server.go @@ -0,0 +1,157 @@ +package weed_server + +import ( + "code.google.com/p/weed-fs/go/glog" + "errors" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" + "net/http" + "strings" +) + +/* +1. level db is only for local instance +2. db stores two types of pairs + + + So, to list a directory, just get the directory entry, and iterate the current directory files + Care must be taken to maintain the and pairs. +3. +*/ +type FilerServer struct { + port string + master string + collection string + db *leveldb.DB +} + +func NewFilerServer(r *http.ServeMux, master string, dir string) (fs *FilerServer, err error) { + fs = &FilerServer{ + master: master, + collection: "", + port: ":8888", + } + + if fs.db, err = leveldb.OpenFile(dir, nil); err != nil { + return + } + + r.HandleFunc("/", fs.filerHandler) + + glog.V(0).Infoln("file server started on port ", fs.port) + + return fs, nil +} + +func (fs *FilerServer) CreateFile(fullFileName string, fid string) (err error) { + fs.ensureFileFolder(fullFileName) + return fs.db.Put([]byte(fullFileName), []byte(fid), nil) +} + +func (fs *FilerServer) FindFile(fullFileName string) (fid string, err error) { + return fs.findEntry(fullFileName) +} + +func (fs *FilerServer) ListDirectories(fullpath string) (dirs []string, err error) { + data, e := fs.db.Get([]byte(fullpath), nil) + if e != nil { + return nil, e + } + return strings.Split(string(data), ":"), nil +} + +func (fs *FilerServer) ListFiles(fullpath string, start, limit int) (files []string) { + if !strings.HasSuffix(fullpath, "/") { + fullpath += "/" + } + iter := fs.db.NewIterator(&util.Range{Start: []byte(fullpath)}, nil) + startCounter, limitCounter := -1, 0 + for iter.Next() { + startCounter++ + if startCounter < start { + continue + } + limitCounter++ + if limit > 0 { + if limitCounter > limit { + break + } + } + key := string(iter.Key()) + if !strings.HasPrefix(key, fullpath) { + break + } + fileName := key[len(fullpath):] + if strings.Contains(fileName, "/") { + break + } + files = append(files, fileName) + } + iter.Release() + return +} + +func (fs *FilerServer) Delete(fullpath string, isForceDirectoryRemoval bool) (fid string, isFile bool, err error) { + val, e := fs.findEntry(fullpath) + if e != nil { + return "", false, e + } + if strings.Contains(val, ",") { + return val, true, fs.db.Delete([]byte(fullpath), nil) + } + // deal with directory + if !strings.HasSuffix(fullpath, "/") { + fullpath += "/" + } + iter := fs.db.NewIterator(&util.Range{Start: []byte(fullpath)}, nil) + counter := 0 + for iter.Next() { + counter++ + if counter > 0 { + break + } + } + iter.Release() + if counter > 0 { + return "", false, errors.New("Force Deletion Not Supported Yet") + } + return "", false, fs.db.Delete([]byte(fullpath), nil) +} + +func (fs *FilerServer) findEntry(fullpath string) (value string, err error) { + data, e := fs.db.Get([]byte(fullpath), nil) + if e != nil { + return "", e + } + return string(data), nil +} + +func (fs *FilerServer) ensureFileFolder(fullFileName string) (err error) { + parts := strings.Split(fullFileName, "/") + path := "/" + for i := 1; i < len(parts)-1; i++ { + sub := parts[i] + if sub == "" { + continue + } + if err = fs.ensureFolderHasEntry(path, sub); err != nil { + return + } + path = path + sub + "/" + } + return nil +} +func (fs *FilerServer) ensureFolderHasEntry(path string, sub string) (err error) { + val, e := fs.findEntry(path) + if e == leveldb.ErrNotFound { + return fs.db.Put([]byte(path), []byte(sub), nil) + } else if e != nil { + return e + } + for _, v := range strings.Split(val, ":") { + if v == sub { + return nil + } + } + return fs.db.Put([]byte(path), []byte(val+":"+sub), nil) +} diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go new file mode 100644 index 000000000..5bdbdc96d --- /dev/null +++ b/go/weed/weed_server/filer_server_handlers.go @@ -0,0 +1,154 @@ +package weed_server + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/util" + "encoding/json" + "errors" + "github.com/syndtr/goleveldb/leveldb" + "io" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "strings" +) + +func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + fs.GetOrHeadHandler(w, r, true) + case "HEAD": + fs.GetOrHeadHandler(w, r, false) + case "DELETE": + fs.DeleteHandler(w, r) + case "PUT": + fs.PostHandler(w, r) + case "POST": + fs.PostHandler(w, r) + } +} +func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { + fileId, err := fs.FindFile(r.URL.Path) + if err == leveldb.ErrNotFound { + w.WriteHeader(http.StatusNotFound) + return + } + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + glog.V(1).Infoln("Invalid fileId", fileId) + w.WriteHeader(http.StatusNotFound) + return + } + lookup, lookupError := operation.Lookup(fs.master, parts[0]) + if lookupError != nil { + glog.V(1).Infoln("Invalid lookup", lookupError.Error()) + w.WriteHeader(http.StatusNotFound) + return + } + if len(lookup.Locations) == 0 { + glog.V(1).Infoln("Can not find location for volume", parts[0]) + w.WriteHeader(http.StatusNotFound) + return + } + urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl + u, _ := url.Parse("http://" + urlLocation + "/" + fileId) + request := &http.Request{ + Method: r.Method, + URL: u, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Body: r.Body, + Host: r.Host, + ContentLength: r.ContentLength, + } + glog.V(3).Infoln("retrieving from", u) + resp, do_err := util.Do(request) + if do_err != nil { + glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) + w.WriteHeader(http.StatusInternalServerError) + writeJsonError(w, r, do_err) + return + } + defer resp.Body.Close() + io.Copy(w, resp.Body) +} + +func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection) + if ae != nil { + glog.V(0).Infoln("failing to assign a file id", ae.Error()) + w.WriteHeader(http.StatusInternalServerError) + writeJsonError(w, r, ae) + return + } + + u, _ := url.Parse("http://" + assignResult.PublicUrl + "/" + assignResult.Fid) + request := &http.Request{ + Method: r.Method, + URL: u, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Body: r.Body, + Host: r.Host, + ContentLength: r.ContentLength, + } + resp, do_err := util.Do(request) + if do_err != nil { + glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) + w.WriteHeader(http.StatusInternalServerError) + writeJsonError(w, r, do_err) + return + } + defer resp.Body.Close() + resp_body, ra_err := ioutil.ReadAll(resp.Body) + if ra_err != nil { + glog.V(0).Infoln("failing to upload to volume server", ra_err.Error()) + w.WriteHeader(http.StatusInternalServerError) + writeJsonError(w, r, ra_err) + return + } + var ret operation.UploadResult + unmarshal_err := json.Unmarshal(resp_body, &ret) + if unmarshal_err != nil { + glog.V(0).Infoln("failing to read upload resonse", string(resp_body)) + w.WriteHeader(http.StatusInternalServerError) + writeJsonError(w, r, unmarshal_err) + return + } + if ret.Error != "" { + glog.V(0).Infoln("failing to post to volume server", ra_err.Error()) + w.WriteHeader(http.StatusInternalServerError) + writeJsonError(w, r, errors.New(ret.Error)) + return + } + if db_err := fs.CreateFile(r.URL.Path, assignResult.Fid); db_err != nil { + operation.DeleteFile(fs.master, assignResult.Fid) //clean up + glog.V(0).Infoln("failing to write to filer server", db_err.Error()) + w.WriteHeader(http.StatusInternalServerError) + writeJsonError(w, r, db_err) + } + w.WriteHeader(http.StatusCreated) +} + +func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { + isForceDirectoryRemoval := r.FormValue("force") == "true" // force remove for directories + fid, isFile, err := fs.Delete(r.URL.Path, isForceDirectoryRemoval) + if err == nil { + if isFile { + err = operation.DeleteFile(fs.master, fid) + } + } + if err != nil { + glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) + w.WriteHeader(http.StatusAccepted) + } else { + w.WriteHeader(http.StatusInternalServerError) + } +}