diff --git a/weed/command/mount.go b/weed/command/mount.go index aec5fcc3c..e54f1f07f 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -26,6 +26,8 @@ type MountOptions struct { uidMap *string gidMap *string readOnly *bool + debug *bool + debugPort *int } var ( @@ -57,6 +59,8 @@ func init() { mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated :") mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated :") mountOptions.readOnly = cmdMount.Flag.Bool("readOnly", false, "read only") + mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:/debug/pprof/goroutine?debug=2") + mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index aebd04170..8f62b4ec9 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -6,6 +6,7 @@ package command import ( "context" "fmt" + "net/http" "os" "os/user" "path" @@ -34,6 +35,10 @@ import ( func runMount(cmd *Command, args []string) bool { + if *mountOptions.debug { + go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil) + } + grace.SetupProfiling(*mountCpuProfile, *mountMemProfile) if *mountReadRetryTime < time.Second { *mountReadRetryTime = time.Second diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go deleted file mode 100644 index 2692c2950..000000000 --- a/weed/filesys/dirty_pages_continuous.go +++ /dev/null @@ -1,138 +0,0 @@ -package filesys - -import ( - "bytes" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "io" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" -) - -type ContinuousDirtyPages struct { - intervals *page_writer.ContinuousIntervals - f *File - writeWaitGroup sync.WaitGroup - chunkAddLock sync.Mutex - lastErr error - collection string - replication string -} - -func newContinuousDirtyPages(file *File) *ContinuousDirtyPages { - dirtyPages := &ContinuousDirtyPages{ - intervals: &page_writer.ContinuousIntervals{}, - f: file, - } - return dirtyPages -} - -func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { - - glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) - - if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { - // this is more than what buffer can hold. - pages.flushAndSave(offset, data) - } - - pages.intervals.AddInterval(data, offset) - - if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit { - pages.saveExistingLargestPageToStorage() - } - - return -} - -func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) { - - // flush existing - pages.saveExistingPagesToStorage() - - // flush the new page - pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))) - - return -} - -func (pages *ContinuousDirtyPages) FlushData() error { - - pages.saveExistingPagesToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - return nil -} - -func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() { - for pages.saveExistingLargestPageToStorage() { - } -} - -func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedData bool) { - - maxList := pages.intervals.RemoveLargestIntervalLinkedList() - if maxList == nil { - return false - } - - entry := pages.f.getEntry() - if entry == nil { - return false - } - - fileSize := int64(entry.Attributes.FileSize) - - chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) - if chunkSize == 0 { - return false - } - - pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) - - return true -} - -func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - - reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - defer pages.chunkAddLock.Unlock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } -} - -func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.intervals.ReadDataAt(data, startOffset) -} - -func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} -func (pages ContinuousDirtyPages) Destroy() { -} diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go new file mode 100644 index 000000000..e6548d7be --- /dev/null +++ b/weed/filesys/dirty_pages_mem_chunk.go @@ -0,0 +1,101 @@ +package filesys + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" + "sync" + "time" +) + +type MemoryChunkPages struct { + fh *FileHandle + writeWaitGroup sync.WaitGroup + chunkAddLock sync.Mutex + lastErr error + collection string + replication string + uploadPipeline *page_writer.UploadPipeline + hasWrites bool +} + +var ( + _ = page_writer.DirtyPages(&MemoryChunkPages{}) +) + +func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *MemoryChunkPages { + + dirtyPages := &MemoryChunkPages{ + fh: fh, + } + + dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.fullpath(), + fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage) + + return dirtyPages +} + +func (pages *MemoryChunkPages) AddPage(offset int64, data []byte) { + pages.hasWrites = true + + glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.f.fullpath(), offset, offset+int64(len(data))) + pages.uploadPipeline.SaveDataAt(data, offset) + + return +} + +func (pages *MemoryChunkPages) FlushData() error { + if !pages.hasWrites { + return nil + } + pages.uploadPipeline.FlushAll() + if pages.lastErr != nil { + return fmt.Errorf("flush data: %v", pages.lastErr) + } + return nil +} + +func (pages *MemoryChunkPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + if !pages.hasWrites { + return + } + return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) +} + +func (pages *MemoryChunkPages) GetStorageOptions() (collection, replication string) { + return pages.collection, pages.replication +} + +func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { + + mtime := time.Now().UnixNano() + defer cleanupFn() + + chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset) + if err != nil { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err) + pages.lastErr = err + return + } + chunk.Mtime = mtime + pages.collection, pages.replication = collection, replication + pages.chunkAddLock.Lock() + pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.entryViewCache = nil + glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size) + pages.chunkAddLock.Unlock() + +} + +func (pages MemoryChunkPages) Destroy() { + pages.uploadPipeline.Shutdown() +} + +func (pages *MemoryChunkPages) LockForRead(startOffset, stopOffset int64) { + pages.uploadPipeline.LockForRead(startOffset, stopOffset) +} +func (pages *MemoryChunkPages) UnlockForRead(startOffset, stopOffset int64) { + pages.uploadPipeline.UnlockForRead(startOffset, stopOffset) +} diff --git a/weed/filesys/dirty_pages_stream.go b/weed/filesys/dirty_pages_stream.go deleted file mode 100644 index 586b73698..000000000 --- a/weed/filesys/dirty_pages_stream.go +++ /dev/null @@ -1,106 +0,0 @@ -package filesys - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "sync" - "time" -) - -type StreamDirtyPages struct { - f *File - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string - chunkedStream *page_writer.ChunkedStreamWriter -} - -func newStreamDirtyPages(file *File, chunkSize int64) *StreamDirtyPages { - - dirtyPages := &StreamDirtyPages{ - f: file, - chunkedStream: page_writer.NewChunkedStreamWriter(chunkSize), - } - - dirtyPages.chunkedStream.SetSaveToStorageFunction(dirtyPages.saveChunkedFileIntevalToStorage) - - return dirtyPages -} - -func (pages *StreamDirtyPages) AddPage(offset int64, data []byte) { - - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - - glog.V(4).Infof("%v stream AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) - if _, err := pages.chunkedStream.WriteAt(data, offset); err != nil { - pages.lastErr = err - } - - return -} - -func (pages *StreamDirtyPages) FlushData() error { - pages.saveChunkedFileToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - pages.chunkedStream.Reset() - return nil -} - -func (pages *StreamDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.chunkedStream.ReadDataAt(data, startOffset) -} - -func (pages *StreamDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *StreamDirtyPages) saveChunkedFileToStorage() { - - pages.chunkedStream.FlushAll() - -} - -func (pages *StreamDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - defer cleanupFn() - - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) - pages.chunkAddLock.Unlock() - - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } - -} - -func (pages StreamDirtyPages) Destroy() { - pages.chunkedStream.Reset() -} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go deleted file mode 100644 index e0c3a91de..000000000 --- a/weed/filesys/dirty_pages_temp_file.go +++ /dev/null @@ -1,106 +0,0 @@ -package filesys - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "io" - "os" - "sync" - "time" -) - -type TempFileDirtyPages struct { - f *File - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string - chunkedFile *page_writer.ChunkedFileWriter -} - -func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages { - - tempFile := &TempFileDirtyPages{ - f: file, - chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize), - } - - return tempFile -} - -func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { - - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - - glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data))) - if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil { - pages.lastErr = err - } - - return -} - -func (pages *TempFileDirtyPages) FlushData() error { - pages.saveChunkedFileToStorage() - pages.writeWaitGroup.Wait() - if pages.lastErr != nil { - return fmt.Errorf("flush data: %v", pages.lastErr) - } - pages.chunkedFile.Reset() - return nil -} - -func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.chunkedFile.ReadDataAt(data, startOffset) -} - -func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication -} - -func (pages *TempFileDirtyPages) saveChunkedFileToStorage() { - - pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) { - reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) - pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size()) - }) - -} - -func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) { - - mtime := time.Now().UnixNano() - pages.writeWaitGroup.Add(1) - writer := func() { - defer pages.writeWaitGroup.Done() - - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) - if err != nil { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) - pages.lastErr = err - return - } - chunk.Mtime = mtime - pages.collection, pages.replication = collection, replication - pages.chunkAddLock.Lock() - defer pages.chunkAddLock.Unlock() - pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size) - } - - if pages.f.wfs.concurrentWriters != nil { - pages.f.wfs.concurrentWriters.Execute(writer) - } else { - go writer() - } - -} - -func (pages TempFileDirtyPages) Destroy() { - pages.chunkedFile.Reset() -} diff --git a/weed/filesys/file.go b/weed/filesys/file.go index e971aa2e0..48a024f20 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -140,7 +140,15 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } } } + // set the new chunks and reset entry cache entry.Chunks = chunks + file.wfs.handlesLock.Lock() + existingHandle, found := file.wfs.handles[file.Id()] + file.wfs.handlesLock.Unlock() + if found { + existingHandle.entryViewCache = nil + } + } entry.Attributes.Mtime = time.Now().Unix() entry.Attributes.FileSize = req.Size diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 738423b6a..86ea20806 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -28,22 +28,20 @@ type FileHandle struct { sync.Mutex f *File - RequestId fuse.RequestID // unique ID for request - NodeId fuse.NodeID // file or directory the request is about - Uid uint32 // user ID of process making request - Gid uint32 // group ID of process making request - writeOnly bool + NodeId fuse.NodeID // file or directory the request is about + Uid uint32 // user ID of process making request + Gid uint32 // group ID of process making request isDeleted bool } func newFileHandle(file *File, uid, gid uint32) *FileHandle { fh := &FileHandle{ - f: file, - // dirtyPages: newContinuousDirtyPages(file, writeOnly), - dirtyPages: newPageWriter(file, file.wfs.option.ChunkSizeLimit), - Uid: uid, - Gid: gid, + f: file, + Uid: uid, + Gid: gid, } + // dirtyPages: newContinuousDirtyPages(file, writeOnly), + fh.dirtyPages = newPageWriter(fh, file.wfs.option.ChunkSizeLimit) entry := fh.f.getEntry() if entry != nil { entry.Attributes.FileSize = filer.FileSize(entry) @@ -77,6 +75,8 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus buff = make([]byte, req.Size) } + fh.lockForRead(req.Offset, len(buff)) + defer fh.unlockForRead(req.Offset, len(buff)) totalRead, err := fh.readFromChunks(buff, req.Offset) if err == nil || err == io.EOF { maxStop := fh.readFromDirtyPages(buff, req.Offset) @@ -103,6 +103,13 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus return err } +func (fh *FileHandle) lockForRead(startOffset int64, size int) { + fh.dirtyPages.LockForRead(startOffset, startOffset+int64(size)) +} +func (fh *FileHandle) unlockForRead(startOffset int64, size int) { + fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size)) +} + func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) return @@ -151,6 +158,10 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { reader := fh.reader if reader == nil { chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64) + glog.V(4).Infof("file handle read %s [%d,%d) from %+v", fileFullPath, offset, offset+int64(len(buff)), chunkViews) + for _, chunkView := range chunkViews { + glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId) + } reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) } fh.reader = reader @@ -161,7 +172,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { glog.Errorf("file handle read %s: %v", fileFullPath, err) } - // glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) + glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) return int64(totalRead), err } diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index 5c06bc44d..c6d08348d 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -6,36 +6,32 @@ import ( ) type PageWriter struct { - f *File + fh *FileHandle collection string replication string chunkSize int64 writerPattern *WriterPattern randomWriter page_writer.DirtyPages - streamWriter page_writer.DirtyPages } var ( _ = page_writer.DirtyPages(&PageWriter{}) ) -func newPageWriter(file *File, chunkSize int64) *PageWriter { +func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { pw := &PageWriter{ - f: file, + fh: fh, chunkSize: chunkSize, writerPattern: NewWriterPattern(chunkSize), - randomWriter: newTempFileDirtyPages(file, chunkSize), - streamWriter: newStreamDirtyPages(file, chunkSize), - //streamWriter: newContinuousDirtyPages(file), - //streamWriter: nil, + randomWriter: newMemoryChunkPages(fh, chunkSize), } return pw } func (pw *PageWriter) AddPage(offset int64, data []byte) { - glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) + glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.fh.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { @@ -47,38 +43,22 @@ func (pw *PageWriter) AddPage(offset int64, data []byte) { } func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { - if chunkIndex > 0 { - if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { - pw.streamWriter.AddPage(offset, data) - return - } - } pw.randomWriter.AddPage(offset, data) } func (pw *PageWriter) FlushData() error { pw.writerPattern.Reset() - if pw.streamWriter != nil { - if err := pw.streamWriter.FlushData(); err != nil { - return err - } - } return pw.randomWriter.FlushData() } func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { - glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.f.fullpath(), offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - if pw.streamWriter != nil { - m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) - maxStop = max(maxStop, m1) - } - m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) - maxStop = max(maxStop, m2) + maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) offset += readSize data = data[readSize:] @@ -88,16 +68,18 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) } func (pw *PageWriter) GetStorageOptions() (collection, replication string) { - if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { - return pw.streamWriter.GetStorageOptions() - } return pw.randomWriter.GetStorageOptions() } +func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) { + pw.randomWriter.LockForRead(startOffset, stopOffset) +} + +func (pw *PageWriter) UnlockForRead(startOffset, stopOffset int64) { + pw.randomWriter.UnlockForRead(startOffset, stopOffset) +} + func (pw *PageWriter) Destroy() { - if pw.streamWriter != nil { - pw.streamWriter.Destroy() - } pw.randomWriter.Destroy() } diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go deleted file mode 100644 index b0e1c2844..000000000 --- a/weed/filesys/page_writer/chunked_file_writer.go +++ /dev/null @@ -1,160 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "io" - "os" - "sync" -) - -type LogicChunkIndex int -type ActualChunkIndex int - -// ChunkedFileWriter assumes the write requests will come in within chunks -type ChunkedFileWriter struct { - dir string - file *os.File - logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex - chunkUsages []*ChunkWrittenIntervalList - ChunkSize int64 - sync.Mutex -} - -var _ = io.WriterAt(&ChunkedFileWriter{}) - -func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter { - return &ChunkedFileWriter{ - dir: dir, - file: nil, - logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), - ChunkSize: chunkSize, - } -} - -func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) { - cw.Lock() - defer cw.Unlock() - - if cw.file == nil { - cw.file, err = os.CreateTemp(cw.dir, "") - if err != nil { - glog.Errorf("create temp file: %v", err) - return - } - } - - actualOffset, chunkUsage := cw.toActualWriteOffset(off) - n, err = cw.file.WriteAt(p, actualOffset) - if err == nil { - startOffset := off % cw.ChunkSize - chunkUsage.MarkWritten(startOffset, startOffset+int64(n)) - } - return -} - -func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { - cw.Lock() - defer cw.Unlock() - - if cw.file == nil { - return - } - - logicChunkIndex := off / cw.ChunkSize - actualChunkIndex, chunkUsage := cw.toActualReadOffset(off) - if chunkUsage != nil { - for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset) - if logicStart < logicStop { - actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize - _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart) - if err != nil { - glog.Errorf("reading temp file: %v", err) - break - } - maxStop = max(maxStop, logicStop) - } - } - } - return -} - -func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) { - logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) - offsetRemainder := logicOffset % cw.ChunkSize - existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if found { - return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex] - } - cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages)) - chunkUsage = newChunkWrittenIntervalList() - cw.chunkUsages = append(cw.chunkUsages, chunkUsage) - return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage -} - -func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) { - logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize) - existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if found { - return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex] - } - return 0, nil -} - -func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) { - for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { - chunkUsage := cw.chunkUsages[actualChunkIndex] - for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { - process(cw.file, logicChunkIndex, t) - } - } -} - -// Reset releases used resources -func (cw *ChunkedFileWriter) Reset() { - if cw.file != nil { - cw.file.Close() - os.Remove(cw.file.Name()) - cw.file = nil - } - cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex) - cw.chunkUsages = cw.chunkUsages[:0] -} - -type FileIntervalReader struct { - f *os.File - startOffset int64 - stopOffset int64 - position int64 -} - -var _ = io.Reader(&FileIntervalReader{}) - -func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader { - actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] - if !found { - // this should never happen - return nil - } - return &FileIntervalReader{ - f: cw.file, - startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset, - stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset, - position: 0, - } -} - -func (fr *FileIntervalReader) Read(p []byte) (n int, err error) { - readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position)) - n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position) - if err == nil || err == io.EOF { - fr.position += int64(n) - if fr.stopOffset-fr.startOffset-fr.position == 0 { - // return a tiny bit faster - err = io.EOF - return - } - } - return -} diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go deleted file mode 100644 index 244ed62c3..000000000 --- a/weed/filesys/page_writer/chunked_file_writer_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package page_writer - -import ( - "github.com/stretchr/testify/assert" - "os" - "testing" -) - -func TestChunkedFileWriter_toActualOffset(t *testing.T) { - cw := NewChunkedFileWriter("", 16) - - writeToFile(cw, 50, 60) - writeToFile(cw, 60, 64) - - writeToFile(cw, 32, 40) - writeToFile(cw, 42, 48) - - writeToFile(cw, 48, 50) - - assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered") - assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals") - -} - -func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) { - - _, chunkUsage := cw.toActualWriteOffset(startOffset) - - // skip doing actual writing - - innerOffset := startOffset % cw.ChunkSize - chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset) - -} - -func TestWriteChunkedFile(t *testing.T) { - x := NewChunkedFileWriter(os.TempDir(), 20) - defer x.Reset() - y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Reset() - - batchSize := 4 - buf := make([]byte, batchSize) - for i := 0; i < 256; i++ { - for x := 0; x < batchSize; x++ { - buf[x] = byte(i) - } - x.WriteAt(buf, int64(i*batchSize)) - y.WriteAt(buf, int64((255-i)*batchSize)) - } - - a := make([]byte, 1) - b := make([]byte, 1) - for i := 0; i < 256*batchSize; i++ { - x.ReadDataAt(a, int64(i)) - y.ReadDataAt(b, int64(256*batchSize-1-i)) - assert.Equal(t, a[0], b[0], "same read") - } - -} diff --git a/weed/filesys/page_writer/chunked_stream_writer.go b/weed/filesys/page_writer/chunked_stream_writer.go index b4314e78f..2f869ddb8 100644 --- a/weed/filesys/page_writer/chunked_stream_writer.go +++ b/weed/filesys/page_writer/chunked_stream_writer.go @@ -1,119 +1,12 @@ package page_writer import ( - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/mem" "io" - "sync" - "sync/atomic" ) type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) -// ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode -type ChunkedStreamWriter struct { - activeChunks map[LogicChunkIndex]*MemChunk - activeChunksLock sync.Mutex - ChunkSize int64 - saveToStorageFn SaveToStorageFunc - sync.Mutex -} - type MemChunk struct { buf []byte usage *ChunkWrittenIntervalList } - -var _ = io.WriterAt(&ChunkedStreamWriter{}) - -func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter { - return &ChunkedStreamWriter{ - ChunkSize: chunkSize, - activeChunks: make(map[LogicChunkIndex]*MemChunk), - } -} - -func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) { - cw.saveToStorageFn = saveToStorageFn -} - -func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) { - cw.Lock() - defer cw.Unlock() - - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) - offsetRemainder := off % cw.ChunkSize - - memChunk, found := cw.activeChunks[logicChunkIndex] - if !found { - memChunk = &MemChunk{ - buf: mem.Allocate(int(cw.ChunkSize)), - usage: newChunkWrittenIntervalList(), - } - cw.activeChunks[logicChunkIndex] = memChunk - } - n = copy(memChunk.buf[offsetRemainder:], p) - memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n)) - if memChunk.usage.IsComplete(cw.ChunkSize) { - if cw.saveToStorageFn != nil { - cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) - } - } - - return -} - -func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { - cw.Lock() - defer cw.Unlock() - - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) - memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize - memChunk, found := cw.activeChunks[logicChunkIndex] - if !found { - return - } - - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) - if logicStart < logicStop { - copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) - maxStop = max(maxStop, logicStop) - } - } - return -} - -func (cw *ChunkedStreamWriter) FlushAll() { - cw.Lock() - defer cw.Unlock() - for logicChunkIndex, memChunk := range cw.activeChunks { - if cw.saveToStorageFn != nil { - cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) - } - } -} - -func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { - var referenceCounter = int32(memChunk.usage.size()) - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) - cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { - atomic.AddInt32(&referenceCounter, -1) - if atomic.LoadInt32(&referenceCounter) == 0 { - mem.Free(memChunk.buf) - } - }) - } -} - -// Reset releases used resources -func (cw *ChunkedStreamWriter) Reset() { - for t, memChunk := range cw.activeChunks { - mem.Free(memChunk.buf) - delete(cw.activeChunks, t) - } -} diff --git a/weed/filesys/page_writer/chunked_stream_writer_test.go b/weed/filesys/page_writer/chunked_stream_writer_test.go deleted file mode 100644 index 3c55a91ad..000000000 --- a/weed/filesys/page_writer/chunked_stream_writer_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package page_writer - -import ( - "github.com/stretchr/testify/assert" - "os" - "testing" -) - -func TestWriteChunkedStream(t *testing.T) { - x := NewChunkedStreamWriter(20) - defer x.Reset() - y := NewChunkedFileWriter(os.TempDir(), 12) - defer y.Reset() - - batchSize := 4 - buf := make([]byte, batchSize) - for i := 0; i < 256; i++ { - for x := 0; x < batchSize; x++ { - buf[x] = byte(i) - } - x.WriteAt(buf, int64(i*batchSize)) - y.WriteAt(buf, int64((255-i)*batchSize)) - } - - a := make([]byte, 1) - b := make([]byte, 1) - for i := 0; i < 256*batchSize; i++ { - x.ReadDataAt(a, int64(i)) - y.ReadDataAt(b, int64(256*batchSize-1-i)) - assert.Equal(t, a[0], b[0], "same read") - } - -} diff --git a/weed/filesys/page_writer/dirty_page_interval.go b/weed/filesys/page_writer/dirty_page_interval.go deleted file mode 100644 index 6d73b8cd7..000000000 --- a/weed/filesys/page_writer/dirty_page_interval.go +++ /dev/null @@ -1,222 +0,0 @@ -package page_writer - -import ( - "io" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -type IntervalNode struct { - Data []byte - Offset int64 - Size int64 - Next *IntervalNode -} - -type IntervalLinkedList struct { - Head *IntervalNode - Tail *IntervalNode -} - -type ContinuousIntervals struct { - lists []*IntervalLinkedList -} - -func (list *IntervalLinkedList) Offset() int64 { - return list.Head.Offset -} -func (list *IntervalLinkedList) Size() int64 { - return list.Tail.Offset + list.Tail.Size - list.Head.Offset -} -func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) { - // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size) - list.Tail.Next = node - list.Tail = node -} -func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) { - // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size) - node.Next = list.Head - list.Head = node -} - -func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) { - t := list.Head - for { - - nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) - if nodeStart < nodeStop { - // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop) - copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset]) - } - - if t.Next == nil { - break - } - t = t.Next - } -} - -func (c *ContinuousIntervals) TotalSize() (total int64) { - for _, list := range c.lists { - total += list.Size() - } - return -} - -func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { - var nodes []*IntervalNode - for t := list.Head; t != nil; t = t.Next { - nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) - if nodeStart >= nodeStop { - // skip non overlapping IntervalNode - continue - } - nodes = append(nodes, &IntervalNode{ - Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset], - Offset: nodeStart, - Size: nodeStop - nodeStart, - Next: nil, - }) - } - for i := 1; i < len(nodes); i++ { - nodes[i-1].Next = nodes[i] - } - return &IntervalLinkedList{ - Head: nodes[0], - Tail: nodes[len(nodes)-1], - } -} - -func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { - - interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))} - - // append to the tail and return - if len(c.lists) == 1 { - lastSpan := c.lists[0] - if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset { - lastSpan.addNodeToTail(interval) - return - } - } - - var newLists []*IntervalLinkedList - for _, list := range c.lists { - // if list is to the left of new interval, add to the new list - if list.Tail.Offset+list.Tail.Size <= interval.Offset { - newLists = append(newLists, list) - } - // if list is to the right of new interval, add to the new list - if interval.Offset+interval.Size <= list.Head.Offset { - newLists = append(newLists, list) - } - // if new interval overwrite the right part of the list - if list.Head.Offset < interval.Offset && interval.Offset < list.Tail.Offset+list.Tail.Size { - // create a new list of the left part of existing list - newLists = append(newLists, subList(list, list.Offset(), interval.Offset)) - } - // if new interval overwrite the left part of the list - if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size < list.Tail.Offset+list.Tail.Size { - // create a new list of the right part of existing list - newLists = append(newLists, subList(list, interval.Offset+interval.Size, list.Tail.Offset+list.Tail.Size)) - } - // skip anything that is fully overwritten by the new interval - } - - c.lists = newLists - // add the new interval to the lists, connecting neighbor lists - var prevList, nextList *IntervalLinkedList - - for _, list := range c.lists { - if list.Head.Offset == interval.Offset+interval.Size { - nextList = list - break - } - } - - for _, list := range c.lists { - if list.Head.Offset+list.Size() == offset { - list.addNodeToTail(interval) - prevList = list - break - } - } - - if prevList != nil && nextList != nil { - // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size) - prevList.Tail.Next = nextList.Head - prevList.Tail = nextList.Tail - c.removeList(nextList) - } else if nextList != nil { - // add to head was not done when checking - nextList.addNodeToHead(interval) - } - if prevList == nil && nextList == nil { - c.lists = append(c.lists, &IntervalLinkedList{ - Head: interval, - Tail: interval, - }) - } - - return -} - -func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList { - var maxSize int64 - maxIndex := -1 - for k, list := range c.lists { - if maxSize <= list.Size() { - maxSize = list.Size() - maxIndex = k - } - } - if maxSize <= 0 { - return nil - } - - t := c.lists[maxIndex] - c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...) - return t - -} - -func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) { - index := -1 - for k, list := range c.lists { - if list.Offset() == target.Offset() { - index = k - } - } - if index < 0 { - return - } - - c.lists = append(c.lists[0:index], c.lists[index+1:]...) - -} - -func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { - for _, list := range c.lists { - start := max(startOffset, list.Offset()) - stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) - if start < stop { - list.ReadData(data[start-startOffset:], start, stop) - maxStop = max(maxStop, stop) - } - } - return -} - -func (l *IntervalLinkedList) ToReader() io.Reader { - var readers []io.Reader - t := l.Head - readers = append(readers, util.NewBytesReader(t.Data)) - for t.Next != nil { - t = t.Next - readers = append(readers, util.NewBytesReader(t.Data)) - } - if len(readers) == 1 { - return readers[0] - } - return io.MultiReader(readers...) -} diff --git a/weed/filesys/page_writer/dirty_page_interval_test.go b/weed/filesys/page_writer/dirty_page_interval_test.go deleted file mode 100644 index 2a2a1df4d..000000000 --- a/weed/filesys/page_writer/dirty_page_interval_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package page_writer - -import ( - "bytes" - "math/rand" - "testing" -) - -func TestContinuousIntervals_AddIntervalAppend(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, 25, 25 - c.AddInterval(getBytes(25, 3), 0) - // _, _, 23, 23, 23, 23 - c.AddInterval(getBytes(23, 4), 2) - - expectedData(t, c, 0, 25, 25, 23, 23, 23, 23) - -} - -func TestContinuousIntervals_AddIntervalInnerOverwrite(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, 25, 25, 25, 25 - c.AddInterval(getBytes(25, 5), 0) - // _, _, 23, 23 - c.AddInterval(getBytes(23, 2), 2) - - expectedData(t, c, 0, 25, 25, 23, 23, 25) - -} - -func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) { - - c := &ContinuousIntervals{} - - // 1, - c.AddInterval(getBytes(1, 1), 0) - // _, 2, - c.AddInterval(getBytes(2, 1), 1) - // _, _, 3, 3, 3 - c.AddInterval(getBytes(3, 3), 2) - // _, _, _, 4, 4, 4 - c.AddInterval(getBytes(4, 3), 3) - - expectedData(t, c, 0, 1, 2, 3, 4, 4, 4) - -} - -func TestContinuousIntervals_RealCase1(t *testing.T) { - - c := &ContinuousIntervals{} - - // 25, - c.AddInterval(getBytes(25, 1), 0) - // _, _, _, _, 23, 23 - c.AddInterval(getBytes(23, 2), 4) - // _, _, _, 24, 24, 24, 24 - c.AddInterval(getBytes(24, 4), 3) - - // _, 22, 22 - c.AddInterval(getBytes(22, 2), 1) - - expectedData(t, c, 0, 25, 22, 22, 24, 24, 24, 24) - -} - -func TestRandomWrites(t *testing.T) { - - c := &ContinuousIntervals{} - - data := make([]byte, 1024) - - for i := 0; i < 1024; i++ { - - start, stop := rand.Intn(len(data)), rand.Intn(len(data)) - if start > stop { - start, stop = stop, start - } - - rand.Read(data[start : stop+1]) - - c.AddInterval(data[start:stop+1], int64(start)) - - expectedData(t, c, 0, data...) - - } - -} - -func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) { - start, stop := int64(offset), int64(offset+len(data)) - for _, list := range c.lists { - nodeStart, nodeStop := max(start, list.Head.Offset), min(stop, list.Head.Offset+list.Size()) - if nodeStart < nodeStop { - buf := make([]byte, nodeStop-nodeStart) - list.ReadData(buf, nodeStart, nodeStop) - if bytes.Compare(buf, data[nodeStart-start:nodeStop-start]) != 0 { - t.Errorf("expected %v actual %v", data[nodeStart-start:nodeStop-start], buf) - } - } - } -} - -func getBytes(content byte, length int) []byte { - data := make([]byte, length) - for i := 0; i < length; i++ { - data[i] = content - } - return data -} diff --git a/weed/filesys/page_writer/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go index 955627d67..25b747fad 100644 --- a/weed/filesys/page_writer/dirty_pages.go +++ b/weed/filesys/page_writer/dirty_pages.go @@ -6,6 +6,8 @@ type DirtyPages interface { ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) GetStorageOptions() (collection, replication string) Destroy() + LockForRead(startOffset, stopOffset int64) + UnlockForRead(startOffset, stopOffset int64) } func max(x, y int64) int64 { diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go new file mode 100644 index 000000000..13ee3caec --- /dev/null +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -0,0 +1,256 @@ +package page_writer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/mem" + "sync" + "sync/atomic" + "time" +) + +type LogicChunkIndex int + +type UploadPipeline struct { + filepath util.FullPath + ChunkSize int64 + writers *util.LimitedConcurrentExecutor + writableChunks map[LogicChunkIndex]*MemChunk + writableChunksLock sync.Mutex + sealedChunks map[LogicChunkIndex]*SealedChunk + sealedChunksLock sync.Mutex + activeWriterCond *sync.Cond + activeWriterCount int32 + activeReadChunks map[LogicChunkIndex]int + activeReadChunksLock sync.Mutex + saveToStorageFn SaveToStorageFunc +} + +type SealedChunk struct { + chunk *MemChunk + referenceCounter int // track uploading or reading processes +} + +func (sc *SealedChunk) FreeReference(messageOnFree string) { + sc.referenceCounter-- + if sc.referenceCounter == 0 { + glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) + mem.Free(sc.chunk.buf) + } +} + +func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { + return &UploadPipeline{ + ChunkSize: chunkSize, + writableChunks: make(map[LogicChunkIndex]*MemChunk), + sealedChunks: make(map[LogicChunkIndex]*SealedChunk), + writers: writers, + activeWriterCond: sync.NewCond(&sync.Mutex{}), + saveToStorageFn: saveToStorageFn, + filepath: filepath, + activeReadChunks: make(map[LogicChunkIndex]int), + } +} + +func (cw *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { + cw.writableChunksLock.Lock() + defer cw.writableChunksLock.Unlock() + + logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) + offsetRemainder := off % cw.ChunkSize + + memChunk, found := cw.writableChunks[logicChunkIndex] + if !found { + memChunk = &MemChunk{ + buf: mem.Allocate(int(cw.ChunkSize)), + usage: newChunkWrittenIntervalList(), + } + cw.writableChunks[logicChunkIndex] = memChunk + } + n = copy(memChunk.buf[offsetRemainder:], p) + memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n)) + cw.maybeMoveToSealed(memChunk, logicChunkIndex) + + return +} + +func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { + logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) + + // read from sealed chunks first + cw.sealedChunksLock.Lock() + sealedChunk, found := cw.sealedChunks[logicChunkIndex] + if found { + sealedChunk.referenceCounter++ + } + cw.sealedChunksLock.Unlock() + if found { + maxStop = readMemChunk(sealedChunk.chunk, p, off, logicChunkIndex, cw.ChunkSize) + glog.V(4).Infof("%s read sealed memchunk [%d,%d)", cw.filepath, off, maxStop) + sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex)) + } + + // read from writable chunks last + cw.writableChunksLock.Lock() + defer cw.writableChunksLock.Unlock() + writableChunk, found := cw.writableChunks[logicChunkIndex] + if !found { + return + } + writableMaxStop := readMemChunk(writableChunk, p, off, logicChunkIndex, cw.ChunkSize) + glog.V(4).Infof("%s read writable memchunk [%d,%d)", cw.filepath, off, writableMaxStop) + maxStop = max(maxStop, writableMaxStop) + + return +} + +func (cw *UploadPipeline) FlushAll() { + cw.writableChunksLock.Lock() + defer cw.writableChunksLock.Unlock() + + for logicChunkIndex, memChunk := range cw.writableChunks { + cw.moveToSealed(memChunk, logicChunkIndex) + } + + cw.waitForCurrentWritersToComplete() +} + +func (cw *UploadPipeline) LockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) + if stopOffset%cw.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + cw.activeReadChunksLock.Lock() + defer cw.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := cw.activeReadChunks[i]; found { + cw.activeReadChunks[i] = count + 1 + } else { + cw.activeReadChunks[i] = 1 + } + } +} + +func (cw *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) + if stopOffset%cw.ChunkSize > 0 { + stopLogicChunkIndex += 1 + } + cw.activeReadChunksLock.Lock() + defer cw.activeReadChunksLock.Unlock() + for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { + if count, found := cw.activeReadChunks[i]; found { + if count == 1 { + delete(cw.activeReadChunks, i) + } else { + cw.activeReadChunks[i] = count - 1 + } + } + } +} + +func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { + cw.activeReadChunksLock.Lock() + defer cw.activeReadChunksLock.Unlock() + if count, found := cw.activeReadChunks[logicChunkIndex]; found { + return count > 0 + } + return false +} + +func (cw *UploadPipeline) waitForCurrentWritersToComplete() { + cw.activeWriterCond.L.Lock() + t := int32(100) + for { + t = atomic.LoadInt32(&cw.activeWriterCount) + if t <= 0 { + break + } + glog.V(4).Infof("activeWriterCond is %d", t) + cw.activeWriterCond.Wait() + } + cw.activeWriterCond.L.Unlock() +} + +func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { + if memChunk.usage.IsComplete(cw.ChunkSize) { + cw.moveToSealed(memChunk, logicChunkIndex) + } +} + +func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { + atomic.AddInt32(&cw.activeWriterCount, 1) + glog.V(4).Infof("%s activeWriterCount %d ++> %d", cw.filepath, cw.activeWriterCount-1, cw.activeWriterCount) + + cw.sealedChunksLock.Lock() + + if oldMemChunk, found := cw.sealedChunks[logicChunkIndex]; found { + oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", cw.filepath, logicChunkIndex)) + } + sealedChunk := &SealedChunk{ + chunk: memChunk, + referenceCounter: 1, // default 1 is for uploading process + } + cw.sealedChunks[logicChunkIndex] = sealedChunk + delete(cw.writableChunks, logicChunkIndex) + + cw.sealedChunksLock.Unlock() + + cw.writers.Execute(func() { + // first add to the file chunks + cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex) + + // notify waiting process + atomic.AddInt32(&cw.activeWriterCount, -1) + glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount) + // Lock and Unlock are not required, + // but it may signal multiple times during one wakeup, + // and the waiting goroutine may miss some of them! + cw.activeWriterCond.L.Lock() + cw.activeWriterCond.Broadcast() + cw.activeWriterCond.L.Unlock() + + // wait for readers + for cw.IsLocked(logicChunkIndex) { + time.Sleep(59 * time.Millisecond) + } + + // then remove from sealed chunks + cw.sealedChunksLock.Lock() + defer cw.sealedChunksLock.Unlock() + delete(cw.sealedChunks, logicChunkIndex) + sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex)) + + }) +} + +func (cw *UploadPipeline) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { + if cw.saveToStorageFn == nil { + return + } + for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { + reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) + cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { + }) + } +} + +func readMemChunk(memChunk *MemChunk, p []byte, off int64, logicChunkIndex LogicChunkIndex, chunkSize int64) (maxStop int64) { + memChunkBaseOffset := int64(logicChunkIndex) * chunkSize + for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { + logicStart := max(off, int64(logicChunkIndex)*chunkSize+t.StartOffset) + logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) + if logicStart < logicStop { + copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) + maxStop = max(maxStop, logicStop) + } + } + return +} + +func (p2 *UploadPipeline) Shutdown() { + +} diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go new file mode 100644 index 000000000..d17948251 --- /dev/null +++ b/weed/filesys/page_writer/upload_pipeline_test.go @@ -0,0 +1,47 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/util" + "testing" +) + +func TestUploadPipeline(t *testing.T) { + + uploadPipeline := NewUploadPipeline("", nil, 2*1024*1024, nil) + + writeRange(uploadPipeline, 0, 131072) + writeRange(uploadPipeline, 131072, 262144) + writeRange(uploadPipeline, 262144, 1025536) + + confirmRange(t, uploadPipeline, 0, 1025536) + + writeRange(uploadPipeline, 1025536, 1296896) + + confirmRange(t, uploadPipeline, 1025536, 1296896) + + writeRange(uploadPipeline, 1296896, 2162688) + + confirmRange(t, uploadPipeline, 1296896, 2162688) + + confirmRange(t, uploadPipeline, 1296896, 2162688) +} + +// startOff and stopOff must be divided by 4 +func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) { + p := make([]byte, 4) + for i := startOff / 4; i < stopOff/4; i += 4 { + util.Uint32toBytes(p, uint32(i)) + uploadPipeline.SaveDataAt(p, i) + } +} + +func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) { + p := make([]byte, 4) + for i := startOff; i < stopOff/4; i += 4 { + uploadPipeline.MaybeReadDataAt(p, i) + x := util.BytesToUint32(p) + if x != uint32(i) { + t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4) + } + } +}