@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
"github.com/golang/groupcache/singleflight"
"io"
"math/rand"
"sync"
@ -18,12 +19,12 @@ type ChunkReadAt struct {
chunkViews [ ] * ChunkView
lookupFileId func ( fileId string ) ( targetUrl string , err error )
readerLock sync . Mutex
fetcherLock sync . Mutex
fileSize int64
lastChunkFileId string
lastChunkData [ ] byte
chunkCache chunk_cache . ChunkCache
fetchGroup singleflight . Group
lastChunkFileId string
lastChunkData [ ] byte
chunkCache chunk_cache . ChunkCache
}
// var _ = io.ReaderAt(&ChunkReadAt{})
@ -88,10 +89,16 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
var buffer [ ] byte
startOffset , remaining := offset , int64 ( len ( p ) )
var nextChunk * ChunkView
for i , chunk := range c . chunkViews {
if remaining <= 0 {
break
}
if i + 1 < len ( c . chunkViews ) {
nextChunk = c . chunkViews [ i + 1 ]
} else {
nextChunk = nil
}
if startOffset < chunk . LogicOffset {
gap := int ( chunk . LogicOffset - startOffset )
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , startOffset , startOffset + int64 ( gap ) )
@ -107,7 +114,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
continue
}
glog . V ( 4 ) . Infof ( "read [%d,%d), %d/%d chunk %s [%d,%d)" , chunkStart , chunkStop , i , len ( c . chunkViews ) , chunk . FileId , chunk . LogicOffset - chunk . Offset , chunk . LogicOffset - chunk . Offset + int64 ( chunk . Size ) )
buffer , err = c . readFromWholeChunkData ( chunk )
buffer , err = c . readFromWholeChunkData ( chunk , nextChunk )
if err != nil {
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
return
@ -135,36 +142,63 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
func ( c * ChunkReadAt ) readFromWholeChunkData ( chunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
c . fetcherLock . Lock ( )
defer c . fetcherLock . Unlock ( )
func ( c * ChunkReadAt ) readFromWholeChunkData ( chunkView , nextChunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
if c . lastChunkFileId == chunkView . FileId {
return c . lastChunkData , nil
}
glog . V ( 4 ) . Infof ( "readFromWholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
v , doErr := c . readOneWholeChunk ( chunkView )
chunkData = c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
if chunkData != nil {
glog . V ( 4 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( chunkData ) ) )
} else {
chunkData , err = c . doFetchFullChunkData ( chunkView )
if err != nil {
return
}
c . chunkCache . SetChunk ( chunkView . FileId , chunkData )
c . lastChunkData = chunkData
c . lastChunkFileId = chunkView . FileId
if doErr != nil {
return
}
chunkData = v . ( [ ] byte )
c . lastChunkData = chunkData
c . lastChunkFileId = chunkView . FileId
go func ( ) {
if c . chunkCache != nil && nextChunkView != nil {
c . readOneWholeChunk ( nextChunkView )
}
} ( )
return
}
func ( c * ChunkReadAt ) readOneWholeChunk ( chunkView * ChunkView ) ( interface { } , error ) {
var err error
return c . fetchGroup . Do ( chunkView . FileId , func ( ) ( interface { } , error ) {
glog . V ( 4 ) . Infof ( "readFromWholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
data := c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
if data != nil {
glog . V ( 4 ) . Infof ( "cache hit %s [%d,%d)" , chunkView . FileId , chunkView . LogicOffset - chunkView . Offset , chunkView . LogicOffset - chunkView . Offset + int64 ( len ( data ) ) )
} else {
var err error
data , err = c . doFetchFullChunkData ( chunkView )
if err != nil {
return data , err
}
c . chunkCache . SetChunk ( chunkView . FileId , data )
}
return data , err
} )
}
func ( c * ChunkReadAt ) doFetchFullChunkData ( chunkView * ChunkView ) ( [ ] byte , error ) {
glog . V ( 2 ) . Infof ( "+ doFetchFullChunkData %s" , chunkView . FileId )
data , err := fetchChunk ( c . lookupFileId , chunkView . FileId , chunkView . CipherKey , chunkView . IsGzipped )
glog . V ( 2 ) . Infof ( "- doFetchFullChunkData %s" , chunkView . FileId )
return data , err
}