Browse Source

move upload pipeline locking to a different file

pull/2613/head
chrislu 3 years ago
parent
commit
8e80f3cd65
  1. 58
      weed/filesys/page_writer/upload_pipeline.go
  2. 63
      weed/filesys/page_writer/upload_pipeline_lock.go

58
weed/filesys/page_writer/upload_pipeline.go

@ -119,64 +119,6 @@ func (up *UploadPipeline) FlushAll() {
up.waitForCurrentWritersToComplete()
}
func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
up.activeReadChunks[i] = count + 1
} else {
up.activeReadChunks[i] = 1
}
}
}
func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
if count == 1 {
delete(up.activeReadChunks, i)
} else {
up.activeReadChunks[i] = count - 1
}
}
}
}
func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
if count, found := up.activeReadChunks[logicChunkIndex]; found {
return count > 0
}
return false
}
func (up *UploadPipeline) waitForCurrentWritersToComplete() {
up.uploaderCountCond.L.Lock()
t := int32(100)
for {
t = atomic.LoadInt32(&up.uploaderCount)
if t <= 0 {
break
}
up.uploaderCountCond.Wait()
}
up.uploaderCountCond.L.Unlock()
}
func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
if memChunk.IsComplete() {
up.moveToSealed(memChunk, logicChunkIndex)

63
weed/filesys/page_writer/upload_pipeline_lock.go

@ -0,0 +1,63 @@
package page_writer
import (
"sync/atomic"
)
func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
up.activeReadChunks[i] = count + 1
} else {
up.activeReadChunks[i] = 1
}
}
}
func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
if count == 1 {
delete(up.activeReadChunks, i)
} else {
up.activeReadChunks[i] = count - 1
}
}
}
}
func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
if count, found := up.activeReadChunks[logicChunkIndex]; found {
return count > 0
}
return false
}
func (up *UploadPipeline) waitForCurrentWritersToComplete() {
up.uploaderCountCond.L.Lock()
t := int32(100)
for {
t = atomic.LoadInt32(&up.uploaderCount)
if t <= 0 {
break
}
up.uploaderCountCond.Wait()
}
up.uploaderCountCond.L.Unlock()
}
Loading…
Cancel
Save