@ -15,8 +15,6 @@ import (
type ChunkReadAt struct {
type ChunkReadAt struct {
masterClient * wdclient . MasterClient
masterClient * wdclient . MasterClient
chunkViews [ ] * ChunkView
chunkViews [ ] * ChunkView
buffer [ ] byte
bufferFileId string
lookupFileId func ( fileId string ) ( targetUrl string , err error )
lookupFileId func ( fileId string ) ( targetUrl string , err error )
readerLock sync . Mutex
readerLock sync . Mutex
fileSize int64
fileSize int64
@ -71,49 +69,50 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
defer c . readerLock . Unlock ( )
defer c . readerLock . Unlock ( )
glog . V ( 4 ) . Infof ( "ReadAt [%d,%d) of total file size %d bytes %d chunk views" , offset , offset + int64 ( len ( p ) ) , c . fileSize , len ( c . chunkViews ) )
glog . V ( 4 ) . Infof ( "ReadAt [%d,%d) of total file size %d bytes %d chunk views" , offset , offset + int64 ( len ( p ) ) , c . fileSize , len ( c . chunkViews ) )
for n < len ( p ) && err == nil {
readCount , readErr := c . doReadAt ( p [ n : ] , offset + int64 ( n ) )
n += readCount
err = readErr
}
return
return c . doReadAt ( p [ n : ] , offset + int64 ( n ) )
}
}
func ( c * ChunkReadAt ) doReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
func ( c * ChunkReadAt ) doReadAt ( p [ ] byte , offset int64 ) ( n int , err error ) {
var startOffset = offset
var buffer [ ] byte
startOffset , remaining := offset , int64 ( len ( p ) )
for i , chunk := range c . chunkViews {
for i , chunk := range c . chunkViews {
if startOffset < min ( chunk . LogicOffset , int64 ( len ( p ) ) + offset ) {
gap := int ( min ( chunk . LogicOffset , int64 ( len ( p ) ) + offset ) - startOffset )
if remaining <= 0 {
break
}
if startOffset < chunk . LogicOffset {
gap := int ( chunk . LogicOffset - startOffset )
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , n , n + gap )
glog . V ( 4 ) . Infof ( "zero [%d,%d)" , n , n + gap )
n += gap
n += gap
startOffset = chunk . LogicOffset
startOffset , remaining = chunk . LogicOffset , remaining - int64 ( gap )
if remaining <= 0 {
break
}
}
}
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
// fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
chunkStart , chunkStop := max ( chunk . LogicOffset , offset ) , min ( chunk . LogicOffset + int64 ( chunk . Size ) , offset + int64 ( len ( p ) ) )
chunkStart , chunkStop := max ( chunk . LogicOffset , startO ffset) , min ( chunk . LogicOffset + int64 ( chunk . Size ) , startOffset + remaining )
if chunkStart >= chunkStop {
if chunkStart >= chunkStop {
continue
continue
}
}
glog . V ( 4 ) . Infof ( "read [%d,%d), %d chunk %s [%d,%d)" , chunkStart , chunkStop , i , chunk . FileId , chunk . LogicOffset - chunk . Offset , chunk . LogicOffset - chunk . Offset + int64 ( chunk . Size ) )
c . buffer , err = c . fetch WholeChunkData( chunk )
glog . V ( 4 ) . Infof ( "read [%d,%d), %d/%d chunk %s [%d,%d)" , chunkStart , chunkStop , i , len ( c . chunkViews ) , chunk . FileId , chunk . LogicOffset - chunk . Offset , chunk . LogicOffset - chunk . Offset + int64 ( chunk . Size ) )
buffer , err = c . readFrom WholeChunkData( chunk )
if err != nil {
if err != nil {
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
glog . Errorf ( "fetching chunk %+v: %v\n" , chunk , err )
return
return
}
}
c . bufferFileId = chunk . FileId
bufferOffset := chunkStart - chunk . LogicOffset + chunk . Offset
bufferOffset := chunkStart - chunk . LogicOffset + chunk . Offset
copied := copy ( p [ chunkStart - o ffset: chunkStop - o ffset] , c . buffer [ bufferOffset : bufferOffset + chunkStop - chunkStart ] )
copied := copy ( p [ chunkStart - startO ffset: chunkStop - startO ffset] , buffer [ bufferOffset : bufferOffset + chunkStop - chunkStart ] )
n += copied
n += copied
startOffset += int64 ( copied )
startOffset , remaining = startOffset + int64 ( copied ) , remaining - int64 ( copied )
}
}
// fmt.Printf("> doReadAt [%d,%d), buffer %s:%d, found:%v, err:%v\n", offset, offset+int64(len(p)), c.bufferFileId, int64(len(c.buffer)), found, err)
glog . V ( 4 ) . Infof ( "doReadAt [%d,%d), n:%v, err:%v" , offset , offset + int64 ( len ( p ) ) , n , err )
if startOffset < min ( c . fileSize , int64 ( len ( p ) ) + offset ) {
gap := int ( min ( c . fileSize , int64 ( len ( p ) ) + offset ) - startOffset )
glog . V ( 4 ) . Infof ( "zero2 [%d,%d)" , n , n + gap )
n += gap
if remaining > 0 {
glog . V ( 4 ) . Infof ( "zero2 [%d,%d)" , n , n + int ( remaining ) )
n += int ( remaining )
}
}
if offset + int64 ( n ) >= c . fileSize {
if offset + int64 ( n ) >= c . fileSize {
err = io . EOF
err = io . EOF
}
}
@ -123,9 +122,9 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
}
func ( c * ChunkReadAt ) fetch WholeChunkData( chunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
func ( c * ChunkReadAt ) readFrom WholeChunkData( chunkView * ChunkView ) ( chunkData [ ] byte , err error ) {
glog . V ( 4 ) . Infof ( "fetch WholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
glog . V ( 4 ) . Infof ( "readFrom WholeChunkData %s offset %d [%d,%d) size at least %d" , chunkView . FileId , chunkView . Offset , chunkView . LogicOffset , chunkView . LogicOffset + int64 ( chunkView . Size ) , chunkView . ChunkSize )
chunkData = c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
chunkData = c . chunkCache . GetChunk ( chunkView . FileId , chunkView . ChunkSize )
if chunkData != nil {
if chunkData != nil {