|
|
@ -25,6 +25,7 @@ type UploadPipeline struct { |
|
|
|
activeReadChunks map[LogicChunkIndex]int |
|
|
|
activeReadChunksLock sync.Mutex |
|
|
|
bufferChunkLimit int |
|
|
|
swapFile *SwapFile |
|
|
|
} |
|
|
|
|
|
|
|
type SealedChunk struct { |
|
|
@ -40,7 +41,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline { |
|
|
|
func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline { |
|
|
|
return &UploadPipeline{ |
|
|
|
ChunkSize: chunkSize, |
|
|
|
writableChunks: make(map[LogicChunkIndex]PageChunk), |
|
|
@ -50,6 +51,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, |
|
|
|
saveToStorageFn: saveToStorageFn, |
|
|
|
activeReadChunks: make(map[LogicChunkIndex]int), |
|
|
|
bufferChunkLimit: bufferChunkLimit, |
|
|
|
swapFile: NewSwapFile(swapFileDir, chunkSize), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -59,10 +61,14 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { |
|
|
|
|
|
|
|
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) |
|
|
|
|
|
|
|
memChunk, found := up.writableChunks[logicChunkIndex] |
|
|
|
pageChunk, found := up.writableChunks[logicChunkIndex] |
|
|
|
if !found { |
|
|
|
if len(up.writableChunks) < up.bufferChunkLimit { |
|
|
|
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) |
|
|
|
if atomic.LoadInt64(&memChunkCounter) > 4*int64(up.bufferChunkLimit) { |
|
|
|
// if total number of chunks is over 4 times of per file buffer count limit
|
|
|
|
pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) |
|
|
|
} else if len(up.writableChunks) < up.bufferChunkLimit { |
|
|
|
// if current file chunks is still under the per file buffer count limit
|
|
|
|
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) |
|
|
|
} else { |
|
|
|
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) |
|
|
|
for lci, mc := range up.writableChunks { |
|
|
@ -75,12 +81,12 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { |
|
|
|
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) |
|
|
|
delete(up.writableChunks, fullestChunkIndex) |
|
|
|
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
|
|
|
|
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) |
|
|
|
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) |
|
|
|
} |
|
|
|
up.writableChunks[logicChunkIndex] = memChunk |
|
|
|
up.writableChunks[logicChunkIndex] = pageChunk |
|
|
|
} |
|
|
|
n = memChunk.WriteDataAt(p, off) |
|
|
|
up.maybeMoveToSealed(memChunk, logicChunkIndex) |
|
|
|
n = pageChunk.WriteDataAt(p, off) |
|
|
|
up.maybeMoveToSealed(pageChunk, logicChunkIndex) |
|
|
|
|
|
|
|
return |
|
|
|
} |
|
|
@ -179,6 +185,7 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic |
|
|
|
} |
|
|
|
|
|
|
|
func (up *UploadPipeline) Shutdown() { |
|
|
|
up.swapFile.FreeResource() |
|
|
|
for logicChunkIndex, sealedChunk := range up.sealedChunks { |
|
|
|
sealedChunk.FreeReference(fmt.Sprintf("%s uploadpipeline shutdown chunk %d", up.filepath, logicChunkIndex)) |
|
|
|
} |
|
|
|