From 7286e525ad85dec877d506908a0ff35590b0f357 Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 13 Feb 2022 23:27:11 -0800 Subject: [PATCH] support write --- weed/mount/filehandle.go | 1 + weed/mount/weedfs_file_sync.go | 114 +++++++++++++++++++++++++++++++- weed/mount/weedfs_file_write.go | 33 ++++++++- 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index 0d5481b30..f2a2ec69c 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -21,6 +21,7 @@ type FileHandle struct { wfs *WFS // cache file has been written to + dirtyMetadata bool dirtyPages *PageWriter entryViewCache []filer.VisibleInterval reader io.ReaderAt diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 1b89c1ecb..29a13690b 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -1,7 +1,14 @@ package mount import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/hanwen/go-fuse/v2/fuse" + "os" + "time" ) /** @@ -43,7 +50,15 @@ import ( * [close]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/close.html */ func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status { - return fuse.ENOSYS + fh := wfs.GetHandle(FileHandleId(in.Fh)) + if fh == nil { + return fuse.ENOENT + } + + fh.Lock() + defer fh.Unlock() + + return wfs.doFlush(fh, in.Uid, in.Gid) } /** @@ -66,5 +81,100 @@ func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status { * @param fi file information */ func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Status) { - return fuse.ENOSYS + + fh := wfs.GetHandle(FileHandleId(in.Fh)) + if fh == nil { + return fuse.ENOENT + } + + fh.Lock() + defer fh.Unlock() + + return wfs.doFlush(fh, in.Uid, in.Gid) + +} + +func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { + // flush works at fh level + fileFullPath := fh.FullPath() + dir, _ := fileFullPath.DirAndName() + // send the data to the OS + glog.V(4).Infof("doFlush %s fh %d", fileFullPath, fh.handle) + + if err := fh.dirtyPages.FlushData(); err != nil { + glog.Errorf("%v doFlush: %v", fileFullPath, err) + return fuse.EIO + } + + if !fh.dirtyMetadata { + return fuse.OK + } + + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + + entry := fh.entry + if entry == nil { + return nil + } + + if entry.Attributes != nil { + entry.Attributes.Mime = fh.contentType + if entry.Attributes.Uid == 0 { + entry.Attributes.Uid = uid + } + if entry.Attributes.Gid == 0 { + entry.Attributes.Gid = gid + } + if entry.Attributes.Crtime == 0 { + entry.Attributes.Crtime = time.Now().Unix() + } + entry.Attributes.Mtime = time.Now().Unix() + entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ wfs.option.Umask) + entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions() + } + + request := &filer_pb.CreateEntryRequest{ + Directory: string(dir), + Entry: entry, + Signatures: []int32{wfs.signature}, + } + + glog.V(4).Infof("%s set chunks: %v", fileFullPath, len(entry.Chunks)) + for i, chunk := range entry.Chunks { + glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + + manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks) + + chunks, _ := filer.CompactFileChunks(wfs.LookupFn(), nonManifestChunks) + chunks, manifestErr := filer.MaybeManifestize(wfs.saveDataAsChunk(fileFullPath), chunks) + if manifestErr != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", manifestErr) + } + entry.Chunks = append(chunks, manifestChunks...) + + wfs.mapPbIdFromLocalToFiler(request.Entry) + defer wfs.mapPbIdFromFilerToLocal(request.Entry) + + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.Errorf("fh flush create %s: %v", fileFullPath, err) + return fmt.Errorf("fh flush create %s: %v", fileFullPath, err) + } + + wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) + + return nil + }) + + if err == nil { + fh.dirtyMetadata = false + } + + if err != nil { + glog.Errorf("%v fh %d flush: %v", fileFullPath, fh.handle, err) + return fuse.EIO + } + + return fuse.OK } diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index 72152d72e..efdf39386 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -2,6 +2,7 @@ package mount import ( "github.com/hanwen/go-fuse/v2/fuse" + "net/http" ) /** @@ -31,5 +32,35 @@ import ( * @param fi file information */ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (written uint32, code fuse.Status) { - return 0, fuse.ENOSYS + + fh := wfs.GetHandle(FileHandleId(in.Fh)) + if fh == nil { + return 0, fuse.ENOENT + } + + fh.Lock() + defer fh.Unlock() + + entry := fh.entry + if entry == nil { + return 0, fuse.OK + } + + entry.Content = nil + offset := int64(in.Offset) + entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize))) + // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) + + fh.dirtyPages.AddPage(offset, data) + + written = uint32(len(data)) + + if offset == 0 { + // detect mime type + fh.contentType = http.DetectContentType(data) + } + + fh.dirtyMetadata = true + + return written, fuse.OK }