diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go index 32d246deb..ac1d24622 100644 --- a/weed/mount/page_writer/page_chunk.go +++ b/weed/mount/page_writer/page_chunk.go @@ -12,5 +12,6 @@ type PageChunk interface { ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) IsComplete() bool ActivityScore() int64 + WrittenSize() int64 SaveContent(saveFn SaveToStorageFunc) } diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index 1ec8cecb4..cbd82c953 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -86,6 +86,13 @@ func (mc *MemChunk) ActivityScore() int64 { return mc.activityScore.ActivityScore() } +func (mc *MemChunk) WrittenSize() int64 { + mc.RLock() + defer mc.RUnlock() + + return mc.usage.WrittenSize() +} + func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { mc.RLock() defer mc.RUnlock() diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index 6cedc64df..10060bef9 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -161,6 +161,12 @@ func (sc *SwapFileChunk) ActivityScore() int64 { return sc.activityScore.ActivityScore() } +func (sc *SwapFileChunk) WrittenSize() int64 { + sc.RLock() + defer sc.RUnlock() + return sc.usage.WrittenSize() +} + func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { sc.RLock() defer sc.RUnlock() diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 6065f2f76..e1aa43fe2 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" - "math" "sync" "sync/atomic" ) @@ -67,15 +66,32 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN if !found { if len(up.writableChunks) > up.writableChunkLimit { // if current file chunks is over the per file buffer count limit - laziestChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64) + candidateChunkIndex, fullness := LogicChunkIndex(-1), int64(0) + for lci, mc := range up.writableChunks { + chunkFullness := mc.WrittenSize() + if fullness < chunkFullness { + candidateChunkIndex = lci + fullness = chunkFullness + } + } + /* // this algo generates too many chunks + candidateChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64) for wci, wc := range up.writableChunks { activityScore := wc.ActivityScore() - if lowestActivityScore > activityScore { - laziestChunkIndex = wci + if lowestActivityScore >= activityScore { + if lowestActivityScore == activityScore { + chunkFullness := wc.WrittenSize() + if fullness < chunkFullness { + candidateChunkIndex = lci + fullness = chunkFullness + } + } + candidateChunkIndex = wci lowestActivityScore = activityScore } } - up.moveToSealed(up.writableChunks[laziestChunkIndex], laziestChunkIndex) + */ + up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs) } if isSequential &&