@ -4,6 +4,7 @@ import (
"context"
"context"
"fmt"
"fmt"
"io"
"io"
"math/rand"
"sync"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/glog"
@ -17,9 +18,12 @@ type ChunkReadAt struct {
chunkViews [ ] * ChunkView
chunkViews [ ] * ChunkView
lookupFileId func ( fileId string ) ( targetUrl string , err error )
lookupFileId func ( fileId string ) ( targetUrl string , err error )
readerLock sync . Mutex
readerLock sync . Mutex
fetcherLock sync . Mutex
fileSize int64
fileSize int64
chunkCache chunk_cache . ChunkCache
lastChunkFileId string
lastChunkData [ ] byte
chunkCache chunk_cache . ChunkCache
}
}
// var _ = io.ReaderAt(&ChunkReadAt{})
// var _ = io.ReaderAt(&ChunkReadAt{})
@ -27,28 +31,36 @@ type ChunkReadAt struct {
type LookupFileIdFunctionType func ( fileId string ) ( targetUrl string , err error )
type LookupFileIdFunctionType func ( fileId string ) ( targetUrl string , err error )
func LookupFn ( filerClient filer_pb . FilerClient ) LookupFileIdFunctionType {
func LookupFn ( filerClient filer_pb . FilerClient ) LookupFileIdFunctionType {
vidCache := make ( map [ string ] * filer_pb . Locations )
return func ( fileId string ) ( targetUrl string , err error ) {
return func ( fileId string ) ( targetUrl string , err error ) {
err = filerClient . WithFilerClient ( func ( client filer_pb . SeaweedFilerClient ) error {
vid := VolumeId ( fileId )
resp , err := client . LookupVolume ( context . Background ( ) , & filer_pb . LookupVolumeRequest {
VolumeIds : [ ] string { vid } ,
vid := VolumeId ( fileId )
locations , found := vidCache [ vid ]
if ! found {
// println("looking up volume", vid)
err = filerClient . WithFilerClient ( func ( client filer_pb . SeaweedFilerClient ) error {
resp , err := client . LookupVolume ( context . Background ( ) , & filer_pb . LookupVolumeRequest {
VolumeIds : [ ] string { vid } ,
} )
if err != nil {
return err
}
locations = resp . LocationsMap [ vid ]
if locations == nil || len ( locations . Locations ) == 0 {
glog . V ( 0 ) . Infof ( "failed to locate %s" , fileId )
return fmt . Errorf ( "failed to locate %s" , fileId )
}
vidCache [ vid ] = locations
return nil
} )
} )
if err != nil {
return err
}
locations := resp . LocationsMap [ vid ]
if locations == nil || len ( locations . Locations ) == 0 {
glog . V ( 0 ) . Infof ( "failed to locate %s" , fileId )
return fmt . Errorf ( "failed to locate %s" , fileId )
}
volumeServerAddress := filerClient . AdjustedUrl ( locations . Locations [ 0 ] . Url )
}
targetUrl = fmt . Sprintf ( "http://%s/%s" , volumeServerAddress , fileId )
volumeServerAddress := filerClient . AdjustedUrl ( locations . Locations [ rand . Intn ( len ( locations . Locations ) ) ] . Url )
targetUrl = fmt . Sprintf ( "http://%s/%s" , volumeServerAddress , fileId )
return nil
} )
return
return
}
}
}
}
@ -125,25 +137,34 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
func ( c * ChunkReadAt ) readFromWholeChunkData ( chunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
func ( c * ChunkReadAt ) readFromWholeChunkData ( chunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
c . fetcherLock . Lock ( )
defer c . fetcherLock . Unlock ( )
if c . lastChunkFileId == chunkView . FileId {
return c . lastChunkData , nil
}
glog . V ( 4 ) . Infof ( "readFromWholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
glog . V ( 4 ) . Infof ( "readFromWholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
chunkData = c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
chunkData = c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
if chunkData != nil {
if chunkData != nil {
glog . V ( 4 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( chunkData ) ) )
glog . V ( 4 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( chunkData ) ) )
} else {
} else {
glog . V ( 4 ) . Infof ( "doFetchFullChunkData %s" , chunkView . FileId )
chunkData , err = c . doFetchFullChunkData ( chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped )
chunkData , err = c . doFetchFullChunkData ( chunkView )
if err != nil {
if err != nil {
return
return
}
}
c . chunkCache . SetChunk ( chunkView . FileId , chunkData )
c . chunkCache . SetChunk ( chunkView . FileId , chunkData )
c . lastChunkData = chunkData
c . lastChunkFileId = chunkView . FileId
}
}
return
return
}
}
func ( c * ChunkReadAt ) doFetchFullChunkData ( fileId string , cipherKey [ ] byte , isGzipped bool ) ( [ ] byte , error ) {
func ( c * ChunkReadAt ) doFetchFullChunkData ( chunkView * ChunkView ) ( [ ] byte , error ) {
data , err := fetchChunk ( c . lookupFileId , chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped )
return fetchChunk ( c . lookupFileId , fileId , cipherKey , isGzipped )
return data , err
}
}