Browse Source

compare chunks by timestamp

pull/4089/head
chrislu 2 years ago
parent
commit
d68b59ab4c
  1. 8
      weed/command/filer_copy.go
  2. 4
      weed/filer/filechunk_manifest.go
  3. 2
      weed/filer/filechunks.go
  4. 2
      weed/filer/filer_notify_append.go
  5. 17
      weed/filer/reader_at.go
  6. 15
      weed/mount/dirty_pages_chunked.go
  7. 20
      weed/mount/filehandle_read.go
  8. 12
      weed/mount/page_writer.go
  9. 4
      weed/mount/page_writer/dirty_pages.go
  10. 6
      weed/mount/page_writer/page_chunk.go
  11. 16
      weed/mount/page_writer/page_chunk_mem.go
  12. 16
      weed/mount/page_writer/page_chunk_swapfile.go
  13. 19
      weed/mount/page_writer/upload_pipeline.go
  14. 3
      weed/mount/weedfs_file_copy_range.go
  15. 4
      weed/mount/weedfs_file_read.go
  16. 5
      weed/mount/weedfs_file_write.go
  17. 4
      weed/mount/weedfs_write.go
  18. 4
      weed/operation/upload_content.go
  19. 4
      weed/server/filer_server_handlers_write_autochunk.go
  20. 2
      weed/server/filer_server_handlers_write_cipher.go
  21. 2
      weed/server/filer_server_handlers_write_upload.go
  22. 6
      weed/server/webdav_server.go
  23. 3
      weed/util/mem/slot_pool.go

8
weed/command/filer_copy.go

@ -365,7 +365,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if flushErr != nil { if flushErr != nil {
return flushErr return flushErr
} }
chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0))
chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0, time.Now().UnixNano()))
} }
if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@ -450,7 +450,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
uploadError = fmt.Errorf("upload %v result: %v\n", fileName, uploadResult.Error) uploadError = fmt.Errorf("upload %v result: %v\n", fileName, uploadResult.Error)
return return
} }
chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize)
chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano())
fmt.Printf("uploaded %s-%d [%d,%d)\n", fileName, i+1, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) fmt.Printf("uploaded %s-%d [%d,%d)\n", fileName, i+1, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i) }(i)
@ -530,7 +530,7 @@ func detectMimeType(f *os.File) string {
return mimeType return mimeType
} }
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry( finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
worker, worker,
@ -561,7 +561,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
if uploadResult.Error != "" { if uploadResult.Error != "" {
return nil, fmt.Errorf("upload result: %v", uploadResult.Error) return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
} }
return uploadResult.ToPbFileChunk(finalFileId, offset), nil
return uploadResult.ToPbFileChunk(finalFileId, offset, tsNs), nil
} }
var _ = filer_pb.FilerClient(&FileCopyWorker{}) var _ = filer_pb.FilerClient(&FileCopyWorker{})

4
weed/filer/filechunk_manifest.go

@ -264,7 +264,7 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer
} }
} }
manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0)
manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -275,4 +275,4 @@ func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer
return return
} }
type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error)
type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error)

2
weed/filer/filechunks.go

@ -138,6 +138,7 @@ type ChunkView struct {
ChunkSize uint64 ChunkSize uint64
CipherKey []byte CipherKey []byte
IsGzipped bool IsGzipped bool
ModifiedTsNs int64
} }
func (cv *ChunkView) IsFullChunk() bool { func (cv *ChunkView) IsFullChunk() bool {
@ -175,6 +176,7 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
ChunkSize: chunk.chunkSize, ChunkSize: chunk.chunkSize,
CipherKey: chunk.cipherKey, CipherKey: chunk.cipherKey,
IsGzipped: chunk.isGzipped, IsGzipped: chunk.isGzipped,
ModifiedTsNs: chunk.modifiedTsNs,
}) })
} }
} }

2
weed/filer/filer_notify_append.go

@ -40,7 +40,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
} }
// append to existing chunks // append to existing chunks
entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset))
entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset, time.Now().UnixNano()))
// update the entry // update the entry
err = f.CreateEntry(context.Background(), entry, false, false, nil, false) err = f.CreateEntry(context.Background(), entry, false, false, nil, false)

17
weed/filer/reader_at.go

