From 28d479e5c059bdb041aeaff1075a738990740bf2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 19 Dec 2022 15:07:22 -0800 Subject: [PATCH] mount: adjust locking for upload pipeline --- weed/mount/page_writer/upload_pipeline.go | 64 +++++++++---------- .../mount/page_writer/upload_pipeline_lock.go | 6 -- 2 files changed, 31 insertions(+), 39 deletions(-) diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 586e05b3f..f54a87a02 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -6,26 +6,24 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "sync" "sync/atomic" - "time" ) type LogicChunkIndex int type UploadPipeline struct { - uploaderCount int32 - uploaderCountCond *sync.Cond - filepath util.FullPath - ChunkSize int64 - writableChunks map[LogicChunkIndex]PageChunk - writableChunksLock sync.Mutex - sealedChunks map[LogicChunkIndex]*SealedChunk - sealedChunksLock sync.Mutex - uploaders *util.LimitedConcurrentExecutor - saveToStorageFn SaveToStorageFunc - activeReadChunks map[LogicChunkIndex]int - activeReadChunksLock sync.Mutex - writableChunkLimit int - swapFile *SwapFile + uploaderCount int32 + uploaderCountCond *sync.Cond + filepath util.FullPath + ChunkSize int64 + uploaders *util.LimitedConcurrentExecutor + saveToStorageFn SaveToStorageFunc + writableChunkLimit int + swapFile *SwapFile + chunksLock sync.Mutex + writableChunks map[LogicChunkIndex]PageChunk + sealedChunks map[LogicChunkIndex]*SealedChunk + activeReadChunks map[LogicChunkIndex]int + readerCountCond *sync.Cond } type SealedChunk struct { @@ -42,7 +40,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { } func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline { - return &UploadPipeline{ + t := &UploadPipeline{ ChunkSize: chunkSize, writableChunks: make(map[LogicChunkIndex]PageChunk), sealedChunks: make(map[LogicChunkIndex]*SealedChunk), @@ -53,11 +51,13 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, writableChunkLimit: bufferChunkLimit, swapFile: NewSwapFile(swapFileDir, chunkSize), } + t.readerCountCond = sync.NewCond(&t.chunksLock) + return t } func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) { - up.writableChunksLock.Lock() - defer up.writableChunksLock.Unlock() + up.chunksLock.Lock() + defer up.chunksLock.Unlock() logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) @@ -95,13 +95,17 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) + up.chunksLock.Lock() + defer func() { + up.readerCountCond.Signal() + up.chunksLock.Unlock() + }() + // read from sealed chunks first - up.sealedChunksLock.Lock() sealedChunk, found := up.sealedChunks[logicChunkIndex] if found { sealedChunk.referenceCounter++ } - up.sealedChunksLock.Unlock() if found { maxStop = sealedChunk.chunk.ReadDataAt(p, off) glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) @@ -109,8 +113,6 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { } // read from writable chunks last - up.writableChunksLock.Lock() - defer up.writableChunksLock.Unlock() writableChunk, found := up.writableChunks[logicChunkIndex] if !found { return @@ -123,8 +125,8 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { } func (up *UploadPipeline) FlushAll() { - up.writableChunksLock.Lock() - defer up.writableChunksLock.Unlock() + up.chunksLock.Lock() + defer up.chunksLock.Unlock() for logicChunkIndex, memChunk := range up.writableChunks { up.moveToSealed(memChunk, logicChunkIndex) @@ -143,8 +145,6 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic atomic.AddInt32(&up.uploaderCount, 1) glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount) - up.sealedChunksLock.Lock() - if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found { oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex)) } @@ -155,8 +155,6 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic up.sealedChunks[logicChunkIndex] = sealedChunk delete(up.writableChunks, logicChunkIndex) - up.sealedChunksLock.Unlock() - up.uploaders.Execute(func() { // first add to the file chunks sealedChunk.chunk.SaveContent(up.saveToStorageFn) @@ -172,13 +170,13 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic up.uploaderCountCond.L.Unlock() // wait for readers + up.chunksLock.Lock() + defer up.chunksLock.Unlock() for up.IsLocked(logicChunkIndex) { - time.Sleep(59 * time.Millisecond) + up.readerCountCond.Wait() } // then remove from sealed chunks - up.sealedChunksLock.Lock() - defer up.sealedChunksLock.Unlock() delete(up.sealedChunks, logicChunkIndex) sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) @@ -188,8 +186,8 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic func (up *UploadPipeline) Shutdown() { up.swapFile.FreeResource() - up.sealedChunksLock.Lock() - defer up.sealedChunksLock.Unlock() + up.chunksLock.Lock() + defer up.chunksLock.Unlock() for logicChunkIndex, sealedChunk := range up.sealedChunks { sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex)) } diff --git a/weed/mount/page_writer/upload_pipeline_lock.go b/weed/mount/page_writer/upload_pipeline_lock.go index 47a40ba37..43b04e01d 100644 --- a/weed/mount/page_writer/upload_pipeline_lock.go +++ b/weed/mount/page_writer/upload_pipeline_lock.go @@ -10,8 +10,6 @@ func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) { if stopOffset%up.ChunkSize > 0 { stopLogicChunkIndex += 1 } - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { if count, found := up.activeReadChunks[i]; found { up.activeReadChunks[i] = count + 1 @@ -27,8 +25,6 @@ func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { if stopOffset%up.ChunkSize > 0 { stopLogicChunkIndex += 1 } - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { if count, found := up.activeReadChunks[i]; found { if count == 1 { @@ -41,8 +37,6 @@ func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { } func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() if count, found := up.activeReadChunks[logicChunkIndex]; found { return count > 0 }