diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 197b87ab8..2ea8ce493 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math" + "sort" "strings" "time" @@ -88,25 +89,23 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { - chunkViews []*ChunkView - totalSize int64 - logicOffset int64 - buffer []byte - bufferOffset int64 - bufferPos int - chunkIndex int - lookupFileId wdclient.LookupFileIdFunctionType + chunkViews []*ChunkView + totalSize int64 + buffer []byte + bufferOffset int64 + bufferPos int + nextChunkViewIndex int + lookupFileId wdclient.LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) -func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - - lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { - return masterClient.LookupFileId(fileId) - } +func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + sort.Slice(chunkViews, func(i, j int) bool { + return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset + }) var totalSize int64 for _, chunk := range chunkViews { @@ -120,33 +119,31 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [ } } -func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { +func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - lookupFileIdFn := LookupFn(filerClient) + lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { + return masterClient.LookupFileId(fileId) + } - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + return doNewChunkStreamReader(lookupFileIdFn, chunks) +} - var totalSize int64 - for _, chunk := range chunkViews { - totalSize += int64(chunk.Size) - } +func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, - totalSize: totalSize, - } + lookupFileIdFn := LookupFn(filerClient) + + return doNewChunkStreamReader(lookupFileIdFn, chunks) } func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { for n < len(p) { if c.isBufferEmpty() { - if c.chunkIndex >= len(c.chunkViews) { + if c.nextChunkViewIndex >= len(c.chunkViews) { return n, io.EOF } - chunkView := c.chunkViews[c.chunkIndex] + chunkView := c.chunkViews[c.nextChunkViewIndex] c.fetchChunkToBuffer(chunkView) - c.chunkIndex++ + c.nextChunkViewIndex++ } t := copy(p[n:], c.buffer[c.bufferPos:]) c.bufferPos += t @@ -173,16 +170,33 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { err = io.ErrUnexpectedEOF } - for i, chunk := range c.chunkViews { - if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { - if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { - c.fetchChunkToBuffer(chunk) - c.chunkIndex = i + 1 - break - } + // stay in the same chunk + if !c.isBufferEmpty() { + if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { + c.bufferPos = int(offset - c.bufferOffset) + return offset, nil + } + } + + // need to seek to a different chunk + currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool { + return c.chunkViews[i].LogicOffset <= offset + }) + if currentChunkIndex == len(c.chunkViews) { + return 0, io.EOF + } + + // positioning within the new chunk + chunk := c.chunkViews[currentChunkIndex] + if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { + c.fetchChunkToBuffer(chunk) + c.nextChunkViewIndex = currentChunkIndex + 1 } + c.bufferPos = int(offset - c.bufferOffset) + } else { + return 0, io.ErrUnexpectedEOF } - c.bufferPos = int(offset - c.bufferOffset) return offset, err