Browse Source

add collection param in lookup operation

pull/279/head
tnextday 10 years ago
parent
commit
45dca3748d
  1. 19
      go/operation/chunked_file.go
  2. 4
      go/operation/delete_content.go
  3. 17
      go/operation/lookup.go
  4. 4
      go/operation/submit.go
  5. 1
      go/storage/compact_map.go
  6. 3
      go/storage/store.go
  7. 7
      go/topology/store_replicate.go
  8. 2
      go/weed/backup.go
  9. 2
      go/weed/benchmark.go
  10. 4
      go/weed/download.go
  11. 16
      go/weed/weed_server/filer_server_handlers.go
  12. 17
      go/weed/weed_server/volume_server_handlers_read.go
  13. 2
      go/weed/weed_server/volume_server_handlers_write.go

19
go/operation/chunked_file.go

@ -38,12 +38,13 @@ type ChunkManifest struct {
// seekable chunked file reader
type ChunkedFileReader struct {
Manifest *ChunkManifest
Master string
pos int64
pr *io.PipeReader
pw *io.PipeWriter
mutex sync.Mutex
Manifest *ChunkManifest
Master string
Collection string
pos int64
pr *io.PipeReader
pw *io.PipeWriter
mutex sync.Mutex
}
func (s ChunkList) Len() int { return len(s) }
@ -69,10 +70,10 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
func (cm *ChunkManifest) DeleteChunks(master string) error {
func (cm *ChunkManifest) DeleteChunks(master, collection string) error {
deleteError := 0
for _, ci := range cm.Chunks {
if e := DeleteFile(master, ci.Fid, ""); e != nil {
if e := DeleteFile(master, ci.Fid, collection, ""); e != nil {
deleteError++
glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master)
}
@ -150,7 +151,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
ci := cm.Chunks[chunkIndex]
// if we need read date from local volume server first?
fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, true)
fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, cf.Collection, true)
if lookupError != nil {
return n, lookupError
}

4
go/operation/delete_content.go

@ -21,8 +21,8 @@ type DeleteResult struct {
Error string `json:"error,omitempty"`
}
func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
fileUrl, err := LookupFileId(master, fileId, false)
func DeleteFile(master, fileId, collection string, jwt security.EncodedJwt) error {
fileUrl, err := LookupFileId(master, fileId, collection, false)
if err != nil {
return fmt.Errorf("Failed to lookup %s:%v", fileId, err)
}

17
go/operation/lookup.go

@ -41,10 +41,10 @@ var (
vc VidCache // caching of volume locations, re-check if after 10 minutes
)
func Lookup(server string, vid string) (ret *LookupResult, err error) {
func Lookup(server, vid, collection string) (ret *LookupResult, err error) {
locations, cache_err := vc.Get(vid)
if cache_err != nil {
if ret, err = do_lookup(server, vid); err == nil {
if ret, err = do_lookup(server, vid, collection); err == nil {
vc.Set(vid, ret.Locations, 10*time.Minute)
}
} else {
@ -53,16 +53,19 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) {
return
}
func LookupNoCache(server string, vid string) (ret *LookupResult, err error) {
if ret, err = do_lookup(server, vid); err == nil {
func LookupNoCache(server, vid, collection string) (ret *LookupResult, err error) {
if ret, err = do_lookup(server, vid, collection); err == nil {
vc.Set(vid, ret.Locations, 10*time.Minute)
}
return
}
func do_lookup(server string, vid string) (*LookupResult, error) {
func do_lookup(server, vid, collection string) (*LookupResult, error) {
values := make(url.Values)
values.Add("volumeId", vid)
if collection != "" {
values.Set("collection", collection)
}
jsonBlob, err := util.Post(server, "/dir/lookup", values)
if err != nil {
return nil, err
@ -78,12 +81,12 @@ func do_lookup(server string, vid string) (*LookupResult, error) {
return &ret, nil
}
func LookupFileId(server string, fileId string, readonly bool) (fullUrl string, err error) {
func LookupFileId(server, fileId, collection string, readonly bool) (fullUrl string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return "", errors.New("Invalid fileId " + fileId)
}
lookup, lookupError := Lookup(server, parts[0])
lookup, lookupError := Lookup(server, parts[0], collection)
if lookupError != nil {
return "", lookupError
}

4
go/operation/submit.go

@ -134,7 +134,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
jwt)
if e != nil {
// delete all uploaded chunks
cm.DeleteChunks(master)
cm.DeleteChunks(master, fi.Collection)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@ -149,7 +149,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
cm.DeleteChunks(master)
cm.DeleteChunks(master, fi.Collection)
}
} else {
ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)

1
go/storage/compact_map.go

@ -21,7 +21,6 @@ func (k Key) String() string {
return strconv.FormatUint(uint64(k), 10)
}
//CompactSection is not concurrent safe,you should lock it when access in multi-thread
type CompactSection struct {
values []NeedleValue

3
go/storage/store.go

@ -141,10 +141,7 @@ func (s *Store) DeleteCollection(collection string) (e error) {
}
return
}
func (s *Store) DeleteVolume(dl *DiskLocation, v *Volume) (e error) {
return dl.DeleteVolume(v.Id)
}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations {
if v, found := location.GetVolume(vid); found {

7
go/topology/store_replicate.go

@ -41,7 +41,6 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
}
u := util.MkUrl(location.Url, r.URL.Path, args)
needle.IsChunkedManifest()
_, err := operation.Upload(u,
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
@ -80,7 +79,11 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
}
func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
if lookupResult, lookupErr := operation.LookupNoCache(masterNode, volumeId.String()); lookupErr == nil {
collection := ""
if v := store.GetVolume(volumeId); v != nil {
collection = v.Collection
}
if lookupResult, lookupErr := operation.LookupNoCache(masterNode, volumeId.String(), collection); lookupErr == nil {
length := 0
selfUrl := net.JoinHostPort(store.Ip, strconv.Itoa(store.Port))
results := make(chan bool)

2
go/weed/backup.go

@ -52,7 +52,7 @@ func runBackup(cmd *Command, args []string) bool {
vid := storage.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
lookup, err := operation.Lookup(*s.master, vid.String())
lookup, err := operation.Lookup(*s.master, vid.String(), "")
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true

2
go/weed/benchmark.go

@ -248,7 +248,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
parts := strings.SplitN(fid, ",", 2)
vid := parts[0]
start := time.Now()
ret, err := operation.Lookup(*b.server, vid)
ret, err := operation.Lookup(*b.server, vid, "")
if err != nil || len(ret.Locations) == 0 {
s.failed++
println("!!!! volume id ", vid, " location not found!!!!!")

4
go/weed/download.go

@ -51,7 +51,7 @@ func runDownload(cmd *Command, args []string) bool {
}
func downloadToFile(server, fileId, saveDir string) error {
fileUrl, lookupError := operation.LookupFileId(server, fileId, true)
fileUrl, lookupError := operation.LookupFileId(server, fileId, "", true)
if lookupError != nil {
return lookupError
}
@ -103,7 +103,7 @@ func downloadToFile(server, fileId, saveDir string) error {
}
func fetchContent(server string, fileId string) (filename string, content []byte, e error) {
fileUrl, lookupError := operation.LookupFileId(server, fileId, true)
fileUrl, lookupError := operation.LookupFileId(server, fileId, "", true)
if lookupError != nil {
return "", nil, lookupError
}

16
go/weed/weed_server/filer_server_handlers.go

@ -77,8 +77,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
w.WriteHeader(http.StatusNotFound)
return
}
urlString, err := operation.LookupFileId(fs.master, fileId, true)
query := r.URL.Query()
collection := query.Get("collection")
if collection == "" {
collection = fs.collection
}
urlString, err := operation.LookupFileId(fs.master, fileId, collection, true)
if err != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound)
@ -162,7 +166,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
} else if fileId != "" && err == nil {
var le error
urlLocation, le = operation.LookupFileId(fs.master, fileId, false)
urlLocation, le = operation.LookupFileId(fs.master, fileId, collection, false)
if le != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error())
w.WriteHeader(http.StatusNotFound)
@ -224,7 +228,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if ret.Name != "" {
path += ret.Name
} else {
operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up
operation.DeleteFile(fs.master, fileId, collection, fs.jwt(fileId)) //clean up
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
writeJsonError(w, r, http.StatusInternalServerError,
errors.New("Can not to write to folder "+path+" without a file name"))
@ -233,7 +237,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
glog.V(4).Infoln("saving", path, "=>", fileId)
if db_err := fs.filer.CreateFile(path, fileId); db_err != nil {
operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up
operation.DeleteFile(fs.master, fileId, collection, fs.jwt(fileId)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err)
return
@ -253,7 +257,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} else {
fid, err = fs.filer.DeleteFile(r.URL.Path)
if err == nil && fid != "" {
err = operation.DeleteFile(fs.master, fid, fs.jwt(fid))
err = operation.DeleteFile(fs.master, fid, r.FormValue("collection"), fs.jwt(fid))
}
}
if err == nil {

17
go/weed/weed_server/volume_server_handlers_read.go

@ -11,12 +11,13 @@ import (
"strings"
"time"
"net/url"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/images"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/storage"
"github.com/chrislusf/seaweedfs/go/util"
"net/url"
)
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
@ -44,16 +45,11 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotFound)
return
}
lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String())
lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String(), r.FormValue("collection"))
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations.Head().PublicUrl))
u.Path = r.URL.Path
arg := url.Values{}
if c := r.FormValue("collection"); c != "" {
arg.Set("collection", c)
}
u.RawQuery = arg.Encode()
http.Redirect(w, r, u.String(), http.StatusMovedPermanently)
} else {
glog.V(2).Infoln("lookup error:", err, r.URL.Path)
@ -92,7 +88,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
w.Header().Set("Etag", etag)
if vs.tryHandleChunkedFile(n, filename, w, r) {
if vs.tryHandleChunkedFile(volumeId, n, filename, w, r) {
return
}
@ -137,7 +133,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
func (vs *VolumeServer) tryHandleChunkedFile(vid storage.VolumeId, n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
if !n.IsChunkedManifest() {
return false
}
@ -164,6 +160,9 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
Manifest: chunkManifest,
Master: vs.GetMasterNode(),
}
if v := vs.store.GetVolume(vid); v != nil {
chunkedFileReader.Collection = v.Collection
}
defer chunkedFileReader.Close()
if e := writeResponseContent(fileName, mType, chunkedFileReader, w, r); e != nil {
glog.V(2).Infoln("response write error:", e)

2
go/weed/weed_server/volume_server_handlers_write.go

@ -77,7 +77,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
// make sure all chunks had deleted before delete manifest
if e := chunkManifest.DeleteChunks(vs.GetMasterNode()); e != nil {
if e := chunkManifest.DeleteChunks(vs.GetMasterNode(), r.FormValue("collection")); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return
}

Loading…
Cancel
Save