@ -111,11 +111,23 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock() c.readerLock.Lock()
defer c.readerLock.Unlock() defer c.readerLock.Unlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
n, _, err = c.doReadAt(p, offset)
return
}
func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, err error) {
c.readerPattern.MonitorReadAt(offset, len(p))
c.readerLock.Lock()
defer c.readerLock.Unlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset) return c.doReadAt(p, offset)
} }
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err error) {
startOffset, remaining := offset, int64(len(p)) startOffset, remaining := offset, int64(len(p))
var nextChunks []*ChunkView var nextChunks []*ChunkView
@ -142,10 +154,11 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
} }
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
ts = chunk.ModifiedTsNs
copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset)) copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
if err != nil { if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err) glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
return copied, err
return copied, ts, err
} }
n += copied n += copied

15
weed/mount/dirty_pages_chunked.go

@ -7,7 +7,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"io" "io"
"sync" "sync"
"time"
) )
type ChunkedDirtyPages struct { type ChunkedDirtyPages struct {
@ -38,11 +37,11 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
return dirtyPages return dirtyPages
} }
func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) {
func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) {
pages.hasWrites = true pages.hasWrites = true
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data))) glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data)))
pages.uploadPipeline.SaveDataAt(data, offset, isSequential)
pages.uploadPipeline.SaveDataAt(data, offset, isSequential, tsNs)
return return
} }
@ -58,27 +57,25 @@ func (pages *ChunkedDirtyPages) FlushData() error {
return nil return nil
} }
func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) {
if !pages.hasWrites { if !pages.hasWrites {
return return
} }
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset, tsNs)
} }
func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func()) {
mtime := time.Now().UnixNano()
defer cleanupFn() defer cleanupFn()
fileFullPath := pages.fh.FullPath() fileFullPath := pages.fh.FullPath()
fileName := fileFullPath.Name() fileName := fileFullPath.Name()
chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset)
chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset, modifiedTsNs)
if err != nil { if err != nil {
glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err) glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err)
pages.lastErr = err pages.lastErr = err
return return
} }
chunk.ModifiedTsNs = mtime
pages.fh.AddChunks([]*filer_pb.FileChunk{chunk}) pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size) glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)

20
weed/mount/filehandle_read.go

