From 45dca3748d0f8e25ef123d0351398d403917e2e6 Mon Sep 17 00:00:00 2001 From: tnextday Date: Sun, 28 Feb 2016 12:01:30 +0800 Subject: [PATCH] add collection param in lookup operation --- go/operation/chunked_file.go | 19 ++++++++++--------- go/operation/delete_content.go | 4 ++-- go/operation/lookup.go | 17 ++++++++++------- go/operation/submit.go | 4 ++-- go/storage/compact_map.go | 1 - go/storage/store.go | 3 --- go/topology/store_replicate.go | 7 +++++-- go/weed/backup.go | 2 +- go/weed/benchmark.go | 2 +- go/weed/download.go | 4 ++-- go/weed/weed_server/filer_server_handlers.go | 16 ++++++++++------ .../volume_server_handlers_read.go | 17 ++++++++--------- .../volume_server_handlers_write.go | 2 +- 13 files changed, 52 insertions(+), 46 deletions(-) diff --git a/go/operation/chunked_file.go b/go/operation/chunked_file.go index 3c9d2ec85..bcdae8617 100644 --- a/go/operation/chunked_file.go +++ b/go/operation/chunked_file.go @@ -38,12 +38,13 @@ type ChunkManifest struct { // seekable chunked file reader type ChunkedFileReader struct { - Manifest *ChunkManifest - Master string - pos int64 - pr *io.PipeReader - pw *io.PipeWriter - mutex sync.Mutex + Manifest *ChunkManifest + Master string + Collection string + pos int64 + pr *io.PipeReader + pw *io.PipeWriter + mutex sync.Mutex } func (s ChunkList) Len() int { return len(s) } @@ -69,10 +70,10 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) { return json.Marshal(cm) } -func (cm *ChunkManifest) DeleteChunks(master string) error { +func (cm *ChunkManifest) DeleteChunks(master, collection string) error { deleteError := 0 for _, ci := range cm.Chunks { - if e := DeleteFile(master, ci.Fid, ""); e != nil { + if e := DeleteFile(master, ci.Fid, collection, ""); e != nil { deleteError++ glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master) } @@ -150,7 +151,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { ci := cm.Chunks[chunkIndex] // if we need read date from local volume server first? - fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, true) + fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, cf.Collection, true) if lookupError != nil { return n, lookupError } diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go index a8cd46f71..28ab4f498 100644 --- a/go/operation/delete_content.go +++ b/go/operation/delete_content.go @@ -21,8 +21,8 @@ type DeleteResult struct { Error string `json:"error,omitempty"` } -func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { - fileUrl, err := LookupFileId(master, fileId, false) +func DeleteFile(master, fileId, collection string, jwt security.EncodedJwt) error { + fileUrl, err := LookupFileId(master, fileId, collection, false) if err != nil { return fmt.Errorf("Failed to lookup %s:%v", fileId, err) } diff --git a/go/operation/lookup.go b/go/operation/lookup.go index 86a2ff760..3b91fa5f1 100644 --- a/go/operation/lookup.go +++ b/go/operation/lookup.go @@ -41,10 +41,10 @@ var ( vc VidCache // caching of volume locations, re-check if after 10 minutes ) -func Lookup(server string, vid string) (ret *LookupResult, err error) { +func Lookup(server, vid, collection string) (ret *LookupResult, err error) { locations, cache_err := vc.Get(vid) if cache_err != nil { - if ret, err = do_lookup(server, vid); err == nil { + if ret, err = do_lookup(server, vid, collection); err == nil { vc.Set(vid, ret.Locations, 10*time.Minute) } } else { @@ -53,16 +53,19 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) { return } -func LookupNoCache(server string, vid string) (ret *LookupResult, err error) { - if ret, err = do_lookup(server, vid); err == nil { +func LookupNoCache(server, vid, collection string) (ret *LookupResult, err error) { + if ret, err = do_lookup(server, vid, collection); err == nil { vc.Set(vid, ret.Locations, 10*time.Minute) } return } -func do_lookup(server string, vid string) (*LookupResult, error) { +func do_lookup(server, vid, collection string) (*LookupResult, error) { values := make(url.Values) values.Add("volumeId", vid) + if collection != "" { + values.Set("collection", collection) + } jsonBlob, err := util.Post(server, "/dir/lookup", values) if err != nil { return nil, err @@ -78,12 +81,12 @@ func do_lookup(server string, vid string) (*LookupResult, error) { return &ret, nil } -func LookupFileId(server string, fileId string, readonly bool) (fullUrl string, err error) { +func LookupFileId(server, fileId, collection string, readonly bool) (fullUrl string, err error) { parts := strings.Split(fileId, ",") if len(parts) != 2 { return "", errors.New("Invalid fileId " + fileId) } - lookup, lookupError := Lookup(server, parts[0]) + lookup, lookupError := Lookup(server, parts[0], collection) if lookupError != nil { return "", lookupError } diff --git a/go/operation/submit.go b/go/operation/submit.go index 8f5239f16..ce4cda535 100644 --- a/go/operation/submit.go +++ b/go/operation/submit.go @@ -134,7 +134,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret jwt) if e != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, fi.Collection) return 0, e } cm.Chunks = append(cm.Chunks, @@ -149,7 +149,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret err = upload_chunked_file_manifest(fileUrl, &cm, jwt) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, fi.Collection) } } else { ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt) diff --git a/go/storage/compact_map.go b/go/storage/compact_map.go index 93312b14d..5271b5e9c 100644 --- a/go/storage/compact_map.go +++ b/go/storage/compact_map.go @@ -21,7 +21,6 @@ func (k Key) String() string { return strconv.FormatUint(uint64(k), 10) } - //CompactSection is not concurrent safe,you should lock it when access in multi-thread type CompactSection struct { values []NeedleValue diff --git a/go/storage/store.go b/go/storage/store.go index 872d5e44b..83f20700d 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -141,10 +141,7 @@ func (s *Store) DeleteCollection(collection string) (e error) { } return } -func (s *Store) DeleteVolume(dl *DiskLocation, v *Volume) (e error) { - return dl.DeleteVolume(v.Id) -} func (s *Store) findVolume(vid VolumeId) *Volume { for _, location := range s.Locations { if v, found := location.GetVolume(vid); found { diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go index 93f0506c5..01d9b88ea 100644 --- a/go/topology/store_replicate.go +++ b/go/topology/store_replicate.go @@ -41,7 +41,6 @@ func ReplicatedWrite(masterNode string, s *storage.Store, } u := util.MkUrl(location.Url, r.URL.Path, args) - needle.IsChunkedManifest() _, err := operation.Upload(u, string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), jwt) @@ -80,7 +79,11 @@ func ReplicatedDelete(masterNode string, store *storage.Store, } func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { - if lookupResult, lookupErr := operation.LookupNoCache(masterNode, volumeId.String()); lookupErr == nil { + collection := "" + if v := store.GetVolume(volumeId); v != nil { + collection = v.Collection + } + if lookupResult, lookupErr := operation.LookupNoCache(masterNode, volumeId.String(), collection); lookupErr == nil { length := 0 selfUrl := net.JoinHostPort(store.Ip, strconv.Itoa(store.Port)) results := make(chan bool) diff --git a/go/weed/backup.go b/go/weed/backup.go index 2f97751f8..3d852de60 100644 --- a/go/weed/backup.go +++ b/go/weed/backup.go @@ -52,7 +52,7 @@ func runBackup(cmd *Command, args []string) bool { vid := storage.VolumeId(*s.volumeId) // find volume location, replication, ttl info - lookup, err := operation.Lookup(*s.master, vid.String()) + lookup, err := operation.Lookup(*s.master, vid.String(), "") if err != nil { fmt.Printf("Error looking up volume %d: %v\n", vid, err) return true diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index daa970788..e9f882008 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -248,7 +248,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { parts := strings.SplitN(fid, ",", 2) vid := parts[0] start := time.Now() - ret, err := operation.Lookup(*b.server, vid) + ret, err := operation.Lookup(*b.server, vid, "") if err != nil || len(ret.Locations) == 0 { s.failed++ println("!!!! volume id ", vid, " location not found!!!!!") diff --git a/go/weed/download.go b/go/weed/download.go index 5e1fd30f8..a2f3118fd 100644 --- a/go/weed/download.go +++ b/go/weed/download.go @@ -51,7 +51,7 @@ func runDownload(cmd *Command, args []string) bool { } func downloadToFile(server, fileId, saveDir string) error { - fileUrl, lookupError := operation.LookupFileId(server, fileId, true) + fileUrl, lookupError := operation.LookupFileId(server, fileId, "", true) if lookupError != nil { return lookupError } @@ -103,7 +103,7 @@ func downloadToFile(server, fileId, saveDir string) error { } func fetchContent(server string, fileId string) (filename string, content []byte, e error) { - fileUrl, lookupError := operation.LookupFileId(server, fileId, true) + fileUrl, lookupError := operation.LookupFileId(server, fileId, "", true) if lookupError != nil { return "", nil, lookupError } diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index ea1d63c38..d7867284e 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -77,8 +77,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, w.WriteHeader(http.StatusNotFound) return } - - urlString, err := operation.LookupFileId(fs.master, fileId, true) + query := r.URL.Query() + collection := query.Get("collection") + if collection == "" { + collection = fs.collection + } + urlString, err := operation.LookupFileId(fs.master, fileId, collection, true) if err != nil { glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) w.WriteHeader(http.StatusNotFound) @@ -162,7 +166,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } else if fileId != "" && err == nil { var le error - urlLocation, le = operation.LookupFileId(fs.master, fileId, false) + urlLocation, le = operation.LookupFileId(fs.master, fileId, collection, false) if le != nil { glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error()) w.WriteHeader(http.StatusNotFound) @@ -224,7 +228,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if ret.Name != "" { path += ret.Name } else { - operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up + operation.DeleteFile(fs.master, fileId, collection, fs.jwt(fileId)) //clean up glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") writeJsonError(w, r, http.StatusInternalServerError, errors.New("Can not to write to folder "+path+" without a file name")) @@ -233,7 +237,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } glog.V(4).Infoln("saving", path, "=>", fileId) if db_err := fs.filer.CreateFile(path, fileId); db_err != nil { - operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up + operation.DeleteFile(fs.master, fileId, collection, fs.jwt(fileId)) //clean up glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) return @@ -253,7 +257,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } else { fid, err = fs.filer.DeleteFile(r.URL.Path) if err == nil && fid != "" { - err = operation.DeleteFile(fs.master, fid, fs.jwt(fid)) + err = operation.DeleteFile(fs.master, fid, r.FormValue("collection"), fs.jwt(fid)) } } if err == nil { diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go index 9c41b7f7c..9b757961f 100644 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ b/go/weed/weed_server/volume_server_handlers_read.go @@ -11,12 +11,13 @@ import ( "strings" "time" + "net/url" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/images" "github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" - "net/url" ) var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") @@ -44,16 +45,11 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusNotFound) return } - lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String()) + lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String(), r.FormValue("collection")) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err == nil && len(lookupResult.Locations) > 0 { u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations.Head().PublicUrl)) u.Path = r.URL.Path - arg := url.Values{} - if c := r.FormValue("collection"); c != "" { - arg.Set("collection", c) - } - u.RawQuery = arg.Encode() http.Redirect(w, r, u.String(), http.StatusMovedPermanently) } else { glog.V(2).Infoln("lookup error:", err, r.URL.Path) @@ -92,7 +88,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } w.Header().Set("Etag", etag) - if vs.tryHandleChunkedFile(n, filename, w, r) { + if vs.tryHandleChunkedFile(volumeId, n, filename, w, r) { return } @@ -137,7 +133,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } -func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { +func (vs *VolumeServer) tryHandleChunkedFile(vid storage.VolumeId, n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { if !n.IsChunkedManifest() { return false } @@ -164,6 +160,9 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, Manifest: chunkManifest, Master: vs.GetMasterNode(), } + if v := vs.store.GetVolume(vid); v != nil { + chunkedFileReader.Collection = v.Collection + } defer chunkedFileReader.Close() if e := writeResponseContent(fileName, mType, chunkedFileReader, w, r); e != nil { glog.V(2).Infoln("response write error:", e) diff --git a/go/weed/weed_server/volume_server_handlers_write.go b/go/weed/weed_server/volume_server_handlers_write.go index 3d2afaf77..10f14e5ad 100644 --- a/go/weed/weed_server/volume_server_handlers_write.go +++ b/go/weed/weed_server/volume_server_handlers_write.go @@ -77,7 +77,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } // make sure all chunks had deleted before delete manifest - if e := chunkManifest.DeleteChunks(vs.GetMasterNode()); e != nil { + if e := chunkManifest.DeleteChunks(vs.GetMasterNode(), r.FormValue("collection")); e != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) return }