Chris Lu
11 years ago
9 changed files with 366 additions and 31 deletions
-
9go/operation/lookup_volume_id.go
-
2go/topology/cluster_commands.go
-
2go/topology/volume_layout.go
-
24go/util/http_util.go
-
28go/weed/download.go
-
19go/weed/server.go
-
2go/weed/weed_server/common.go
-
157go/weed/weed_server/filer_server.go
-
154go/weed/weed_server/filer_server_handlers.go
@ -0,0 +1,157 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"code.google.com/p/weed-fs/go/glog" |
|||
"errors" |
|||
"github.com/syndtr/goleveldb/leveldb" |
|||
"github.com/syndtr/goleveldb/leveldb/util" |
|||
"net/http" |
|||
"strings" |
|||
) |
|||
|
|||
/* |
|||
1. level db is only for local instance |
|||
2. db stores two types of pairs |
|||
<path/to/dir, sub folder names> |
|||
<path/to/file, file id> |
|||
So, to list a directory, just get the directory entry, and iterate the current directory files |
|||
Care must be taken to maintain the <dir, sub dirs> and <file, fileid> pairs. |
|||
3. |
|||
*/ |
|||
type FilerServer struct { |
|||
port string |
|||
master string |
|||
collection string |
|||
db *leveldb.DB |
|||
} |
|||
|
|||
func NewFilerServer(r *http.ServeMux, master string, dir string) (fs *FilerServer, err error) { |
|||
fs = &FilerServer{ |
|||
master: master, |
|||
collection: "", |
|||
port: ":8888", |
|||
} |
|||
|
|||
if fs.db, err = leveldb.OpenFile(dir, nil); err != nil { |
|||
return |
|||
} |
|||
|
|||
r.HandleFunc("/", fs.filerHandler) |
|||
|
|||
glog.V(0).Infoln("file server started on port ", fs.port) |
|||
|
|||
return fs, nil |
|||
} |
|||
|
|||
func (fs *FilerServer) CreateFile(fullFileName string, fid string) (err error) { |
|||
fs.ensureFileFolder(fullFileName) |
|||
return fs.db.Put([]byte(fullFileName), []byte(fid), nil) |
|||
} |
|||
|
|||
func (fs *FilerServer) FindFile(fullFileName string) (fid string, err error) { |
|||
return fs.findEntry(fullFileName) |
|||
} |
|||
|
|||
func (fs *FilerServer) ListDirectories(fullpath string) (dirs []string, err error) { |
|||
data, e := fs.db.Get([]byte(fullpath), nil) |
|||
if e != nil { |
|||
return nil, e |
|||
} |
|||
return strings.Split(string(data), ":"), nil |
|||
} |
|||
|
|||
func (fs *FilerServer) ListFiles(fullpath string, start, limit int) (files []string) { |
|||
if !strings.HasSuffix(fullpath, "/") { |
|||
fullpath += "/" |
|||
} |
|||
iter := fs.db.NewIterator(&util.Range{Start: []byte(fullpath)}, nil) |
|||
startCounter, limitCounter := -1, 0 |
|||
for iter.Next() { |
|||
startCounter++ |
|||
if startCounter < start { |
|||
continue |
|||
} |
|||
limitCounter++ |
|||
if limit > 0 { |
|||
if limitCounter > limit { |
|||
break |
|||
} |
|||
} |
|||
key := string(iter.Key()) |
|||
if !strings.HasPrefix(key, fullpath) { |
|||
break |
|||
} |
|||
fileName := key[len(fullpath):] |
|||
if strings.Contains(fileName, "/") { |
|||
break |
|||
} |
|||
files = append(files, fileName) |
|||
} |
|||
iter.Release() |
|||
return |
|||
} |
|||
|
|||
func (fs *FilerServer) Delete(fullpath string, isForceDirectoryRemoval bool) (fid string, isFile bool, err error) { |
|||
val, e := fs.findEntry(fullpath) |
|||
if e != nil { |
|||
return "", false, e |
|||
} |
|||
if strings.Contains(val, ",") { |
|||
return val, true, fs.db.Delete([]byte(fullpath), nil) |
|||
} |
|||
// deal with directory
|
|||
if !strings.HasSuffix(fullpath, "/") { |
|||
fullpath += "/" |
|||
} |
|||
iter := fs.db.NewIterator(&util.Range{Start: []byte(fullpath)}, nil) |
|||
counter := 0 |
|||
for iter.Next() { |
|||
counter++ |
|||
if counter > 0 { |
|||
break |
|||
} |
|||
} |
|||
iter.Release() |
|||
if counter > 0 { |
|||
return "", false, errors.New("Force Deletion Not Supported Yet") |
|||
} |
|||
return "", false, fs.db.Delete([]byte(fullpath), nil) |
|||
} |
|||
|
|||
func (fs *FilerServer) findEntry(fullpath string) (value string, err error) { |
|||
data, e := fs.db.Get([]byte(fullpath), nil) |
|||
if e != nil { |
|||
return "", e |
|||
} |
|||
return string(data), nil |
|||
} |
|||
|
|||
func (fs *FilerServer) ensureFileFolder(fullFileName string) (err error) { |
|||
parts := strings.Split(fullFileName, "/") |
|||
path := "/" |
|||
for i := 1; i < len(parts)-1; i++ { |
|||
sub := parts[i] |
|||
if sub == "" { |
|||
continue |
|||
} |
|||
if err = fs.ensureFolderHasEntry(path, sub); err != nil { |
|||
return |
|||
} |
|||
path = path + sub + "/" |
|||
} |
|||
return nil |
|||
} |
|||
func (fs *FilerServer) ensureFolderHasEntry(path string, sub string) (err error) { |
|||
val, e := fs.findEntry(path) |
|||
if e == leveldb.ErrNotFound { |
|||
return fs.db.Put([]byte(path), []byte(sub), nil) |
|||
} else if e != nil { |
|||
return e |
|||
} |
|||
for _, v := range strings.Split(val, ":") { |
|||
if v == sub { |
|||
return nil |
|||
} |
|||
} |
|||
return fs.db.Put([]byte(path), []byte(val+":"+sub), nil) |
|||
} |
@ -0,0 +1,154 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"code.google.com/p/weed-fs/go/glog" |
|||
"code.google.com/p/weed-fs/go/operation" |
|||
"code.google.com/p/weed-fs/go/util" |
|||
"encoding/json" |
|||
"errors" |
|||
"github.com/syndtr/goleveldb/leveldb" |
|||
"io" |
|||
"io/ioutil" |
|||
"math/rand" |
|||
"net/http" |
|||
"net/url" |
|||
"strings" |
|||
) |
|||
|
|||
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { |
|||
switch r.Method { |
|||
case "GET": |
|||
fs.GetOrHeadHandler(w, r, true) |
|||
case "HEAD": |
|||
fs.GetOrHeadHandler(w, r, false) |
|||
case "DELETE": |
|||
fs.DeleteHandler(w, r) |
|||
case "PUT": |
|||
fs.PostHandler(w, r) |
|||
case "POST": |
|||
fs.PostHandler(w, r) |
|||
} |
|||
} |
|||
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { |
|||
fileId, err := fs.FindFile(r.URL.Path) |
|||
if err == leveldb.ErrNotFound { |
|||
w.WriteHeader(http.StatusNotFound) |
|||
return |
|||
} |
|||
parts := strings.Split(fileId, ",") |
|||
if len(parts) != 2 { |
|||
glog.V(1).Infoln("Invalid fileId", fileId) |
|||
w.WriteHeader(http.StatusNotFound) |
|||
return |
|||
} |
|||
lookup, lookupError := operation.Lookup(fs.master, parts[0]) |
|||
if lookupError != nil { |
|||
glog.V(1).Infoln("Invalid lookup", lookupError.Error()) |
|||
w.WriteHeader(http.StatusNotFound) |
|||
return |
|||
} |
|||
if len(lookup.Locations) == 0 { |
|||
glog.V(1).Infoln("Can not find location for volume", parts[0]) |
|||
w.WriteHeader(http.StatusNotFound) |
|||
return |
|||
} |
|||
urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl |
|||
u, _ := url.Parse("http://" + urlLocation + "/" + fileId) |
|||
request := &http.Request{ |
|||
Method: r.Method, |
|||
URL: u, |
|||
Proto: r.Proto, |
|||
ProtoMajor: r.ProtoMajor, |
|||
ProtoMinor: r.ProtoMinor, |
|||
Header: r.Header, |
|||
Body: r.Body, |
|||
Host: r.Host, |
|||
ContentLength: r.ContentLength, |
|||
} |
|||
glog.V(3).Infoln("retrieving from", u) |
|||
resp, do_err := util.Do(request) |
|||
if do_err != nil { |
|||
glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
writeJsonError(w, r, do_err) |
|||
return |
|||
} |
|||
defer resp.Body.Close() |
|||
io.Copy(w, resp.Body) |
|||
} |
|||
|
|||
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { |
|||
query := r.URL.Query() |
|||
assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection) |
|||
if ae != nil { |
|||
glog.V(0).Infoln("failing to assign a file id", ae.Error()) |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
writeJsonError(w, r, ae) |
|||
return |
|||
} |
|||
|
|||
u, _ := url.Parse("http://" + assignResult.PublicUrl + "/" + assignResult.Fid) |
|||
request := &http.Request{ |
|||
Method: r.Method, |
|||
URL: u, |
|||
Proto: r.Proto, |
|||
ProtoMajor: r.ProtoMajor, |
|||
ProtoMinor: r.ProtoMinor, |
|||
Header: r.Header, |
|||
Body: r.Body, |
|||
Host: r.Host, |
|||
ContentLength: r.ContentLength, |
|||
} |
|||
resp, do_err := util.Do(request) |
|||
if do_err != nil { |
|||
glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
writeJsonError(w, r, do_err) |
|||
return |
|||
} |
|||
defer resp.Body.Close() |
|||
resp_body, ra_err := ioutil.ReadAll(resp.Body) |
|||
if ra_err != nil { |
|||
glog.V(0).Infoln("failing to upload to volume server", ra_err.Error()) |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
writeJsonError(w, r, ra_err) |
|||
return |
|||
} |
|||
var ret operation.UploadResult |
|||
unmarshal_err := json.Unmarshal(resp_body, &ret) |
|||
if unmarshal_err != nil { |
|||
glog.V(0).Infoln("failing to read upload resonse", string(resp_body)) |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
writeJsonError(w, r, unmarshal_err) |
|||
return |
|||
} |
|||
if ret.Error != "" { |
|||
glog.V(0).Infoln("failing to post to volume server", ra_err.Error()) |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
writeJsonError(w, r, errors.New(ret.Error)) |
|||
return |
|||
} |
|||
if db_err := fs.CreateFile(r.URL.Path, assignResult.Fid); db_err != nil { |
|||
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
|
|||
glog.V(0).Infoln("failing to write to filer server", db_err.Error()) |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
writeJsonError(w, r, db_err) |
|||
} |
|||
w.WriteHeader(http.StatusCreated) |
|||
} |
|||
|
|||
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { |
|||
isForceDirectoryRemoval := r.FormValue("force") == "true" // force remove for directories
|
|||
fid, isFile, err := fs.Delete(r.URL.Path, isForceDirectoryRemoval) |
|||
if err == nil { |
|||
if isFile { |
|||
err = operation.DeleteFile(fs.master, fid) |
|||
} |
|||
} |
|||
if err != nil { |
|||
glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) |
|||
w.WriteHeader(http.StatusAccepted) |
|||
} else { |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
} |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue