|
@ -7,6 +7,7 @@ import ( |
|
|
"math" |
|
|
"math" |
|
|
"sort" |
|
|
"sort" |
|
|
"strings" |
|
|
"strings" |
|
|
|
|
|
"sync" |
|
|
"time" |
|
|
"time" |
|
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
@ -131,8 +132,8 @@ type ChunkStreamReader struct { |
|
|
logicOffset int64 |
|
|
logicOffset int64 |
|
|
buffer []byte |
|
|
buffer []byte |
|
|
bufferOffset int64 |
|
|
bufferOffset int64 |
|
|
bufferPos int |
|
|
|
|
|
nextChunkViewIndex int |
|
|
|
|
|
|
|
|
bufferLock sync.Mutex |
|
|
|
|
|
chunk string |
|
|
lookupFileId wdclient.LookupFileIdFunctionType |
|
|
lookupFileId wdclient.LookupFileIdFunctionType |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -175,27 +176,29 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { |
|
|
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { |
|
|
|
|
|
c.bufferLock.Lock() |
|
|
|
|
|
defer c.bufferLock.Unlock() |
|
|
if err = c.prepareBufferFor(off); err != nil { |
|
|
if err = c.prepareBufferFor(off); err != nil { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
c.logicOffset = off |
|
|
c.logicOffset = off |
|
|
return c.Read(p) |
|
|
|
|
|
|
|
|
return c.doRead(p) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { |
|
|
func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { |
|
|
|
|
|
c.bufferLock.Lock() |
|
|
|
|
|
defer c.bufferLock.Unlock() |
|
|
|
|
|
return c.doRead(p) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) { |
|
|
|
|
|
// fmt.Printf("do read [%d,%d) at %s[%d,%d)\n", c.logicOffset, c.logicOffset+int64(len(p)), c.chunk, c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
|
|
|
for n < len(p) { |
|
|
for n < len(p) { |
|
|
if c.isBufferEmpty() { |
|
|
|
|
|
if c.nextChunkViewIndex >= len(c.chunkViews) { |
|
|
|
|
|
return n, io.EOF |
|
|
|
|
|
} |
|
|
|
|
|
chunkView := c.chunkViews[c.nextChunkViewIndex] |
|
|
|
|
|
if err = c.fetchChunkToBuffer(chunkView); err != nil { |
|
|
|
|
|
return |
|
|
|
|
|
} |
|
|
|
|
|
c.nextChunkViewIndex++ |
|
|
|
|
|
|
|
|
// println("read", c.logicOffset)
|
|
|
|
|
|
if err = c.prepareBufferFor(c.logicOffset); err != nil { |
|
|
|
|
|
return |
|
|
} |
|
|
} |
|
|
t := copy(p[n:], c.buffer[c.bufferPos:]) |
|
|
|
|
|
c.bufferPos += t |
|
|
|
|
|
|
|
|
t := copy(p[n:], c.buffer[c.logicOffset-c.bufferOffset:]) |
|
|
n += t |
|
|
n += t |
|
|
c.logicOffset += int64(t) |
|
|
c.logicOffset += int64(t) |
|
|
} |
|
|
} |
|
@ -203,10 +206,12 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *ChunkStreamReader) isBufferEmpty() bool { |
|
|
func (c *ChunkStreamReader) isBufferEmpty() bool { |
|
|
return len(c.buffer) <= c.bufferPos |
|
|
|
|
|
|
|
|
return len(c.buffer) <= int(c.logicOffset - c.bufferOffset) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { |
|
|
func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { |
|
|
|
|
|
c.bufferLock.Lock() |
|
|
|
|
|
defer c.bufferLock.Unlock() |
|
|
|
|
|
|
|
|
var err error |
|
|
var err error |
|
|
switch whence { |
|
|
switch whence { |
|
@ -226,48 +231,59 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func insideChunk(offset int64, chunk *ChunkView) bool { |
|
|
|
|
|
return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { |
|
|
func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { |
|
|
// stay in the same chunk
|
|
|
// stay in the same chunk
|
|
|
if !c.isBufferEmpty() { |
|
|
|
|
|
if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { |
|
|
|
|
|
c.bufferPos = int(offset - c.bufferOffset) |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { |
|
|
|
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// fmt.Printf("fetch for offset %d\n", offset)
|
|
|
|
|
|
|
|
|
// need to seek to a different chunk
|
|
|
// need to seek to a different chunk
|
|
|
currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool { |
|
|
currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool { |
|
|
return offset < c.chunkViews[i].LogicOffset |
|
|
return offset < c.chunkViews[i].LogicOffset |
|
|
}) |
|
|
}) |
|
|
if currentChunkIndex == len(c.chunkViews) { |
|
|
if currentChunkIndex == len(c.chunkViews) { |
|
|
// not found
|
|
|
// not found
|
|
|
if c.chunkViews[0].LogicOffset <= offset { |
|
|
|
|
|
|
|
|
if insideChunk(offset, c.chunkViews[0]) { |
|
|
|
|
|
// fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
|
|
|
currentChunkIndex = 0 |
|
|
currentChunkIndex = 0 |
|
|
} else if c.chunkViews[len(c.chunkViews)-1].LogicOffset <= offset { |
|
|
|
|
|
currentChunkIndex = len(c.chunkViews) -1 |
|
|
|
|
|
|
|
|
} else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) { |
|
|
|
|
|
currentChunkIndex = len(c.chunkViews) - 1 |
|
|
|
|
|
// fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
|
|
|
} else { |
|
|
} else { |
|
|
return io.EOF |
|
|
return io.EOF |
|
|
} |
|
|
} |
|
|
} else if currentChunkIndex > 0 { |
|
|
} else if currentChunkIndex > 0 { |
|
|
if c.chunkViews[currentChunkIndex-1].LogicOffset <= offset { |
|
|
|
|
|
|
|
|
if insideChunk(offset, c.chunkViews[currentChunkIndex]) { |
|
|
|
|
|
// good hit
|
|
|
|
|
|
} else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]){ |
|
|
currentChunkIndex -= 1 |
|
|
currentChunkIndex -= 1 |
|
|
|
|
|
// fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
|
|
|
} else { |
|
|
} else { |
|
|
|
|
|
// glog.Fatalf("unexpected1 offset %d", offset)
|
|
|
return fmt.Errorf("unexpected1 offset %d", offset) |
|
|
return fmt.Errorf("unexpected1 offset %d", offset) |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
|
|
|
// glog.Fatalf("unexpected2 offset %d", offset)
|
|
|
return fmt.Errorf("unexpected2 offset %d", offset) |
|
|
return fmt.Errorf("unexpected2 offset %d", offset) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// positioning within the new chunk
|
|
|
// positioning within the new chunk
|
|
|
chunk := c.chunkViews[currentChunkIndex] |
|
|
chunk := c.chunkViews[currentChunkIndex] |
|
|
if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { |
|
|
|
|
|
|
|
|
if insideChunk(offset, chunk) { |
|
|
if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { |
|
|
if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { |
|
|
if err = c.fetchChunkToBuffer(chunk); err != nil { |
|
|
if err = c.fetchChunkToBuffer(chunk); err != nil { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
c.nextChunkViewIndex = currentChunkIndex + 1 |
|
|
|
|
|
} |
|
|
} |
|
|
c.bufferPos = int(offset - c.bufferOffset) |
|
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
// glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
|
|
|
|
|
|
return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) |
|
|
} |
|
|
} |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
@ -298,10 +314,10 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { |
|
|
return err |
|
|
return err |
|
|
} |
|
|
} |
|
|
c.buffer = buffer.Bytes() |
|
|
c.buffer = buffer.Bytes() |
|
|
c.bufferPos = 0 |
|
|
|
|
|
c.bufferOffset = chunkView.LogicOffset |
|
|
c.bufferOffset = chunkView.LogicOffset |
|
|
|
|
|
c.chunk = chunkView.FileId |
|
|
|
|
|
|
|
|
// glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
|
|
|
|
|
|
|
|
|
// glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
|
|
|
|
|
|
|
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|