diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 9f338782e..5ffc3a024 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math/rand" "sync" "github.com/chrislusf/seaweedfs/weed/glog" @@ -17,9 +18,12 @@ type ChunkReadAt struct { chunkViews []*ChunkView lookupFileId func(fileId string) (targetUrl string, err error) readerLock sync.Mutex + fetcherLock sync.Mutex fileSize int64 - chunkCache chunk_cache.ChunkCache + lastChunkFileId string + lastChunkData []byte + chunkCache chunk_cache.ChunkCache } // var _ = io.ReaderAt(&ChunkReadAt{}) @@ -27,28 +31,36 @@ type ChunkReadAt struct { type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error) func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { + + vidCache := make(map[string]*filer_pb.Locations) return func(fileId string) (targetUrl string, err error) { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - vid := VolumeId(fileId) - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ - VolumeIds: []string{vid}, + vid := VolumeId(fileId) + locations, found := vidCache[vid] + + if !found { + // println("looking up volume", vid) + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + locations = resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + vidCache[vid] = locations + + return nil }) - if err != nil { - return err - } - - locations := resp.LocationsMap[vid] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", fileId) - return fmt.Errorf("failed to locate %s", fileId) - } - - volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) + } - targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[rand.Intn(len(locations.Locations))].Url) + targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) - return nil - }) return } } @@ -125,25 +137,34 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) { + c.fetcherLock.Lock() + defer c.fetcherLock.Unlock() + + if c.lastChunkFileId == chunkView.FileId { + return c.lastChunkData, nil + } + glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) if chunkData != nil { glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) } else { - glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId) - chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) + chunkData, err = c.doFetchFullChunkData(chunkView) if err != nil { return } c.chunkCache.SetChunk(chunkView.FileId, chunkData) + c.lastChunkData = chunkData + c.lastChunkFileId = chunkView.FileId } return } -func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { +func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) { + data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) - return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped) + return data, err }