Browse Source

use limited in memory buffer instead of swap file

pull/2631/head
chrislu 3 years ago
parent
commit
62d815d1ca
  1. 5
      weed/filesys/dirty_pages_chunked.go
  2. 10
      weed/filesys/page_writer/upload_pipeline.go
  3. 2
      weed/filesys/page_writer/upload_pipeline_test.go

5
weed/filesys/dirty_pages_chunked.go

@ -31,10 +31,7 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
fh: fh, fh: fh,
} }
swapFileDir := fh.f.wfs.option.getTempFilePageDir()
dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.fullpath(),
fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, swapFileDir)
dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.f.wfs.option.ConcurrentWriters)
return dirtyPages return dirtyPages
} }

10
weed/filesys/page_writer/upload_pipeline.go

@ -24,7 +24,7 @@ type UploadPipeline struct {
saveToStorageFn SaveToStorageFunc saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex activeReadChunksLock sync.Mutex
swapFile *SwapFile
bufferChunkLimit int
} }
type SealedChunk struct { type SealedChunk struct {
@ -40,7 +40,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
} }
} }
func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, swapFileDir string) *UploadPipeline {
func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
return &UploadPipeline{ return &UploadPipeline{
ChunkSize: chunkSize, ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk), writableChunks: make(map[LogicChunkIndex]PageChunk),
@ -48,9 +48,8 @@ func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentEx
uploaders: writers, uploaders: writers,
uploaderCountCond: sync.NewCond(&sync.Mutex{}), uploaderCountCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn, saveToStorageFn: saveToStorageFn,
filepath: filepath,
activeReadChunks: make(map[LogicChunkIndex]int), activeReadChunks: make(map[LogicChunkIndex]int),
swapFile: NewSwapFile(swapFileDir, chunkSize),
bufferChunkLimit: bufferChunkLimit,
} }
} }
@ -62,7 +61,7 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
memChunk, found := up.writableChunks[logicChunkIndex] memChunk, found := up.writableChunks[logicChunkIndex]
if !found { if !found {
if len(up.writableChunks) < 16 {
if len(up.writableChunks) < up.bufferChunkLimit {
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else { } else {
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
@ -180,5 +179,4 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
} }
func (up *UploadPipeline) Shutdown() { func (up *UploadPipeline) Shutdown() {
up.swapFile.FreeResource()
} }

2
weed/filesys/page_writer/upload_pipeline_test.go

@ -7,7 +7,7 @@ import (
func TestUploadPipeline(t *testing.T) { func TestUploadPipeline(t *testing.T) {
uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil, "")
uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16)
writeRange(uploadPipeline, 0, 131072) writeRange(uploadPipeline, 0, 131072)
writeRange(uploadPipeline, 131072, 262144) writeRange(uploadPipeline, 131072, 262144)

Loading…
Cancel
Save