Browse Source

file set attribute

pull/2668/head
chrislu 3 years ago
parent
commit
fe57a2e770
  1. 7
      weed/mount/filehandle_map.go
  2. 4
      weed/mount/page_writer.go
  3. 7
      weed/mount/weedfs.go
  4. 44
      weed/mount/weedfs_attr.go
  5. 1
      weed/mount/weedfs_dir_read.go
  6. 2
      weed/mount/weedfs_filehandle.go
  7. 8
      weed/mount/weedfs_xattr.go

7
weed/mount/filehandle_map.go

@ -30,6 +30,13 @@ func (i *FileHandleToInode) GetFileHandle(fh FileHandleId) *FileHandle {
return nil return nil
} }
func (i *FileHandleToInode) FindFileHandle(inode uint64) (fh *FileHandle, found bool) {
i.RLock()
defer i.RUnlock()
fh, found = i.inode2fh[inode]
return
}
func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *filer_pb.Entry) *FileHandle { func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *filer_pb.Entry) *FileHandle {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()

4
weed/mount/page_writer.go

@ -30,7 +30,7 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
func (pw *PageWriter) AddPage(offset int64, data []byte) { func (pw *PageWriter) AddPage(offset int64, data []byte) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh, offset, offset+int64(len(data)))
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ { for i := chunkIndex; len(data) > 0; i++ {
@ -50,7 +50,7 @@ func (pw *PageWriter) FlushData() error {
} }
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh, offset, offset+int64(len(data)))
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ { for i := chunkIndex; len(data) > 0; i++ {

7
weed/mount/weedfs.go

@ -120,8 +120,12 @@ func (wfs *WFS) String() string {
return "seaweedfs" return "seaweedfs"
} }
func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, entry *filer_pb.Entry, status fuse.Status) {
func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
path = wfs.inodeToPath.GetPath(inode) path = wfs.inodeToPath.GetPath(inode)
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
return path, fh, fh.entry, fuse.OK
}
entry, status = wfs.maybeLoadEntry(path) entry, status = wfs.maybeLoadEntry(path)
return return
} }
@ -146,7 +150,6 @@ func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.St
}, fuse.OK }, fuse.OK
} }
// TODO Use inode to selectively filetering metadata updates
// read from async meta cache // read from async meta cache
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)

44
weed/mount/weedfs_attr.go

@ -2,6 +2,7 @@ package mount
import ( import (
"github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
"os" "os"
@ -15,7 +16,7 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
return fuse.OK return fuse.OK
} }
_, entry, status := wfs.maybeReadEntry(input.NodeId)
_, _, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK { if status != fuse.OK {
return status return status
} }
@ -27,13 +28,43 @@ func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse
func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse.AttrOut) (code fuse.Status) { func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse.AttrOut) (code fuse.Status) {
// TODO this is only for directory. Filet setAttr involves open files and truncate to a size
path, entry, status := wfs.maybeReadEntry(input.NodeId)
path, fh, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK { if status != fuse.OK {
return status return status
} }
if size, ok := input.GetSize(); ok {
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.Chunks))
if size < filer.FileSize(entry) {
// fmt.Printf("truncate %v \n", fullPath)
var chunks []*filer_pb.FileChunk
var truncatedChunks []*filer_pb.FileChunk
for _, chunk := range entry.Chunks {
int64Size := int64(chunk.Size)
if chunk.Offset+int64Size > int64(size) {
// this chunk is truncated
int64Size = int64(size) - chunk.Offset
if int64Size > 0 {
chunks = append(chunks, chunk)
glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size)
chunk.Size = uint64(int64Size)
} else {
glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString())
truncatedChunks = append(truncatedChunks, chunk)
}
}
}
// set the new chunks and reset entry cache
entry.Chunks = chunks
if fh != nil {
fh.entryViewCache = nil
}
}
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileSize = size
}
if mode, ok := input.GetMode(); ok { if mode, ok := input.GetMode(); ok {
entry.Attributes.FileMode = uint32(mode) entry.Attributes.FileMode = uint32(mode)
} }
@ -54,6 +85,11 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
out.AttrValid = 1 out.AttrValid = 1
wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry) wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry)
if fh != nil {
fh.dirtyMetadata = true
return fuse.OK
}
return wfs.saveEntry(path, entry) return wfs.saveEntry(path, entry)
} }

1
weed/mount/weedfs_dir_read.go

@ -116,7 +116,6 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
return true return true
} }
// TODO remove this with checking whether directory is not forgotten
if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil { if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return fuse.EIO return fuse.EIO

2
weed/mount/weedfs_filehandle.go

@ -3,7 +3,7 @@ package mount
import "github.com/hanwen/go-fuse/v2/fuse" import "github.com/hanwen/go-fuse/v2/fuse"
func (wfs *WFS) AcquireHandle(inode uint64, uid, gid uint32) (fileHandle *FileHandle, code fuse.Status) { func (wfs *WFS) AcquireHandle(inode uint64, uid, gid uint32) (fileHandle *FileHandle, code fuse.Status) {
_, entry, status := wfs.maybeReadEntry(inode)
_, _, entry, status := wfs.maybeReadEntry(inode)
if status == fuse.OK { if status == fuse.OK {
fileHandle = wfs.fhmap.AcquireFileHandle(wfs, inode, entry) fileHandle = wfs.fhmap.AcquireFileHandle(wfs, inode, entry)
fileHandle.entry = entry fileHandle.entry = entry

8
weed/mount/weedfs_xattr.go

@ -32,7 +32,7 @@ func (wfs *WFS) GetXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr str
return 0, fuse.EINVAL return 0, fuse.EINVAL
} }
_, entry, status := wfs.maybeReadEntry(header.NodeId)
_, _, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK { if status != fuse.OK {
return 0, status return 0, status
} }
@ -89,7 +89,7 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st
} }
} }
path, entry, status := wfs.maybeReadEntry(input.NodeId)
path, _, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK { if status != fuse.OK {
return status return status
} }
@ -117,7 +117,7 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st
// slice, and return the number of bytes. If the buffer is too // slice, and return the number of bytes. If the buffer is too
// small, return ERANGE, with the required buffer size. // small, return ERANGE, with the required buffer size.
func (wfs *WFS) ListXAttr(cancel <-chan struct{}, header *fuse.InHeader, dest []byte) (n uint32, code fuse.Status) { func (wfs *WFS) ListXAttr(cancel <-chan struct{}, header *fuse.InHeader, dest []byte) (n uint32, code fuse.Status) {
_, entry, status := wfs.maybeReadEntry(header.NodeId)
_, _, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK { if status != fuse.OK {
return 0, status return 0, status
} }
@ -149,7 +149,7 @@ func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr
if len(attr) == 0 { if len(attr) == 0 {
return fuse.EINVAL return fuse.EINVAL
} }
path, entry, status := wfs.maybeReadEntry(header.NodeId)
path, _, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK { if status != fuse.OK {
return status return status
} }

Loading…
Cancel
Save