Browse Source

move oldest chunk to sealed, instead of by fullness

pull/4089/head
chrislu 2 years ago
parent
commit
7f1d49a123
  1. 2
      weed/mount/page_writer/page_chunk.go
  2. 7
      weed/mount/page_writer/page_chunk_mem.go
  3. 6
      weed/mount/page_writer/page_chunk_swapfile.go
  4. 14
      weed/mount/page_writer/upload_pipeline.go

2
weed/mount/page_writer/page_chunk.go

@ -11,6 +11,6 @@ type PageChunk interface {
WriteDataAt(src []byte, offset int64, tsNs int64) (n int) WriteDataAt(src []byte, offset int64, tsNs int64) (n int)
ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
IsComplete() bool IsComplete() bool
WrittenSize() int64
LastModifiedTsNs() int64
SaveContent(saveFn SaveToStorageFunc) SaveContent(saveFn SaveToStorageFunc)
} }

7
weed/mount/page_writer/page_chunk_mem.go

@ -82,11 +82,8 @@ func (mc *MemChunk) IsComplete() bool {
return mc.usage.IsComplete(mc.chunkSize) return mc.usage.IsComplete(mc.chunkSize)
} }
func (mc *MemChunk) WrittenSize() int64 {
mc.RLock()
defer mc.RUnlock()
return mc.usage.WrittenSize()
func (mc *MemChunk) LastModifiedTsNs() int64 {
return mc.lastModifiedTsNs
} }
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {

6
weed/mount/page_writer/page_chunk_swapfile.go

@ -138,10 +138,8 @@ func (sc *SwapFileChunk) IsComplete() bool {
return sc.usage.IsComplete(sc.swapfile.chunkSize) return sc.usage.IsComplete(sc.swapfile.chunkSize)
} }
func (sc *SwapFileChunk) WrittenSize() int64 {
sc.RLock()
defer sc.RUnlock()
return sc.usage.WrittenSize()
func (sc *SwapFileChunk) LastModifiedTsNs() int64 {
return sc.lastModifiedTsNs
} }
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {

14
weed/mount/page_writer/upload_pipeline.go

@ -66,16 +66,16 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsN
if !found { if !found {
if len(up.writableChunks) > up.writableChunkLimit { if len(up.writableChunks) > up.writableChunkLimit {
// if current file chunks is over the per file buffer count limit // if current file chunks is over the per file buffer count limit
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
oldestChunkIndex, oldestTs := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks { for lci, mc := range up.writableChunks {
chunkFullness := mc.WrittenSize()
if fullness < chunkFullness {
fullestChunkIndex = lci
fullness = chunkFullness
chunkModifiedTsNs := mc.LastModifiedTsNs()
if oldestTs < chunkModifiedTsNs {
oldestChunkIndex = lci
oldestTs = chunkModifiedTsNs
} }
} }
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
up.moveToSealed(up.writableChunks[oldestChunkIndex], oldestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, oldestTs)
} }
if false && isSequential && if false && isSequential &&
len(up.writableChunks) < up.writableChunkLimit && len(up.writableChunks) < up.writableChunkLimit &&

Loading…
Cancel
Save