Browse Source

unlock before submitting the uploading jobs

pull/4077/head
chrislu 2 years ago
parent
commit
f7beba8515
  1. 6
      weed/mount/page_writer/upload_pipeline.go

6
weed/mount/page_writer/upload_pipeline.go

@ -73,8 +73,9 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
fullness = chunkFullness fullness = chunkFullness
} }
} }
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
fullWritableChunk := up.writableChunks[fullestChunkIndex]
delete(up.writableChunks, fullestChunkIndex) delete(up.writableChunks, fullestChunkIndex)
up.moveToSealed(fullWritableChunk, fullestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
} }
if isSequential && if isSequential &&
@ -155,6 +156,8 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
up.sealedChunks[logicChunkIndex] = sealedChunk up.sealedChunks[logicChunkIndex] = sealedChunk
delete(up.writableChunks, logicChunkIndex) delete(up.writableChunks, logicChunkIndex)
// unlock before submitting the uploading jobs
up.chunksLock.Unlock()
up.uploaders.Execute(func() { up.uploaders.Execute(func() {
// first add to the file chunks // first add to the file chunks
sealedChunk.chunk.SaveContent(up.saveToStorageFn) sealedChunk.chunk.SaveContent(up.saveToStorageFn)
@ -181,6 +184,7 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
}) })
up.chunksLock.Lock()
} }
func (up *UploadPipeline) Shutdown() { func (up *UploadPipeline) Shutdown() {

Loading…
Cancel
Save