@ -17,18 +17,18 @@ func (fh *FileHandle) unlockForRead(startOffset int64, size int) {
fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size)) fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size))
} }
func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64, tsNs int64) (maxStop int64) {
maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset, tsNs)
return return
} }
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, int64, error) {
fileFullPath := fh.FullPath() fileFullPath := fh.FullPath()
entry := fh.GetEntry() entry := fh.GetEntry()
if entry == nil { if entry == nil {
return 0, io.EOF
return 0, 0, io.EOF
} }
if entry.IsInRemoteOnly() { if entry.IsInRemoteOnly() {
@ -36,7 +36,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
newEntry, err := fh.downloadRemoteEntry(entry) newEntry, err := fh.downloadRemoteEntry(entry)
if err != nil { if err != nil {
glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err) glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err)
return 0, err
return 0, 0, err
} }
entry = newEntry entry = newEntry
} }
@ -45,20 +45,20 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
if fileSize == 0 { if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fileFullPath) glog.V(1).Infof("empty fh %v", fileFullPath)
return 0, io.EOF
return 0, 0, io.EOF
} }
if offset+int64(len(buff)) <= int64(len(entry.Content)) { if offset+int64(len(buff)) <= int64(len(entry.Content)) {
totalRead := copy(buff, entry.Content[offset:]) totalRead := copy(buff, entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), nil
return int64(totalRead), 0, nil
} }
var chunkResolveErr error var chunkResolveErr error
if fh.entryViewCache == nil { if fh.entryViewCache == nil {
fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.GetChunks(), 0, fileSize) fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.GetChunks(), 0, fileSize)
if chunkResolveErr != nil { if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
return 0, 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
} }
fh.CloseReader() fh.CloseReader()
} }
@ -72,7 +72,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
fh.reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize) fh.reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize)
} }
totalRead, err := fh.reader.ReadAt(buff, offset)
totalRead, ts, err := fh.reader.ReadAtWithTime(buff, offset)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fileFullPath, err) glog.Errorf("file handle read %s: %v", fileFullPath, err)
@ -80,7 +80,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
// glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) // glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
return int64(totalRead), ts, err
} }
func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {

12
weed/mount/page_writer.go

@ -29,35 +29,35 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
return pw return pw
} }
func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) {
func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ { for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
pw.addToOneChunk(i, offset, data[:writeSize], isSequential)
pw.addToOneChunk(i, offset, data[:writeSize], isSequential, tsNs)
offset += writeSize offset += writeSize
data = data[writeSize:] data = data[writeSize:]
} }
} }
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool) {
pw.randomWriter.AddPage(offset, data, isSequential)
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool, tsNs int64) {
pw.randomWriter.AddPage(offset, data, isSequential, tsNs)
} }
func (pw *PageWriter) FlushData() error { func (pw *PageWriter) FlushData() error {
return pw.randomWriter.FlushData() return pw.randomWriter.FlushData()
} }
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64, tsNs int64) (maxStop int64) {
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ { for i := chunkIndex; len(data) > 0; i++ {
readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset, tsNs)
offset += readSize offset += readSize
data = data[readSize:] data = data[readSize:]

4
weed/mount/page_writer/dirty_pages.go

@ -1,9 +1,9 @@
package page_writer package page_writer
type DirtyPages interface { type DirtyPages interface {
AddPage(offset int64, data []byte, isSequential bool)
AddPage(offset int64, data []byte, isSequential bool, tsNs int64)
FlushData() error FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64)
Destroy() Destroy()
LockForRead(startOffset, stopOffset int64) LockForRead(startOffset, stopOffset int64)
UnlockForRead(startOffset, stopOffset int64) UnlockForRead(startOffset, stopOffset int64)

6
weed/mount/page_writer/page_chunk.go

@ -4,12 +4,12 @@ import (
"io" "io"
) )
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func())
type PageChunk interface { type PageChunk interface {
FreeResource() FreeResource()
WriteDataAt(src []byte, offset int64) (n int)
ReadDataAt(p []byte, off int64) (maxStop int64)
WriteDataAt(src []byte, offset int64, tsNs int64) (n int)
ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64)
IsComplete() bool IsComplete() bool
WrittenSize() int64 WrittenSize() int64
SaveContent(saveFn SaveToStorageFunc) SaveContent(saveFn SaveToStorageFunc)

16
weed/mount/page_writer/page_chunk_mem.go

@ -19,6 +19,7 @@ type MemChunk struct {
usage *ChunkWrittenIntervalList usage *ChunkWrittenIntervalList
chunkSize int64 chunkSize int64
logicChunkIndex LogicChunkIndex logicChunkIndex LogicChunkIndex
lastModifiedTsNs int64
} }
func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
@ -39,17 +40,22 @@ func (mc *MemChunk) FreeResource() {
mem.Free(mc.buf) mem.Free(mc.buf)
} }
func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
mc.Lock() mc.Lock()
defer mc.Unlock() defer mc.Unlock()
if mc.lastModifiedTsNs > tsNs {
println("write old data1", tsNs-mc.lastModifiedTsNs, "ns")
}
mc.lastModifiedTsNs = tsNs
innerOffset := offset % mc.chunkSize innerOffset := offset % mc.chunkSize
n = copy(mc.buf[innerOffset:], src) n = copy(mc.buf[innerOffset:], src)
mc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
return return
} }
func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
mc.RLock() mc.RLock()
defer mc.RUnlock() defer mc.RUnlock()
@ -58,8 +64,12 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset) logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop { if logicStart < logicStop {
if mc.lastModifiedTsNs > tsNs {
copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
maxStop = max(maxStop, logicStop) maxStop = max(maxStop, logicStop)
} else {
println("read old data1", tsNs-mc.lastModifiedTsNs, "ns")
}
} }
} }
return return
@ -88,7 +98,7 @@ func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
} }
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset]) reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), mc.lastModifiedTsNs, func() {
}) })
} }
} }

16
weed/mount/page_writer/page_chunk_swapfile.go

@ -29,6 +29,7 @@ type SwapFileChunk struct {
usage *ChunkWrittenIntervalList usage *ChunkWrittenIntervalList
logicChunkIndex LogicChunkIndex logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex actualChunkIndex ActualChunkIndex
lastModifiedTsNs int64
} }
func NewSwapFile(dir string, chunkSize int64) *SwapFile { func NewSwapFile(dir string, chunkSize int64) *SwapFile {
@ -87,10 +88,15 @@ func (sc *SwapFileChunk) FreeResource() {
delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex) delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
} }
func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
sc.Lock() sc.Lock()
defer sc.Unlock() defer sc.Unlock()
if sc.lastModifiedTsNs > tsNs {
println("write old data2", tsNs-sc.lastModifiedTsNs, "ns")
}
sc.lastModifiedTsNs = tsNs
innerOffset := offset % sc.swapfile.chunkSize innerOffset := offset % sc.swapfile.chunkSize
var err error var err error
n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset) n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
@ -102,7 +108,7 @@ func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
return return
} }
func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
sc.RLock() sc.RLock()
defer sc.RUnlock() defer sc.RUnlock()
@ -111,12 +117,16 @@ func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
logicStart := max(off, chunkStartOffset+t.StartOffset) logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop { if logicStart < logicStop {
if sc.lastModifiedTsNs > tsNs {
actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
break break
} }
maxStop = max(maxStop, logicStop) maxStop = max(maxStop, logicStop)
} else {
println("read old data2", tsNs-sc.lastModifiedTsNs, "ns")
}
} }
} }
return return
@ -145,7 +155,7 @@ func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
data := mem.Allocate(int(t.Size())) data := mem.Allocate(int(t.Size()))
sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
reader := util.NewBytesReader(data) reader := util.NewBytesReader(data)
saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), sc.lastModifiedTsNs, func() {
}) })
mem.Free(data) mem.Free(data)
} }

