|
@ -10,7 +10,7 @@ import ( |
|
|
"time" |
|
|
"time" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type MemoryChunkPages struct { |
|
|
|
|
|
|
|
|
type ChunkedDirtyPages struct { |
|
|
fh *FileHandle |
|
|
fh *FileHandle |
|
|
writeWaitGroup sync.WaitGroup |
|
|
writeWaitGroup sync.WaitGroup |
|
|
chunkAddLock sync.Mutex |
|
|
chunkAddLock sync.Mutex |
|
@ -22,12 +22,12 @@ type MemoryChunkPages struct { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
var ( |
|
|
var ( |
|
|
_ = page_writer.DirtyPages(&MemoryChunkPages{}) |
|
|
|
|
|
|
|
|
_ = page_writer.DirtyPages(&ChunkedDirtyPages{}) |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { |
|
|
|
|
|
|
|
|
func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages { |
|
|
|
|
|
|
|
|
dirtyPages := &MemoryChunkPages{ |
|
|
|
|
|
|
|
|
dirtyPages := &ChunkedDirtyPages{ |
|
|
fh: fh, |
|
|
fh: fh, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -39,7 +39,7 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { |
|
|
return dirtyPages |
|
|
return dirtyPages |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) { |
|
|
|
|
|
|
|
|
func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte) { |
|
|
pages.hasWrites = true |
|
|
pages.hasWrites = true |
|
|
|
|
|
|
|
|
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.f.fullpath(), offset, offset+int64(len(data))) |
|
|
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.f.fullpath(), offset, offset+int64(len(data))) |
|
@ -48,7 +48,7 @@ func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (pages *MemoryChunkPages) FlushData() error { |
|
|
|
|
|
|
|
|
func (pages *ChunkedDirtyPages) FlushData() error { |
|
|
if !pages.hasWrites { |
|
|
if !pages.hasWrites { |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
@ -59,18 +59,18 @@ func (pages *MemoryChunkPages) FlushData() error { |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (pages *MemoryChunkPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { |
|
|
|
|
|
|
|
|
func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { |
|
|
if !pages.hasWrites { |
|
|
if !pages.hasWrites { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) |
|
|
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (pages *MemoryChunkPages) GetStorageOptions() (collection, replication string) { |
|
|
|
|
|
|
|
|
func (pages *ChunkedDirtyPages) GetStorageOptions() (collection, replication string) { |
|
|
return pages.collection, pages.replication |
|
|
return pages.collection, pages.replication |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { |
|
|
|
|
|
|
|
|
func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { |
|
|
|
|
|
|
|
|
mtime := time.Now().UnixNano() |
|
|
mtime := time.Now().UnixNano() |
|
|
defer cleanupFn() |
|
|
defer cleanupFn() |
|
@ -91,13 +91,13 @@ func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (pages MemoryChunkPages) Destroy() { |
|
|
|
|
|
|
|
|
func (pages ChunkedDirtyPages) Destroy() { |
|
|
pages.uploadPipeline.Shutdown() |
|
|
pages.uploadPipeline.Shutdown() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (pages *MemoryChunkPages) LockForRead(startOffset, stopOffset int64) { |
|
|
|
|
|
|
|
|
func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) { |
|
|
pages.uploadPipeline.LockForRead(startOffset, stopOffset) |
|
|
pages.uploadPipeline.LockForRead(startOffset, stopOffset) |
|
|
} |
|
|
} |
|
|
func (pages *MemoryChunkPages) UnlockForRead(startOffset, stopOffset int64) { |
|
|
|
|
|
|
|
|
func (pages *ChunkedDirtyPages) UnlockForRead(startOffset, stopOffset int64) { |
|
|
pages.uploadPipeline.UnlockForRead(startOffset, stopOffset) |
|
|
pages.uploadPipeline.UnlockForRead(startOffset, stopOffset) |
|
|
} |
|
|
} |