From f2f68f675efc4e394b5f610e977d47d692819968 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 13 Mar 2022 18:17:35 -0700 Subject: [PATCH] write to disk during random writes, limiting total disk spaces used --- weed/mount/dirty_pages_chunked.go | 4 +- weed/mount/page_writer.go | 24 +++++----- weed/mount/page_writer/dirty_pages.go | 2 +- weed/mount/page_writer/page_chunk_swapfile.go | 10 ++++- weed/mount/page_writer/upload_pipeline.go | 17 +++---- .../mount/page_writer/upload_pipeline_test.go | 2 +- weed/mount/page_writer_pattern.go | 44 +++++++++++++++++++ weed/mount/weedfs_file_write.go | 4 +- 8 files changed, 82 insertions(+), 25 deletions(-) create mode 100644 weed/mount/page_writer_pattern.go diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index a76c6dd61..e0d764070 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -38,11 +38,11 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages { return dirtyPages } -func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte) { +func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) { pages.hasWrites = true glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data))) - pages.uploadPipeline.SaveDataAt(data, offset) + pages.uploadPipeline.SaveDataAt(data, offset, isSequential) return } diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go index ffcaa6398..016c4841a 100644 --- a/weed/mount/page_writer.go +++ b/weed/mount/page_writer.go @@ -6,10 +6,11 @@ import ( ) type PageWriter struct { - fh *FileHandle - collection string - replication string - chunkSize int64 + fh *FileHandle + collection string + replication string + chunkSize int64 + writerPattern *WriterPattern randomWriter page_writer.DirtyPages } @@ -20,28 +21,29 @@ var ( func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { pw := &PageWriter{ - fh: fh, - chunkSize: chunkSize, - randomWriter: newMemoryChunkPages(fh, chunkSize), + fh: fh, + chunkSize: chunkSize, + writerPattern: NewWriterPattern(chunkSize), + randomWriter: newMemoryChunkPages(fh, chunkSize), } return pw } -func (pw *PageWriter) AddPage(offset int64, data []byte) { +func (pw *PageWriter) AddPage(offset int64, data []byte, isSequentail bool) { glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - pw.addToOneChunk(i, offset, data[:writeSize]) + pw.addToOneChunk(i, offset, data[:writeSize], isSequentail) offset += writeSize data = data[writeSize:] } } -func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { - pw.randomWriter.AddPage(offset, data) +func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool) { + pw.randomWriter.AddPage(offset, data, isSequential) } func (pw *PageWriter) FlushData() error { diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go index 25b747fad..c16cee47a 100644 --- a/weed/mount/page_writer/dirty_pages.go +++ b/weed/mount/page_writer/dirty_pages.go @@ -1,7 +1,7 @@ package page_writer type DirtyPages interface { - AddPage(offset int64, data []byte) + AddPage(offset int64, data []byte, isSequential bool) FlushData() error ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) GetStorageOptions() (collection, replication string) diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index 486557629..c70f8c5a1 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -18,6 +18,7 @@ type SwapFile struct { file *os.File logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex chunkSize int64 + freeActualChunkList []ActualChunkIndex } type SwapFileChunk struct { @@ -53,7 +54,12 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF } actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex] if !found { - actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex)) + if len(sf.freeActualChunkList) > 0 { + actualChunkIndex = sf.freeActualChunkList[0] + sf.freeActualChunkList = sf.freeActualChunkList[1:] + } else { + actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex)) + } sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex } @@ -66,6 +72,8 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF } func (sc *SwapFileChunk) FreeResource() { + sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex) + delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex) } func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 190076a2b..2286cdf00 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -55,7 +55,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, } } -func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { +func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) { up.writableChunksLock.Lock() defer up.writableChunksLock.Unlock() @@ -63,13 +63,8 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { pageChunk, found := up.writableChunks[logicChunkIndex] if !found { - if atomic.LoadInt64(&memChunkCounter) > 4*int64(up.bufferChunkLimit) { - // if total number of chunks is over 4 times of per file buffer count limit - pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) - } else if len(up.writableChunks) < up.bufferChunkLimit { - // if current file chunks is still under the per file buffer count limit - pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) - } else { + if len(up.writableChunks) > up.bufferChunkLimit { + // if current file chunks is over the per file buffer count limit fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) for lci, mc := range up.writableChunks { chunkFullness := mc.WrittenSize() @@ -81,7 +76,13 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) delete(up.writableChunks, fullestChunkIndex) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) + } + if isSequential && + len(up.writableChunks) < up.bufferChunkLimit && + atomic.LoadInt64(&memChunkCounter) < 4*int64(up.bufferChunkLimit) { pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + } else { + pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) } up.writableChunks[logicChunkIndex] = pageChunk } diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go index 63b60faaf..f130c97c1 100644 --- a/weed/mount/page_writer/upload_pipeline_test.go +++ b/weed/mount/page_writer/upload_pipeline_test.go @@ -31,7 +31,7 @@ func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) { p := make([]byte, 4) for i := startOff / 4; i < stopOff/4; i += 4 { util.Uint32toBytes(p, uint32(i)) - uploadPipeline.SaveDataAt(p, i) + uploadPipeline.SaveDataAt(p, i, false) } } diff --git a/weed/mount/page_writer_pattern.go b/weed/mount/page_writer_pattern.go new file mode 100644 index 000000000..665056b36 --- /dev/null +++ b/weed/mount/page_writer_pattern.go @@ -0,0 +1,44 @@ +package mount + +type WriterPattern struct { + isStreaming bool + lastWriteOffset int64 + chunkSize int64 +} + +// For streaming write: only cache the first chunk +// For random write: fall back to temp file approach +// writes can only change from streaming mode to non-streaming mode + +func NewWriterPattern(chunkSize int64) *WriterPattern { + return &WriterPattern{ + isStreaming: true, + lastWriteOffset: -1, + chunkSize: chunkSize, + } +} + +func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) { + if rp.lastWriteOffset > offset { + rp.isStreaming = false + } + if rp.lastWriteOffset == -1 { + if offset != 0 { + rp.isStreaming = false + } + } + rp.lastWriteOffset = offset +} + +func (rp *WriterPattern) IsStreamingMode() bool { + return rp.isStreaming +} + +func (rp *WriterPattern) IsRandomMode() bool { + return !rp.isStreaming +} + +func (rp *WriterPattern) Reset() { + rp.isStreaming = true + rp.lastWriteOffset = -1 +} diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index f71e27335..d14680752 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -43,6 +43,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr return 0, fuse.ENOENT } + fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size)) + fh.Lock() defer fh.Unlock() @@ -56,7 +58,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize))) // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) - fh.dirtyPages.AddPage(offset, data) + fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsStreamingMode()) written = uint32(len(data))