Browse Source

lookup support query `Head` and `readonly` location

pull/279/head
tnextday 10 years ago
parent
commit
1c4b0fd9a0
  1. 2
      go/operation/chunked_file.go
  2. 2
      go/operation/delete_content.go
  3. 24
      go/operation/lookup.go
  4. 6
      go/operation/lookup_vid_cache.go
  5. 2
      go/operation/lookup_vid_cache_test.go
  6. 2
      go/weed/backup.go
  7. 2
      go/weed/benchmark.go
  8. 4
      go/weed/download.go
  9. 3
      go/weed/weed_server/filer_server_handlers.go
  10. 2
      go/weed/weed_server/master_server_handlers.go
  11. 2
      go/weed/weed_server/volume_server_handlers_read.go

2
go/operation/chunked_file.go

@ -150,7 +150,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
ci := cm.Chunks[chunkIndex]
// if we need read date from local volume server first?
fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid)
fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, true)
if lookupError != nil {
return n, lookupError
}

2
go/operation/delete_content.go

@ -21,7 +21,7 @@ type DeleteResult struct {
}
func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
fileUrl, err := LookupFileId(master, fileId)
fileUrl, err := LookupFileId(master, fileId, false)
if err != nil {
return err
}

24
go/operation/lookup.go

@ -16,9 +16,12 @@ type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
}
type Locations []Location
type LookupResult struct {
VolumeId string `json:"volumeId,omitempty"`
Locations []Location `json:"locations,omitempty"`
Locations Locations `json:"locations,omitempty"`
Error string `json:"error,omitempty"`
}
@ -26,11 +29,20 @@ func (lr *LookupResult) String() string {
return fmt.Sprintf("VolumeId:%s, Locations:%v, Error:%s", lr.VolumeId, lr.Locations, lr.Error)
}
func (ls Locations) Head() *Location {
return &ls[0]
}
func (ls Locations) PickForRead() *Location {
return &ls[rand.Intn(len(ls))]
}
var (
vc VidCache // caching of volume locations, re-check if after 10 minutes
)
func Lookup(server string, vid string) (ret *LookupResult, err error) {
//Maybe we should fetch from master when lookup location for write
locations, cache_err := vc.Get(vid)
if cache_err != nil {
if ret, err = do_lookup(server, vid); err == nil {
@ -60,7 +72,7 @@ func do_lookup(server string, vid string) (*LookupResult, error) {
return &ret, nil
}
func LookupFileId(server string, fileId string) (fullUrl string, err error) {
func LookupFileId(server string, fileId string, readonly bool) (fullUrl string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return "", errors.New("Invalid fileId " + fileId)
@ -72,7 +84,13 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
if len(lookup.Locations) == 0 {
return "", errors.New("File Not Found")
}
return "http://" + lookup.Locations[rand.Intn(len(lookup.Locations))].Url + "/" + fileId, nil
var u string
if readonly{
u = lookup.Locations.PickForRead().Url
}else{
u = lookup.Locations.Head().Url
}
return "http://" + u + "/" + fileId, nil
}
// LookupVolumeIds find volume locations by cache and actual lookup

6
go/operation/lookup_vid_cache.go

@ -9,14 +9,14 @@ import (
)
type VidInfo struct {
Locations []Location
Locations Locations
NextRefreshTime time.Time
}
type VidCache struct {
cache []VidInfo
}
func (vc *VidCache) Get(vid string) ([]Location, error) {
func (vc *VidCache) Get(vid string) (Locations, error) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
@ -33,7 +33,7 @@ func (vc *VidCache) Get(vid string) ([]Location, error) {
}
return nil, errors.New("Not Found")
}
func (vc *VidCache) Set(vid string, locations []Location, duration time.Duration) {
func (vc *VidCache) Set(vid string, locations Locations, duration time.Duration) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)

2
go/operation/lookup_vid_cache_test.go

@ -10,7 +10,7 @@ func TestCaching(t *testing.T) {
var (
vc VidCache
)
var locations []Location
var locations Locations
locations = append(locations, Location{Url: "a.com:8080"})
vc.Set("123", locations, time.Second)
ret, _ := vc.Get("123")

2
go/weed/backup.go

@ -57,7 +57,7 @@ func runBackup(cmd *Command, args []string) bool {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true
}
volumeServer := lookup.Locations[0].Url
volumeServer := lookup.Locations.Head().Url
stats, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
if err != nil {

2
go/weed/benchmark.go

@ -254,7 +254,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
println("!!!! volume id ", vid, " location not found!!!!!")
continue
}
server := ret.Locations[rand.Intn(len(ret.Locations))].Url
server := ret.Locations.PickForRead().Url
url := "http://" + server + "/" + fid
if bytesRead, err := util.Get(url); err == nil {
s.completed++

4
go/weed/download.go

@ -53,7 +53,7 @@ func runDownload(cmd *Command, args []string) bool {
}
func downloadToFile(server, fileId, saveDir string) error {
fileUrl, lookupError := operation.LookupFileId(server, fileId)
fileUrl, lookupError := operation.LookupFileId(server, fileId, true)
if lookupError != nil {
return lookupError
}
@ -105,7 +105,7 @@ func downloadToFile(server, fileId, saveDir string) error {
}
func fetchContent(server string, fileId string) (filename string, content []byte, e error) {
fileUrl, lookupError := operation.LookupFileId(server, fileId)
fileUrl, lookupError := operation.LookupFileId(server, fileId, true)
if lookupError != nil {
return "", nil, lookupError
}

3
go/weed/weed_server/filer_server_handlers.go

@ -5,7 +5,6 @@ import (
"errors"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"strconv"
@ -91,7 +90,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
w.WriteHeader(http.StatusNotFound)
return
}
urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].Url
urlLocation := lookup.Locations.PickForRead().Url
urlString := "http://" + urlLocation + "/" + fileId
if fs.redirectOnRead {
http.Redirect(w, r, urlString, http.StatusFound)

2
go/weed/weed_server/master_server_handlers.go

@ -25,7 +25,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
if err == nil {
machines := ms.Topo.Lookup(collection, volumeId)
if machines != nil {
var ret []operation.Location
var ret operation.Locations
for _, dn := range machines {
ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl})
}

2
go/weed/weed_server/volume_server_handlers_read.go

@ -48,7 +48,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
http.Redirect(w, r, util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)+r.URL.Path, http.StatusMovedPermanently)
http.Redirect(w, r, util.NormalizeUrl(lookupResult.Locations.Head().PublicUrl)+r.URL.Path, http.StatusMovedPermanently)
} else {
glog.V(2).Infoln("lookup error:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)

Loading…
Cancel
Save