From 1c4b0fd9a0e4442d88b62ef726870ac0d892d374 Mon Sep 17 00:00:00 2001 From: tnextday Date: Tue, 15 Dec 2015 22:13:06 +0800 Subject: [PATCH] lookup support query `Head` and `readonly` location --- go/operation/chunked_file.go | 2 +- go/operation/delete_content.go | 2 +- go/operation/lookup.go | 28 +++++++++++++++---- go/operation/lookup_vid_cache.go | 6 ++-- go/operation/lookup_vid_cache_test.go | 2 +- go/weed/backup.go | 2 +- go/weed/benchmark.go | 2 +- go/weed/download.go | 4 +-- go/weed/weed_server/filer_server_handlers.go | 3 +- go/weed/weed_server/master_server_handlers.go | 2 +- .../volume_server_handlers_read.go | 2 +- 11 files changed, 36 insertions(+), 19 deletions(-) diff --git a/go/operation/chunked_file.go b/go/operation/chunked_file.go index 70564cbd2..c42da903a 100644 --- a/go/operation/chunked_file.go +++ b/go/operation/chunked_file.go @@ -150,7 +150,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { ci := cm.Chunks[chunkIndex] // if we need read date from local volume server first? - fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid) + fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, true) if lookupError != nil { return n, lookupError } diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go index 32ad69b17..3a9ea28e3 100644 --- a/go/operation/delete_content.go +++ b/go/operation/delete_content.go @@ -21,7 +21,7 @@ type DeleteResult struct { } func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { - fileUrl, err := LookupFileId(master, fileId) + fileUrl, err := LookupFileId(master, fileId, false) if err != nil { return err } diff --git a/go/operation/lookup.go b/go/operation/lookup.go index f77d1ec9b..7719690ec 100644 --- a/go/operation/lookup.go +++ b/go/operation/lookup.go @@ -16,21 +16,33 @@ type Location struct { Url string `json:"url,omitempty"` PublicUrl string `json:"publicUrl,omitempty"` } + +type Locations []Location + type LookupResult struct { - VolumeId string `json:"volumeId,omitempty"` - Locations []Location `json:"locations,omitempty"` - Error string `json:"error,omitempty"` + VolumeId string `json:"volumeId,omitempty"` + Locations Locations `json:"locations,omitempty"` + Error string `json:"error,omitempty"` } func (lr *LookupResult) String() string { return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error) } +func (ls Locations) Head() *Location { + return &ls[0] +} + +func (ls Locations) PickForRead() *Location { + return &ls[rand.Intn(len(ls))] +} + var ( vc VidCache // caching of volume locations, re-check if after 10 minutes ) func Lookup(server string, vid string) (ret *LookupResult, err error) { + //Maybe we should fetch from master when lookup location for write locations, cache_err := vc.Get(vid) if cache_err != nil { if ret, err = do_lookup(server, vid); err == nil { @@ -60,7 +72,7 @@ func do_lookup(server string, vid string) (*LookupResult, error) { return &ret, nil } -func LookupFileId(server string, fileId string) (fullUrl string, err error) { +func LookupFileId(server string, fileId string, readonly bool) (fullUrl string, err error) { parts := strings.Split(fileId, ",") if len(parts) != 2 { return "", errors.New("Invalid fileId " + fileId) @@ -72,7 +84,13 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { if len(lookup.Locations) == 0 { return "", errors.New("File Not Found") } - return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil + var u string + if readonly{ + u = lookup.Locations.PickForRead().Url + }else{ + u = lookup.Locations.Head().Url + } + return "http://" + u + "/" + fileId, nil } // LookupVolumeIds find volume locations by cache and actual lookup diff --git a/go/operation/lookup_vid_cache.go b/go/operation/lookup_vid_cache.go index ac4240102..ecbfbfade 100644 --- a/go/operation/lookup_vid_cache.go +++ b/go/operation/lookup_vid_cache.go @@ -9,14 +9,14 @@ import ( ) type VidInfo struct { - Locations []Location + Locations Locations NextRefreshTime time.Time } type VidCache struct { cache []VidInfo } -func (vc *VidCache) Get(vid string) ([]Location, error) { +func (vc *VidCache) Get(vid string) (Locations, error) { id, err := strconv.Atoi(vid) if err != nil { glog.V(1).Infof("Unknown volume id %s", vid) @@ -33,7 +33,7 @@ func (vc *VidCache) Get(vid string) ([]Location, error) { } return nil, errors.New("Not Found") } -func (vc *VidCache) Set(vid string, locations []Location, duration time.Duration) { +func (vc *VidCache) Set(vid string, locations Locations, duration time.Duration) { id, err := strconv.Atoi(vid) if err != nil { glog.V(1).Infof("Unknown volume id %s", vid) diff --git a/go/operation/lookup_vid_cache_test.go b/go/operation/lookup_vid_cache_test.go index 9c9e2affb..e3e24e37e 100644 --- a/go/operation/lookup_vid_cache_test.go +++ b/go/operation/lookup_vid_cache_test.go @@ -10,7 +10,7 @@ func TestCaching(t *testing.T) { var ( vc VidCache ) - var locations []Location + var locations Locations locations = append(locations, Location{Url: "a.com:8080"}) vc.Set("123", locations, time.Second) ret, _ := vc.Get("123") diff --git a/go/weed/backup.go b/go/weed/backup.go index 5e51a8b03..0e78f2e2b 100644 --- a/go/weed/backup.go +++ b/go/weed/backup.go @@ -57,7 +57,7 @@ func runBackup(cmd *Command, args []string) bool { fmt.Printf("Error looking up volume %d: %v\n", vid, err) return true } - volumeServer := lookup.Locations[0].Url + volumeServer := lookup.Locations.Head().Url stats, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) if err != nil { diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index b63f0008e..51652b1ae 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -254,7 +254,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { println("!!!! volume id ", vid, " location not found!!!!!") continue } - server := ret.Locations[rand.Intn(len(ret.Locations))].Url + server := ret.Locations.PickForRead().Url url := "http://" + server + "/" + fid if bytesRead, err := util.Get(url); err == nil { s.completed++ diff --git a/go/weed/download.go b/go/weed/download.go index dfe4f88b4..df7d1a470 100644 --- a/go/weed/download.go +++ b/go/weed/download.go @@ -53,7 +53,7 @@ func runDownload(cmd *Command, args []string) bool { } func downloadToFile(server, fileId, saveDir string) error { - fileUrl, lookupError := operation.LookupFileId(server, fileId) + fileUrl, lookupError := operation.LookupFileId(server, fileId, true) if lookupError != nil { return lookupError } @@ -105,7 +105,7 @@ func downloadToFile(server, fileId, saveDir string) error { } func fetchContent(server string, fileId string) (filename string, content []byte, e error) { - fileUrl, lookupError := operation.LookupFileId(server, fileId) + fileUrl, lookupError := operation.LookupFileId(server, fileId, true) if lookupError != nil { return "", nil, lookupError } diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index 1695296d4..24bbbcf26 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -5,7 +5,6 @@ import ( "errors" "io" "io/ioutil" - "math/rand" "net/http" "net/url" "strconv" @@ -91,7 +90,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, w.WriteHeader(http.StatusNotFound) return } - urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].Url + urlLocation := lookup.Locations.PickForRead().Url urlString := "http://" + urlLocation + "/" + fileId if fs.redirectOnRead { http.Redirect(w, r, urlString, http.StatusFound) diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index 2be5d9524..6a5b06c3c 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -25,7 +25,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume if err == nil { machines := ms.Topo.Lookup(collection, volumeId) if machines != nil { - var ret []operation.Location + var ret operation.Locations for _, dn := range machines { ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl}) } diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go index 2aa0fc656..eac26d151 100644 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ b/go/weed/weed_server/volume_server_handlers_read.go @@ -48,7 +48,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err == nil && len(lookupResult.Locations) > 0 { - http.Redirect(w, r, util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)+r.URL.Path, http.StatusMovedPermanently) + http.Redirect(w, r, util.NormalizeUrl(lookupResult.Locations.Head().PublicUrl)+r.URL.Path, http.StatusMovedPermanently) } else { glog.V(2).Infoln("lookup error:", err, r.URL.Path) w.WriteHeader(http.StatusNotFound)