From 4ceeba9e707330430c3e7638f6eca8222032dc4f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 16 Aug 2020 21:07:46 -0700 Subject: [PATCH] streaming reads --- weed/filer2/reader_at.go | 61 +++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 35 deletions(-) diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index b0702e03e..fc74ef2af 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -80,48 +80,39 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { - var found bool - var chunkStart, chunkStop int64 - var chunkView *ChunkView + var startOffset = offset for _, chunk := range c.chunkViews { + if startOffset < min(chunk.LogicOffset, int64(len(p))+offset) { + gap := int(min(chunk.LogicOffset, int64(len(p))+offset) - startOffset) + glog.V(4).Infof("zero [%d,%d)", n, n+gap) + n += gap + startOffset = chunk.LogicOffset + } // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) - chunkStart, chunkStop = max(chunk.LogicOffset, offset), min(chunk.LogicOffset+int64(chunk.Size), offset+int64(len(p))) - chunkView = chunk - if chunkStart < chunkStop { - // fmt.Printf(">>> found [%d,%d), chunk %s [%d,%d)\n", chunkStart, chunkStop, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) - found = true - c.buffer, err = c.fetchWholeChunkData(chunk) - if err != nil { - glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - } - c.bufferFileId = chunk.FileId - break + chunkStart, chunkStop := max(chunk.LogicOffset, offset), min(chunk.LogicOffset+int64(chunk.Size), offset+int64(len(p))) + if chunkStart >= chunkStop { + continue + } + glog.V(4).Infof("read [%d,%d), chunk %s [%d,%d)\n", chunkStart, chunkStop, chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) + c.buffer, err = c.fetchWholeChunkData(chunk) + if err != nil { + glog.Errorf("fetching chunk %+v: %v\n", chunk, err) + return } + c.bufferFileId = chunk.FileId + bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset + copied := copy(p[chunkStart-offset:chunkStop-offset], c.buffer[bufferOffset:bufferOffset+chunkStop-chunkStart]) + n += copied + startOffset += int64(copied) } // fmt.Printf("> doReadAt [%d,%d), buffer %s:%d, found:%v, err:%v\n", offset, offset+int64(len(p)), c.bufferFileId, int64(len(c.buffer)), found, err) - if err != nil { - return + if startOffset < min(c.fileSize, int64(len(p))+offset) { + gap := int(min(c.fileSize, int64(len(p))+offset) - startOffset) + glog.V(4).Infof("zero2 [%d,%d)", n, n+gap) + n += gap } - - if found { - bufferOffset := chunkStart - chunkView.LogicOffset + chunkView.Offset - /* - skipped, copied := chunkStart-offset, chunkStop-chunkStart - fmt.Printf("+++ copy %d+%d=%d fill:[%d, %d) p[%d,%d) <- buffer:[%d,%d) buffer %s:%d\nchunkView:%+v\n\n", - skipped, copied, skipped+copied, - chunkStart, chunkStop, - chunkStart-offset, chunkStop-offset, bufferOffset, bufferOffset+chunkStop-chunkStart, - c.bufferFileId, len(c.buffer), - chunkView) - */ - n = int(chunkStart-offset) + - copy(p[chunkStart-offset:chunkStop-offset], c.buffer[bufferOffset:bufferOffset+chunkStop-chunkStart]) - return - } - - n = len(p) if offset+int64(n) >= c.fileSize { err = io.EOF } @@ -137,7 +128,7 @@ func (c *ChunkReadAt) fetchWholeChunkData(chunkView *ChunkView) (chunkData []byt chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) if chunkData != nil { - glog.V(5).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset + int64(len(chunkData))) + glog.V(5).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) } else { chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) if err != nil {