19
weed/mount/page_writer/upload_pipeline.go

@ -55,7 +55,8 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64,
return t return t
} }
func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) {
func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool, tsNs int64) (n int) {
up.chunksLock.Lock() up.chunksLock.Lock()
defer up.chunksLock.Unlock() defer up.chunksLock.Unlock()
@ -76,7 +77,7 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
} }
if isSequential &&
if false && isSequential &&
len(up.writableChunks) < up.writableChunkLimit && len(up.writableChunks) < up.writableChunkLimit &&
atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
@ -85,13 +86,19 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
} }
up.writableChunks[logicChunkIndex] = pageChunk up.writableChunks[logicChunkIndex] = pageChunk
} }
n = pageChunk.WriteDataAt(p, off)
if _, foundSealed := up.sealedChunks[logicChunkIndex]; foundSealed {
println("found already sealed chunk", logicChunkIndex)
}
if _, foundReading := up.activeReadChunks[logicChunkIndex]; foundReading {
println("found active read chunk", logicChunkIndex)
}
n = pageChunk.WriteDataAt(p, off, tsNs)
up.maybeMoveToSealed(pageChunk, logicChunkIndex) up.maybeMoveToSealed(pageChunk, logicChunkIndex)
return return
} }
func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
up.chunksLock.Lock() up.chunksLock.Lock()
@ -106,7 +113,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
sealedChunk.referenceCounter++ sealedChunk.referenceCounter++
} }
if found { if found {
maxStop = sealedChunk.chunk.ReadDataAt(p, off)
maxStop = sealedChunk.chunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
} }
@ -116,7 +123,7 @@ func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
if !found { if !found {
return return
} }
writableMaxStop := writableChunk.ReadDataAt(p, off)
writableMaxStop := writableChunk.ReadDataAt(p, off, tsNs)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop) maxStop = max(maxStop, writableMaxStop)

3
weed/mount/weedfs_file_copy_range.go

@ -3,6 +3,7 @@ package mount
import ( import (
"context" "context"
"net/http" "net/http"
"time"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
@ -88,7 +89,7 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
// put data at the specified offset in target file // put data at the specified offset in target file
fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len)) fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len))
fhOut.entry.Content = nil fhOut.entry.Content = nil
fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode())
fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode(), time.Now().UnixNano())
fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize))) fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize)))
fhOut.dirtyMetadata = true fhOut.dirtyMetadata = true
written = uint32(totalRead) written = uint32(totalRead)

4
weed/mount/weedfs_file_read.go

