|
@ -147,14 +147,17 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxS |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (up *UploadPipeline) FlushAll() { |
|
|
func (up *UploadPipeline) FlushAll() { |
|
|
|
|
|
up.flushChunks() |
|
|
|
|
|
up.waitForCurrentWritersToComplete() |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (up *UploadPipeline) flushChunks() { |
|
|
up.chunksLock.Lock() |
|
|
up.chunksLock.Lock() |
|
|
defer up.chunksLock.Unlock() |
|
|
defer up.chunksLock.Unlock() |
|
|
|
|
|
|
|
|
for logicChunkIndex, memChunk := range up.writableChunks { |
|
|
for logicChunkIndex, memChunk := range up.writableChunks { |
|
|
up.moveToSealed(memChunk, logicChunkIndex) |
|
|
up.moveToSealed(memChunk, logicChunkIndex) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
up.waitForCurrentWritersToComplete() |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { |
|
|
func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { |
|
|