diff --git a/weed/filesys/page_writer/page_chunk.go b/weed/filesys/page_writer/page_chunk.go index 2f869ddb8..ffb85d3a3 100644 --- a/weed/filesys/page_writer/page_chunk.go +++ b/weed/filesys/page_writer/page_chunk.go @@ -1,12 +1,64 @@ package page_writer import ( + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" ) type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) +type PageChunk interface { + FreeResource() + WriteDataAt(src []byte, offset int64) (n int) + ReadDataAt(p []byte, off int64, logicChunkIndex LogicChunkIndex, chunkSize int64) (maxStop int64) + IsComplete(chunkSize int64) bool + SaveContent(saveFn SaveToStorageFunc, logicChunkIndex LogicChunkIndex, chunkSize int64) +} + +var ( + _ = PageChunk(&MemChunk{}) +) + type MemChunk struct { buf []byte usage *ChunkWrittenIntervalList } + +func (mc *MemChunk) FreeResource() { + mem.Free(mc.buf) +} + +func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) { + n = copy(mc.buf[offset:], src) + mc.usage.MarkWritten(offset, offset+int64(n)) + return +} + +func (mc *MemChunk) ReadDataAt(p []byte, off int64, logicChunkIndex LogicChunkIndex, chunkSize int64) (maxStop int64) { + memChunkBaseOffset := int64(logicChunkIndex) * chunkSize + for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { + logicStart := max(off, int64(logicChunkIndex)*chunkSize+t.StartOffset) + logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) + if logicStart < logicStop { + copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) + maxStop = max(maxStop, logicStop) + } + } + return +} + +func (mc *MemChunk) IsComplete(chunkSize int64) bool { + return mc.usage.IsComplete(chunkSize) +} + +func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc, logicChunkIndex LogicChunkIndex, chunkSize int64) { + if saveFn == nil { + return + } + for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { + reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset]) + saveFn(reader, int64(logicChunkIndex)*chunkSize+t.StartOffset, t.Size(), func() { + }) + } +} diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 05300ef1c..02a412993 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -15,7 +15,7 @@ type LogicChunkIndex int type UploadPipeline struct { filepath util.FullPath ChunkSize int64 - writableChunks map[LogicChunkIndex]*MemChunk + writableChunks map[LogicChunkIndex]PageChunk writableChunksLock sync.Mutex sealedChunks map[LogicChunkIndex]*SealedChunk sealedChunksLock sync.Mutex @@ -28,7 +28,7 @@ type UploadPipeline struct { } type SealedChunk struct { - chunk *MemChunk + chunk PageChunk referenceCounter int // track uploading or reading processes } @@ -36,14 +36,14 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { sc.referenceCounter-- if sc.referenceCounter == 0 { glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) - mem.Free(sc.chunk.buf) + sc.chunk.FreeResource() } } func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { return &UploadPipeline{ ChunkSize: chunkSize, - writableChunks: make(map[LogicChunkIndex]*MemChunk), + writableChunks: make(map[LogicChunkIndex]PageChunk), sealedChunks: make(map[LogicChunkIndex]*SealedChunk), uploaders: writers, uploaderCountCond: sync.NewCond(&sync.Mutex{}), @@ -68,8 +68,7 @@ func (cw *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { } cw.writableChunks[logicChunkIndex] = memChunk } - n = copy(memChunk.buf[offsetRemainder:], p) - memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n)) + n = memChunk.WriteDataAt(p, offsetRemainder) cw.maybeMoveToSealed(memChunk, logicChunkIndex) return @@ -86,7 +85,7 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { } cw.sealedChunksLock.Unlock() if found { - maxStop = readMemChunk(sealedChunk.chunk, p, off, logicChunkIndex, cw.ChunkSize) + maxStop = sealedChunk.chunk.ReadDataAt(p, off, logicChunkIndex, cw.ChunkSize) glog.V(4).Infof("%s read sealed memchunk [%d,%d)", cw.filepath, off, maxStop) sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex)) } @@ -98,7 +97,7 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { if !found { return } - writableMaxStop := readMemChunk(writableChunk, p, off, logicChunkIndex, cw.ChunkSize) + writableMaxStop := writableChunk.ReadDataAt(p, off, logicChunkIndex, cw.ChunkSize) glog.V(4).Infof("%s read writable memchunk [%d,%d)", cw.filepath, off, writableMaxStop) maxStop = max(maxStop, writableMaxStop) @@ -174,13 +173,13 @@ func (cw *UploadPipeline) waitForCurrentWritersToComplete() { cw.uploaderCountCond.L.Unlock() } -func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { - if memChunk.usage.IsComplete(cw.ChunkSize) { +func (cw *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { + if memChunk.IsComplete(cw.ChunkSize) { cw.moveToSealed(memChunk, logicChunkIndex) } } -func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { +func (cw *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { atomic.AddInt32(&cw.uploaderCount, 1) glog.V(4).Infof("%s uploaderCount %d ++> %d", cw.filepath, cw.uploaderCount-1, cw.uploaderCount) @@ -200,7 +199,7 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic cw.uploaders.Execute(func() { // first add to the file chunks - cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex) + sealedChunk.chunk.SaveContent(cw.saveToStorageFn, logicChunkIndex, cw.ChunkSize) // notify waiting process atomic.AddInt32(&cw.uploaderCount, -1) @@ -226,30 +225,6 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic }) } -func (cw *UploadPipeline) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { - if cw.saveToStorageFn == nil { - return - } - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) - cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { - }) - } -} - -func readMemChunk(memChunk *MemChunk, p []byte, off int64, logicChunkIndex LogicChunkIndex, chunkSize int64) (maxStop int64) { - memChunkBaseOffset := int64(logicChunkIndex) * chunkSize - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - logicStart := max(off, int64(logicChunkIndex)*chunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) - if logicStart < logicStop { - copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) - maxStop = max(maxStop, logicStop) - } - } - return -} - func (p2 *UploadPipeline) Shutdown() { }