@ -59,9 +59,9 @@ func readDataByFileHandle(buff []byte, fhIn *FileHandle, offset int64) (int64, e
fhIn.lockForRead(offset, size) fhIn.lockForRead(offset, size)
defer fhIn.unlockForRead(offset, size) defer fhIn.unlockForRead(offset, size)
n, err := fhIn.readFromChunks(buff, offset)
n, tsNs, err := fhIn.readFromChunks(buff, offset)
if err == nil || err == io.EOF { if err == nil || err == io.EOF {
maxStop := fhIn.readFromDirtyPages(buff, offset)
maxStop := fhIn.readFromDirtyPages(buff, offset, tsNs)
n = max(maxStop-offset, n) n = max(maxStop-offset, n)
} }
if err == io.EOF { if err == io.EOF {

5
weed/mount/weedfs_file_write.go

@ -5,6 +5,7 @@ import (
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
"net/http" "net/http"
"syscall" "syscall"
"time"
) )
/** /**
@ -46,6 +47,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size)) fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size))
tsNs := time.Now().UnixNano()
fh.orderedMutex.Acquire(context.Background(), 1) fh.orderedMutex.Acquire(context.Background(), 1)
defer fh.orderedMutex.Release(1) defer fh.orderedMutex.Release(1)
@ -59,7 +62,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize))) entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode())
fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode(), tsNs)
written = uint32(len(data)) written = uint32(len(data))

4
weed/mount/weedfs_write.go

@ -13,7 +13,7 @@ import (
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, err error) {
return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
fileId, uploadResult, err, data := operation.UploadWithRetry( fileId, uploadResult, err, data := operation.UploadWithRetry(
wfs, wfs,
@ -56,7 +56,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
wfs.chunkCache.SetChunk(fileId, data) wfs.chunkCache.SetChunk(fileId, data)
} }
chunk = uploadResult.ToPbFileChunk(fileId, offset)
chunk = uploadResult.ToPbFileChunk(fileId, offset, tsNs)
return chunk, nil return chunk, nil
} }
} }

4
weed/operation/upload_content.go

@ -45,13 +45,13 @@ type UploadResult struct {
RetryCount int `json:"-"` RetryCount int `json:"-"`
} }
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsNs int64) *filer_pb.FileChunk {
fid, _ := filer_pb.ToFileIdObject(fileId) fid, _ := filer_pb.ToFileIdObject(fileId)
return &filer_pb.FileChunk{ return &filer_pb.FileChunk{
FileId: fileId, FileId: fileId,
Offset: offset, Offset: offset,
Size: uint64(uploadResult.Size), Size: uint64(uploadResult.Size),
ModifiedTsNs: time.Now().UnixNano(),
ModifiedTsNs: tsNs,
ETag: uploadResult.ContentMd5, ETag: uploadResult.ContentMd5,
CipherKey: uploadResult.CipherKey, CipherKey: uploadResult.CipherKey,
IsCompressed: uploadResult.Gzip > 0, IsCompressed: uploadResult.Gzip > 0,

4
weed/server/filer_server_handlers_write_autochunk.go

@ -256,7 +256,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, error) {
return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) {
var fileId string var fileId string
var uploadResult *operation.UploadResult var uploadResult *operation.UploadResult
@ -290,7 +290,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
return nil, err return nil, err
} }
return uploadResult.ToPbFileChunk(fileId, offset), nil
return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
} }
} }

2
weed/server/filer_server_handlers_write_cipher.go

@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
} }
// Save to chunk manifest structure // Save to chunk manifest structure
fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0)}
fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0, time.Now().UnixNano())}
// fmt.Printf("uploaded: %+v\n", uploadResult) // fmt.Printf("uploaded: %+v\n", uploadResult)

2
weed/server/filer_server_handlers_write_upload.go

@ -214,5 +214,5 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
if uploadResult.Size == 0 { if uploadResult.Size == 0 {
return nil, nil return nil, nil
} }
return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset)}, nil
return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset, time.Now().UnixNano())}, nil
} }

6
weed/server/webdav_server.go

@ -381,7 +381,7 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
return fs.stat(ctx, name) return fs.stat(ctx, name)
} }
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
fileId, uploadResult, flushErr, _ := operation.UploadWithRetry( fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
f.fs, f.fs,
@ -413,7 +413,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
glog.V(0).Infof("upload failure %v: %v", f.name, flushErr) glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
return nil, fmt.Errorf("upload result: %v", uploadResult.Error) return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
} }
return uploadResult.ToPbFileChunk(fileId, offset), nil
return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
} }
func (f *WebDavFile) Write(buf []byte) (int, error) { func (f *WebDavFile) Write(buf []byte) (int, error) {
@ -439,7 +439,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) { f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
var chunk *filer_pb.FileChunk var chunk *filer_pb.FileChunk
chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset)
chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano())
if flushErr != nil { if flushErr != nil {
return fmt.Errorf("%s upload result: %v", f.name, flushErr) return fmt.Errorf("%s upload result: %v", f.name, flushErr)

3
weed/util/mem/slot_pool.go

@ -42,6 +42,9 @@ func getSlotPool(size int) (*sync.Pool, bool) {
func Allocate(size int) []byte { func Allocate(size int) []byte {
if pool, found := getSlotPool(size); found { if pool, found := getSlotPool(size); found {
slab := *pool.Get().(*[]byte) slab := *pool.Get().(*[]byte)
for i := 0; i < size; i++ {
slab[i] = 0
}
return slab[:size] return slab[:size]
} }
return make([]byte, size) return make([]byte, size)

Loading…
Cancel
Save