From d06ecc2649416128a69a29187d3f3b89f5ad38dd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 10 May 2021 21:47:07 -0700 Subject: [PATCH] working properly --- weed/filesys/dirty_pages_temp_file.go | 27 +++++++++---- weed/filesys/dirty_pages_temp_interval.go | 48 ++++++++++++++++------- weed/filesys/filehandle.go | 4 +- 3 files changed, 55 insertions(+), 24 deletions(-) diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index a04efb6aa..58c150acf 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -56,18 +56,23 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { } pages.tf = tf pages.writtenIntervals.tempFile = tf + pages.writtenIntervals.lastOffset = 0 } - writtenOffset := pages.writtenIntervals.TotalSize() + writtenOffset := pages.writtenIntervals.lastOffset + dataSize := int64(len(data)) - glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+int64(len(data))) + // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize) if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil { pages.lastErr = err } else { pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) + pages.writtenIntervals.lastOffset += dataSize } + // pages.writtenIntervals.debug() + return } @@ -81,6 +86,11 @@ func (pages *TempFileDirtyPages) FlushData() error { pages.pageAddLock.Lock() defer pages.pageAddLock.Unlock() if pages.tf != nil { + + pages.writtenIntervals.tempFile = nil + pages.writtenIntervals.lists = nil + + pages.tf.Close() os.Remove(pages.tf.Name()) pages.tf = nil } @@ -91,15 +101,16 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { pageSize := pages.f.wfs.option.ChunkSizeLimit - uploadedSize := int64(0) + // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists)) + for _, list := range pages.writtenIntervals.lists { - for { - start, stop := max(list.Offset(), uploadedSize), min(list.Offset()+list.Size(), uploadedSize+pageSize) + listStopOffset := list.Offset() + list.Size() + for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { + start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) if start >= stop { - break + continue } - uploadedSize = stop - glog.V(4).Infof("uploading %v [%d,%d)", pages.f.Name, start, stop) + // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists)) pages.saveToStorage(list.ToReader(start, stop), start, stop-start) } } diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/dirty_pages_temp_interval.go index f423b0e85..2d22845e2 100644 --- a/weed/filesys/dirty_pages_temp_interval.go +++ b/weed/filesys/dirty_pages_temp_interval.go @@ -1,8 +1,8 @@ package filesys import ( - "github.com/chrislusf/seaweedfs/weed/glog" "io" + "log" "os" ) @@ -20,8 +20,9 @@ type WrittenIntervalLinkedList struct { } type WrittenContinuousIntervals struct { - tempFile *os.File - lists []*WrittenIntervalLinkedList + tempFile *os.File + lastOffset int64 + lists []*WrittenIntervalLinkedList } func (list *WrittenIntervalLinkedList) Offset() int64 { @@ -95,6 +96,21 @@ func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenInterv } } +func (c *WrittenContinuousIntervals) debug() { + log.Printf("++") + for _, l := range c.lists { + log.Printf("++++") + for t := l.Head; ; t = t.Next { + log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) + if t.Next == nil { + break + } + } + log.Printf("----") + } + log.Printf("--") +} + func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) { interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)} @@ -223,6 +239,7 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader for t := l.Head; ; t = t.Next { startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop) if startOffset < stopOffset { + // log.Printf("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset)) } if t.Next == nil { @@ -236,29 +253,32 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader } type FileSectionReader struct { - file *os.File - Offset int64 - dataStart int64 - dataStop int64 + file *os.File + tempStartOffset int64 + Offset int64 + dataStart int64 + dataStop int64 } var _ = io.Reader(&FileSectionReader{}) func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader { return &FileSectionReader{ - file: tempfile, - Offset: offset, - dataStart: dataOffset, - dataStop: dataOffset + size, + file: tempfile, + tempStartOffset: offset, + Offset: offset, + dataStart: dataOffset, + dataStop: dataOffset + size, } } func (f *FileSectionReader) Read(p []byte) (n int, err error) { - dataLen := min(f.dataStop-f.Offset, int64(len(p))) - if dataLen < 0 { + remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset) + if remaining <= 0 { return 0, io.EOF } - glog.V(4).Infof("reading %v [%d,%d)", f.file.Name(), f.Offset, f.Offset+dataLen) + dataLen := min(remaining, int64(len(p))) + // glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart) n, err = f.file.ReadAt(p[:dataLen], f.Offset) if n > 0 { f.Offset += int64(n) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 1245cce71..abdab2b5e 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -38,8 +38,8 @@ type FileHandle struct { func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle { fh := &FileHandle{ f: file, - dirtyPages: newContinuousDirtyPages(file, writeOnly), - /// dirtyPages: newTempFileDirtyPages(file, writeOnly), + // dirtyPages: newContinuousDirtyPages(file, writeOnly), + dirtyPages: newTempFileDirtyPages(file, writeOnly), Uid: uid, Gid: gid, }