diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index bf985f8bd..6c5c84905 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -2,7 +2,6 @@ package filer2 import ( "bytes" - "fmt" "io" "math" @@ -48,9 +47,11 @@ type ChunkStreamReader struct { masterClient *wdclient.MasterClient chunkViews []*ChunkView logicOffset int64 - buffer bytes.Buffer + buffer []byte bufferOffset int64 + bufferPos int chunkIndex int + totalSize int64 } var _ = io.ReadSeeker(&ChunkStreamReader{}) @@ -58,16 +59,21 @@ var _ = io.ReadSeeker(&ChunkStreamReader{}) func NewChunkStreamReader(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + var totalSize uint64 + for _, chunk := range chunkViews { + totalSize += chunk.Size + } return &ChunkStreamReader{ masterClient: masterClient, chunkViews: chunkViews, bufferOffset: -1, + totalSize: int64(totalSize), } } func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { - if c.buffer.Len() == 0 { + if c.isBufferEmpty() { if c.chunkIndex >= len(c.chunkViews) { return 0, io.EOF } @@ -75,11 +81,42 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { c.fetchChunkToBuffer(chunkView) c.chunkIndex++ } - return c.buffer.Read(p) + n = copy(p, c.buffer[c.bufferPos:]) + c.bufferPos += n + return +} + +func (c *ChunkStreamReader) isBufferEmpty() bool { + return len(c.buffer) <= c.bufferPos } func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { - return 0, fmt.Errorf("ChunkStreamReader: seek not supported") + + var err error + switch whence { + case io.SeekStart: + case io.SeekCurrent: + offset += c.bufferOffset + int64(c.bufferPos) + case io.SeekEnd: + offset = c.totalSize + offset + } + if offset > c.totalSize { + err = io.ErrUnexpectedEOF + } + + for i, chunk := range c.chunkViews { + if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + if c.isBufferEmpty() || c.bufferOffset != offset { + c.fetchChunkToBuffer(chunk) + c.chunkIndex = i + 1 + break + } + } + } + c.bufferPos = int(offset - c.bufferOffset) + + return offset, err + } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { @@ -88,13 +125,17 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } - c.buffer.Reset() + var buffer bytes.Buffer err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) { - c.buffer.Write(data) + buffer.Write(data) }) if err != nil { glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) return err } + c.buffer = buffer.Bytes() + c.bufferPos = 0 + c.bufferOffset = chunkView.LogicOffset + return nil } diff --git a/weed/images/resizing.go b/weed/images/resizing.go index ff0eff5e1..b048daa1c 100644 --- a/weed/images/resizing.go +++ b/weed/images/resizing.go @@ -6,10 +6,11 @@ import ( "image/gif" "image/jpeg" "image/png" + "io" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/disintegration/imaging" - "io" + + "github.com/chrislusf/seaweedfs/weed/glog" ) func Resized(ext string, read io.ReadSeeker, width, height int, mode string) (resized io.ReadSeeker, w int, h int) { @@ -35,6 +36,7 @@ func Resized(ext string, read io.ReadSeeker, width, height int, mode string) (re } } } else { + read.Seek(0, 0) return read, bounds.Dx(), bounds.Dy() } var buf bytes.Buffer diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 1ab3c59ed..baa0038c4 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -142,11 +142,11 @@ func NewChunkedFileReader(chunkList []*ChunkInfo, master string) *ChunkedFileRea func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { var err error switch whence { - case 0: - case 1: + case io.SeekStart: + case io.SeekCurrent: offset += cf.pos - case 2: - offset = cf.totalSize - offset + case io.SeekEnd: + offset = cf.totalSize + offset } if offset > cf.totalSize { err = ErrInvalidRange