From fc0628c0381fcf571cf46ff699e2b79dc8c99bb0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 01:53:56 -0800 Subject: [PATCH 01/15] working --- weed/command/mount.go | 4 + weed/command/mount_std.go | 5 + weed/filesys/dirty_pages_mem_chunk.go | 99 +++++++++++ weed/filesys/page_writer.go | 5 +- weed/filesys/page_writer/upload_pipeline.go | 187 ++++++++++++++++++++ 5 files changed, 296 insertions(+), 4 deletions(-) create mode 100644 weed/filesys/dirty_pages_mem_chunk.go create mode 100644 weed/filesys/page_writer/upload_pipeline.go diff --git a/weed/command/mount.go b/weed/command/mount.go index aec5fcc3c..e54f1f07f 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -26,6 +26,8 @@ type MountOptions struct { uidMap *string gidMap *string readOnly *bool + debug *bool + debugPort *int } var ( @@ -57,6 +59,8 @@ func init() { mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated :") mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated :") mountOptions.readOnly = cmdMount.Flag.Bool("readOnly", false, "read only") + mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:/debug/pprof/goroutine?debug=2") + mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index aebd04170..8f62b4ec9 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -6,6 +6,7 @@ package command import ( "context" "fmt" + "net/http" "os" "os/user" "path" @@ -34,6 +35,10 @@ import ( func runMount(cmd *Command, args []string) bool { + if *mountOptions.debug { + go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil) + } + grace.SetupProfiling(*mountCpuProfile, *mountMemProfile) if *mountReadRetryTime < time.Second { *mountReadRetryTime = time.Second diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go new file mode 100644 index 000000000..9740842cf --- /dev/null +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -0,0 +1,99 @@ +package filesys + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" + "sync" + "time" +) + +type MemoryChunkPages struct { + f *File + writeWaitGroup sync.WaitGroup + chunkAddLock sync.Mutex + lastErr error + collection string + replication string + uploadPipeline *page_writer.UploadPipeline +} + +func newMemoryChunkPages(file *File, chunkSize int64) *MemoryChunkPages { + + dirtyPages := &MemoryChunkPages{ + f: file, + } + + dirtyPages.uploadPipeline = page_writer.NewUploadPipeline( + file.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage) + + return dirtyPages +} + +func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) { + + glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) + pages.uploadPipeline.SaveDataAt(data, offset) + + return +} + +func (pages *MemoryChunkPages) FlushData() error { + pages.saveChunkedFileToStorage() + pages.writeWaitGroup.Wait() + if pages.lastErr != nil { + return fmt.Errorf("flush data: %v", pages.lastErr) + } + return nil +} + +func (pages *MemoryChunkPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) +} + +func (pages *MemoryChunkPages) GetStorageOptions() (collection, replication string) { + return pages.collection, pages.replication +} + +func (pages *MemoryChunkPages) saveChunkedFileToStorage() { + + pages.uploadPipeline.FlushAll() + +} + +func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { + + mtime := time.Now().UnixNano() + pages.writeWaitGroup.Add(1) + writer := func() { + defer pages.writeWaitGroup.Done() + defer cleanupFn() + + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) + if err != nil { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) + pages.lastErr = err + return + } + chunk.Mtime = mtime + pages.collection, pages.replication = collection, replication + pages.chunkAddLock.Lock() + pages.f.addChunks([]*filer_pb.FileChunk{chunk}) + glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) + pages.chunkAddLock.Unlock() + + } + + if pages.f.wfs.concurrentWriters != nil { + pages.f.wfs.concurrentWriters.Execute(writer) + } else { + go writer() + } + +} + +func (pages MemoryChunkPages) Destroy() { + pages.uploadPipeline.Shutdown() +} diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index 5c06bc44d..d618a1dda 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -25,10 +25,7 @@ func newPageWriter(file *File, chunkSize int64) *PageWriter { f: file, chunkSize: chunkSize, writerPattern: NewWriterPattern(chunkSize), - randomWriter: newTempFileDirtyPages(file, chunkSize), - streamWriter: newStreamDirtyPages(file, chunkSize), - //streamWriter: newContinuousDirtyPages(file), - //streamWriter: nil, + randomWriter: newMemoryChunkPages(file, chunkSize), } return pw } diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go new file mode 100644 index 000000000..0c9e13649 --- /dev/null +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -0,0 +1,187 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/mem" + "sync" + "sync/atomic" +) + +type UploadPipeline struct { + writableChunks map[LogicChunkIndex]*MemChunk + writableChunksLock sync.Mutex + sealedChunks map[LogicChunkIndex]*SealedChunk + sealedChunksLock sync.Mutex + ChunkSize int64 + writers *util.LimitedConcurrentExecutor + activeWriterCond *sync.Cond + activeWriterCount int32 + saveToStorageFn SaveToStorageFunc +} + +type SealedChunk struct { + chunk *MemChunk + referenceCounter int // track uploading or reading processes +} + +func (sc *SealedChunk) FreeReference() { + sc.referenceCounter-- + if sc.referenceCounter == 0 { + mem.Free(sc.chunk.buf) + } +} + +func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { + return &UploadPipeline{ + ChunkSize: chunkSize, + writableChunks: make(map[LogicChunkIndex]*MemChunk), + sealedChunks: make(map[LogicChunkIndex]*SealedChunk), + writers: writers, + activeWriterCond: sync.NewCond(&sync.Mutex{}), + saveToStorageFn: saveToStorageFn, + } +} + +func (cw *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { + cw.writableChunksLock.Lock() + defer cw.writableChunksLock.Unlock() + + logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) + offsetRemainder := off % cw.ChunkSize + + memChunk, found := cw.writableChunks[logicChunkIndex] + if !found { + memChunk = &MemChunk{ + buf: mem.Allocate(int(cw.ChunkSize)), + usage: newChunkWrittenIntervalList(), + } + cw.writableChunks[logicChunkIndex] = memChunk + } + n = copy(memChunk.buf[offsetRemainder:], p) + memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n)) + cw.maybeMoveToSealed(memChunk, logicChunkIndex) + + return +} + +func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { + logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) + + // read from sealed chunks first + cw.sealedChunksLock.Lock() + sealedChunk, found := cw.sealedChunks[logicChunkIndex] + if found { + sealedChunk.referenceCounter++ + } + cw.sealedChunksLock.Unlock() + if found { + maxStop = readMemChunk(sealedChunk.chunk, p, off, logicChunkIndex, cw.ChunkSize) + sealedChunk.FreeReference() + } + + // read from writable chunks last + cw.writableChunksLock.Lock() + defer cw.writableChunksLock.Unlock() + writableChunk, found := cw.writableChunks[logicChunkIndex] + if !found { + return + } + maxStop = max(maxStop, readMemChunk(writableChunk, p, off, logicChunkIndex, cw.ChunkSize)) + + return +} + +func (cw *UploadPipeline) FlushAll() { + cw.writableChunksLock.Lock() + defer cw.writableChunksLock.Unlock() + + for logicChunkIndex, memChunk := range cw.writableChunks { + cw.moveToSealed(memChunk, logicChunkIndex) + } + + cw.waitForCurrentWritersToComplete() +} + +func (cw *UploadPipeline) waitForCurrentWritersToComplete() { + cw.activeWriterCond.L.Lock() + t := int32(100) + for { + t = atomic.LoadInt32(&cw.activeWriterCount) + if t <= 0 { + break + } + glog.V(4).Infof("activeWriterCond is %d", t) + cw.activeWriterCond.Wait() + } + cw.activeWriterCond.L.Unlock() +} + +func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { + if memChunk.usage.IsComplete(cw.ChunkSize) { + cw.moveToSealed(memChunk, logicChunkIndex) + } +} + +func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { + atomic.AddInt32(&cw.activeWriterCount, 1) + glog.V(4).Infof("activeWriterCount %d ++> %d", cw.activeWriterCount-1, cw.activeWriterCount) + + cw.sealedChunksLock.Lock() + + if oldMemChunk, found := cw.sealedChunks[logicChunkIndex]; found { + oldMemChunk.FreeReference() + } + sealedChunk := &SealedChunk{ + chunk: memChunk, + referenceCounter: 1, // default 1 is for uploading process + } + cw.sealedChunks[logicChunkIndex] = sealedChunk + delete(cw.writableChunks, logicChunkIndex) + + cw.sealedChunksLock.Unlock() + + cw.writers.Execute(func() { + cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex) + + // remove from sealed chunks + sealedChunk.FreeReference() + cw.sealedChunksLock.Lock() + defer cw.sealedChunksLock.Unlock() + delete(cw.sealedChunks, logicChunkIndex) + + atomic.AddInt32(&cw.activeWriterCount, -1) + glog.V(4).Infof("activeWriterCount %d --> %d", cw.activeWriterCount+1, cw.activeWriterCount) + // Lock and Unlock are not required, + // but it may signal multiple times during one wakeup, + // and the waiting goroutine may miss some of them! + cw.activeWriterCond.L.Lock() + cw.activeWriterCond.Broadcast() + cw.activeWriterCond.L.Unlock() + }) +} + +func (cw *UploadPipeline) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { + for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { + reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) + cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { + }) + } +} + +func readMemChunk(memChunk *MemChunk, p []byte, off int64, logicChunkIndex LogicChunkIndex, chunkSize int64) (maxStop int64) { + memChunkBaseOffset := int64(logicChunkIndex) * chunkSize + for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { + logicStart := max(off, int64(logicChunkIndex)*chunkSize+t.StartOffset) + logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) + if logicStart < logicStop { + copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) + maxStop = max(maxStop, logicStop) + } + } + return +} + +func (p2 *UploadPipeline) Shutdown() { + +} From f710d5ffcaced286062dcbe99bf5fcebdb73efc0 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 03:19:00 -0800 Subject: [PATCH 02/15] a little speed up --- weed/filesys/dirty_pages_mem_chunk.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index 9740842cf..98b3929ac 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -18,6 +18,7 @@ type MemoryChunkPages struct { collection string replication string uploadPipeline *page_writer.UploadPipeline + hasWrites bool } func newMemoryChunkPages(file *File, chunkSize int64) *MemoryChunkPages { @@ -33,6 +34,7 @@ func newMemoryChunkPages(file *File, chunkSize int64) *MemoryChunkPages { } func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) { + pages.hasWrites = true glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) pages.uploadPipeline.SaveDataAt(data, offset) @@ -41,6 +43,9 @@ func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) { } func (pages *MemoryChunkPages) FlushData() error { + if !pages.hasWrites { + return nil + } pages.saveChunkedFileToStorage() pages.writeWaitGroup.Wait() if pages.lastErr != nil { @@ -50,6 +55,9 @@ func (pages *MemoryChunkPages) FlushData() error { } func (pages *MemoryChunkPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + if !pages.hasWrites { + return + } return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) } From 77d9993f38845ba3aa2af78a5f3d6ddb93093a48 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 03:19:11 -0800 Subject: [PATCH 03/15] remove unused variables --- weed/filesys/filehandle.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 738423b6a..ef8f62938 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -28,11 +28,9 @@ type FileHandle struct { sync.Mutex f *File - RequestId fuse.RequestID // unique ID for request - NodeId fuse.NodeID // file or directory the request is about - Uid uint32 // user ID of process making request - Gid uint32 // group ID of process making request - writeOnly bool + NodeId fuse.NodeID // file or directory the request is about + Uid uint32 // user ID of process making request + Gid uint32 // group ID of process making request isDeleted bool } From 6c908352cbc0791299b16c57815610204ea39f26 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 03:19:24 -0800 Subject: [PATCH 04/15] testing skip memory management --- weed/util/mem/slot_pool.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/weed/util/mem/slot_pool.go b/weed/util/mem/slot_pool.go index e6680d3cb..a871188b5 100644 --- a/weed/util/mem/slot_pool.go +++ b/weed/util/mem/slot_pool.go @@ -35,10 +35,12 @@ func getSlotPool(size int) *sync.Pool { } func Allocate(size int) []byte { + return make([]byte, size) slab := *getSlotPool(size).Get().(*[]byte) return slab[:size] } func Free(buf []byte) { + return getSlotPool(cap(buf)).Put(&buf) } From da7f13e73e110ad7b33124cae119ccd51ef3f194 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 03:21:31 -0800 Subject: [PATCH 05/15] Revert "testing skip memory management" This reverts commit 6c908352cbc0791299b16c57815610204ea39f26. --- weed/util/mem/slot_pool.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/weed/util/mem/slot_pool.go b/weed/util/mem/slot_pool.go index a871188b5..e6680d3cb 100644 --- a/weed/util/mem/slot_pool.go +++ b/weed/util/mem/slot_pool.go @@ -35,12 +35,10 @@ func getSlotPool(size int) *sync.Pool { } func Allocate(size int) []byte { - return make([]byte, size) slab := *getSlotPool(size).Get().(*[]byte) return slab[:size] } func Free(buf []byte) { - return getSlotPool(cap(buf)).Put(&buf) } From 1734017ba13cb03f04f3028712e521f4a6b7f00c Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 13:40:41 -0800 Subject: [PATCH 06/15] add test --- weed/filesys/page_writer/upload_pipeline.go | 6 ++- .../page_writer/upload_pipeline_test.go | 47 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 weed/filesys/page_writer/upload_pipeline_test.go diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 0c9e13649..b87061adb 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -142,9 +142,10 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic cw.sealedChunksLock.Unlock() cw.writers.Execute(func() { + // first add to the file chunks cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex) - // remove from sealed chunks + // then remove from sealed chunks sealedChunk.FreeReference() cw.sealedChunksLock.Lock() defer cw.sealedChunksLock.Unlock() @@ -162,6 +163,9 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic } func (cw *UploadPipeline) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { + if cw.saveToStorageFn == nil { + return + } for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go new file mode 100644 index 000000000..81191868f --- /dev/null +++ b/weed/filesys/page_writer/upload_pipeline_test.go @@ -0,0 +1,47 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/util" + "testing" +) + +func TestUploadPipeline(t *testing.T) { + + uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil) + + writeRange(uploadPipeline, 0, 131072) + writeRange(uploadPipeline, 131072, 262144) + writeRange(uploadPipeline, 262144, 1025536) + + confirmRange(t, uploadPipeline, 0, 1025536) + + writeRange(uploadPipeline, 1025536, 1296896) + + confirmRange(t, uploadPipeline, 1025536, 1296896) + + writeRange(uploadPipeline, 1296896, 2162688) + + confirmRange(t, uploadPipeline, 1296896, 2162688) + + confirmRange(t, uploadPipeline, 1296896, 2162688) +} + +// startOff and stopOff must be divided by 4 +func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) { + p := make([]byte, 4) + for i := startOff / 4; i < stopOff/4; i += 4 { + util.Uint32toBytes(p, uint32(i)) + uploadPipeline.SaveDataAt(p, i) + } +} + +func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) { + p := make([]byte, 4) + for i := startOff; i < stopOff/4; i += 4 { + uploadPipeline.MaybeReadDataAt(p, i) + x := util.BytesToUint32(p) + if x != uint32(i) { + t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4) + } + } +} From 0ba88596e8dd5da1425c2ac53c54a9c9428ecc63 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 13:53:30 -0800 Subject: [PATCH 07/15] invalidate filehandle entry view cache --- weed/filesys/dirty_pages_mem_chunk.go | 23 ++++++++++++----------- weed/filesys/filehandle.go | 11 ++++++----- weed/filesys/page_writer.go | 12 ++++++------ 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index 98b3929ac..7e8dc0b05 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -11,7 +11,7 @@ import ( ) type MemoryChunkPages struct { - f *File + fh *FileHandle writeWaitGroup sync.WaitGroup chunkAddLock sync.Mutex lastErr error @@ -21,14 +21,14 @@ type MemoryChunkPages struct { hasWrites bool } -func newMemoryChunkPages(file *File, chunkSize int64) *MemoryChunkPages { +func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { dirtyPages := &MemoryChunkPages{ - f: file, + fh: fh, } dirtyPages.uploadPipeline = page_writer.NewUploadPipeline( - file.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage) + fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage) return dirtyPages } @@ -36,7 +36,7 @@ func newMemoryChunkPages(file *File, chunkSize int64) *MemoryChunkPages { func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) { pages.hasWrites = true - glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.f.fullpath(), offset, offset+int64(len(data))) pages.uploadPipeline.SaveDataAt(data, offset) return @@ -79,23 +79,24 @@ func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, defer pages.writeWaitGroup.Done() defer cleanupFn() - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) + chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset) if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err) pages.lastErr = err return } chunk.Mtime = mtime pages.collection, pages.replication = collection, replication pages.chunkAddLock.Lock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) + pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.entryViewCache = nil + glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size) pages.chunkAddLock.Unlock() } - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) + if pages.fh.f.wfs.concurrentWriters != nil { + pages.fh.f.wfs.concurrentWriters.Execute(writer) } else { go writer() } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index ef8f62938..8545be9b6 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -36,12 +36,12 @@ type FileHandle struct { func newFileHandle(file *File, uid, gid uint32) *FileHandle { fh := &FileHandle{ - f: file, - // dirtyPages: newContinuousDirtyPages(file, writeOnly), - dirtyPages: newPageWriter(file, file.wfs.option.ChunkSizeLimit), - Uid: uid, - Gid: gid, + f: file, + Uid: uid, + Gid: gid, } + // dirtyPages: newContinuousDirtyPages(file, writeOnly), + fh.dirtyPages = newPageWriter(fh, file.wfs.option.ChunkSizeLimit) entry := fh.f.getEntry() if entry != nil { entry.Attributes.FileSize = filer.FileSize(entry) @@ -149,6 +149,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { reader := fh.reader if reader == nil { chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64) + glog.V(4).Infof("file handle read %s [%d,%d) from %+v", fileFullPath, offset, offset+int64(len(buff)), chunkViews) reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) } fh.reader = reader diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index d618a1dda..90ef7d7c4 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -6,7 +6,7 @@ import ( ) type PageWriter struct { - f *File + fh *FileHandle collection string replication string chunkSize int64 @@ -20,19 +20,19 @@ var ( _ = page_writer.DirtyPages(&PageWriter{}) ) -func newPageWriter(file *File, chunkSize int64) *PageWriter { +func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { pw := &PageWriter{ - f: file, + fh: fh, chunkSize: chunkSize, writerPattern: NewWriterPattern(chunkSize), - randomWriter: newMemoryChunkPages(file, chunkSize), + randomWriter: newMemoryChunkPages(fh, chunkSize), } return pw } func (pw *PageWriter) AddPage(offset int64, data []byte) { - glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) + glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.fh.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { @@ -64,7 +64,7 @@ func (pw *PageWriter) FlushData() error { } func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { - glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.f.fullpath(), offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { From 381f4e73a0c74172336b15b6ba410ff92725d1a7 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 13:56:47 -0800 Subject: [PATCH 08/15] delete actual reference first --- weed/filesys/page_writer/upload_pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index b87061adb..2fb1bbf52 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -146,10 +146,10 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex) // then remove from sealed chunks - sealedChunk.FreeReference() cw.sealedChunksLock.Lock() defer cw.sealedChunksLock.Unlock() delete(cw.sealedChunks, logicChunkIndex) + sealedChunk.FreeReference() atomic.AddInt32(&cw.activeWriterCount, -1) glog.V(4).Infof("activeWriterCount %d --> %d", cw.activeWriterCount+1, cw.activeWriterCount) From fc22071a2fc4706548ae74c7ca5cef60ed7c3055 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 14:02:37 -0800 Subject: [PATCH 09/15] more logs --- weed/filesys/filehandle.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 8545be9b6..517d8b3d4 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -150,6 +150,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { if reader == nil { chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64) glog.V(4).Infof("file handle read %s [%d,%d) from %+v", fileFullPath, offset, offset+int64(len(buff)), chunkViews) + for _, chunkView := range chunkViews { + glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId) + } reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) } fh.reader = reader From 7bf7af971b3f5e41bc50e755b9eb3855a7ba103f Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 14:15:10 -0800 Subject: [PATCH 10/15] more logs --- weed/filesys/dirty_pages_mem_chunk.go | 2 +- weed/filesys/page_writer/upload_pipeline.go | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index 7e8dc0b05..fc5669ec0 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -27,7 +27,7 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { fh: fh, } - dirtyPages.uploadPipeline = page_writer.NewUploadPipeline( + dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.fullpath(), fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage) return dirtyPages diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 2fb1bbf52..83e5c8b60 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -1,6 +1,7 @@ package page_writer import ( + "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/mem" @@ -18,6 +19,7 @@ type UploadPipeline struct { activeWriterCond *sync.Cond activeWriterCount int32 saveToStorageFn SaveToStorageFunc + filepath util.FullPath } type SealedChunk struct { @@ -25,14 +27,15 @@ type SealedChunk struct { referenceCounter int // track uploading or reading processes } -func (sc *SealedChunk) FreeReference() { +func (sc *SealedChunk) FreeReference(messageOnFree string) { sc.referenceCounter-- if sc.referenceCounter == 0 { + glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) mem.Free(sc.chunk.buf) } } -func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { +func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { return &UploadPipeline{ ChunkSize: chunkSize, writableChunks: make(map[LogicChunkIndex]*MemChunk), @@ -40,6 +43,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, writers: writers, activeWriterCond: sync.NewCond(&sync.Mutex{}), saveToStorageFn: saveToStorageFn, + filepath: filepath, } } @@ -77,7 +81,7 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { cw.sealedChunksLock.Unlock() if found { maxStop = readMemChunk(sealedChunk.chunk, p, off, logicChunkIndex, cw.ChunkSize) - sealedChunk.FreeReference() + sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex)) } // read from writable chunks last @@ -125,12 +129,12 @@ func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { atomic.AddInt32(&cw.activeWriterCount, 1) - glog.V(4).Infof("activeWriterCount %d ++> %d", cw.activeWriterCount-1, cw.activeWriterCount) + glog.V(4).Infof("%s activeWriterCount %d ++> %d", cw.filepath, cw.activeWriterCount-1, cw.activeWriterCount) cw.sealedChunksLock.Lock() if oldMemChunk, found := cw.sealedChunks[logicChunkIndex]; found { - oldMemChunk.FreeReference() + oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", cw.filepath, logicChunkIndex)) } sealedChunk := &SealedChunk{ chunk: memChunk, @@ -149,10 +153,10 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic cw.sealedChunksLock.Lock() defer cw.sealedChunksLock.Unlock() delete(cw.sealedChunks, logicChunkIndex) - sealedChunk.FreeReference() + sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex)) atomic.AddInt32(&cw.activeWriterCount, -1) - glog.V(4).Infof("activeWriterCount %d --> %d", cw.activeWriterCount+1, cw.activeWriterCount) + glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount) // Lock and Unlock are not required, // but it may signal multiple times during one wakeup, // and the waiting goroutine may miss some of them! From 047446d5ca3f23370d085ef6e9a89924760cf658 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 15:50:11 -0800 Subject: [PATCH 11/15] remove extra async execution --- weed/filesys/dirty_pages_mem_chunk.go | 46 ++++++++------------------- 1 file changed, 14 insertions(+), 32 deletions(-) diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index fc5669ec0..9313c4562 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -46,8 +46,7 @@ func (pages *MemoryChunkPages) FlushData() error { if !pages.hasWrites { return nil } - pages.saveChunkedFileToStorage() - pages.writeWaitGroup.Wait() + pages.uploadPipeline.FlushAll() if pages.lastErr != nil { return fmt.Errorf("flush data: %v", pages.lastErr) } @@ -65,41 +64,24 @@ func (pages *MemoryChunkPages) GetStorageOptions() (collection, replication stri return pages.collection, pages.replication } -func (pages *MemoryChunkPages) saveChunkedFileToStorage() { - - pages.uploadPipeline.FlushAll() - -} - func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - defer cleanupFn() - - chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk}) - pages.fh.entryViewCache = nil - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size) - pages.chunkAddLock.Unlock() - - } + defer cleanupFn() - if pages.fh.f.wfs.concurrentWriters != nil { - pages.fh.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() + chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset) + if err != nil { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err) + pages.lastErr = err + return } + chunk.Mtime = mtime + pages.collection, pages.replication = collection, replication + pages.chunkAddLock.Lock() + pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.entryViewCache = nil + glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size) + pages.chunkAddLock.Unlock() } From b068bc291dcf30ad73682eaf1884a7c4d9c7f560 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 20:07:01 -0800 Subject: [PATCH 12/15] testing with always resetting entry view cache --- weed/filesys/filehandle.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 517d8b3d4..619d9a226 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -138,7 +138,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { } var chunkResolveErr error - if fh.entryViewCache == nil { + if true || fh.entryViewCache == nil { fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64) if chunkResolveErr != nil { return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) From 0a3f95ca01a3667116b0a7420325bb523886ff75 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 20:41:00 -0800 Subject: [PATCH 13/15] more logs --- weed/filesys/filehandle.go | 2 +- weed/filesys/page_writer/upload_pipeline.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 619d9a226..8606ac0d2 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -163,7 +163,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { glog.Errorf("file handle read %s: %v", fileFullPath, err) } - // glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) + glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) return int64(totalRead), err } diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 83e5c8b60..9f459c11e 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -81,6 +81,7 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { cw.sealedChunksLock.Unlock() if found { maxStop = readMemChunk(sealedChunk.chunk, p, off, logicChunkIndex, cw.ChunkSize) + glog.V(4).Infof("%s read sealed memchunk [%d,%d)", cw.filepath, off, maxStop) sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex)) } @@ -91,7 +92,9 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { if !found { return } - maxStop = max(maxStop, readMemChunk(writableChunk, p, off, logicChunkIndex, cw.ChunkSize)) + writableMaxStop := readMemChunk(writableChunk, p, off, logicChunkIndex, cw.ChunkSize) + glog.V(4).Infof("%s read writable memchunk [%d,%d)", cw.filepath, off, writableMaxStop) + maxStop = max(maxStop, writableMaxStop) return } From f4ad63528a35b8602060eb4d741532c840e4b2c2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 22:24:44 -0800 Subject: [PATCH 14/15] wait for reading threads to complete before dropping sealed chunks --- weed/filesys/dirty_pages_continuous.go | 138 ----------- weed/filesys/dirty_pages_mem_chunk.go | 11 + weed/filesys/dirty_pages_stream.go | 106 --------- weed/filesys/dirty_pages_temp_file.go | 106 --------- weed/filesys/filehandle.go | 9 + weed/filesys/page_writer.go | 33 +-- .../page_writer/chunked_file_writer.go | 160 ------------- .../page_writer/chunked_file_writer_test.go | 60 ----- .../page_writer/chunked_stream_writer.go | 107 --------- .../page_writer/chunked_stream_writer_test.go | 33 --- .../page_writer/dirty_page_interval.go | 222 ------------------ .../page_writer/dirty_page_interval_test.go | 113 --------- weed/filesys/page_writer/dirty_pages.go | 2 + weed/filesys/page_writer/upload_pipeline.go | 90 +++++-- .../page_writer/upload_pipeline_test.go | 2 +- 15 files changed, 106 insertions(+), 1086 deletions(-) delete mode 100644 weed/filesys/dirty_pages_continuous.go delete mode 100644 weed/filesys/dirty_pages_stream.go delete mode 100644 weed/filesys/dirty_pages_temp_file.go delete mode 100644 weed/filesys/page_writer/chunked_file_writer.go delete mode 100644 weed/filesys/page_writer/chunked_file_writer_test.go delete mode 100644 weed/filesys/page_writer/chunked_stream_writer_test.go delete mode 100644 weed/filesys/page_writer/dirty_page_interval.go delete mode 100644 weed/filesys/page_writer/dirty_page_interval_test.go diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go deleted file mode 100644 index 2692c2950..000000000 --- a/weed/filesys/dirty_pages_continuous.go +++ /dev/null @@ -1,138 +0,0 @@ -package filesys - -import ( - "bytes" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "io" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -type ContinuousDirtyPages struct { - intervals *page_writer.ContinuousIntervals - f *File - writeWaitGroup sync.WaitGroup - chunkAddLock sync.Mutex - lastErr error - collection string - replication string -} - -func newContinuousDirtyPages(file *File) *ContinuousDirtyPages { - dirtyPages := &ContinuousDirtyPages{ - intervals: &page_writer.ContinuousIntervals{}, - f: file, - } - return dirtyPages -} - -func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { - - glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) - - if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { - // this is more than what buffer can hold. - pages.flushAndSave(offset, data) - } - - pages.intervals.AddInterval(data, offset) - - if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit { - pages.saveExistingLargestPageToStorage() - } - - return -} - -func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) { - - // flush existing - pages.saveExistingPagesToStorage() - - // flush the new page - pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))) - - return -} - -func (pages *ContinuousDirtyPages) FlushData() error { - - pages.saveExistingPagesToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - return nil -} - -func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() { - for pages.saveExistingLargestPageToStorage() { - } -} - -func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedData bool) { - - maxList := pages.intervals.RemoveLargestIntervalLinkedList() - if maxList == nil { - return false - } - - entry := pages.f.getEntry() - if entry == nil { - return false - } - - fileSize := int64(entry.Attributes.FileSize) - - chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) - if chunkSize == 0 { - return false - } - - pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) - - return true -} - -func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - - reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - defer pages.chunkAddLock.Unlock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } -} - -func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.intervals.ReadDataAt(data, startOffset) -} - -func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} -func (pages ContinuousDirtyPages) Destroy() { -} diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go index 9313c4562..e6548d7be 100644 --- a/weed/filesys/dirty_pages_mem_chunk.go +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -21,6 +21,10 @@ type MemoryChunkPages struct { hasWrites bool } +var ( + _ = page_writer.DirtyPages(&MemoryChunkPages{}) +) + func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { dirtyPages := &MemoryChunkPages{ @@ -88,3 +92,10 @@ func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, func (pages MemoryChunkPages) Destroy() { pages.uploadPipeline.Shutdown() } + +func (pages *MemoryChunkPages) LockForRead(startOffset, stopOffset int64) { + pages.uploadPipeline.LockForRead(startOffset, stopOffset) +} +func (pages *MemoryChunkPages) UnlockForRead(startOffset, stopOffset int64) { + pages.uploadPipeline.UnlockForRead(startOffset, stopOffset) +} diff --git a/weed/filesys/dirty_pages_stream.go b/weed/filesys/dirty_pages_stream.go deleted file mode 100644 index 586b73698..000000000 --- a/weed/filesys/dirty_pages_stream.go +++ /dev/null @@ -1,106 +0,0 @@ -package filesys - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "sync" - "time" -) - -type StreamDirtyPages struct { - f *File - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string - chunkedStream *page_writer.ChunkedStreamWriter -} - -func newStreamDirtyPages(file *File, chunkSize int64) *StreamDirtyPages { - - dirtyPages := &StreamDirtyPages{ - f: file, - chunkedStream: page_writer.NewChunkedStreamWriter(chunkSize), - } - - dirtyPages.chunkedStream.SetSaveToStorageFunction(dirtyPages.saveChunkedFileIntevalToStorage) - - return dirtyPages -} - -func (pages *StreamDirtyPages) AddPage(offset int64, data []byte) { - - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - - glog.V(4).Infof("%v stream AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) - if _, err := pages.chunkedStream.WriteAt(data, offset); err != nil { - pages.lastErr = err - } - - return -} - -func (pages *StreamDirtyPages) FlushData() error { - pages.saveChunkedFileToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - pages.chunkedStream.Reset() - return nil -} - -func (pages *StreamDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.chunkedStream.ReadDataAt(data, startOffset) -} - -func (pages *StreamDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *StreamDirtyPages) saveChunkedFileToStorage() { - - pages.chunkedStream.FlushAll() - -} - -func (pages *StreamDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - defer cleanupFn() - - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) - pages.chunkAddLock.Unlock() - - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } - -} - -func (pages StreamDirtyPages) Destroy() { - pages.chunkedStream.Reset() -} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go deleted file mode 100644 index e0c3a91de..000000000 --- a/weed/filesys/dirty_pages_temp_file.go +++ /dev/null @@ -1,106 +0,0 @@ -package filesys - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "os" - "sync" - "time" -) - -type TempFileDirtyPages struct { - f *File - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string - chunkedFile *page_writer.ChunkedFileWriter -} - -func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages { - - tempFile := &TempFileDirtyPages{ - f: file, - chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize), - } - - return tempFile -} - -func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { - - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - - glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) - if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil { - pages.lastErr = err - } - - return -} - -func (pages *TempFileDirtyPages) FlushData() error { - pages.saveChunkedFileToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - pages.chunkedFile.Reset() - return nil -} - -func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.chunkedFile.ReadDataAt(data, startOffset) -} - -func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *TempFileDirtyPages) saveChunkedFileToStorage() { - - pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) { - reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) - pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size()) - }) - -} - -func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - defer pages.chunkAddLock.Unlock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } - -} - -func (pages TempFileDirtyPages) Destroy() { - pages.chunkedFile.Reset() -} diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 8606ac0d2..d3b37a5b9 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -75,6 +75,8 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus buff = make([]byte, req.Size) } + fh.lockForRead(req.Offset, len(buff)) + defer fh.unlockForRead(req.Offset, len(buff)) totalRead, err := fh.readFromChunks(buff, req.Offset) if err == nil || err == io.EOF { maxStop := fh.readFromDirtyPages(buff, req.Offset) @@ -101,6 +103,13 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus return err } +func (fh *FileHandle) lockForRead(startOffset int64, size int) { + fh.dirtyPages.LockForRead(startOffset, startOffset+int64(size)) +} +func (fh *FileHandle) unlockForRead(startOffset int64, size int) { + fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size)) +} + func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) return diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index 90ef7d7c4..c6d08348d 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -13,7 +13,6 @@ type PageWriter struct { writerPattern *WriterPattern randomWriter page_writer.DirtyPages - streamWriter page_writer.DirtyPages } var ( @@ -44,22 +43,11 @@ func (pw *PageWriter) AddPage(offset int64, data []byte) { } func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { - if chunkIndex > 0 { - if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { - pw.streamWriter.AddPage(offset, data) - return - } - } pw.randomWriter.AddPage(offset, data) } func (pw *PageWriter) FlushData() error { pw.writerPattern.Reset() - if pw.streamWriter != nil { - if err := pw.streamWriter.FlushData(); err != nil { - return err - } - } return pw.randomWriter.FlushData() } @@ -70,12 +58,7 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) for i := chunkIndex; len(data) > 0; i++ { readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - if pw.streamWriter != nil { - m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) - maxStop = max(maxStop, m1) - } - m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) - maxStop = max(maxStop, m2) + maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) offset += readSize data = data[readSize:] @@ -85,16 +68,18 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) } func (pw *PageWriter) GetStorageOptions() (collection, replication string) { - if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { - return pw.streamWriter.GetStorageOptions() - } return pw.randomWriter.GetStorageOptions() } +func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) { + pw.randomWriter.LockForRead(startOffset, stopOffset) +} + +func (pw *PageWriter) UnlockForRead(startOffset, stopOffset int64) { + pw.randomWriter.UnlockForRead(startOffset, stopOffset) +} + func (pw *PageWriter) Destroy() { - if pw.streamWriter != nil { - pw.streamWriter.Destroy() - } pw.randomWriter.Destroy() } diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go deleted file mode 100644 index b0e1c2844..000000000 --- a/weed/filesys/page_writer/chunked_file_writer.go +++ /dev/null @@ -1,160 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "io" - "os" - "sync" -) - -type LogicChunkIndex int -type ActualChunkIndex int - -// ChunkedFileWriter assumes the write requests will come in within chunks -type ChunkedFileWriter struct { - dir string - file *os.File - logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex - chunkUsages []*ChunkWrittenIntervalList - ChunkSize int64 - sync.Mutex -} - -var _ = io.WriterAt(&ChunkedFileWriter{}) - -func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter { - return &ChunkedFileWriter{ - dir: dir, - file: nil, - logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), - ChunkSize: chunkSize, - } -} - -func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) { - cw.Lock() - defer cw.Unlock() - - if cw.file == nil { - cw.file, err = os.CreateTemp(cw.dir, "") - if err != nil { - glog.Errorf("create temp file: %v", err) - return - } - } - - actualOffset, chunkUsage := cw.toActualWriteOffset(off) - n, err = cw.file.WriteAt(p, actualOffset) - if err == nil { - startOffset := off % cw.ChunkSize - chunkUsage.MarkWritten(startOffset, startOffset+int64(n)) - } - return -} - -func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { - cw.Lock() - defer cw.Unlock() - - if cw.file == nil { - return - } - - logicChunkIndex := off / cw.ChunkSize - actualChunkIndex, chunkUsage := cw.toActualReadOffset(off) - if chunkUsage != nil { - for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset) - if logicStart < logicStop { - actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize - _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart) - if err != nil { - glog.Errorf("reading temp file: %v", err) - break - } - maxStop = max(maxStop, logicStop) - } - } - } - return -} - -func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) { - logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) - offsetRemainder := logicOffset % cw.ChunkSize - existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if found { - return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex] - } - cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages)) - chunkUsage = newChunkWrittenIntervalList() - cw.chunkUsages = append(cw.chunkUsages, chunkUsage) - return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage -} - -func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) { - logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) - existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if found { - return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex] - } - return 0, nil -} - -func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) { - for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { - chunkUsage := cw.chunkUsages[actualChunkIndex] - for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - process(cw.file, logicChunkIndex, t) - } - } -} - -// Reset releases used resources -func (cw *ChunkedFileWriter) Reset() { - if cw.file != nil { - cw.file.Close() - os.Remove(cw.file.Name()) - cw.file = nil - } - cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex) - cw.chunkUsages = cw.chunkUsages[:0] -} - -type FileIntervalReader struct { - f *os.File - startOffset int64 - stopOffset int64 - position int64 -} - -var _ = io.Reader(&FileIntervalReader{}) - -func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader { - actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if !found { - // this should never happen - return nil - } - return &FileIntervalReader{ - f: cw.file, - startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset, - stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset, - position: 0, - } -} - -func (fr *FileIntervalReader) Read(p []byte) (n int, err error) { - readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position)) - n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position) - if err == nil || err == io.EOF { - fr.position += int64(n) - if fr.stopOffset-fr.startOffset-fr.position == 0 { - // return a tiny bit faster - err = io.EOF - return - } - } - return -} diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go deleted file mode 100644 index 244ed62c3..000000000 --- a/weed/filesys/page_writer/chunked_file_writer_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package page_writer - -import ( - "github.com/stretchr/testify/assert" - "os" - "testing" -) - -func TestChunkedFileWriter_toActualOffset(t *testing.T) { - cw := NewChunkedFileWriter("", 16) - - writeToFile(cw, 50, 60) - writeToFile(cw, 60, 64) - - writeToFile(cw, 32, 40) - writeToFile(cw, 42, 48) - - writeToFile(cw, 48, 50) - - assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered") - assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals") - -} - -func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) { - - _, chunkUsage := cw.toActualWriteOffset(startOffset) - - // skip doing actual writing - - innerOffset := startOffset % cw.ChunkSize - chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset) - -} - -func TestWriteChunkedFile(t *testing.T) { - x := NewChunkedFileWriter(os.TempDir(), 20) - defer x.Reset() - y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Reset() - - batchSize := 4 - buf := make([]byte, batchSize) - for i := 0; i < 256; i++ { - for x := 0; x < batchSize; x++ { - buf[x] = byte(i) - } - x.WriteAt(buf, int64(i*batchSize)) - y.WriteAt(buf, int64((255-i)*batchSize)) - } - - a := make([]byte, 1) - b := make([]byte, 1) - for i := 0; i < 256*batchSize; i++ { - x.ReadDataAt(a, int64(i)) - y.ReadDataAt(b, int64(256*batchSize-1-i)) - assert.Equal(t, a[0], b[0], "same read") - } - -} diff --git a/weed/filesys/page_writer/chunked_stream_writer.go b/weed/filesys/page_writer/chunked_stream_writer.go index b4314e78f..2f869ddb8 100644 --- a/weed/filesys/page_writer/chunked_stream_writer.go +++ b/weed/filesys/page_writer/chunked_stream_writer.go @@ -1,119 +1,12 @@ package page_writer import ( - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/mem" "io" - "sync" - "sync/atomic" ) type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) -// ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode -type ChunkedStreamWriter struct { - activeChunks map[LogicChunkIndex]*MemChunk - activeChunksLock sync.Mutex - ChunkSize int64 - saveToStorageFn SaveToStorageFunc - sync.Mutex -} - type MemChunk struct { buf []byte usage *ChunkWrittenIntervalList } - -var _ = io.WriterAt(&ChunkedStreamWriter{}) - -func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter { - return &ChunkedStreamWriter{ - ChunkSize: chunkSize, - activeChunks: make(map[LogicChunkIndex]*MemChunk), - } -} - -func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) { - cw.saveToStorageFn = saveToStorageFn -} - -func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) { - cw.Lock() - defer cw.Unlock() - - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) - offsetRemainder := off % cw.ChunkSize - - memChunk, found := cw.activeChunks[logicChunkIndex] - if !found { - memChunk = &MemChunk{ - buf: mem.Allocate(int(cw.ChunkSize)), - usage: newChunkWrittenIntervalList(), - } - cw.activeChunks[logicChunkIndex] = memChunk - } - n = copy(memChunk.buf[offsetRemainder:], p) - memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n)) - if memChunk.usage.IsComplete(cw.ChunkSize) { - if cw.saveToStorageFn != nil { - cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) - } - } - - return -} - -func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { - cw.Lock() - defer cw.Unlock() - - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) - memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize - memChunk, found := cw.activeChunks[logicChunkIndex] - if !found { - return - } - - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) - if logicStart < logicStop { - copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) - maxStop = max(maxStop, logicStop) - } - } - return -} - -func (cw *ChunkedStreamWriter) FlushAll() { - cw.Lock() - defer cw.Unlock() - for logicChunkIndex, memChunk := range cw.activeChunks { - if cw.saveToStorageFn != nil { - cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) - } - } -} - -func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { - var referenceCounter = int32(memChunk.usage.size()) - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) - cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { - atomic.AddInt32(&referenceCounter, -1) - if atomic.LoadInt32(&referenceCounter) == 0 { - mem.Free(memChunk.buf) - } - }) - } -} - -// Reset releases used resources -func (cw *ChunkedStreamWriter) Reset() { - for t, memChunk := range cw.activeChunks { - mem.Free(memChunk.buf) - delete(cw.activeChunks, t) - } -} diff --git a/weed/filesys/page_writer/chunked_stream_writer_test.go b/weed/filesys/page_writer/chunked_stream_writer_test.go deleted file mode 100644 index 3c55a91ad..000000000 --- a/weed/filesys/page_writer/chunked_stream_writer_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package page_writer - -import ( - "github.com/stretchr/testify/assert" - "os" - "testing" -) - -func TestWriteChunkedStream(t *testing.T) { - x := NewChunkedStreamWriter(20) - defer x.Reset() - y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Reset() - - batchSize := 4 - buf := make([]byte, batchSize) - for i := 0; i < 256; i++ { - for x := 0; x < batchSize; x++ { - buf[x] = byte(i) - } - x.WriteAt(buf, int64(i*batchSize)) - y.WriteAt(buf, int64((255-i)*batchSize)) - } - - a := make([]byte, 1) - b := make([]byte, 1) - for i := 0; i < 256*batchSize; i++ { - x.ReadDataAt(a, int64(i)) - y.ReadDataAt(b, int64(256*batchSize-1-i)) - assert.Equal(t, a[0], b[0], "same read") - } - -} diff --git a/weed/filesys/page_writer/dirty_page_interval.go b/weed/filesys/page_writer/dirty_page_interval.go deleted file mode 100644 index 6d73b8cd7..000000000 --- a/weed/filesys/page_writer/dirty_page_interval.go +++ /dev/null @@ -1,222 +0,0 @@ -package page_writer - -import ( - "io" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -type IntervalNode struct { - Data []byte - Offset int64 - Size int64 - Next *IntervalNode -} - -type IntervalLinkedList struct { - Head *IntervalNode - Tail *IntervalNode -} - -type ContinuousIntervals struct { - lists []*IntervalLinkedList -} - -func (list *IntervalLinkedList) Offset() int64 { - return list.Head.Offset -} -func (list *IntervalLinkedList) Size() int64 { - return list.Tail.Offset + list.Tail.Size - list.Head.Offset -} -func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) { - // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size) - list.Tail.Next = node - list.Tail = node -} -func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) { - // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size) - node.Next = list.Head - list.Head = node -} - -func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) { - t := list.Head - for { - - nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) - if nodeStart < nodeStop { - // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop) - copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset]) - } - - if t.Next == nil { - break - } - t = t.Next - } -} - -func (c *ContinuousIntervals) TotalSize() (total int64) { - for _, list := range c.lists { - total += list.Size() - } - return -} - -func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { - var nodes []*IntervalNode - for t := list.Head; t != nil; t = t.Next { - nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) - if nodeStart >= nodeStop { - // skip non overlapping IntervalNode - continue - } - nodes = append(nodes, &IntervalNode{ - Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset], - Offset: nodeStart, - Size: nodeStop - nodeStart, - Next: nil, - }) - } - for i := 1; i < len(nodes); i++ { - nodes[i-1].Next = nodes[i] - } - return &IntervalLinkedList{ - Head: nodes[0], - Tail: nodes[len(nodes)-1], - } -} - -func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { - - interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))} - - // append to the tail and return - if len(c.lists) == 1 { - lastSpan := c.lists[0] - if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset { - lastSpan.addNodeToTail(interval) - return - } - } - - var newLists []*IntervalLinkedList - for _, list := range c.lists { - // if list is to the left of new interval, add to the new list - if list.Tail.Offset+list.Tail.Size <= interval.Offset { - newLists = append(newLists, list) - } - // if list is to the right of new interval, add to the new list - if interval.Offset+interval.Size <= list.Head.Offset { - newLists = append(newLists, list) - } - // if new interval overwrite the right part of the list - if list.Head.Offset < interval.Offset && interval.Offset < list.Tail.Offset+list.Tail.Size { - // create a new list of the left part of existing list - newLists = append(newLists, subList(list, list.Offset(), interval.Offset)) - } - // if new interval overwrite the left part of the list - if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size < list.Tail.Offset+list.Tail.Size { - // create a new list of the right part of existing list - newLists = append(newLists, subList(list, interval.Offset+interval.Size, list.Tail.Offset+list.Tail.Size)) - } - // skip anything that is fully overwritten by the new interval - } - - c.lists = newLists - // add the new interval to the lists, connecting neighbor lists - var prevList, nextList *IntervalLinkedList - - for _, list := range c.lists { - if list.Head.Offset == interval.Offset+interval.Size { - nextList = list - break - } - } - - for _, list := range c.lists { - if list.Head.Offset+list.Size() == offset { - list.addNodeToTail(interval) - prevList = list - break - } - } - - if prevList != nil && nextList != nil { - // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size) - prevList.Tail.Next = nextList.Head - prevList.Tail = nextList.Tail - c.removeList(nextList) - } else if nextList != nil { - // add to head was not done when checking - nextList.addNodeToHead(interval) - } - if prevList == nil && nextList == nil { - c.lists = append(c.lists, &IntervalLinkedList{ - Head: interval, - Tail: interval, - }) - } - - return -} - -func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList { - var maxSize int64 - maxIndex := -1 - for k, list := range c.lists { - if maxSize <= list.Size() { - maxSize = list.Size() - maxIndex = k - } - } - if maxSize <= 0 { - return nil - } - - t := c.lists[maxIndex] - c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...) - return t - -} - -func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) { - index := -1 - for k, list := range c.lists { - if list.Offset() == target.Offset() { - index = k - } - } - if index < 0 { - return - } - - c.lists = append(c.lists[0:index], c.lists[index+1:]...) - -} - -func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { - for _, list := range c.lists { - start := max(startOffset, list.Offset()) - stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) - if start < stop { - list.ReadData(data[start-startOffset:], start, stop) - maxStop = max(maxStop, stop) - } - } - return -} - -func (l *IntervalLinkedList) ToReader() io.Reader { - var readers []io.Reader - t := l.Head - readers = append(readers, util.NewBytesReader(t.Data)) - for t.Next != nil { - t = t.Next - readers = append(readers, util.NewBytesReader(t.Data)) - } - if len(readers) == 1 { - return readers[0] - } - return io.MultiReader(readers...) -} diff --git a/weed/filesys/page_writer/dirty_page_interval_test.go b/weed/filesys/page_writer/dirty_page_interval_test.go deleted file mode 100644 index 2a2a1df4d..000000000 --- a/weed/filesys/page_writer/dirty_page_interval_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package page_writer - -import ( - "bytes" - "math/rand" - "testing" -) - -func TestContinuousIntervals_AddIntervalAppend(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, 25, 25 - c.AddInterval(getBytes(25, 3), 0) - // _, _, 23, 23, 23, 23 - c.AddInterval(getBytes(23, 4), 2) - - expectedData(t, c, 0, 25, 25, 23, 23, 23, 23) - -} - -func TestContinuousIntervals_AddIntervalInnerOverwrite(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, 25, 25, 25, 25 - c.AddInterval(getBytes(25, 5), 0) - // _, _, 23, 23 - c.AddInterval(getBytes(23, 2), 2) - - expectedData(t, c, 0, 25, 25, 23, 23, 25) - -} - -func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) { - - c := &ContinuousIntervals{} - - // 1, - c.AddInterval(getBytes(1, 1), 0) - // _, 2, - c.AddInterval(getBytes(2, 1), 1) - // _, _, 3, 3, 3 - c.AddInterval(getBytes(3, 3), 2) - // _, _, _, 4, 4, 4 - c.AddInterval(getBytes(4, 3), 3) - - expectedData(t, c, 0, 1, 2, 3, 4, 4, 4) - -} - -func TestContinuousIntervals_RealCase1(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, - c.AddInterval(getBytes(25, 1), 0) - // _, _, _, _, 23, 23 - c.AddInterval(getBytes(23, 2), 4) - // _, _, _, 24, 24, 24, 24 - c.AddInterval(getBytes(24, 4), 3) - - // _, 22, 22 - c.AddInterval(getBytes(22, 2), 1) - - expectedData(t, c, 0, 25, 22, 22, 24, 24, 24, 24) - -} - -func TestRandomWrites(t *testing.T) { - - c := &ContinuousIntervals{} - - data := make([]byte, 1024) - - for i := 0; i < 1024; i++ { - - start, stop := rand.Intn(len(data)), rand.Intn(len(data)) - if start > stop { - start, stop = stop, start - } - - rand.Read(data[start : stop+1]) - - c.AddInterval(data[start:stop+1], int64(start)) - - expectedData(t, c, 0, data...) - - } - -} - -func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) { - start, stop := int64(offset), int64(offset+len(data)) - for _, list := range c.lists { - nodeStart, nodeStop := max(start, list.Head.Offset), min(stop, list.Head.Offset+list.Size()) - if nodeStart < nodeStop { - buf := make([]byte, nodeStop-nodeStart) - list.ReadData(buf, nodeStart, nodeStop) - if bytes.Compare(buf, data[nodeStart-start:nodeStop-start]) != 0 { - t.Errorf("expected %v actual %v", data[nodeStart-start:nodeStop-start], buf) - } - } - } -} - -func getBytes(content byte, length int) []byte { - data := make([]byte, length) - for i := 0; i < length; i++ { - data[i] = content - } - return data -} diff --git a/weed/filesys/page_writer/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go index 955627d67..25b747fad 100644 --- a/weed/filesys/page_writer/dirty_pages.go +++ b/weed/filesys/page_writer/dirty_pages.go @@ -6,6 +6,8 @@ type DirtyPages interface { ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) GetStorageOptions() (collection, replication string) Destroy() + LockForRead(startOffset, stopOffset int64) + UnlockForRead(startOffset, stopOffset int64) } func max(x, y int64) int64 { diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 9f459c11e..13ee3caec 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -7,19 +7,24 @@ import ( "github.com/chrislusf/seaweedfs/weed/util/mem" "sync" "sync/atomic" + "time" ) +type LogicChunkIndex int + type UploadPipeline struct { - writableChunks map[LogicChunkIndex]*MemChunk - writableChunksLock sync.Mutex - sealedChunks map[LogicChunkIndex]*SealedChunk - sealedChunksLock sync.Mutex - ChunkSize int64 - writers *util.LimitedConcurrentExecutor - activeWriterCond *sync.Cond - activeWriterCount int32 - saveToStorageFn SaveToStorageFunc - filepath util.FullPath + filepath util.FullPath + ChunkSize int64 + writers *util.LimitedConcurrentExecutor + writableChunks map[LogicChunkIndex]*MemChunk + writableChunksLock sync.Mutex + sealedChunks map[LogicChunkIndex]*SealedChunk + sealedChunksLock sync.Mutex + activeWriterCond *sync.Cond + activeWriterCount int32 + activeReadChunks map[LogicChunkIndex]int + activeReadChunksLock sync.Mutex + saveToStorageFn SaveToStorageFunc } type SealedChunk struct { @@ -44,6 +49,7 @@ func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentEx activeWriterCond: sync.NewCond(&sync.Mutex{}), saveToStorageFn: saveToStorageFn, filepath: filepath, + activeReadChunks: make(map[LogicChunkIndex]int), } } @@ -110,6 +116,51 @@ func (cw *UploadPipeline) FlushAll() { cw.waitForCurrentWritersToComplete() } +func (cw *UploadPipeline) LockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) + if stopOffset%cw.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + cw.activeReadChunksLock.Lock() + defer cw.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := cw.activeReadChunks[i]; found { + cw.activeReadChunks[i] = count + 1 + } else { + cw.activeReadChunks[i] = 1 + } + } +} + +func (cw *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) + if stopOffset%cw.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + cw.activeReadChunksLock.Lock() + defer cw.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := cw.activeReadChunks[i]; found { + if count == 1 { + delete(cw.activeReadChunks, i) + } else { + cw.activeReadChunks[i] = count - 1 + } + } + } +} + +func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { + cw.activeReadChunksLock.Lock() + defer cw.activeReadChunksLock.Unlock() + if count, found := cw.activeReadChunks[logicChunkIndex]; found { + return count > 0 + } + return false +} + func (cw *UploadPipeline) waitForCurrentWritersToComplete() { cw.activeWriterCond.L.Lock() t := int32(100) @@ -152,12 +203,7 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic // first add to the file chunks cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex) - // then remove from sealed chunks - cw.sealedChunksLock.Lock() - defer cw.sealedChunksLock.Unlock() - delete(cw.sealedChunks, logicChunkIndex) - sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex)) - + // notify waiting process atomic.AddInt32(&cw.activeWriterCount, -1) glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount) // Lock and Unlock are not required, @@ -166,6 +212,18 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic cw.activeWriterCond.L.Lock() cw.activeWriterCond.Broadcast() cw.activeWriterCond.L.Unlock() + + // wait for readers + for cw.IsLocked(logicChunkIndex) { + time.Sleep(59 * time.Millisecond) + } + + // then remove from sealed chunks + cw.sealedChunksLock.Lock() + defer cw.sealedChunksLock.Unlock() + delete(cw.sealedChunks, logicChunkIndex) + sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex)) + }) } diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go index 81191868f..d17948251 100644 --- a/weed/filesys/page_writer/upload_pipeline_test.go +++ b/weed/filesys/page_writer/upload_pipeline_test.go @@ -7,7 +7,7 @@ import ( func TestUploadPipeline(t *testing.T) { - uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil) + uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil) writeRange(uploadPipeline, 0, 131072) writeRange(uploadPipeline, 131072, 262144) From b2acfd75e9155d49f32eea8d6247d0c8661ec998 Mon Sep 17 00:00:00 2001 From: chrislu Date: Mon, 17 Jan 2022 23:02:30 -0800 Subject: [PATCH 15/15] ensure entry view cache is invalidated --- weed/filesys/file.go | 8 ++++++++ weed/filesys/filehandle.go | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/weed/filesys/file.go b/weed/filesys/file.go index e971aa2e0..48a024f20 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -140,7 +140,15 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } } } + // set the new chunks and reset entry cache entry.Chunks = chunks + file.wfs.handlesLock.Lock() + existingHandle, found := file.wfs.handles[file.Id()] + file.wfs.handlesLock.Unlock() + if found { + existingHandle.entryViewCache = nil + } + } entry.Attributes.Mtime = time.Now().Unix() entry.Attributes.FileSize = req.Size diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index d3b37a5b9..86ea20806 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -147,7 +147,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { } var chunkResolveErr error - if true || fh.entryViewCache == nil { + if fh.entryViewCache == nil { fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64) if chunkResolveErr != nil { return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)