diff --git a/weed/filesys/dirty_pages_chunked.go b/weed/filesys/dirty_pages_chunked.go index 71da91666..002922958 100644 --- a/weed/filesys/dirty_pages_chunked.go +++ b/weed/filesys/dirty_pages_chunked.go @@ -31,10 +31,7 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages { fh: fh, } - swapFileDir := fh.f.wfs.option.getTempFilePageDir() - - dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.fullpath(), - fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, swapFileDir) + dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.f.wfs.option.ConcurrentWriters) return dirtyPages } diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 65b41e9fa..53641e66d 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -24,7 +24,7 @@ type UploadPipeline struct { saveToStorageFn SaveToStorageFunc activeReadChunks map[LogicChunkIndex]int activeReadChunksLock sync.Mutex - swapFile *SwapFile + bufferChunkLimit int } type SealedChunk struct { @@ -40,7 +40,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { } } -func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, swapFileDir string) *UploadPipeline { +func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline { return &UploadPipeline{ ChunkSize: chunkSize, writableChunks: make(map[LogicChunkIndex]PageChunk), @@ -48,9 +48,8 @@ func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentEx uploaders: writers, uploaderCountCond: sync.NewCond(&sync.Mutex{}), saveToStorageFn: saveToStorageFn, - filepath: filepath, activeReadChunks: make(map[LogicChunkIndex]int), - swapFile: NewSwapFile(swapFileDir, chunkSize), + bufferChunkLimit: bufferChunkLimit, } } @@ -62,7 +61,7 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { memChunk, found := up.writableChunks[logicChunkIndex] if !found { - if len(up.writableChunks) < 16 { + if len(up.writableChunks) < up.bufferChunkLimit { memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } else { fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) @@ -180,5 +179,4 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic } func (up *UploadPipeline) Shutdown() { - up.swapFile.FreeResource() } diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go index 5ecb677e8..816fb228b 100644 --- a/weed/filesys/page_writer/upload_pipeline_test.go +++ b/weed/filesys/page_writer/upload_pipeline_test.go @@ -7,7 +7,7 @@ import ( func TestUploadPipeline(t *testing.T) { - uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil, "") + uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16) writeRange(uploadPipeline, 0, 131072) writeRange(uploadPipeline, 131072, 262144)