diff --git a/weed/filesys/page_writer/chunk_interval_list.go b/weed/filesys/page_writer/chunk_interval_list.go index dca9a1740..e6dc5d1f5 100644 --- a/weed/filesys/page_writer/chunk_interval_list.go +++ b/weed/filesys/page_writer/chunk_interval_list.go @@ -51,6 +51,12 @@ func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool { return list.size() == 1 && list.head.next.isComplete(chunkSize) } +func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) { + for t := list.head; t != nil; t = t.next { + writtenByteCount += t.Size() + } + return +} func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) { diff --git a/weed/filesys/page_writer/page_chunk.go b/weed/filesys/page_writer/page_chunk.go index d1f3a5745..4e8f31425 100644 --- a/weed/filesys/page_writer/page_chunk.go +++ b/weed/filesys/page_writer/page_chunk.go @@ -11,5 +11,6 @@ type PageChunk interface { WriteDataAt(src []byte, offset int64) (n int) ReadDataAt(p []byte, off int64) (maxStop int64) IsComplete() bool + WrittenSize() int64 SaveContent(saveFn SaveToStorageFunc) } diff --git a/weed/filesys/page_writer/page_chunk_mem.go b/weed/filesys/page_writer/page_chunk_mem.go index 887993eea..dfd54c19e 100644 --- a/weed/filesys/page_writer/page_chunk_mem.go +++ b/weed/filesys/page_writer/page_chunk_mem.go @@ -53,6 +53,10 @@ func (mc *MemChunk) IsComplete() bool { return mc.usage.IsComplete(mc.chunkSize) } +func (mc *MemChunk) WrittenSize() int64 { + return mc.usage.WrittenSize() +} + func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { if saveFn == nil { return diff --git a/weed/filesys/page_writer/page_chunk_swapfile.go b/weed/filesys/page_writer/page_chunk_swapfile.go index 44ad64297..486557629 100644 --- a/weed/filesys/page_writer/page_chunk_swapfile.go +++ b/weed/filesys/page_writer/page_chunk_swapfile.go @@ -101,6 +101,10 @@ func (sc *SwapFileChunk) IsComplete() bool { return sc.usage.IsComplete(sc.swapfile.chunkSize) } +func (sc *SwapFileChunk) WrittenSize() int64 { + return sc.usage.WrittenSize() +} + func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { if saveFn == nil { return diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 47693c235..65b41e9fa 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -65,10 +65,18 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { if len(up.writableChunks) < 16 { memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } else { - memChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) - if memChunk == nil { - memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) + for lci, mc := range up.writableChunks { + chunkFullness := mc.WrittenSize() + if fullness < chunkFullness { + fullestChunkIndex = lci + fullness = chunkFullness + } } + up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) + delete(up.writableChunks, fullestChunkIndex) + fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness) + memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } up.writableChunks[logicChunkIndex] = memChunk }