From 7f1d49a123404802454d581d962bb1b592c1f5d1 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 23 Dec 2022 10:41:07 -0800 Subject: [PATCH] move oldest chunk to sealed, instead of by fullness --- weed/mount/page_writer/page_chunk.go | 2 +- weed/mount/page_writer/page_chunk_mem.go | 7 ++----- weed/mount/page_writer/page_chunk_swapfile.go | 6 ++---- weed/mount/page_writer/upload_pipeline.go | 14 +++++++------- 4 files changed, 12 insertions(+), 17 deletions(-) diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go index 14087f8d0..a221de459 100644 --- a/weed/mount/page_writer/page_chunk.go +++ b/weed/mount/page_writer/page_chunk.go @@ -11,6 +11,6 @@ type PageChunk interface { WriteDataAt(src []byte, offset int64, tsNs int64) (n int) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) IsComplete() bool - WrittenSize() int64 + LastModifiedTsNs() int64 SaveContent(saveFn SaveToStorageFunc) } diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index a2a7bfb15..ae0cfbcd8 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -82,11 +82,8 @@ func (mc *MemChunk) IsComplete() bool { return mc.usage.IsComplete(mc.chunkSize) } -func (mc *MemChunk) WrittenSize() int64 { - mc.RLock() - defer mc.RUnlock() - - return mc.usage.WrittenSize() +func (mc *MemChunk) LastModifiedTsNs() int64 { + return mc.lastModifiedTsNs } func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index b11a44871..0a96fc95c 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -138,10 +138,8 @@ func (sc *SwapFileChunk) IsComplete() bool { return sc.usage.IsComplete(sc.swapfile.chunkSize) } -func (sc *SwapFileChunk) WrittenSize() int64 { - sc.RLock() - defer sc.RUnlock() - return sc.usage.WrittenSize() +func (sc *SwapFileChunk) LastModifiedTsNs() int64 { + return sc.lastModifiedTsNs } func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 61273a48c..95fc6ca0f 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -66,16 +66,16 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN if !found { if len(up.writableChunks) > up.writableChunkLimit { // if current file chunks is over the per file buffer count limit - fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) + oldestChunkIndex, oldestTs := LogicChunkIndex(-1), int64(0) for lci, mc := range up.writableChunks { - chunkFullness := mc.WrittenSize() - if fullness < chunkFullness { - fullestChunkIndex = lci - fullness = chunkFullness + chunkModifiedTsNs := mc.LastModifiedTsNs() + if oldestTs < chunkModifiedTsNs { + oldestChunkIndex = lci + oldestTs = chunkModifiedTsNs } } - up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) - // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) + up.moveToSealed(up.writableChunks[oldestChunkIndex], oldestChunkIndex) + // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs) } if false && isSequential && len(up.writableChunks) < up.writableChunkLimit &&