diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 9ab7ea355..764ba6060 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -158,9 +158,9 @@ func logPrintf(name string, visibles []VisibleInterval) { /* glog.V(0).Infof("%s len %d", name, len(visibles)) for _, v := range visibles { - glog.V(0).Infof("%s: [%d,%d)", name, v.start, v.stop) + glog.V(0).Infof("%s: [%d,%d) %s %d", name, v.start, v.stop, v.fileId, v.chunkOffset) } - */ + */ } var bufPool = sync.Pool{ @@ -169,7 +169,7 @@ var bufPool = sync.Pool{ }, } -func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval { +func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) { newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed) @@ -183,16 +183,22 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. } logPrintf(" before", visibles) + // glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size) chunkStop := chunk.Offset + int64(chunk.Size) for _, v := range visibles { if v.start < chunk.Offset && chunk.Offset < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped)) + t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped) + newVisibles = append(newVisibles, t) + // glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop) } if v.start < chunkStop && chunkStop < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped)) + t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped) + newVisibles = append(newVisibles, t) + // glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop) } if chunkStop <= v.start || v.stop <= chunk.Offset { newVisibles = append(newVisibles, v) + // glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop) } } newVisibles = append(newVisibles, newV) @@ -219,17 +225,16 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chu chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks) sort.Slice(chunks, func(i, j int) bool { - return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + if chunks[i].Mtime == chunks[j].Mtime { + return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + } + return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run }) - var newVisibles []VisibleInterval for _, chunk := range chunks { // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) - newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk) - t := visibles[:0] - visibles = newVisibles - newVisibles = t + visibles = MergeIntoVisibles(visibles, chunk) logPrintf("add", visibles) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 46f5f22ed..f20e67df1 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -168,7 +168,6 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, node = dir.newFile(req.Name, request.Entry) file := node.(*File) - file.isOpen++ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) return file, fh, nil diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 88a1a4f55..562aaff9d 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -29,9 +29,6 @@ var counter = int32(0) func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { - pages.lock.Lock() - defer pages.lock.Unlock() - glog.V(5).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { @@ -82,14 +79,6 @@ func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chun return } -func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) { - - pages.lock.Lock() - defer pages.lock.Unlock() - - return pages.saveExistingPagesToStorage() -} - func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) { var hasSavedData bool @@ -103,7 +92,9 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer } if err == nil { - chunks = append(chunks, chunk) + if chunk != nil { + chunks = append(chunks, chunk) + } } else { return } @@ -121,9 +112,14 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi fileSize := int64(pages.f.entry.Attributes.FileSize) for { chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) + if chunkSize == 0 { + return + } chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) if err == nil { - hasSavedData = true + if chunk != nil { + hasSavedData = true + } glog.V(4).Infof("saveToStorage %s %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.GetFileIdString(), maxList.Offset(), maxList.Offset()+chunkSize, fileSize) return } else { @@ -170,10 +166,5 @@ func min(x, y int64) int64 { } func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - - pages.lock.Lock() - defer pages.lock.Unlock() - return pages.intervals.ReadDataAt(data, startOffset) - } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 43f3ddf07..f15eff780 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -87,8 +87,6 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - file.isOpen++ - handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid) resp.Handle = fuse.HandleID(handle.handle) @@ -120,7 +118,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f if req.Valid.Size() { glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks)) - if req.Size < filer2.TotalSize(file.entry.Chunks) { + if req.Size < filer2.FileSize(file.entry) { // fmt.Printf("truncate %v \n", fullPath) var chunks []*filer_pb.FileChunk var truncatedChunks []*filer_pb.FileChunk @@ -252,7 +250,7 @@ func (file *File) Forget() { } func (file *File) maybeLoadEntry(ctx context.Context) error { - if file.entry == nil || file.isOpen <= 0 { + if file.entry == nil && file.isOpen <= 0 { entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) if err != nil { glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) @@ -268,15 +266,14 @@ func (file *File) maybeLoadEntry(ctx context.Context) error { func (file *File) addChunks(chunks []*filer_pb.FileChunk) { sort.Slice(chunks, func(i, j int) bool { - return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + if chunks[i].Mtime == chunks[j].Mtime { + return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + } + return chunks[i].Mtime < chunks[j].Mtime }) - var newVisibles []filer2.VisibleInterval for _, chunk := range chunks { - newVisibles = filer2.MergeIntoVisibles(file.entryViewCache, newVisibles, chunk) - t := file.entryViewCache[:0] - file.entryViewCache = newVisibles - newVisibles = t + file.entryViewCache = filer2.MergeIntoVisibles(file.entryViewCache, chunk) } file.reader = nil diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 94590f842..0b6aeddd2 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -7,6 +7,7 @@ import ( "math" "net/http" "os" + "sync" "time" "github.com/seaweedfs/fuse" @@ -22,6 +23,7 @@ type FileHandle struct { dirtyPages *ContinuousDirtyPages contentType string handle uint64 + sync.RWMutex f *File RequestId fuse.RequestID // unique ID for request @@ -41,6 +43,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle { if fh.f.entry != nil { fh.f.entry.Attributes.FileSize = filer2.FileSize(fh.f.entry) } + return fh } @@ -55,6 +58,12 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) + fh.RLock() + defer fh.RUnlock() + + if req.Size <= 0 { + return nil + } buff := resp.Data[:cap(resp.Data)] if req.Size > cap(resp.Data) { @@ -65,7 +74,7 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus totalRead, err := fh.readFromChunks(buff, req.Offset) if err == nil { maxStop := fh.readFromDirtyPages(buff, req.Offset) - totalRead = max(maxStop - req.Offset, totalRead) + totalRead = max(maxStop-req.Offset, totalRead) } if err != nil { @@ -77,13 +86,15 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead) totalRead = min(int64(len(buff)), totalRead) } - resp.Data = buff[:totalRead] + // resp.Data = buff[:totalRead] + resp.Data = buff return err } func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { - return fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) + maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) + return } func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { @@ -127,6 +138,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { // Write to the file handle func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { + fh.Lock() + defer fh.Unlock() + // write the request to volume servers data := make([]byte, len(req.Data)) copy(data, req.Data) @@ -162,19 +176,30 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err glog.V(4).Infof("Release %v fh %d", fh.f.fullpath(), fh.handle) + fh.Lock() + defer fh.Unlock() + fh.f.isOpen-- - if fh.f.isOpen <= 0 { + if fh.f.isOpen < 0 { + glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0) + fh.f.isOpen = 0 + return nil + } + + if fh.f.isOpen == 0 { fh.doFlush(ctx, req.Header) fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) - fh.f.entryViewCache = nil - fh.f.reader = nil } return nil } func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { + + fh.Lock() + defer fh.Unlock() + return fh.doFlush(ctx, req.Header) } @@ -183,13 +208,14 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { // send the data to the OS glog.V(4).Infof("doFlush %s fh %d %v", fh.f.fullpath(), fh.handle, header) - chunks, err := fh.dirtyPages.FlushToStorage() + chunks, err := fh.dirtyPages.saveExistingPagesToStorage() if err != nil { glog.Errorf("flush %s: %v", fh.f.fullpath(), err) return fuse.EIO } if len(chunks) > 0 { + fh.f.addChunks(chunks) fh.f.dirtyMetadata = true } @@ -227,18 +253,14 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) } - chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + chunks, _ := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) chunks, manifestErr := filer2.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks) if manifestErr != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", manifestErr) } fh.f.entry.Chunks = chunks - // fh.f.entryViewCache = nil - - // special handling of one chunk md5 - if len(chunks) == 1 { - } + fh.f.entryViewCache = nil if err := filer_pb.CreateEntry(client, request); err != nil { glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) @@ -247,11 +269,6 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - fh.f.wfs.deleteFileChunks(garbages) - for i, chunk := range garbages { - glog.V(4).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) - } - return nil }) diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 15ec0903d..c0bb75f4a 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -50,6 +50,10 @@ func openMetaStore(dbFolder string) filer2.FilerStore { func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error { mc.Lock() defer mc.Unlock() + return mc.doInsertEntry(ctx, entry) +} + +func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer2.Entry) error { filer_pb.BeforeEntrySerialization(entry.Chunks) return mc.actualStore.InsertEntry(ctx, entry) } diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go index cd98f4a7c..f5d633120 100644 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -18,7 +18,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error { entry := filer2.FromPbEntry(string(dirPath), pbEntry) - if err := mc.InsertEntry(context.Background(), entry); err != nil { + if err := mc.doInsertEntry(context.Background(), entry); err != nil { glog.V(0).Infof("read %s: %v", entry.FullPath, err) return err } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index f147d7548..e9ee0864b 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -118,10 +118,14 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand inodeId := file.fullpath().AsInode() existingHandle, found := wfs.handles[inodeId] if found && existingHandle != nil { + file.isOpen++ return existingHandle } fileHandle = newFileHandle(file, uid, gid) + file.maybeLoadEntry(context.Background()) + file.isOpen++ + wfs.handles[inodeId] = fileHandle fileHandle.handle = inodeId diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 091a70fa3..92e43b675 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -119,5 +119,5 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT } - return cachedEntry.ToProtoEntry(), nil + return cachedEntry.ToProtoEntry(), cacheErr } diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go index 40b9c4e47..0e023c0d1 100644 --- a/weed/util/bounded_tree/bounded_tree.go +++ b/weed/util/bounded_tree/bounded_tree.go @@ -15,7 +15,7 @@ type Node struct { type BoundedTree struct { root *Node - sync.Mutex + sync.RWMutex } func NewBoundedTree() *BoundedTree { @@ -131,6 +131,9 @@ func (n *Node) getChild(childName string) *Node { func (t *BoundedTree) HasVisited(p util.FullPath) bool { + t.RLock() + defer t.RUnlock() + if t.root == nil { return true }