@ -19,10 +19,7 @@ type ChunkReadAt struct {
readerLock sync . Mutex
readerLock sync . Mutex
fileSize int64
fileSize int64
lastChunkFileId string
lastChunkData [ ] byte
isPrefetching bool
chunkCache chunk_cache . ChunkCache
chunkCache chunk_cache . ChunkCache
}
}
// var _ = io.ReaderAt(&ChunkReadAt{})
// var _ = io.ReaderAt(&ChunkReadAt{})
@ -79,16 +76,10 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
var buffer [ ] byte
var buffer [ ] byte
startOffset , remaining := offset , int64 ( len ( p ) )
startOffset , remaining := offset , int64 ( len ( p ) )
var nextChunkView * ChunkView
for i , chunk := range c . chunkViews {
for i , chunk := range c . chunkViews {
if remaining <= 0 {
if remaining <= 0 {
break
break
}
}
if i + 1 < len ( c . chunkViews ) {
nextChunkView = c . chunkViews [ i + 1 ]
} else {
nextChunkView = nil
}
if startOffset < chunk . LogicOffset {
if startOffset < chunk . LogicOffset {
gap := int ( chunk . LogicOffset - startOffset )
gap := int ( chunk . LogicOffset - startOffset )
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , startOffset , startOffset + int64 ( gap ) )
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , startOffset , startOffset + int64 ( gap ) )
@ -104,7 +95,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
continue
continue
}
}
glog . V ( 4 ) . Infof ( "read [%d,%d), %d/%d chunk %s [%d,%d)" , chunkStart , chunkStop , i , len ( c . chunkViews ) , chunk . FileId , chunk . LogicOffset - chunk . Offset , chunk . LogicOffset - chunk . Offset + int64 ( chunk . Size ) )
glog . V ( 4 ) . Infof ( "read [%d,%d), %d/%d chunk %s [%d,%d)" , chunkStart , chunkStop , i , len ( c . chunkViews ) , chunk . FileId , chunk . LogicOffset - chunk . Offset , chunk . LogicOffset - chunk . Offset + int64 ( chunk . Size ) )
buffer , err = c . readFromWholeChunkData ( chunk , nextChunkView )
buffer , err = c . readFromWholeChunkData ( chunk )
if err != nil {
if err != nil {
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
return
return
@ -132,45 +123,27 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
}
func ( c * ChunkReadAt ) readFromWholeChunkData ( chunkView * ChunkView , nextChunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
func ( c * ChunkReadAt ) readFromWholeChunkData ( chunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
glog . V ( 4 ) . Infof ( "readFromWholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
glog . V ( 4 ) . Infof ( "readFromWholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
if c . lastChunkFileId == chunkView . FileId {
return c . lastChunkData , nil
}
chunkData = c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
chunkData = c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
if chunkData != nil {
if chunkData != nil {
glog . V ( 4 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( chunkData ) ) )
glog . V ( 4 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( chunkData ) ) )
} else {
} else {
glog . V ( 4 ) . Infof ( "doFetchFullChunkData %s" , chunkView . FileId )
glog . V ( 4 ) . Infof ( "doFetchFullChunkData %s" , chunkView . FileId )
chunkData , err = c . doFetchFullChunkData ( chunkView )
chunkData , err = c . doFetchFullChunkData ( chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped )
if err != nil {
if err != nil {
return
return
}
}
c . chunkCache . SetChunk ( chunkView . FileId , chunkData )
c . chunkCache . SetChunk ( chunkView . FileId , chunkData )
}
}
c . lastChunkData = chunkData
c . lastChunkFileId = chunkView . FileId
if nextChunkView != nil && ! c . isPrefetching {
c . isPrefetching = true
go func ( ) {
if chunkData , err := c . doFetchFullChunkData ( nextChunkView ) ; err == nil {
c . chunkCache . SetChunk ( nextChunkView . FileId , chunkData )
}
c . isPrefetching = false
} ( )
}
return
return
}
}
func ( c * ChunkReadAt ) doFetchFullChunkData ( chunkView * ChunkView ) ( [ ] byte , error ) {
func ( c * ChunkReadAt ) doFetchFullChunkData ( fileId string , cipherKey [ ] byte , isGzipped bool ) ( [ ] byte , error ) {
data , err := fetchChunk ( c . lookupFileId , chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped )
return data , err
return fetchChunk ( c . lookupFileId , fileId , cipherKey , isGzipped )
}
}