|
|
@ -187,6 +187,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer |
|
|
|
|
|
|
|
// ---------------- ChunkStreamReader ----------------------------------
|
|
|
|
type ChunkStreamReader struct { |
|
|
|
head *Interval[*ChunkView] |
|
|
|
chunkView *Interval[*ChunkView] |
|
|
|
totalSize int64 |
|
|
|
logicOffset int64 |
|
|
@ -211,6 +212,7 @@ func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, ch |
|
|
|
} |
|
|
|
|
|
|
|
return &ChunkStreamReader{ |
|
|
|
head: chunkViews.Front(), |
|
|
|
chunkView: chunkViews.Front(), |
|
|
|
lookupFileId: lookupFileIdFn, |
|
|
|
totalSize: totalSize, |
|
|
@ -309,15 +311,20 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { |
|
|
|
chunk := c.chunkView.Value |
|
|
|
if insideChunk(offset, chunk) { |
|
|
|
if c.isBufferEmpty() || c.bufferOffset != chunk.ViewOffset { |
|
|
|
if err = c.fetchChunkToBuffer(chunk); err != nil { |
|
|
|
return |
|
|
|
} |
|
|
|
return c.fetchChunkToBuffer(chunk) |
|
|
|
} |
|
|
|
} else { |
|
|
|
// glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
|
|
|
|
return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize)) |
|
|
|
for p := c.head; p != nil; p = p.Next { |
|
|
|
chunk = p.Value |
|
|
|
if insideChunk(offset, chunk) { |
|
|
|
if c.isBufferEmpty() || c.bufferOffset != chunk.ViewOffset { |
|
|
|
return c.fetchChunkToBuffer(chunk) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
|
|
|
|
return io.EOF |
|
|
|
} |
|
|
|
|
|
|
|
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { |
|
|
|