diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go index 88b50ce41..19401b94e 100644 --- a/weed/filesys/dirty_pages_continuous.go +++ b/weed/filesys/dirty_pages_continuous.go @@ -147,3 +147,5 @@ func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int6 func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) { return pages.collection, pages.replication } +func (pages ContinuousDirtyPages) Destroy() { +} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index 6a22889dc..25fce5d48 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -12,22 +12,21 @@ import ( ) type TempFileDirtyPages struct { - f *File - tf *os.File - writtenIntervals *page_writer.WrittenContinuousIntervals - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string + f *File + writeWaitGroup sync.WaitGroup + pageAddLock sync.Mutex + chunkAddLock sync.Mutex + lastErr error + collection string + replication string + chunkedFile *page_writer.ChunkedFileWriter } -func newTempFileDirtyPages(file *File) *TempFileDirtyPages { +func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages { tempFile := &TempFileDirtyPages{ - f: file, - writtenIntervals: &page_writer.WrittenContinuousIntervals{}, + f: file, + chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize), } return tempFile @@ -38,28 +37,8 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { pages.pageAddLock.Lock() defer pages.pageAddLock.Unlock() - if pages.tf == nil { - tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "") - if err != nil { - glog.Errorf("create temp file: %v", err) - pages.lastErr = err - return - } - pages.tf = tf - pages.writtenIntervals.TempFile = tf - pages.writtenIntervals.LastOffset = 0 - } - - writtenOffset := pages.writtenIntervals.LastOffset - dataSize := int64(len(data)) - - // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize) - - if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil { + if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil { pages.lastErr = err - } else { - pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) - pages.writtenIntervals.LastOffset += dataSize } // pages.writtenIntervals.debug() @@ -68,54 +47,38 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { } func (pages *TempFileDirtyPages) FlushData() error { - - pages.saveExistingPagesToStorage() + pages.saveChunkedFileToStorage() pages.writeWaitGroup.Wait() if pages.lastErr != nil { return fmt.Errorf("flush data: %v", pages.lastErr) } - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - if pages.tf != nil { - - pages.writtenIntervals.TempFile = nil - pages.writtenIntervals.Lists = nil - - pages.tf.Close() - os.Remove(pages.tf.Name()) - pages.tf = nil - } return nil } -func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { +func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + return pages.chunkedFile.ReadDataAt(data, startOffset) +} - pageSize := pages.f.wfs.option.ChunkSizeLimit +func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { + return pages.collection, pages.replication +} - // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists)) +func (pages *TempFileDirtyPages) saveChunkedFileToStorage() { - for _, list := range pages.writtenIntervals.Lists { - listStopOffset := list.Offset() + list.Size() - for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { - start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) - if start >= stop { - continue - } - // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists)) - pages.saveToStorage(list.ToReader(start, stop), start, stop-start) - } - } + pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex int, interval *page_writer.PageChunkWrittenInterval) { + reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) + pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize, interval.Size()) + }) } -func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { +func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) { mtime := time.Now().UnixNano() pages.writeWaitGroup.Add(1) writer := func() { defer pages.writeWaitGroup.Done() - reader = io.LimitReader(reader, size) chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) if err != nil { glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) @@ -135,12 +98,9 @@ func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, s } else { go writer() } -} -func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.writtenIntervals.ReadDataAt(data, startOffset) } -func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication +func (pages TempFileDirtyPages) Destroy() { + pages.chunkedFile.Destroy() } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index d92b17b5b..a551e6e10 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -222,6 +222,7 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err fh.reader = nil fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) + fh.dirtyPages.Destroy() } if fh.f.isOpen < 0 { diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index 9c9e38968..8e52f9f67 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -24,7 +24,7 @@ func newPageWriter(file *File, chunkSize int64) *PageWriter { pw := &PageWriter{ f: file, chunkSize: chunkSize, - randomWriter: newTempFileDirtyPages(file), + randomWriter: newTempFileDirtyPages(file, chunkSize), streamWriter: newContinuousDirtyPages(file), writerPattern: NewWriterPattern(file.Name, chunkSize), } @@ -63,11 +63,23 @@ func (pw *PageWriter) FlushData() error { return pw.randomWriter.FlushData() } -func (pw *PageWriter) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), startOffset, startOffset+int64(len(data))) - m1 := pw.streamWriter.ReadDirtyDataAt(data, startOffset) - m2 := pw.randomWriter.ReadDirtyDataAt(data, startOffset) - return max(m1, m2) +func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { + glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), offset, offset+int64(len(data))) + + chunkIndex := offset / pw.chunkSize + for i := chunkIndex; len(data) > 0; i++ { + readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) + + m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) + m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) + + maxStop = max(maxStop, max(m1, m2)) + + offset += readSize + data = data[readSize:] + } + + return } func (pw *PageWriter) GetStorageOptions() (collection, replication string) { @@ -76,3 +88,7 @@ func (pw *PageWriter) GetStorageOptions() (collection, replication string) { } return pw.randomWriter.GetStorageOptions() } + +func (pw *PageWriter) Destroy() { + pw.randomWriter.Destroy() +} diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go new file mode 100644 index 000000000..180a2039d --- /dev/null +++ b/weed/filesys/page_writer/chunked_file_writer.go @@ -0,0 +1,152 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "io" + "os" + "sync" +) + +// ChunkedFileWriter assumes the write requests will come in within chunks +type ChunkedFileWriter struct { + dir string + file *os.File + logicToActualChunkIndex map[int]int + chunkUsages []*PageChunkWrittenIntervalList + ChunkSize int64 + sync.Mutex +} + +var _ = io.WriterAt(&ChunkedFileWriter{}) + +func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter { + return &ChunkedFileWriter{ + dir: dir, + file: nil, + logicToActualChunkIndex: make(map[int]int), + ChunkSize: chunkSize, + } +} + +func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) { + cw.Lock() + defer cw.Unlock() + + if cw.file == nil { + cw.file, err = os.CreateTemp(cw.dir, "") + if err != nil { + glog.Errorf("create temp file: %v", err) + return + } + } + + actualOffset, chunkUsage := cw.toActualWriteOffset(off) + n, err = cw.file.WriteAt(p, actualOffset) + if err == nil { + startOffset := off % cw.ChunkSize + chunkUsage.MarkWritten(startOffset, startOffset+int64(n)) + } + return +} + +func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { + cw.Lock() + defer cw.Unlock() + + if cw.file == nil { + return + } + + logicChunkIndex := off / cw.ChunkSize + actualChunkIndex, chunkUsage := cw.toActualReadOffset(off) + if chunkUsage != nil { + for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { + logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.startOffset) + logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset) + if logicStart < logicStop { + actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize + _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart) + if err != nil { + glog.Errorf("reading temp file: %v", err) + break + } + maxStop = max(maxStop, logicStop) + } + } + } + return +} + +func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *PageChunkWrittenIntervalList) { + logicChunkIndex := int(logicOffset / cw.ChunkSize) + offsetRemainder := logicOffset % cw.ChunkSize + existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] + if found { + return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex] + } + cw.logicToActualChunkIndex[logicChunkIndex] = len(cw.chunkUsages) + chunkUsage = newPageChunkWrittenIntervalList() + cw.chunkUsages = append(cw.chunkUsages, chunkUsage) + return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage +} + +func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex int, chunkUsage *PageChunkWrittenIntervalList) { + logicChunkIndex := int(logicOffset / cw.ChunkSize) + existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] + if found { + return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex] + } + return 0, nil +} + +func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex int, interval *PageChunkWrittenInterval)) { + for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { + chunkUsage := cw.chunkUsages[actualChunkIndex] + for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { + process(cw.file, logicChunkIndex, t) + } + } +} +func (cw *ChunkedFileWriter) Destroy() { + if cw.file != nil { + cw.file.Close() + os.Remove(cw.file.Name()) + } +} + +type FileIntervalReader struct { + f *os.File + startOffset int64 + stopOffset int64 + position int64 +} + +var _ = io.Reader(&FileIntervalReader{}) + +func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex int, interval *PageChunkWrittenInterval) *FileIntervalReader { + actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] + if !found { + // this should never happen + return nil + } + return &FileIntervalReader{ + f: cw.file, + startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.startOffset, + stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset, + position: 0, + } +} + +func (fr *FileIntervalReader) Read(p []byte) (n int, err error) { + readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position)) + n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position) + if err == nil || err == io.EOF { + fr.position += int64(n) + if fr.stopOffset-fr.startOffset-fr.position == 0 { + // return a tiny bit faster + err = io.EOF + return + } + } + return +} diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go new file mode 100644 index 000000000..1c72c77d4 --- /dev/null +++ b/weed/filesys/page_writer/chunked_file_writer_test.go @@ -0,0 +1,60 @@ +package page_writer + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestChunkedFileWriter_toActualOffset(t *testing.T) { + cw := NewChunkedFileWriter("", 16) + + writeToFile(cw, 50, 60) + writeToFile(cw, 60, 64) + + writeToFile(cw, 32, 40) + writeToFile(cw, 42, 48) + + writeToFile(cw, 48, 50) + + assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered") + assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals") + +} + +func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) { + + _, chunkUsage := cw.toActualWriteOffset(startOffset) + + // skip doing actual writing + + innerOffset := startOffset % cw.ChunkSize + chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset) + +} + +func TestWriteChunkedFile(t *testing.T) { + x := NewChunkedFileWriter(os.TempDir(), 20) + defer x.Destroy() + y := NewChunkedFileWriter(os.TempDir(), 12) + defer y.Destroy() + + batchSize := 4 + buf := make([]byte, batchSize) + for i := 0; i < 256; i++ { + for x := 0; x < batchSize; x++ { + buf[x] = byte(i) + } + x.WriteAt(buf, int64(i*batchSize)) + y.WriteAt(buf, int64((255-i)*batchSize)) + } + + a := make([]byte, 1) + b := make([]byte, 1) + for i := 0; i < 256*batchSize; i++ { + x.ReadDataAt(a, int64(i)) + y.ReadDataAt(b, int64(256*batchSize-1-i)) + assert.Equal(t, a[0], b[0], "same read") + } + +} diff --git a/weed/filesys/page_writer/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go index c18f847b7..955627d67 100644 --- a/weed/filesys/page_writer/dirty_pages.go +++ b/weed/filesys/page_writer/dirty_pages.go @@ -5,4 +5,24 @@ type DirtyPages interface { FlushData() error ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) GetStorageOptions() (collection, replication string) + Destroy() +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} +func minInt(x, y int) int { + if x < y { + return x + } + return y } diff --git a/weed/filesys/page_writer/dirty_pages_temp_interval.go b/weed/filesys/page_writer/dirty_pages_temp_interval.go index aeaf0ec6f..623346fa8 100644 --- a/weed/filesys/page_writer/dirty_pages_temp_interval.go +++ b/weed/filesys/page_writer/dirty_pages_temp_interval.go @@ -287,16 +287,3 @@ func (f *FileSectionReader) Read(p []byte) (n int, err error) { } return } - -func max(x, y int64) int64 { - if x > y { - return x - } - return y -} -func min(x, y int64) int64 { - if x < y { - return x - } - return y -} diff --git a/weed/filesys/page_writer/page_chunk_interval_list.go b/weed/filesys/page_writer/page_chunk_interval_list.go index e626b2a7f..09fc45bfb 100644 --- a/weed/filesys/page_writer/page_chunk_interval_list.go +++ b/weed/filesys/page_writer/page_chunk_interval_list.go @@ -10,6 +10,10 @@ type PageChunkWrittenInterval struct { next *PageChunkWrittenInterval } +func (interval *PageChunkWrittenInterval) Size() int64 { + return interval.stopOffset - interval.startOffset +} + // PageChunkWrittenIntervalList mark written intervals within one page chunk type PageChunkWrittenIntervalList struct { head *PageChunkWrittenInterval