From 888eb2abb58413d52eb0a053f3bd7f94149f8f49 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Jul 2018 14:51:36 -0700 Subject: [PATCH] filer read write all via locations from MasterClient --- weed/filer2/filer.go | 14 +++++++-- weed/operation/delete_content.go | 8 ++++++ weed/server/filer_grpc_server.go | 4 +-- weed/server/filer_server_handlers_read.go | 5 ++-- weed/server/filer_server_handlers_write.go | 6 ++-- weed/wdclient/vid_map.go | 33 ++++++++++++++++++++++ 6 files changed, 59 insertions(+), 11 deletions(-) diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index e1c3fea84..100117e6a 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -206,9 +206,17 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { func (f *Filer) deleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { - if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil { - glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err) - } + f.DeleteFileByFileId(chunk.FileId) + } +} + +func (f *Filer) DeleteFileByFileId(fileId string) { + fileUrlOnVolume, err := f.MasterClient.LookupFileId(fileId) + if err != nil { + glog.V(0).Infof("can not find file %s: %v", fileId, err) + } + if err := operation.DeleteFromVolumeServer(fileUrlOnVolume, ""); err != nil { + glog.V(0).Infof("deleting file %s: %v", fileId, err) } } diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index b78221da1..2d7e71c6e 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -21,6 +21,14 @@ type DeleteResult struct { Error string `json:"error,omitempty"` } +func DeleteFromVolumeServer(fileUrlOnVolume string, jwt security.EncodedJwt) error { + err = util.Delete(fileUrlOnVolume, jwt) + if err != nil { + return fmt.Errorf("Failed to delete %s:%v", fileUrlOnVolume, err) + } + return nil +} + func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { fileUrl, err := LookupFileId(master, fileId) if err != nil { diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index f66a6eca8..0155eeccf 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -176,11 +176,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr if err = fs.filer.UpdateEntry(newEntry); err == nil { for _, garbage := range unusedChunks { glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) - operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId)) + fs.filer.DeleteFileByFileId(garbage.FileId) } for _, garbage := range garbages { glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) - operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId)) + fs.filer.DeleteFileByFileId(garbage.FileId) } } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 4e20be5da..e17cd776d 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -8,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/util" "mime" "mime/multipart" @@ -63,7 +62,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, fileId := entry.Chunks[0].FileId - urlString, err := operation.LookupFileId(fs.filer.GetMaster(), fileId) + urlString, err := fs.filer.MasterClient.LookupFileId(fileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) w.WriteHeader(http.StatusNotFound) @@ -223,7 +222,7 @@ func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int for _, chunkView := range chunkViews { - urlString, err := operation.LookupFileId(fs.filer.GetMaster(), chunkView.FileId) + urlString, err := fs.filer.MasterClient.LookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index c1b7e3826..8a19f3fdb 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -49,7 +49,7 @@ func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Reques w.WriteHeader(http.StatusNoContent) } else { fileId = entry.Chunks[0].FileId - urlLocation, err = operation.LookupFileId(fs.filer.GetMaster(), fileId) + urlLocation, err = fs.filer.MasterClient.LookupFileId(fileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error()) w.WriteHeader(http.StatusNotFound) @@ -176,7 +176,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if ret.Name != "" { path += ret.Name } else { - operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up + fs.filer.DeleteFileByFileId(fileId) glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") writeJsonError(w, r, http.StatusInternalServerError, errors.New("Can not to write to folder "+path+" without a file name")) @@ -205,7 +205,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { }}, } if db_err := fs.filer.CreateEntry(entry); db_err != nil { - operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up + fs.filer.DeleteFileByFileId(fileId) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) return diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 0eb5ee5d0..93884c53d 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -2,6 +2,12 @@ package wdclient import ( "sync" + "strings" + "math/rand" + "errors" + "strconv" + "github.com/chrislusf/seaweedfs/weed/glog" + "fmt" ) type Location struct { @@ -14,6 +20,33 @@ type VidMap struct { vid2Locations map[uint32][]Location } +func (vc *VidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) { + id, err := strconv.Atoi(vid) + if err != nil { + glog.V(1).Infof("Unknown volume id %s", vid) + return "", err + } + + locations := vc.GetLocations(uint32(id)) + if len(locations) == 0 { + return "", fmt.Errorf("volume %d not found", id) + } + + return locations[rand.Intn(len(locations))].Url, nil +} + +func (vc *VidMap) LookupFileId(fileId string) (fullUrl string, err error) { + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return "", errors.New("Invalid fileId " + fileId) + } + serverUrl, lookupError := LookupVolumeServerUrl(parts[0]) + if lookupError != nil { + return "", lookupError + } + return "http://" + serverUrl + "/" + fileId, nil +} + func (vc *VidMap) GetLocations(vid uint32) (locations []Location) { vc.RLock() defer vc.RUnlock()