Browse Source

add back previous chunk upload selection algo

pull/4109/head
chrislu 2 years ago
parent
commit
5423790b2c
  1. 1
      weed/mount/page_writer/page_chunk.go
  2. 7
      weed/mount/page_writer/page_chunk_mem.go
  3. 6
      weed/mount/page_writer/page_chunk_swapfile.go
  4. 26
      weed/mount/page_writer/upload_pipeline.go

1
weed/mount/page_writer/page_chunk.go

@ -12,5 +12,6 @@ type PageChunk interface {
ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
IsComplete() bool IsComplete() bool
ActivityScore() int64 ActivityScore() int64
WrittenSize() int64
SaveContent(saveFn SaveToStorageFunc) SaveContent(saveFn SaveToStorageFunc)
} }

7
weed/mount/page_writer/page_chunk_mem.go

@ -86,6 +86,13 @@ func (mc *MemChunk) ActivityScore() int64 {
return mc.activityScore.ActivityScore() return mc.activityScore.ActivityScore()
} }
func (mc *MemChunk) WrittenSize() int64 {
mc.RLock()
defer mc.RUnlock()
return mc.usage.WrittenSize()
}
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
mc.RLock() mc.RLock()
defer mc.RUnlock() defer mc.RUnlock()

6
weed/mount/page_writer/page_chunk_swapfile.go

@ -161,6 +161,12 @@ func (sc *SwapFileChunk) ActivityScore() int64 {
return sc.activityScore.ActivityScore() return sc.activityScore.ActivityScore()
} }
func (sc *SwapFileChunk) WrittenSize() int64 {
sc.RLock()
defer sc.RUnlock()
return sc.usage.WrittenSize()
}
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
sc.RLock() sc.RLock()
defer sc.RUnlock() defer sc.RUnlock()

26
weed/mount/page_writer/upload_pipeline.go

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"math"
"sync" "sync"
"sync/atomic" "sync/atomic"
) )
@ -67,15 +66,32 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN
if !found { if !found {
if len(up.writableChunks) > up.writableChunkLimit { if len(up.writableChunks) > up.writableChunkLimit {
// if current file chunks is over the per file buffer count limit // if current file chunks is over the per file buffer count limit
laziestChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64)
candidateChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks {
chunkFullness := mc.WrittenSize()
if fullness < chunkFullness {
candidateChunkIndex = lci
fullness = chunkFullness
}
}
/* // this algo generates too many chunks
candidateChunkIndex, lowestActivityScore := LogicChunkIndex(-1), int64(math.MaxInt64)
for wci, wc := range up.writableChunks { for wci, wc := range up.writableChunks {
activityScore := wc.ActivityScore() activityScore := wc.ActivityScore()
if lowestActivityScore > activityScore {
laziestChunkIndex = wci
if lowestActivityScore >= activityScore {
if lowestActivityScore == activityScore {
chunkFullness := wc.WrittenSize()
if fullness < chunkFullness {
candidateChunkIndex = lci
fullness = chunkFullness
}
}
candidateChunkIndex = wci
lowestActivityScore = activityScore lowestActivityScore = activityScore
} }
} }
up.moveToSealed(up.writableChunks[laziestChunkIndex], laziestChunkIndex)
*/
up.moveToSealed(up.writableChunks[candidateChunkIndex], candidateChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs)
} }
if isSequential && if isSequential &&

Loading…
Cancel
Save