diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 667b089ed..0c4626317 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -365,7 +365,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if flushErr != nil { return flushErr } - chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0)) + chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0, time.Now().UnixNano())) } if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -450,7 +450,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, uploadError = fmt.Errorf("upload %v result: %v\n", fileName, uploadResult.Error) return } - chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize) + chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano()) fmt.Printf("uploaded %s-%d [%d,%d)\n", fileName, i+1, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) }(i) @@ -530,7 +530,7 @@ func detectMimeType(f *os.File) string { return mimeType } -func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) { +func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry( worker, @@ -561,7 +561,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off if uploadResult.Error != "" { return nil, fmt.Errorf("upload result: %v", uploadResult.Error) } - return uploadResult.ToPbFileChunk(finalFileId, offset), nil + return uploadResult.ToPbFileChunk(finalFileId, offset, tsNs), nil } var _ = filer_pb.FilerClient(&FileCopyWorker{}) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 221a11ffe..d9d0331be 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -264,7 +264,7 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer } } - manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0) + manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0) if err != nil { return nil, err } @@ -275,4 +275,4 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer return } -type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) +type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 061e0757a..2a20a5992 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -42,7 +42,7 @@ func ETag(entry *filer_pb.Entry) (etag string) { } func ETagEntry(entry *Entry) (etag string) { - if entry.IsInRemoteOnly() { + if entry.IsInRemoteOnly() { return entry.Remote.RemoteETag } if entry.Attr.Md5 == nil { @@ -131,13 +131,14 @@ func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_p } type ChunkView struct { - FileId string - Offset int64 - Size uint64 - LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk - ChunkSize uint64 - CipherKey []byte - IsGzipped bool + FileId string + Offset int64 + Size uint64 + LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk + ChunkSize uint64 + CipherKey []byte + IsGzipped bool + ModifiedTsNs int64 } func (cv *ChunkView) IsFullChunk() bool { @@ -168,13 +169,14 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int if chunkStart < chunkStop { views = append(views, &ChunkView{ - FileId: chunk.fileId, - Offset: chunkStart - chunk.start + chunk.chunkOffset, - Size: uint64(chunkStop - chunkStart), - LogicOffset: chunkStart, - ChunkSize: chunk.chunkSize, - CipherKey: chunk.cipherKey, - IsGzipped: chunk.isGzipped, + FileId: chunk.fileId, + Offset: chunkStart - chunk.start + chunk.chunkOffset, + Size: uint64(chunkStop - chunkStart), + LogicOffset: chunkStart, + ChunkSize: chunk.chunkSize, + CipherKey: chunk.cipherKey, + IsGzipped: chunk.isGzipped, + ModifiedTsNs: chunk.modifiedTsNs, }) } } diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index 5c03d4f16..55278c492 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -40,7 +40,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { } // append to existing chunks - entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset)) + entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset, time.Now().UnixNano())) // update the entry err = f.CreateEntry(context.Background(), entry, false, false, nil, false) diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 9d1fab20a..76cd52547 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -111,11 +111,23 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { c.readerLock.Lock() defer c.readerLock.Unlock() + // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) + n, _, err = c.doReadAt(p, offset) + return +} + +func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, err error) { + + c.readerPattern.MonitorReadAt(offset, len(p)) + + c.readerLock.Lock() + defer c.readerLock.Unlock() + // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) return c.doReadAt(p, offset) } -func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { +func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err error) { startOffset, remaining := offset, int64(len(p)) var nextChunks []*ChunkView @@ -142,10 +154,11 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset + ts = chunk.ModifiedTsNs copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset)) if err != nil { glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - return copied, err + return copied, ts, err } n += copied diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index 78e7b7877..430c40405 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -7,7 +7,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "io" "sync" - "time" ) type ChunkedDirtyPages struct { @@ -38,11 +37,11 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages { return dirtyPages } -func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) { +func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) { pages.hasWrites = true glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data))) - pages.uploadPipeline.SaveDataAt(data, offset, isSequential) + pages.uploadPipeline.SaveDataAt(data, offset, isSequential, tsNs) return } @@ -58,27 +57,25 @@ func (pages *ChunkedDirtyPages) FlushData() error { return nil } -func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { +func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) { if !pages.hasWrites { return } - return pages.uploadPipeline.MaybeReadDataAt(data, startOffset) + return pages.uploadPipeline.MaybeReadDataAt(data, startOffset, tsNs) } -func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) { +func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func()) { - mtime := time.Now().UnixNano() defer cleanupFn() fileFullPath := pages.fh.FullPath() fileName := fileFullPath.Name() - chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset) + chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset, modifiedTsNs) if err != nil { glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err) pages.lastErr = err return } - chunk.ModifiedTsNs = mtime pages.fh.AddChunks([]*filer_pb.FileChunk{chunk}) glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size) diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index a316a16cd..095dffc94 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -17,18 +17,18 @@ func (fh *FileHandle) unlockForRead(startOffset int64, size int) { fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size)) } -func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { - maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) +func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs int64) (maxStop int64) { + maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset, tsNs) return } -func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { +func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) { fileFullPath := fh.FullPath() entry := fh.GetEntry() if entry == nil { - return 0, io.EOF + return 0, 0, io.EOF } if entry.IsInRemoteOnly() { @@ -36,7 +36,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { newEntry, err := fh.downloadRemoteEntry(entry) if err != nil { glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err) - return 0, err + return 0, 0, err } entry = newEntry } @@ -45,20 +45,20 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { if fileSize == 0 { glog.V(1).Infof("empty fh %v", fileFullPath) - return 0, io.EOF + return 0, 0, io.EOF } if offset+int64(len(buff)) <= int64(len(entry.Content)) { totalRead := copy(buff, entry.Content[offset:]) glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) - return int64(totalRead), nil + return int64(totalRead), 0, nil } var chunkResolveErr error if fh.entryViewCache == nil { fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.GetChunks(), 0, fileSize) if chunkResolveErr != nil { - return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) + return 0, 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) } fh.CloseReader() } @@ -72,7 +72,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { fh.reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize) } - totalRead, err := fh.reader.ReadAt(buff, offset) + totalRead, ts, err := fh.reader.ReadAtWithTime(buff, offset) if err != nil && err != io.EOF { glog.Errorf("file handle read %s: %v", fileFullPath, err) @@ -80,7 +80,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { // glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) - return int64(totalRead), err + return int64(totalRead), ts, err } func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go index 1f31b5300..c9470c440 100644 --- a/weed/mount/page_writer.go +++ b/weed/mount/page_writer.go @@ -29,35 +29,35 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { return pw } -func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) { +func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) { glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - pw.addToOneChunk(i, offset, data[:writeSize], isSequential) + pw.addToOneChunk(i, offset, data[:writeSize], isSequential, tsNs) offset += writeSize data = data[writeSize:] } } -func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool) { - pw.randomWriter.AddPage(offset, data, isSequential) +func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool, tsNs int64) { + pw.randomWriter.AddPage(offset, data, isSequential, tsNs) } func (pw *PageWriter) FlushData() error { return pw.randomWriter.FlushData() } -func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { +func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64, tsNs int64) (maxStop int64) { glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) + maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset, tsNs) offset += readSize data = data[readSize:] diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go index 44f879afc..7cddcf69e 100644 --- a/weed/mount/page_writer/dirty_pages.go +++ b/weed/mount/page_writer/dirty_pages.go @@ -1,9 +1,9 @@ package page_writer type DirtyPages interface { - AddPage(offset int64, data []byte, isSequential bool) + AddPage(offset int64, data []byte, isSequential bool, tsNs int64) FlushData() error - ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) + ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) Destroy() LockForRead(startOffset, stopOffset int64) UnlockForRead(startOffset, stopOffset int64) diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go index 4e8f31425..14087f8d0 100644 --- a/weed/mount/page_writer/page_chunk.go +++ b/weed/mount/page_writer/page_chunk.go @@ -4,12 +4,12 @@ import ( "io" ) -type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) +type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func()) type PageChunk interface { FreeResource() - WriteDataAt(src []byte, offset int64) (n int) - ReadDataAt(p []byte, off int64) (maxStop int64) + WriteDataAt(src []byte, offset int64, tsNs int64) (n int) + ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) IsComplete() bool WrittenSize() int64 SaveContent(saveFn SaveToStorageFunc) diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go index 8cccded67..a2a7bfb15 100644 --- a/weed/mount/page_writer/page_chunk_mem.go +++ b/weed/mount/page_writer/page_chunk_mem.go @@ -15,10 +15,11 @@ var ( type MemChunk struct { sync.RWMutex - buf []byte - usage *ChunkWrittenIntervalList - chunkSize int64 - logicChunkIndex LogicChunkIndex + buf []byte + usage *ChunkWrittenIntervalList + chunkSize int64 + logicChunkIndex LogicChunkIndex + lastModifiedTsNs int64 } func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { @@ -39,17 +40,22 @@ func (mc *MemChunk) FreeResource() { mem.Free(mc.buf) } -func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) { +func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) { mc.Lock() defer mc.Unlock() + if mc.lastModifiedTsNs > tsNs { + println("write old data1", tsNs-mc.lastModifiedTsNs, "ns") + } + mc.lastModifiedTsNs = tsNs + innerOffset := offset % mc.chunkSize n = copy(mc.buf[innerOffset:], src) mc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) return } -func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { +func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) { mc.RLock() defer mc.RUnlock() @@ -58,8 +64,12 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset) logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) if logicStart < logicStop { - copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) - maxStop = max(maxStop, logicStop) + if mc.lastModifiedTsNs > tsNs { + copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) + maxStop = max(maxStop, logicStop) + } else { + println("read old data1", tsNs-mc.lastModifiedTsNs, "ns") + } } } return @@ -88,7 +98,7 @@ func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { } for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset]) - saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() { + saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), mc.lastModifiedTsNs, func() { }) } } diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go index bf2cdb256..b11a44871 100644 --- a/weed/mount/page_writer/page_chunk_swapfile.go +++ b/weed/mount/page_writer/page_chunk_swapfile.go @@ -29,6 +29,7 @@ type SwapFileChunk struct { usage *ChunkWrittenIntervalList logicChunkIndex LogicChunkIndex actualChunkIndex ActualChunkIndex + lastModifiedTsNs int64 } func NewSwapFile(dir string, chunkSize int64) *SwapFile { @@ -87,10 +88,15 @@ func (sc *SwapFileChunk) FreeResource() { delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex) } -func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { +func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) { sc.Lock() defer sc.Unlock() + if sc.lastModifiedTsNs > tsNs { + println("write old data2", tsNs-sc.lastModifiedTsNs, "ns") + } + sc.lastModifiedTsNs = tsNs + innerOffset := offset % sc.swapfile.chunkSize var err error n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset) @@ -102,7 +108,7 @@ func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { return } -func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { +func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) { sc.RLock() defer sc.RUnlock() @@ -111,12 +117,16 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { logicStart := max(off, chunkStartOffset+t.StartOffset) logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) if logicStart < logicStop { - actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize - if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { - glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) - break + if sc.lastModifiedTsNs > tsNs { + actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize + if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { + glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) + break + } + maxStop = max(maxStop, logicStop) + } else { + println("read old data2", tsNs-sc.lastModifiedTsNs, "ns") } - maxStop = max(maxStop, logicStop) } } return @@ -145,7 +155,7 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { data := mem.Allocate(int(t.Size())) sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) reader := util.NewBytesReader(data) - saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() { + saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), sc.lastModifiedTsNs, func() { }) mem.Free(data) } diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 252dddc06..61273a48c 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -55,7 +55,8 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, return t } -func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) { +func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) { + up.chunksLock.Lock() defer up.chunksLock.Unlock() @@ -76,7 +77,7 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) } - if isSequential && + if false && isSequential && len(up.writableChunks) < up.writableChunkLimit && atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) @@ -85,13 +86,19 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n } up.writableChunks[logicChunkIndex] = pageChunk } - n = pageChunk.WriteDataAt(p, off) + if _, foundSealed := up.sealedChunks[logicChunkIndex]; foundSealed { + println("found already sealed chunk", logicChunkIndex) + } + if _, foundReading := up.activeReadChunks[logicChunkIndex]; foundReading { + println("found active read chunk", logicChunkIndex) + } + n = pageChunk.WriteDataAt(p, off, tsNs) up.maybeMoveToSealed(pageChunk, logicChunkIndex) return } -func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { +func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) { logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) up.chunksLock.Lock() @@ -106,7 +113,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { sealedChunk.referenceCounter++ } if found { - maxStop = sealedChunk.chunk.ReadDataAt(p, off) + maxStop = sealedChunk.chunk.ReadDataAt(p, off, tsNs) glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) } @@ -116,7 +123,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { if !found { return } - writableMaxStop := writableChunk.ReadDataAt(p, off) + writableMaxStop := writableChunk.ReadDataAt(p, off, tsNs) glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) maxStop = max(maxStop, writableMaxStop) diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go index bc092a252..64d623f47 100644 --- a/weed/mount/weedfs_file_copy_range.go +++ b/weed/mount/weedfs_file_copy_range.go @@ -3,6 +3,7 @@ package mount import ( "context" "net/http" + "time" "github.com/hanwen/go-fuse/v2/fuse" @@ -88,7 +89,7 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) // put data at the specified offset in target file fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len)) fhOut.entry.Content = nil - fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode()) + fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode(), time.Now().UnixNano()) fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize))) fhOut.dirtyMetadata = true written = uint32(totalRead) diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go index 8375f9a5d..a5a17a3ba 100644 --- a/weed/mount/weedfs_file_read.go +++ b/weed/mount/weedfs_file_read.go @@ -59,9 +59,9 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e fhIn.lockForRead(offset, size) defer fhIn.unlockForRead(offset, size) - n, err := fhIn.readFromChunks(buff, offset) + n, tsNs, err := fhIn.readFromChunks(buff, offset) if err == nil || err == io.EOF { - maxStop := fhIn.readFromDirtyPages(buff, offset) + maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs) n = max(maxStop-offset, n) } if err == io.EOF { diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index 7b13d54ff..3eaefef94 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -5,6 +5,7 @@ import ( "github.com/hanwen/go-fuse/v2/fuse" "net/http" "syscall" + "time" ) /** @@ -46,6 +47,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size)) + tsNs := time.Now().UnixNano() + fh.orderedMutex.Acquire(context.Background(), 1) defer fh.orderedMutex.Release(1) @@ -59,7 +62,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize))) // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) - fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode()) + fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode(), tsNs) written = uint32(len(data)) diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go index e18a4a358..4c8470245 100644 --- a/weed/mount/weedfs_write.go +++ b/weed/mount/weedfs_write.go @@ -13,7 +13,7 @@ import ( func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { - return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, err error) { + return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { fileId, uploadResult, err, data := operation.UploadWithRetry( wfs, @@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun wfs.chunkCache.SetChunk(fileId, data) } - chunk = uploadResult.ToPbFileChunk(fileId, offset) + chunk = uploadResult.ToPbFileChunk(fileId, offset, tsNs) return chunk, nil } } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index ed38dfa6b..0c3e29a43 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -45,13 +45,13 @@ type UploadResult struct { RetryCount int `json:"-"` } -func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { +func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsNs int64) *filer_pb.FileChunk { fid, _ := filer_pb.ToFileIdObject(fileId) return &filer_pb.FileChunk{ FileId: fileId, Offset: offset, Size: uint64(uploadResult.Size), - ModifiedTsNs: time.Now().UnixNano(), + ModifiedTsNs: tsNs, ETag: uploadResult.ContentMd5, CipherKey: uploadResult.CipherKey, IsCompressed: uploadResult.Gzip > 0, diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index c671deb76..8b3fc45fb 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -256,7 +256,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { - return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, error) { + return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) { var fileId string var uploadResult *operation.UploadResult @@ -290,7 +290,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs return nil, err } - return uploadResult.ToPbFileChunk(fileId, offset), nil + return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil } } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index bd8761077..bb5659437 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht } // Save to chunk manifest structure - fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0)} + fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0, time.Now().UnixNano())} // fmt.Printf("uploaded: %+v\n", uploadResult) diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 95920583d..cc43eba64 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -214,5 +214,5 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch if uploadResult.Size == 0 { return nil, nil } - return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset)}, nil + return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset, time.Now().UnixNano())}, nil } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 80b882181..615fa1a18 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -381,7 +381,7 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, return fs.stat(ctx, name) } -func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) { +func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { fileId, uploadResult, flushErr, _ := operation.UploadWithRetry( f.fs, @@ -413,7 +413,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 glog.V(0).Infof("upload failure %v: %v", f.name, flushErr) return nil, fmt.Errorf("upload result: %v", uploadResult.Error) } - return uploadResult.ToPbFileChunk(fileId, offset), nil + return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil } func (f *WebDavFile) Write(buf []byte) (int, error) { @@ -439,7 +439,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) { var chunk *filer_pb.FileChunk - chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset) + chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano()) if flushErr != nil { return fmt.Errorf("%s upload result: %v", f.name, flushErr) diff --git a/weed/util/mem/slot_pool.go b/weed/util/mem/slot_pool.go index 70f2525b3..2c729deef 100644 --- a/weed/util/mem/slot_pool.go +++ b/weed/util/mem/slot_pool.go @@ -42,6 +42,9 @@ func getSlotPool(size int) (*sync.Pool, bool) { func Allocate(size int) []byte { if pool, found := getSlotPool(size); found { slab := *pool.Get().(*[]byte) + for i := 0; i < size; i++ { + slab[i] = 0 + } return slab[:size] } return make([]byte, size)