Browse Source

remove dead code

pull/2716/head
chrislu 3 years ago
parent
commit
c3792c8352
  1. 0
      weed/command/mount.go
  2. 8
      weed/command/mount_darwin.go
  3. 6
      weed/command/mount_linux.go
  4. 2
      weed/command/mount_notsupported.go
  5. 0
      weed/command/mount_std.go
  6. 655
      weed/filesys/dir.go
  7. 169
      weed/filesys/dir_link.go
  8. 149
      weed/filesys/dir_rename.go
  9. 100
      weed/filesys/dirty_pages_chunked.go
  10. 406
      weed/filesys/file.go
  11. 346
      weed/filesys/filehandle.go
  12. 213
      weed/filesys/fscache.go
  13. 115
      weed/filesys/fscache_test.go
  14. 32
      weed/filesys/meta_cache/cache_config.go
  15. 101
      weed/filesys/meta_cache/id_mapper.go
  16. 154
      weed/filesys/meta_cache/meta_cache.go
  17. 47
      weed/filesys/meta_cache/meta_cache_init.go
  18. 68
      weed/filesys/meta_cache/meta_cache_subscribe.go
  19. 98
      weed/filesys/page_writer.go
  20. 115
      weed/filesys/page_writer/chunk_interval_list.go
  21. 49
      weed/filesys/page_writer/chunk_interval_list_test.go
  22. 30
      weed/filesys/page_writer/dirty_pages.go
  23. 16
      weed/filesys/page_writer/page_chunk.go
  24. 69
      weed/filesys/page_writer/page_chunk_mem.go
  25. 121
      weed/filesys/page_writer/page_chunk_swapfile.go
  26. 182
      weed/filesys/page_writer/upload_pipeline.go
  27. 63
      weed/filesys/page_writer/upload_pipeline_lock.go
  28. 47
      weed/filesys/page_writer/upload_pipeline_test.go
  29. 44
      weed/filesys/page_writer_pattern.go
  30. 63
      weed/filesys/permission.go
  31. 22
      weed/filesys/unimplemented.go
  32. 319
      weed/filesys/wfs.go
  33. 51
      weed/filesys/wfs_filer_client.go
  34. 84
      weed/filesys/wfs_write.go
  35. 153
      weed/filesys/xattr.go
  36. 2
      weed/mount/dirty_pages_chunked.go
  37. 2
      weed/mount/page_writer.go

0
weed/command/mount2.go → weed/command/mount.go

8
weed/command/mount_darwin.go

@ -1,13 +1,5 @@
package command
import (
"github.com/seaweedfs/fuse"
)
func osSpecificMountOptions() []fuse.MountOption {
return []fuse.MountOption{}
}
func checkMountPointAvailable(dir string) bool {
return true
}

6
weed/command/mount_linux.go

@ -6,8 +6,6 @@ import (
"io"
"os"
"strings"
"github.com/seaweedfs/fuse"
)
const (
@ -137,10 +135,6 @@ func parseInfoFile(r io.Reader) ([]*Info, error) {
return out, nil
}
func osSpecificMountOptions() []fuse.MountOption {
return []fuse.MountOption{}
}
func checkMountPointAvailable(dir string) bool {
mountPoint := dir
if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") {

2
weed/command/mount2_notsupported.go → weed/command/mount_notsupported.go

@ -8,7 +8,7 @@ import (
"runtime"
)
func runMount2(cmd *Command, args []string) bool {
func runMount(cmd *Command, args []string) bool {
fmt.Printf("Mount is not supported on %s %s\n", runtime.GOOS, runtime.GOARCH)
return true

0
weed/command/mount2_std.go → weed/command/mount_std.go

655
weed/filesys/dir.go

@ -1,655 +0,0 @@
package filesys
import (
"bytes"
"context"
"math"
"os"
"strings"
"syscall"
"time"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
type Dir struct {
name string
wfs *WFS
entry *filer_pb.Entry
parent *Dir
id uint64
}
var _ = fs.Node(&Dir{})
var _ = fs.NodeIdentifier(&Dir{})
var _ = fs.NodeCreater(&Dir{})
var _ = fs.NodeMknoder(&Dir{})
var _ = fs.NodeMkdirer(&Dir{})
var _ = fs.NodeFsyncer(&Dir{})
var _ = fs.NodeRequestLookuper(&Dir{})
var _ = fs.HandleReadDirAller(&Dir{})
var _ = fs.NodeRemover(&Dir{})
var _ = fs.NodeRenamer(&Dir{})
var _ = fs.NodeSetattrer(&Dir{})
var _ = fs.NodeGetxattrer(&Dir{})
var _ = fs.NodeSetxattrer(&Dir{})
var _ = fs.NodeRemovexattrer(&Dir{})
var _ = fs.NodeListxattrer(&Dir{})
var _ = fs.NodeForgetter(&Dir{})
func (dir *Dir) Id() uint64 {
if dir.parent == nil {
return 1
}
return dir.id
}
func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
entry, err := dir.maybeLoadEntry()
if err != nil {
glog.V(3).Infof("dir Attr %s, err: %+v", dir.FullPath(), err)
return err
}
// https://github.com/bazil/fuse/issues/196
attr.Valid = time.Second
attr.Inode = dir.Id()
attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir
attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
attr.Ctime = time.Unix(entry.Attributes.Mtime, 0)
attr.Atime = time.Unix(entry.Attributes.Mtime, 0)
attr.Gid = entry.Attributes.Gid
attr.Uid = entry.Attributes.Uid
if dir.FullPath() == dir.wfs.option.FilerMountRootPath {
attr.BlockSize = blockSize
}
glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
return nil
}
func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
glog.V(4).Infof("dir Getxattr %s", dir.FullPath())
entry, err := dir.maybeLoadEntry()
if err != nil {
return err
}
return getxattr(entry, req, resp)
}
func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
// write the file chunks to the filerGrpcAddress
glog.V(3).Infof("dir %s fsync %+v", dir.FullPath(), req)
return nil
}
func (dir *Dir) newFile(name string, fileMode os.FileMode) fs.Node {
fileFullPath := util.NewFullPath(dir.FullPath(), name)
fileId := fileFullPath.AsInode(fileMode)
dir.wfs.handlesLock.Lock()
existingHandle, found := dir.wfs.handles[fileId]
dir.wfs.handlesLock.Unlock()
if found {
glog.V(4).Infof("newFile found opened file handle: %+v", fileFullPath)
return existingHandle.f
}
return &File{
Name: name,
dir: dir,
wfs: dir.wfs,
id: fileId,
}
}
func (dir *Dir) newDirectory(fullpath util.FullPath) fs.Node {
return &Dir{name: fullpath.Name(), wfs: dir.wfs, parent: dir, id: fullpath.AsInode(os.ModeDir)}
}
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
if err := checkName(req.Name); err != nil {
return nil, nil, err
}
exclusive := req.Flags&fuse.OpenExclusive != 0
isDirectory := req.Mode&os.ModeDir > 0
if exclusive || isDirectory {
_, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, exclusive)
if err != nil {
return nil, nil, err
}
}
var node fs.Node
if isDirectory {
node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name))
return node, node, nil
}
node = dir.newFile(req.Name, req.Mode)
file := node.(*File)
file.entry = &filer_pb.Entry{
Name: req.Name,
IsDirectory: req.Mode&os.ModeDir > 0,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
Uid: req.Uid,
Gid: req.Gid,
Collection: dir.wfs.option.Collection,
Replication: dir.wfs.option.Replication,
TtlSec: dir.wfs.option.TtlSec,
},
}
file.dirtyMetadata = true
fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
return file, fh, nil
}
func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
if err := checkName(req.Name); err != nil {
return nil, err
}
glog.V(3).Infof("dir %s Mknod %+v", dir.FullPath(), req)
_, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false)
if err != nil {
return nil, err
}
var node fs.Node
node = dir.newFile(req.Name, req.Mode)
return node, nil
}
func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) {
dirFullPath := dir.FullPath()
request := &filer_pb.CreateEntryRequest{
Directory: dirFullPath,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: mode&os.ModeDir > 0,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(mode &^ dir.wfs.option.Umask),
Uid: uid,
Gid: gid,
Collection: dir.wfs.option.Collection,
Replication: dir.wfs.option.Replication,
TtlSec: dir.wfs.option.TtlSec,
},
},
OExcl: exclusive,
Signatures: []int32{dir.wfs.signature},
}
glog.V(1).Infof("create %s/%s", dirFullPath, name)
err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
if err := filer_pb.CreateEntry(client, request); err != nil {
if strings.Contains(err.Error(), "EEXIST") {
return fuse.EEXIST
}
glog.V(0).Infof("create %s/%s: %v", dirFullPath, name, err)
return fuse.EIO
}
if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
glog.Errorf("local InsertEntry dir %s/%s: %v", dirFullPath, name, err)
return fuse.EIO
}
return nil
})
return request, err
}
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
if err := checkName(req.Name); err != nil {
return nil, err
}
glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name)
newEntry := &filer_pb.Entry{
Name: req.Name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
Uid: req.Uid,
Gid: req.Gid,
},
}
dirFullPath := dir.FullPath()
err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(newEntry)
defer dir.wfs.mapPbIdFromFilerToLocal(newEntry)
request := &filer_pb.CreateEntryRequest{
Directory: dirFullPath,
Entry: newEntry,
Signatures: []int32{dir.wfs.signature},
}
glog.V(1).Infof("mkdir: %v", request)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
return err
}
if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
glog.Errorf("local mkdir dir %s/%s: %v", dirFullPath, req.Name, err)
return fuse.EIO
}
return nil
})
if err == nil {
node := dir.newDirectory(util.NewFullPath(dirFullPath, req.Name))
return node, nil
}
glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
return nil, fuse.EIO
}
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
if err := checkName(req.Name); err != nil {
return nil, err
}
dirPath := util.FullPath(dir.FullPath())
// glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
fullFilePath := dirPath.Child(req.Name)
visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
if visitErr != nil {
glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
return nil, fuse.EIO
}
localEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
}
if localEntry == nil {
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
entry, err := filer_pb.GetEntry(dir.wfs, fullFilePath)
if err != nil {
glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
}
localEntry = filer.FromPbEntry(string(dirPath), entry)
} else {
glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
}
if localEntry != nil {
if localEntry.IsDirectory() {
node = dir.newDirectory(fullFilePath)
} else {
node = dir.newFile(req.Name, localEntry.Attr.Mode)
}
// resp.EntryValid = time.Second
resp.Attr.Valid = time.Second
resp.Attr.Size = localEntry.FileSize
resp.Attr.Mtime = localEntry.Attr.Mtime
resp.Attr.Crtime = localEntry.Attr.Crtime
resp.Attr.Mode = localEntry.Attr.Mode
resp.Attr.Gid = localEntry.Attr.Gid
resp.Attr.Uid = localEntry.Attr.Uid
if localEntry.HardLinkCounter > 0 {
resp.Attr.Nlink = uint32(localEntry.HardLinkCounter)
}
return node, nil
}
glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
}
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
dirPath := util.FullPath(dir.FullPath())
glog.V(4).Infof("dir ReadDirAll %s", dirPath)
processEachEntryFn := func(entry *filer.Entry, isLast bool) {
if entry.IsDirectory() {
dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir, Inode: dirPath.Child(entry.Name()).AsInode(os.ModeDir)}
ret = append(ret, dirent)
} else {
dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode)), Inode: dirPath.Child(entry.Name()).AsInode(entry.Attr.Mode)}
ret = append(ret, dirent)
}
}
if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fuse.EIO
}
listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
processEachEntryFn(entry, false)
return true
})
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
return nil, fuse.EIO
}
// create proper . and .. directories
ret = append(ret, fuse.Dirent{
Inode: dirPath.AsInode(os.ModeDir),
Name: ".",
Type: fuse.DT_Dir,
})
// return the correct parent inode for the mount root
var inode uint64
if string(dirPath) == dir.wfs.option.FilerMountRootPath {
inode = dir.wfs.option.MountParentInode
} else {
inode = util.FullPath(dir.parent.FullPath()).AsInode(os.ModeDir)
}
ret = append(ret, fuse.Dirent{
Inode: inode,
Name: "..",
Type: fuse.DT_Dir,
})
return
}
func findFileType(mode uint16) fuse.DirentType {
switch mode & (syscall.S_IFMT & 0xffff) {
case syscall.S_IFSOCK:
return fuse.DT_Socket
case syscall.S_IFLNK:
return fuse.DT_Link
case syscall.S_IFREG:
return fuse.DT_File
case syscall.S_IFBLK:
return fuse.DT_Block
case syscall.S_IFDIR:
return fuse.DT_Dir
case syscall.S_IFCHR:
return fuse.DT_Char
case syscall.S_IFIFO:
return fuse.DT_FIFO
}
return fuse.DT_File
}
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
parentHasPermission := false
parentEntry, err := dir.maybeLoadEntry()
if err != nil {
return err
}
if err := checkPermission(parentEntry, req.Uid, req.Gid, true); err == nil {
parentHasPermission = true
}
dirFullPath := dir.FullPath()
filePath := util.NewFullPath(dirFullPath, req.Name)
entry, err := filer_pb.GetEntry(dir.wfs, filePath)
if err != nil {
return err
}
if !parentHasPermission {
if err := checkPermission(entry, req.Uid, req.Gid, true); err != nil {
return err
}
}
if !req.Dir {
return dir.removeOneFile(entry, req)
}
return dir.removeFolder(entry, req)
}
func (dir *Dir) removeOneFile(entry *filer_pb.Entry, req *fuse.RemoveRequest) error {
dirFullPath := dir.FullPath()
filePath := util.NewFullPath(dirFullPath, req.Name)
// first, ensure the filer store can correctly delete
glog.V(3).Infof("remove file: %v", req)
isDeleteData := entry != nil && entry.HardLinkCounter <= 1
err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature})
if err != nil {
glog.V(3).Infof("not found remove file %s: %v", filePath, err)
return fuse.ENOENT
}
// then, delete meta cache and fsNode cache
if err = dir.wfs.metaCache.DeleteEntry(context.Background(), filePath); err != nil {
glog.V(3).Infof("local DeleteEntry %s: %v", filePath, err)
return fuse.ESTALE
}
// remove current file handle if any
dir.wfs.handlesLock.Lock()
defer dir.wfs.handlesLock.Unlock()
inodeId := filePath.AsInode(0)
if fh, ok := dir.wfs.handles[inodeId]; ok {
delete(dir.wfs.handles, inodeId)
fh.isDeleted = true
}
return nil
}
func (dir *Dir) removeFolder(entry *filer_pb.Entry, req *fuse.RemoveRequest) error {
dirFullPath := dir.FullPath()
glog.V(3).Infof("remove directory entry: %v", req)
ignoreRecursiveErr := true // ignore recursion error since the OS should manage it
err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, true, true, ignoreRecursiveErr, false, []int32{dir.wfs.signature})
if err != nil {
glog.V(0).Infof("remove %s/%s: %v", dirFullPath, req.Name, err)
if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
return fuse.EEXIST
}
return fuse.ENOENT
}
t := util.NewFullPath(dirFullPath, req.Name)
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
return nil
}
func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
glog.V(4).Infof("%v dir setattr %+v mode=%d", dir.FullPath(), req, req.Mode)
entry, err := dir.maybeLoadEntry()
if err != nil {
return err
}
if req.Valid.Mode() {
entry.Attributes.FileMode = uint32(req.Mode)
}
if req.Valid.Uid() {
entry.Attributes.Uid = req.Uid
}
if req.Valid.Gid() {
entry.Attributes.Gid = req.Gid
}
if req.Valid.Mtime() {
entry.Attributes.Mtime = req.Mtime.Unix()
}
entry.Attributes.Mtime = time.Now().Unix()
return dir.saveEntry(entry)
}
func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name)
entry, err := dir.maybeLoadEntry()
if err != nil {
return err
}
if err := setxattr(entry, req); err != nil {
return err
}
return dir.saveEntry(entry)
}
func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name)
entry, err := dir.maybeLoadEntry()
if err != nil {
return err
}
if err := removexattr(entry, req); err != nil {
return err
}
return dir.saveEntry(entry)
}
func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
glog.V(4).Infof("dir Listxattr %s", dir.FullPath())
entry, err := dir.maybeLoadEntry()
if err != nil {
return err
}
if err := listxattr(entry, req, resp); err != nil {
return err
}
return nil
}
func (dir *Dir) Forget() {
glog.V(4).Infof("Forget dir %s", dir.FullPath())
}
func (dir *Dir) maybeLoadEntry() (*filer_pb.Entry, error) {
parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName()
return dir.wfs.maybeLoadEntry(parentDirPath, name)
}
func (dir *Dir) saveEntry(entry *filer_pb.Entry) error {
parentDir, name := util.FullPath(dir.FullPath()).DirAndName()
return dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(entry)
defer dir.wfs.mapPbIdFromFilerToLocal(entry)
request := &filer_pb.UpdateEntryRequest{
Directory: parentDir,
Entry: entry,
Signatures: []int32{dir.wfs.signature},
}
glog.V(1).Infof("save dir entry: %v", request)
_, err := client.UpdateEntry(context.Background(), request)
if err != nil {
glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
return fuse.EIO
}
if err := dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
return fuse.ESTALE
}
return nil
})
}
func (dir *Dir) FullPath() string {
var parts []string
for p := dir; p != nil; p = p.parent {
if strings.HasPrefix(p.name, "/") {
if len(p.name) > 1 {
parts = append(parts, p.name[1:])
}
} else {
parts = append(parts, p.name)
}
}
if len(parts) == 0 {
return "/"
}
var buf bytes.Buffer
for i := len(parts) - 1; i >= 0; i-- {
buf.WriteString("/")
buf.WriteString(parts[i])
}
return buf.String()
}

169
weed/filesys/dir_link.go

@ -1,169 +0,0 @@
package filesys
import (
"context"
"github.com/chrislusf/seaweedfs/weed/util"
"os"
"syscall"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
var _ = fs.NodeLinker(&Dir{})
var _ = fs.NodeSymlinker(&Dir{})
var _ = fs.NodeReadlinker(&File{})
const (
HARD_LINK_MARKER = '\x01'
)
func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (fs.Node, error) {
if err := checkName(req.NewName); err != nil {
return nil, err
}
oldFile, ok := old.(*File)
if !ok {
glog.Errorf("old node is not a file: %+v", old)
}
glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName)
oldEntry, err := oldFile.maybeLoadEntry(ctx)
if err != nil {
return nil, err
}
if oldEntry == nil {
return nil, fuse.EIO
}
// update old file to hardlink mode
if len(oldEntry.HardLinkId) == 0 {
oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
oldEntry.HardLinkCounter = 1
}
oldEntry.HardLinkCounter++
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
Directory: oldFile.dir.FullPath(),
Entry: oldEntry,
Signatures: []int32{dir.wfs.signature},
}
// CreateLink 1.2 : update new file to hardlink mode
oldEntry.Attributes.Mtime = time.Now().Unix()
request := &filer_pb.CreateEntryRequest{
Directory: dir.FullPath(),
Entry: &filer_pb.Entry{
Name: req.NewName,
IsDirectory: false,
Attributes: oldEntry.Attributes,
Chunks: oldEntry.Chunks,
Extended: oldEntry.Extended,
HardLinkId: oldEntry.HardLinkId,
HardLinkCounter: oldEntry.HardLinkCounter,
},
Signatures: []int32{dir.wfs.signature},
}
// apply changes to the filer, and also apply to local metaCache
err = dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil {
glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
return fuse.EIO
}
dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry))
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
return fuse.EIO
}
dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
if err != nil {
return nil, fuse.EIO
}
// create new file node
newNode := dir.newFile(req.NewName, 0)
newFile := newNode.(*File)
return newFile, err
}
func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) {
if err := checkName(req.NewName); err != nil {
return nil, err
}
glog.V(4).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target)
request := &filer_pb.CreateEntryRequest{
Directory: dir.FullPath(),
Entry: &filer_pb.Entry{
Name: req.NewName,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32((os.FileMode(0777) | os.ModeSymlink) &^ dir.wfs.option.Umask),
Uid: req.Uid,
Gid: req.Gid,
SymlinkTarget: req.Target,
},
},
Signatures: []int32{dir.wfs.signature},
}
err := dir.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err)
return fuse.EIO
}
dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
symlink := dir.newFile(req.NewName, os.ModeSymlink)
return symlink, err
}
func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) {
entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return "", err
}
if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 {
return "", fuse.Errno(syscall.EINVAL)
}
glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget)
return entry.Attributes.SymlinkTarget, nil
}

149
weed/filesys/dir_rename.go

@ -1,149 +0,0 @@
package filesys
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"io"
"os"
"strings"
)
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
if err := checkName(req.NewName); err != nil {
return err
}
if err := checkName(req.OldName); err != nil {
return err
}
newDir := newDirectory.(*Dir)
newPath := util.NewFullPath(newDir.FullPath(), req.NewName)
oldPath := util.NewFullPath(dir.FullPath(), req.OldName)
glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
// update remote filer
err := dir.wfs.WithFilerClient(true, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
request := &filer_pb.StreamRenameEntryRequest{
OldDirectory: dir.FullPath(),
OldName: req.OldName,
NewDirectory: newDir.FullPath(),
NewName: req.NewName,
Signatures: []int32{dir.wfs.signature},
}
stream, err := client.StreamRenameEntry(ctx, request)
if err != nil {
glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
glog.V(0).Infof("dir Rename %s => %s receive: %v", oldPath, newPath, recvErr)
if strings.Contains(recvErr.Error(), "not empty") {
return fuse.EEXIST
}
if strings.Contains(recvErr.Error(), "not directory") {
return fuse.ENOTDIR
}
return recvErr
}
}
if err = dir.handleRenameResponse(ctx, resp); err != nil {
glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
return err
}
}
return nil
})
return err
}
func (dir *Dir) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamRenameEntryResponse) error {
// comes from filer StreamRenameEntry, can only be create or delete entry
if resp.EventNotification.NewEntry != nil {
// with new entry, the old entry name also exists. This is the first step to create new entry
newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry)
if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil {
return err
}
oldParent, newParent := util.FullPath(resp.Directory), util.FullPath(resp.EventNotification.NewParentPath)
oldName, newName := resp.EventNotification.OldEntry.Name, resp.EventNotification.NewEntry.Name
entryFileMode := newEntry.Attr.Mode
oldPath := oldParent.Child(oldName)
newPath := newParent.Child(newName)
oldFsNode := NodeWithId(oldPath.AsInode(entryFileMode))
newFsNode := NodeWithId(newPath.AsInode(entryFileMode))
newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode(os.ModeDir)))
var newDir *Dir
if found {
newDir = newDirNode.(*Dir)
}
dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
if file, ok := internalNode.(*File); ok {
glog.V(4).Infof("internal file node %s", oldParent.Child(oldName))
file.Name = newName
file.id = uint64(newFsNode)
if found {
file.dir = newDir
}
}
if dir, ok := internalNode.(*Dir); ok {
glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName))
dir.name = newName
dir.id = uint64(newFsNode)
if found {
dir.parent = newDir
}
}
})
// change file handle
if !newEntry.IsDirectory() {
inodeId := oldPath.AsInode(entryFileMode)
dir.wfs.handlesLock.Lock()
if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle != nil {
glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath)
delete(dir.wfs.handles, inodeId)
existingHandle.handle = newPath.AsInode(entryFileMode)
existingHandle.f.entry.Name = newName
existingHandle.f.id = newPath.AsInode(entryFileMode)
dir.wfs.handles[newPath.AsInode(entryFileMode)] = existingHandle
}
dir.wfs.handlesLock.Unlock()
}
} else if resp.EventNotification.OldEntry != nil {
// without new entry, only old entry name exists. This is the second step to delete old entry
if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil {
return err
}
}
return nil
}

100
weed/filesys/dirty_pages_chunked.go

@ -1,100 +0,0 @@
package filesys
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"sync"
"time"
)
type ChunkedDirtyPages struct {
fh *FileHandle
writeWaitGroup sync.WaitGroup
chunkAddLock sync.Mutex
lastErr error
collection string
replication string
uploadPipeline *page_writer.UploadPipeline
hasWrites bool
}
var (
_ = page_writer.DirtyPages(&ChunkedDirtyPages{})
)
func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
dirtyPages := &ChunkedDirtyPages{
fh: fh,
}
dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.f.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.f.wfs.option.ConcurrentWriters)
return dirtyPages
}
func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte) {
pages.hasWrites = true
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.f.fullpath(), offset, offset+int64(len(data)))
pages.uploadPipeline.SaveDataAt(data, offset)
return
}
func (pages *ChunkedDirtyPages) FlushData() error {
if !pages.hasWrites {
return nil
}
pages.uploadPipeline.FlushAll()
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
return nil
}
func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
if !pages.hasWrites {
return
}
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
}
func (pages *ChunkedDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
}
func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
mtime := time.Now().UnixNano()
defer cleanupFn()
chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err)
pages.lastErr = err
return
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
pages.chunkAddLock.Lock()
pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk})
pages.fh.entryViewCache = nil
glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size)
pages.chunkAddLock.Unlock()
}
func (pages ChunkedDirtyPages) Destroy() {
pages.uploadPipeline.Shutdown()
}
func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) {
pages.uploadPipeline.LockForRead(startOffset, stopOffset)
}
func (pages *ChunkedDirtyPages) UnlockForRead(startOffset, stopOffset int64) {
pages.uploadPipeline.UnlockForRead(startOffset, stopOffset)
}

406
weed/filesys/file.go

@ -1,406 +0,0 @@
package filesys
import (
"context"
"os"
"sort"
"time"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
const blockSize = 512
var _ = fs.Node(&File{})
var _ = fs.NodeIdentifier(&File{})
var _ = fs.NodeOpener(&File{})
var _ = fs.NodeFsyncer(&File{})
var _ = fs.NodeSetattrer(&File{})
var _ = fs.NodeGetxattrer(&File{})
var _ = fs.NodeSetxattrer(&File{})
var _ = fs.NodeRemovexattrer(&File{})
var _ = fs.NodeListxattrer(&File{})
var _ = fs.NodeForgetter(&File{})
type File struct {
Name string
dir *Dir
wfs *WFS
entry *filer_pb.Entry
isOpen int
dirtyMetadata bool
id uint64
}
func (file *File) fullpath() util.FullPath {
return util.NewFullPath(file.dir.FullPath(), file.Name)
}
func (file *File) Id() uint64 {
return file.id
}
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return err
}
if entry == nil {
return fuse.ENOENT
}
attr.Inode = file.Id()
attr.Valid = time.Second
attr.Mode = os.FileMode(entry.Attributes.FileMode)
attr.Size = filer.FileSize(entry)
if file.isOpen > 0 {
attr.Size = entry.Attributes.FileSize
glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
}
attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
attr.Ctime = time.Unix(entry.Attributes.Mtime, 0)
attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
attr.Gid = entry.Attributes.Gid
attr.Uid = entry.Attributes.Uid
attr.BlockSize = blockSize
attr.Blocks = (attr.Size + blockSize - 1) / blockSize
if entry.HardLinkCounter > 0 {
attr.Nlink = uint32(entry.HardLinkCounter)
}
return nil
}
func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
// glog.V(4).Infof("file Getxattr %s", file.fullpath())
entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return err
}
return getxattr(entry, req, resp)
}
func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
// resp.Flags |= fuse.OpenDirectIO
handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid)
resp.Handle = fuse.HandleID(handle.handle)
glog.V(4).Infof("%v file open handle id = %d", file.fullpath(), handle.handle)
return handle, nil
}
func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
glog.V(4).Infof("%v file setattr %+v mode=%d", file.fullpath(), req, req.Mode)
entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return err
}
if req.Valid.Size() {
glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks))
if req.Size < filer.FileSize(entry) {
// fmt.Printf("truncate %v \n", fullPath)
var chunks []*filer_pb.FileChunk
var truncatedChunks []*filer_pb.FileChunk
for _, chunk := range entry.Chunks {
int64Size := int64(chunk.Size)
if chunk.Offset+int64Size > int64(req.Size) {
// this chunk is truncated
int64Size = int64(req.Size) - chunk.Offset
if int64Size > 0 {
chunks = append(chunks, chunk)
glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size)
chunk.Size = uint64(int64Size)
} else {
glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString())
truncatedChunks = append(truncatedChunks, chunk)
}
}
}
// set the new chunks and reset entry cache
entry.Chunks = chunks
file.wfs.handlesLock.Lock()
existingHandle, found := file.wfs.handles[file.Id()]
file.wfs.handlesLock.Unlock()
if found {
existingHandle.entryViewCache = nil
}
}
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileSize = req.Size
file.dirtyMetadata = true
}
if req.Valid.Mode() && entry.Attributes.FileMode != uint32(req.Mode) {
entry.Attributes.FileMode = uint32(req.Mode)
entry.Attributes.Mtime = time.Now().Unix()
file.dirtyMetadata = true
}
if req.Valid.Uid() && entry.Attributes.Uid != req.Uid {
entry.Attributes.Uid = req.Uid
entry.Attributes.Mtime = time.Now().Unix()
file.dirtyMetadata = true
}
if req.Valid.Gid() && entry.Attributes.Gid != req.Gid {
entry.Attributes.Gid = req.Gid
entry.Attributes.Mtime = time.Now().Unix()
file.dirtyMetadata = true
}
if req.Valid.Crtime() {
entry.Attributes.Crtime = req.Crtime.Unix()
entry.Attributes.Mtime = time.Now().Unix()
file.dirtyMetadata = true
}
if req.Valid.Mtime() && entry.Attributes.Mtime != req.Mtime.Unix() {
entry.Attributes.Mtime = req.Mtime.Unix()
file.dirtyMetadata = true
}
if req.Valid.Handle() {
// fmt.Printf("file handle => %d\n", req.Handle)
}
if file.isOpen > 0 {
return nil
}
if !file.dirtyMetadata {
return nil
}
return file.saveEntry(entry)
}
func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name)
entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return err
}
if err := setxattr(entry, req); err != nil {
return err
}
file.dirtyMetadata = true
if file.isOpen > 0 {
return nil
}
return file.saveEntry(entry)
}
func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name)
entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return err
}
if err := removexattr(entry, req); err != nil {
return err
}
file.dirtyMetadata = true
if file.isOpen > 0 {
return nil
}
return file.saveEntry(entry)
}
func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
glog.V(4).Infof("file Listxattr %s", file.fullpath())
entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return err
}
if err := listxattr(entry, req, resp); err != nil {
return err
}
return nil
}
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// write the file chunks to the filerGrpcAddress
glog.V(4).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req)
return file.wfs.Fsync(file, req.Header)
}
func (file *File) Forget() {
t := util.NewFullPath(file.dir.FullPath(), file.Name)
glog.V(4).Infof("Forget file %s", t)
file.wfs.ReleaseHandle(t, fuse.HandleID(t.AsInode(file.entry.FileMode())))
}
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
file.wfs.handlesLock.Lock()
handle, found := file.wfs.handles[file.Id()]
file.wfs.handlesLock.Unlock()
entry = file.entry
if found {
// glog.V(4).Infof("maybeLoadEntry found opened file %s/%s", file.dir.FullPath(), file.Name)
entry = handle.f.entry
}
if entry != nil {
if len(entry.HardLinkId) == 0 {
// only always reload hard link
return entry, nil
}
}
entry, err = file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
if err != nil {
glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
return entry, err
}
if entry != nil {
// file.entry = entry
} else {
glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err)
}
return entry, nil
}
func lessThan(a, b *filer_pb.FileChunk) bool {
if a.Mtime == b.Mtime {
return a.Fid.FileKey < b.Fid.FileKey
}
return a.Mtime < b.Mtime
}
func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
// find the earliest incoming chunk
newChunks := chunks
earliestChunk := newChunks[0]
for i := 1; i < len(newChunks); i++ {
if lessThan(earliestChunk, newChunks[i]) {
earliestChunk = newChunks[i]
}
}
entry := file.getEntry()
if entry == nil {
return
}
// pick out-of-order chunks from existing chunks
for _, chunk := range entry.Chunks {
if lessThan(earliestChunk, chunk) {
chunks = append(chunks, chunk)
}
}
// sort incoming chunks
sort.Slice(chunks, func(i, j int) bool {
return lessThan(chunks[i], chunks[j])
})
glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
entry.Chunks = append(entry.Chunks, newChunks...)
}
func (file *File) saveEntry(entry *filer_pb.Entry) error {
return file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
file.wfs.mapPbIdFromLocalToFiler(entry)
defer file.wfs.mapPbIdFromFilerToLocal(entry)
request := &filer_pb.CreateEntryRequest{
Directory: file.dir.FullPath(),
Entry: entry,
Signatures: []int32{file.wfs.signature},
}
glog.V(4).Infof("save file entry: %v", request)
_, err := client.CreateEntry(context.Background(), request)
if err != nil {
glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
return fuse.EIO
}
file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
file.dirtyMetadata = false
return nil
})
}
func (file *File) getEntry() *filer_pb.Entry {
return file.entry
}
func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {
err := file.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: file.dir.FullPath(),
Name: entry.Name,
}
glog.V(4).Infof("download entry: %v", request)
resp, err := client.CacheRemoteObjectToLocalCluster(context.Background(), request)
if err != nil {
glog.Errorf("CacheRemoteObjectToLocalCluster file %s/%s: %v", file.dir.FullPath(), file.Name, err)
return fuse.EIO
}
entry = resp.Entry
file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry))
file.dirtyMetadata = false
return nil
})
return entry, err
}

346
weed/filesys/filehandle.go

@ -1,346 +0,0 @@
package filesys
import (
"context"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
type FileHandle struct {
// cache file has been written to
dirtyPages *PageWriter
entryViewCache []filer.VisibleInterval
reader io.ReaderAt
contentType string
handle uint64
sync.Mutex
f *File
NodeId fuse.NodeID // file or directory the request is about
Uid uint32 // user ID of process making request
Gid uint32 // group ID of process making request
isDeleted bool
}
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
fh := &FileHandle{
f: file,
Uid: uid,
Gid: gid,
}
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
fh.dirtyPages = newPageWriter(fh, file.wfs.option.ChunkSizeLimit)
entry := fh.f.getEntry()
if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry)
}
return fh
}
var _ = fs.Handle(&FileHandle{})
// var _ = fs.HandleReadAller(&FileHandle{})
var _ = fs.HandleReader(&FileHandle{})
var _ = fs.HandleFlusher(&FileHandle{})
var _ = fs.HandleWriter(&FileHandle{})
var _ = fs.HandleReleaser(&FileHandle{})
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
fh.Lock()
defer fh.Unlock()
glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
if req.Size <= 0 {
return nil
}
buff := resp.Data[:cap(resp.Data)]
if req.Size > cap(resp.Data) {
// should not happen
buff = make([]byte, req.Size)
}
fh.lockForRead(req.Offset, len(buff))
defer fh.unlockForRead(req.Offset, len(buff))
totalRead, err := fh.readFromChunks(buff, req.Offset)
if err == nil || err == io.EOF {
maxStop := fh.readFromDirtyPages(buff, req.Offset)
totalRead = max(maxStop-req.Offset, totalRead)
}
if err == io.EOF {
err = nil
}
if err != nil {
glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err)
return fuse.EIO
}
if totalRead > int64(len(buff)) {
glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead)
totalRead = min(int64(len(buff)), totalRead)
}
if err == nil {
resp.Data = buff[:totalRead]
}
return err
}
func (fh *FileHandle) lockForRead(startOffset int64, size int) {
fh.dirtyPages.LockForRead(startOffset, startOffset+int64(size))
}
func (fh *FileHandle) unlockForRead(startOffset int64, size int) {
fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size))
}
func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
return
}
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
entry := fh.f.getEntry()
if entry == nil {
return 0, io.EOF
}
if entry.IsInRemoteOnly() {
glog.V(4).Infof("download remote entry %s", fh.f.fullpath())
newEntry, err := fh.f.downloadRemoteEntry(entry)
if err != nil {
glog.V(1).Infof("download remote entry %s: %v", fh.f.fullpath(), err)
return 0, err
}
entry = newEntry
}
fileSize := int64(filer.FileSize(entry))
fileFullPath := fh.f.fullpath()
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fileFullPath)
return 0, io.EOF
}
if offset+int64(len(buff)) <= int64(len(entry.Content)) {
totalRead := copy(buff, entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), nil
}
var chunkResolveErr error
if fh.entryViewCache == nil {
fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, fileSize)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
fh.reader = nil
}
reader := fh.reader
if reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, fileSize)
glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews))
for _, chunkView := range chunkViews {
glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId)
}
reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
}
fh.reader = reader
totalRead, err := reader.ReadAt(buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fileFullPath, err)
}
glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
// Write to the file handle
func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
fh.dirtyPages.writerPattern.MonitorWriteAt(req.Offset, len(req.Data))
fh.Lock()
defer fh.Unlock()
// write the request to volume servers
data := req.Data
if len(data) <= 512 && req.Offset == 0 {
// fuse message cacheable size
data = make([]byte, len(req.Data))
copy(data, req.Data)
}
entry := fh.f.getEntry()
if entry == nil {
return fuse.EIO
}
entry.Content = nil
entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(req.Offset, data)
resp.Size = len(data)
if req.Offset == 0 {
// detect mime type
fh.contentType = http.DetectContentType(data)
fh.f.dirtyMetadata = true
}
fh.f.dirtyMetadata = true
return nil
}
func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
glog.V(4).Infof("Release %v fh %d open=%d", fh.f.fullpath(), fh.handle, fh.f.isOpen)
fh.f.wfs.handlesLock.Lock()
fh.f.isOpen--
fh.f.wfs.handlesLock.Unlock()
if fh.f.isOpen <= 0 {
fh.f.entry = nil
fh.entryViewCache = nil
fh.reader = nil
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
fh.dirtyPages.Destroy()
}
if fh.f.isOpen < 0 {
glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
fh.f.isOpen = 0
return nil
}
return nil
}
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle)
if fh.isDeleted {
glog.V(4).Infof("Flush %v fh %d skip deleted", fh.f.fullpath(), fh.handle)
return nil
}
fh.Lock()
defer fh.Unlock()
if err := fh.doFlush(ctx, req.Header); err != nil {
glog.Errorf("Flush doFlush %s: %v", fh.f.Name, err)
return err
}
return nil
}
func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
// flush works at fh level
// send the data to the OS
glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle)
if err := fh.dirtyPages.FlushData(); err != nil {
glog.Errorf("%v doFlush: %v", fh.f.fullpath(), err)
return fuse.EIO
}
if !fh.f.dirtyMetadata {
return nil
}
err := fh.f.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
entry := fh.f.getEntry()
if entry == nil {
return nil
}
if entry.Attributes != nil {
entry.Attributes.Mime = fh.contentType
if entry.Attributes.Uid == 0 {
entry.Attributes.Uid = header.Uid
}
if entry.Attributes.Gid == 0 {
entry.Attributes.Gid = header.Gid
}
if entry.Attributes.Crtime == 0 {
entry.Attributes.Crtime = time.Now().Unix()
}
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions()
}
request := &filer_pb.CreateEntryRequest{
Directory: fh.f.dir.FullPath(),
Entry: entry,
Signatures: []int32{fh.f.wfs.signature},
}
glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks))
for i, chunk := range entry.Chunks {
glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
if manifestErr != nil {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
entry.Chunks = append(chunks, manifestChunks...)
fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
}
fh.f.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
if err == nil {
fh.f.dirtyMetadata = false
}
if err != nil {
glog.Errorf("%v fh %d flush: %v", fh.f.fullpath(), fh.handle, err)
return fuse.EIO
}
return nil
}

213
weed/filesys/fscache.go

@ -1,213 +0,0 @@
package filesys
import (
"sync"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/util"
)
type FsCache struct {
root *FsNode
sync.RWMutex
}
type FsNode struct {
parent *FsNode
node fs.Node
name string
childrenLock sync.RWMutex
children map[string]*FsNode
}
func newFsCache(root fs.Node) *FsCache {
return &FsCache{
root: &FsNode{
node: root,
},
}
}
func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
c.RLock()
defer c.RUnlock()
return c.doGetFsNode(path)
}
func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node {
t := c.root
for _, p := range path.Split() {
t = t.findChild(p)
if t == nil {
return nil
}
}
return t.node
}
func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
c.Lock()
defer c.Unlock()
c.doSetFsNode(path, node)
}
func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) {
t := c.root
for _, p := range path.Split() {
t = t.ensureChild(p)
}
t.node = node
}
func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
c.Lock()
defer c.Unlock()
t := c.doGetFsNode(path)
if t != nil {
return t
}
t = genNodeFn()
c.doSetFsNode(path, t)
return t
}
func (c *FsCache) DeleteFsNode(path util.FullPath) {
c.Lock()
defer c.Unlock()
t := c.root
for _, p := range path.Split() {
t = t.findChild(p)
if t == nil {
return
}
}
if t.parent != nil {
t.parent.disconnectChild(t)
}
t.deleteSelf()
}
// oldPath and newPath are full path including the new name
func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
c.Lock()
defer c.Unlock()
// find old node
src := c.root
for _, p := range oldPath.Split() {
src = src.findChild(p)
if src == nil {
return src
}
}
if src.parent != nil {
src.parent.disconnectChild(src)
}
// find new node
target := c.root
for _, p := range newPath.Split() {
target = target.ensureChild(p)
}
parent := target.parent
if dir, ok := src.node.(*Dir); ok {
dir.name = target.name // target is not Dir, but a shortcut
}
if f, ok := src.node.(*File); ok {
f.Name = target.name
entry := f.getEntry()
if entry != nil {
entry.Name = f.Name
}
}
parent.disconnectChild(target)
target.deleteSelf()
src.name = target.name
src.connectToParent(parent)
return src
}
func (n *FsNode) connectToParent(parent *FsNode) {
n.parent = parent
oldNode := parent.findChild(n.name)
if oldNode != nil {
oldNode.deleteSelf()
}
if dir, ok := n.node.(*Dir); ok {
if parent.node != nil {
dir.parent = parent.node.(*Dir)
}
}
if f, ok := n.node.(*File); ok {
if parent.node != nil {
f.dir = parent.node.(*Dir)
}
}
n.childrenLock.Lock()
parent.children[n.name] = n
n.childrenLock.Unlock()
}
func (n *FsNode) findChild(name string) *FsNode {
n.childrenLock.RLock()
defer n.childrenLock.RUnlock()
child, found := n.children[name]
if found {
return child
}
return nil
}
func (n *FsNode) ensureChild(name string) *FsNode {
n.childrenLock.Lock()
defer n.childrenLock.Unlock()
if n.children == nil {
n.children = make(map[string]*FsNode)
}
child, found := n.children[name]
if found {
return child
}
t := &FsNode{
parent: n,
node: nil,
name: name,
children: nil,
}
n.children[name] = t
return t
}
func (n *FsNode) disconnectChild(child *FsNode) {
n.childrenLock.Lock()
delete(n.children, child.name)
n.childrenLock.Unlock()
child.parent = nil
}
func (n *FsNode) deleteSelf() {
n.childrenLock.Lock()
for _, child := range n.children {
child.deleteSelf()
}
n.children = nil
n.childrenLock.Unlock()
n.node = nil
n.parent = nil
}

115
weed/filesys/fscache_test.go

@ -1,115 +0,0 @@
package filesys
import (
"testing"
"github.com/chrislusf/seaweedfs/weed/util"
)
func TestPathSplit(t *testing.T) {
parts := util.FullPath("/").Split()
if len(parts) != 0 {
t.Errorf("expecting an empty list, but getting %d", len(parts))
}
parts = util.FullPath("/readme.md").Split()
if len(parts) != 1 {
t.Errorf("expecting an empty list, but getting %d", len(parts))
}
}
func TestFsCache(t *testing.T) {
cache := newFsCache(nil)
x := cache.GetFsNode(util.FullPath("/y/x"))
if x != nil {
t.Errorf("wrong node!")
}
p := util.FullPath("/a/b/c")
cache.SetFsNode(p, &File{Name: "cc"})
tNode := cache.GetFsNode(p)
tFile := tNode.(*File)
if tFile.Name != "cc" {
t.Errorf("expecting a FsNode")
}
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"})
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
b := cache.GetFsNode(util.FullPath("/a/b"))
if b != nil {
t.Errorf("unexpected node!")
}
a := cache.GetFsNode(util.FullPath("/a"))
if a == nil {
t.Errorf("missing node!")
}
cache.DeleteFsNode(util.FullPath("/a"))
if b != nil {
t.Errorf("unexpected node!")
}
a = cache.GetFsNode(util.FullPath("/a"))
if a != nil {
t.Errorf("wrong DeleteFsNode!")
}
z := cache.GetFsNode(util.FullPath("/z"))
if z == nil {
t.Errorf("missing node!")
}
y := cache.GetFsNode(util.FullPath("/x/y"))
if y != nil {
t.Errorf("wrong node!")
}
}
func TestFsCacheMove(t *testing.T) {
cache := newFsCache(nil)
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x"))
d := cache.GetFsNode(util.FullPath("/z/x/d"))
if d == nil {
t.Errorf("unexpected nil node!")
}
if d.(*File).Name != "dd" {
t.Errorf("unexpected non dd node!")
}
}
func TestFsCacheMove2(t *testing.T) {
cache := newFsCache(nil)
cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
cache.Move(util.FullPath("/a/b/d"), util.FullPath("/a/b/e"))
d := cache.GetFsNode(util.FullPath("/a/b/e"))
if d == nil {
t.Errorf("unexpected nil node!")
}
if d.(*File).Name != "e" {
t.Errorf("unexpected node!")
}
}

32
weed/filesys/meta_cache/cache_config.go

@ -1,32 +0,0 @@
package meta_cache
import "github.com/chrislusf/seaweedfs/weed/util"
var (
_ = util.Configuration(&cacheConfig{})
)
// implementing util.Configuraion
type cacheConfig struct {
dir string
}
func (c cacheConfig) GetString(key string) string {
return c.dir
}
func (c cacheConfig) GetBool(key string) bool {
panic("implement me")
}
func (c cacheConfig) GetInt(key string) int {
panic("implement me")
}
func (c cacheConfig) GetStringSlice(key string) []string {
panic("implement me")
}
func (c cacheConfig) SetDefault(key string, value interface{}) {
panic("implement me")
}

101
weed/filesys/meta_cache/id_mapper.go

@ -1,101 +0,0 @@
package meta_cache
import (
"fmt"
"strconv"
"strings"
)
type UidGidMapper struct {
uidMapper *IdMapper
gidMapper *IdMapper
}
type IdMapper struct {
localToFiler map[uint32]uint32
filerToLocal map[uint32]uint32
}
// UidGidMapper translates local uid/gid to filer uid/gid
// The local storage always persists the same as the filer.
// The local->filer translation happens when updating the filer first and later saving to meta_cache.
// And filer->local happens when reading from the meta_cache.
func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
uidMapper, err := newIdMapper(uidPairsStr)
if err != nil {
return nil, err
}
gidMapper, err := newIdMapper(gidPairStr)
if err != nil {
return nil, err
}
return &UidGidMapper{
uidMapper: uidMapper,
gidMapper: gidMapper,
}, nil
}
func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) {
return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
}
func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) {
return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
}
func (m *IdMapper) LocalToFiler(id uint32) uint32 {
value, found := m.localToFiler[id]
if found {
return value
}
return id
}
func (m *IdMapper) FilerToLocal(id uint32) uint32 {
value, found := m.filerToLocal[id]
if found {
return value
}
return id
}
func newIdMapper(pairsStr string) (*IdMapper, error) {
localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
if err != nil {
return nil, err
}
return &IdMapper{
localToFiler: localToFiler,
filerToLocal: filerToLocal,
}, nil
}
func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
if pairsStr == "" {
return
}
localToFiler = make(map[uint32]uint32)
filerToLocal = make(map[uint32]uint32)
for _, pairStr := range strings.Split(pairsStr, ",") {
pair := strings.Split(pairStr, ":")
localUidStr, filerUidStr := pair[0], pair[1]
localUid, localUidErr := strconv.Atoi(localUidStr)
if localUidErr != nil {
err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr)
return
}
filerUid, filerUidErr := strconv.Atoi(filerUidStr)
if filerUidErr != nil {
err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
return
}
localToFiler[uint32(localUid)] = uint32(filerUid)
filerToLocal[uint32(filerUid)] = uint32(localUid)
}
return
}

154
weed/filesys/meta_cache/meta_cache.go

@ -1,154 +0,0 @@
package meta_cache
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/leveldb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
"os"
)
// need to have logic similar to FilerStoreWrapper
// e.g. fill fileId field for chunks
type MetaCache struct {
localStore filer.VirtualFilerStore
// sync.RWMutex
visitedBoundary *bounded_tree.BoundedTree
uidGidMapper *UidGidMapper
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
}
func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
return &MetaCache{
localStore: openMetaStore(dbFolder),
visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
uidGidMapper: uidGidMapper,
invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
invalidateFunc(fullpath, entry)
},
}
}
func openMetaStore(dbFolder string) filer.VirtualFilerStore {
os.RemoveAll(dbFolder)
os.MkdirAll(dbFolder, 0755)
store := &leveldb.LevelDBStore{}
config := &cacheConfig{
dir: dbFolder,
}
if err := store.Initialize(config, ""); err != nil {
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
}
return filer.NewFilerStoreWrapper(store)
}
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
//mc.Lock()
//defer mc.Unlock()
return mc.doInsertEntry(ctx, entry)
}
func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
return mc.localStore.InsertEntry(ctx, entry)
}
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
//mc.Lock()
//defer mc.Unlock()
oldDir, _ := oldPath.DirAndName()
if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
if oldPath != "" {
if newEntry != nil && oldPath == newEntry.FullPath {
// skip the unnecessary deletion
// leave the update to the following InsertEntry operation
} else {
glog.V(3).Infof("DeleteEntry %s", oldPath)
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
return err
}
}
}
} else {
// println("unknown old directory:", oldDir)
}
if newEntry != nil {
newDir, _ := newEntry.DirAndName()
if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
return err
}
}
}
return nil
}
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
//mc.Lock()
//defer mc.Unlock()
return mc.localStore.UpdateEntry(ctx, entry)
}
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
//mc.RLock()
//defer mc.RUnlock()
entry, err = mc.localStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
}
mc.mapIdFromFilerToLocal(entry)
return
}
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
//mc.Lock()
//defer mc.Unlock()
return mc.localStore.DeleteEntry(ctx, fp)
}
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
//mc.RLock()
//defer mc.RUnlock()
if !mc.visitedBoundary.HasVisited(dirPath) {
// if this request comes after renaming, it should be fine
glog.Warningf("unsynchronized dir: %v", dirPath)
}
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
mc.mapIdFromFilerToLocal(entry)
return eachEntryFunc(entry)
})
if err != nil {
return err
}
return err
}
func (mc *MetaCache) Shutdown() {
//mc.Lock()
//defer mc.Unlock()
mc.localStore.Shutdown()
}
func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
}
func (mc *MetaCache) Debug() {
if debuggable, ok := mc.localStore.(filer.Debuggable); ok {
println("start debugging")
debuggable.Debug(os.Stderr)
}
}

47
weed/filesys/meta_cache/meta_cache_init.go

@ -1,47 +0,0 @@
package meta_cache
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
util.Retry("ReadDirAllEntries", func() error {
err = filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer.FromPbEntry(string(path), pbEntry)
if IsHiddenSystemEntry(string(path), entry.Name()) {
return nil
}
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
glog.V(0).Infof("read %s: %v", entry.FullPath, err)
return err
}
if entry.IsDirectory() {
childDirectories = append(childDirectories, entry.Name())
}
return nil
})
return err
})
if err != nil {
err = fmt.Errorf("list %s: %v", path, err)
}
return
})
}
func IsHiddenSystemEntry(dir, name string) bool {
return dir == "/" && (name == "topics" || name == "etc")
}

68
weed/filesys/meta_cache/meta_cache_subscribe.go

@ -1,68 +0,0 @@
package meta_cache
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
for _, sig := range message.Signatures {
if sig == selfSignature && selfSignature != 0 {
return nil
}
}
dir := resp.Directory
var oldPath util.FullPath
var newEntry *filer.Entry
if message.OldEntry != nil {
oldPath = util.NewFullPath(dir, message.OldEntry.Name)
glog.V(4).Infof("deleting %v", oldPath)
}
if message.NewEntry != nil {
if message.NewParentPath != "" {
dir = message.NewParentPath
}
key := util.NewFullPath(dir, message.NewEntry.Name)
glog.V(4).Infof("creating %v", key)
newEntry = filer.FromPbEntry(dir, message.NewEntry)
}
err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
if err == nil {
if message.OldEntry != nil && message.NewEntry != nil {
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
mc.invalidateFunc(oldKey, message.OldEntry)
if message.OldEntry.Name != message.NewEntry.Name {
newKey := util.NewFullPath(dir, message.NewEntry.Name)
mc.invalidateFunc(newKey, message.NewEntry)
}
} else if message.OldEntry == nil && message.NewEntry != nil {
// no need to invaalidate
} else if message.OldEntry != nil && message.NewEntry == nil {
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
mc.invalidateFunc(oldKey, message.OldEntry)
}
}
return err
}
util.RetryForever("followMetaUpdates", func() error {
return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true)
}, func(err error) bool {
glog.Errorf("follow metadata updates: %v", err)
return true
})
return nil
}

98
weed/filesys/page_writer.go

@ -1,98 +0,0 @@
package filesys
import (
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"github.com/chrislusf/seaweedfs/weed/glog"
)
type PageWriter struct {
fh *FileHandle
collection string
replication string
chunkSize int64
writerPattern *WriterPattern
randomWriter page_writer.DirtyPages
}
var (
_ = page_writer.DirtyPages(&PageWriter{})
)
func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
pw := &PageWriter{
fh: fh,
chunkSize: chunkSize,
writerPattern: NewWriterPattern(chunkSize),
randomWriter: newMemoryChunkPages(fh, chunkSize),
// randomWriter: newTempFileDirtyPages(fh.f, chunkSize),
}
return pw
}
func (pw *PageWriter) AddPage(offset int64, data []byte) {
glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.fh.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode())
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
pw.addToOneChunk(i, offset, data[:writeSize])
offset += writeSize
data = data[writeSize:]
}
}
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) {
pw.randomWriter.AddPage(offset, data)
}
func (pw *PageWriter) FlushData() error {
pw.writerPattern.Reset()
return pw.randomWriter.FlushData()
}
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.f.fullpath(), offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
offset += readSize
data = data[readSize:]
}
return
}
func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
return pw.randomWriter.GetStorageOptions()
}
func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) {
pw.randomWriter.LockForRead(startOffset, stopOffset)
}
func (pw *PageWriter) UnlockForRead(startOffset, stopOffset int64) {
pw.randomWriter.UnlockForRead(startOffset, stopOffset)
}
func (pw *PageWriter) Destroy() {
pw.randomWriter.Destroy()
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}

115
weed/filesys/page_writer/chunk_interval_list.go

@ -1,115 +0,0 @@
package page_writer
import "math"
// ChunkWrittenInterval mark one written interval within one page chunk
type ChunkWrittenInterval struct {
StartOffset int64
stopOffset int64
prev *ChunkWrittenInterval
next *ChunkWrittenInterval
}
func (interval *ChunkWrittenInterval) Size() int64 {
return interval.stopOffset - interval.StartOffset
}
func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
return interval.stopOffset-interval.StartOffset == chunkSize
}
// ChunkWrittenIntervalList mark written intervals within one page chunk
type ChunkWrittenIntervalList struct {
head *ChunkWrittenInterval
tail *ChunkWrittenInterval
}
func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
list := &ChunkWrittenIntervalList{
head: &ChunkWrittenInterval{
StartOffset: -1,
stopOffset: -1,
},
tail: &ChunkWrittenInterval{
StartOffset: math.MaxInt64,
stopOffset: math.MaxInt64,
},
}
list.head.next = list.tail
list.tail.prev = list.head
return list
}
func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
interval := &ChunkWrittenInterval{
StartOffset: startOffset,
stopOffset: stopOffset,
}
list.addInterval(interval)
}
func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool {
return list.size() == 1 && list.head.next.isComplete(chunkSize)
}
func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
for t := list.head; t != nil; t = t.next {
writtenByteCount += t.Size()
}
return
}
func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
p := list.head
for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
}
q := list.tail
for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
}
if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
// merge p and q together
p.stopOffset = q.stopOffset
unlinkNodesBetween(p, q.next)
return
}
if interval.StartOffset <= p.stopOffset {
// merge new interval into p
p.stopOffset = interval.stopOffset
unlinkNodesBetween(p, q)
return
}
if q.StartOffset <= interval.stopOffset {
// merge new interval into q
q.StartOffset = interval.StartOffset
unlinkNodesBetween(p, q)
return
}
// add the new interval between p and q
unlinkNodesBetween(p, q)
p.next = interval
interval.prev = p
q.prev = interval
interval.next = q
}
// unlinkNodesBetween remove all nodes after start and before stop, exclusive
func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
if start.next == stop {
return
}
start.next.prev = nil
start.next = stop
stop.prev.next = nil
stop.prev = start
}
func (list *ChunkWrittenIntervalList) size() int {
var count int
for t := list.head; t != nil; t = t.next {
count++
}
return count - 2
}

49
weed/filesys/page_writer/chunk_interval_list_test.go

@ -1,49 +0,0 @@
package page_writer
import (
"github.com/stretchr/testify/assert"
"testing"
)
func Test_PageChunkWrittenIntervalList(t *testing.T) {
list := newChunkWrittenIntervalList()
assert.Equal(t, 0, list.size(), "empty list")
list.MarkWritten(0, 5)
assert.Equal(t, 1, list.size(), "one interval")
list.MarkWritten(0, 5)
assert.Equal(t, 1, list.size(), "duplicated interval2")
list.MarkWritten(95, 100)
assert.Equal(t, 2, list.size(), "two intervals")
list.MarkWritten(50, 60)
assert.Equal(t, 3, list.size(), "three intervals")
list.MarkWritten(50, 55)
assert.Equal(t, 3, list.size(), "three intervals merge")
list.MarkWritten(40, 50)
assert.Equal(t, 3, list.size(), "three intervals grow forward")
list.MarkWritten(50, 65)
assert.Equal(t, 3, list.size(), "three intervals grow backward")
list.MarkWritten(70, 80)
assert.Equal(t, 4, list.size(), "four intervals")
list.MarkWritten(60, 70)
assert.Equal(t, 3, list.size(), "three intervals merged")
list.MarkWritten(59, 71)
assert.Equal(t, 3, list.size(), "covered three intervals")
list.MarkWritten(5, 59)
assert.Equal(t, 2, list.size(), "covered two intervals")
list.MarkWritten(70, 99)
assert.Equal(t, 1, list.size(), "covered one intervals")
}

30
weed/filesys/page_writer/dirty_pages.go

@ -1,30 +0,0 @@
package page_writer
type DirtyPages interface {
AddPage(offset int64, data []byte)
FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
GetStorageOptions() (collection, replication string)
Destroy()
LockForRead(startOffset, stopOffset int64)
UnlockForRead(startOffset, stopOffset int64)
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}
func minInt(x, y int) int {
if x < y {
return x
}
return y
}

16
weed/filesys/page_writer/page_chunk.go

@ -1,16 +0,0 @@
package page_writer
import (
"io"
)
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
type PageChunk interface {
FreeResource()
WriteDataAt(src []byte, offset int64) (n int)
ReadDataAt(p []byte, off int64) (maxStop int64)
IsComplete() bool
WrittenSize() int64
SaveContent(saveFn SaveToStorageFunc)
}

69
weed/filesys/page_writer/page_chunk_mem.go

@ -1,69 +0,0 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
)
var (
_ = PageChunk(&MemChunk{})
)
type MemChunk struct {
buf []byte
usage *ChunkWrittenIntervalList
chunkSize int64
logicChunkIndex LogicChunkIndex
}
func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
return &MemChunk{
logicChunkIndex: logicChunkIndex,
chunkSize: chunkSize,
buf: mem.Allocate(int(chunkSize)),
usage: newChunkWrittenIntervalList(),
}
}
func (mc *MemChunk) FreeResource() {
mem.Free(mc.buf)
}
func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
innerOffset := offset % mc.chunkSize
n = copy(mc.buf[innerOffset:], src)
mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
return
}
func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop {
copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
maxStop = max(maxStop, logicStop)
}
}
return
}
func (mc *MemChunk) IsComplete() bool {
return mc.usage.IsComplete(mc.chunkSize)
}
func (mc *MemChunk) WrittenSize() int64 {
return mc.usage.WrittenSize()
}
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
}
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
})
}
}

121
weed/filesys/page_writer/page_chunk_swapfile.go

@ -1,121 +0,0 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
"os"
)
var (
_ = PageChunk(&SwapFileChunk{})
)
type ActualChunkIndex int
type SwapFile struct {
dir string
file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
chunkSize int64
}
type SwapFileChunk struct {
swapfile *SwapFile
usage *ChunkWrittenIntervalList
logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex
}
func NewSwapFile(dir string, chunkSize int64) *SwapFile {
return &SwapFile{
dir: dir,
file: nil,
logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
chunkSize: chunkSize,
}
}
func (sf *SwapFile) FreeResource() {
if sf.file != nil {
sf.file.Close()
os.Remove(sf.file.Name())
}
}
func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
if sf.file == nil {
var err error
sf.file, err = os.CreateTemp(sf.dir, "")
if err != nil {
glog.Errorf("create swap file: %v", err)
return nil
}
}
actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
if !found {
actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
}
return &SwapFileChunk{
swapfile: sf,
usage: newChunkWrittenIntervalList(),
logicChunkIndex: logicChunkIndex,
actualChunkIndex: actualChunkIndex,
}
}
func (sc *SwapFileChunk) FreeResource() {
}
func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
innerOffset := offset % sc.swapfile.chunkSize
var err error
n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
if err == nil {
sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
} else {
glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
}
return
}
func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
break
}
maxStop = max(maxStop, logicStop)
}
}
return
}
func (sc *SwapFileChunk) IsComplete() bool {
return sc.usage.IsComplete(sc.swapfile.chunkSize)
}
func (sc *SwapFileChunk) WrittenSize() int64 {
return sc.usage.WrittenSize()
}
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
}
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
data := mem.Allocate(int(t.Size()))
sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
reader := util.NewBytesReader(data)
saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
})
mem.Free(data)
}
sc.usage = newChunkWrittenIntervalList()
}

182
weed/filesys/page_writer/upload_pipeline.go

@ -1,182 +0,0 @@
package page_writer
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"sync"
"sync/atomic"
"time"
)
type LogicChunkIndex int
type UploadPipeline struct {
filepath util.FullPath
ChunkSize int64
writableChunks map[LogicChunkIndex]PageChunk
writableChunksLock sync.Mutex
sealedChunks map[LogicChunkIndex]*SealedChunk
sealedChunksLock sync.Mutex
uploaders *util.LimitedConcurrentExecutor
uploaderCount int32
uploaderCountCond *sync.Cond
saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
bufferChunkLimit int
}
type SealedChunk struct {
chunk PageChunk
referenceCounter int // track uploading or reading processes
}
func (sc *SealedChunk) FreeReference(messageOnFree string) {
sc.referenceCounter--
if sc.referenceCounter == 0 {
glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
sc.chunk.FreeResource()
}
}
func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
return &UploadPipeline{
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk),
sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
uploaders: writers,
uploaderCountCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn,
activeReadChunks: make(map[LogicChunkIndex]int),
bufferChunkLimit: bufferChunkLimit,
}
}
func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
memChunk, found := up.writableChunks[logicChunkIndex]
if !found {
if len(up.writableChunks) < up.bufferChunkLimit {
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks {
chunkFullness := mc.WrittenSize()
if fullness < chunkFullness {
fullestChunkIndex = lci
fullness = chunkFullness
}
}
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
delete(up.writableChunks, fullestChunkIndex)
fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness)
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
}
up.writableChunks[logicChunkIndex] = memChunk
}
n = memChunk.WriteDataAt(p, off)
up.maybeMoveToSealed(memChunk, logicChunkIndex)
return
}
func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
// read from sealed chunks first
up.sealedChunksLock.Lock()
sealedChunk, found := up.sealedChunks[logicChunkIndex]
if found {
sealedChunk.referenceCounter++
}
up.sealedChunksLock.Unlock()
if found {
maxStop = sealedChunk.chunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
}
// read from writable chunks last
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
writableChunk, found := up.writableChunks[logicChunkIndex]
if !found {
return
}
writableMaxStop := writableChunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop)
return
}
func (up *UploadPipeline) FlushAll() {
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
for logicChunkIndex, memChunk := range up.writableChunks {
up.moveToSealed(memChunk, logicChunkIndex)
}
up.waitForCurrentWritersToComplete()
}
func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
if memChunk.IsComplete() {
up.moveToSealed(memChunk, logicChunkIndex)
}
}
func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
atomic.AddInt32(&up.uploaderCount, 1)
glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
up.sealedChunksLock.Lock()
if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
}
sealedChunk := &SealedChunk{
chunk: memChunk,
referenceCounter: 1, // default 1 is for uploading process
}
up.sealedChunks[logicChunkIndex] = sealedChunk
delete(up.writableChunks, logicChunkIndex)
up.sealedChunksLock.Unlock()
up.uploaders.Execute(func() {
// first add to the file chunks
sealedChunk.chunk.SaveContent(up.saveToStorageFn)
// notify waiting process
atomic.AddInt32(&up.uploaderCount, -1)
glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
// Lock and Unlock are not required,
// but it may signal multiple times during one wakeup,
// and the waiting goroutine may miss some of them!
up.uploaderCountCond.L.Lock()
up.uploaderCountCond.Broadcast()
up.uploaderCountCond.L.Unlock()
// wait for readers
for up.IsLocked(logicChunkIndex) {
time.Sleep(59 * time.Millisecond)
}
// then remove from sealed chunks
up.sealedChunksLock.Lock()
defer up.sealedChunksLock.Unlock()
delete(up.sealedChunks, logicChunkIndex)
sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
})
}
func (up *UploadPipeline) Shutdown() {
}

63
weed/filesys/page_writer/upload_pipeline_lock.go

@ -1,63 +0,0 @@
package page_writer
import (
"sync/atomic"
)
func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
up.activeReadChunks[i] = count + 1
} else {
up.activeReadChunks[i] = 1
}
}
}
func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
if count == 1 {
delete(up.activeReadChunks, i)
} else {
up.activeReadChunks[i] = count - 1
}
}
}
}
func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
if count, found := up.activeReadChunks[logicChunkIndex]; found {
return count > 0
}
return false
}
func (up *UploadPipeline) waitForCurrentWritersToComplete() {
up.uploaderCountCond.L.Lock()
t := int32(100)
for {
t = atomic.LoadInt32(&up.uploaderCount)
if t <= 0 {
break
}
up.uploaderCountCond.Wait()
}
up.uploaderCountCond.L.Unlock()
}

47
weed/filesys/page_writer/upload_pipeline_test.go

@ -1,47 +0,0 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/util"
"testing"
)
func TestUploadPipeline(t *testing.T) {
uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16)
writeRange(uploadPipeline, 0, 131072)
writeRange(uploadPipeline, 131072, 262144)
writeRange(uploadPipeline, 262144, 1025536)
confirmRange(t, uploadPipeline, 0, 1025536)
writeRange(uploadPipeline, 1025536, 1296896)
confirmRange(t, uploadPipeline, 1025536, 1296896)
writeRange(uploadPipeline, 1296896, 2162688)
confirmRange(t, uploadPipeline, 1296896, 2162688)
confirmRange(t, uploadPipeline, 1296896, 2162688)
}
// startOff and stopOff must be divided by 4
func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff / 4; i < stopOff/4; i += 4 {
util.Uint32toBytes(p, uint32(i))
uploadPipeline.SaveDataAt(p, i)
}
}
func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff; i < stopOff/4; i += 4 {
uploadPipeline.MaybeReadDataAt(p, i)
x := util.BytesToUint32(p)
if x != uint32(i) {
t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
}
}
}

44
weed/filesys/page_writer_pattern.go

@ -1,44 +0,0 @@
package filesys
type WriterPattern struct {
isStreaming bool
lastWriteOffset int64
chunkSize int64
}
// For streaming write: only cache the first chunk
// For random write: fall back to temp file approach
// writes can only change from streaming mode to non-streaming mode
func NewWriterPattern(chunkSize int64) *WriterPattern {
return &WriterPattern{
isStreaming: true,
lastWriteOffset: -1,
chunkSize: chunkSize,
}
}
func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) {
if rp.lastWriteOffset > offset {
rp.isStreaming = false
}
if rp.lastWriteOffset == -1 {
if offset != 0 {
rp.isStreaming = false
}
}
rp.lastWriteOffset = offset
}
func (rp *WriterPattern) IsStreamingMode() bool {
return rp.isStreaming
}
func (rp *WriterPattern) IsRandomMode() bool {
return !rp.isStreaming
}
func (rp *WriterPattern) Reset() {
rp.isStreaming = true
rp.lastWriteOffset = -1
}

63
weed/filesys/permission.go

@ -1,63 +0,0 @@
package filesys
import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
)
func checkPermission(entry *filer_pb.Entry, uid, gid uint32, isWrite bool) error {
if uid == 0 || gid == 0 {
return nil
}
if entry == nil {
return nil
}
if entry.Attributes == nil {
return nil
}
attr := entry.Attributes
if attr.Uid == uid {
if isWrite {
if attr.FileMode&0200 > 0 {
return nil
} else {
return fuse.EPERM
}
} else {
if attr.FileMode&0400 > 0 {
return nil
} else {
return fuse.EPERM
}
}
} else if attr.Gid == gid {
if isWrite {
if attr.FileMode&0020 > 0 {
return nil
} else {
return fuse.EPERM
}
} else {
if attr.FileMode&0040 > 0 {
return nil
} else {
return fuse.EPERM
}
}
} else {
if isWrite {
if attr.FileMode&0002 > 0 {
return nil
} else {
return fuse.EPERM
}
} else {
if attr.FileMode&0004 > 0 {
return nil
} else {
return fuse.EPERM
}
}
}
}

22
weed/filesys/unimplemented.go

@ -1,22 +0,0 @@
package filesys
import (
"context"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
// https://github.com/bazil/fuse/issues/130
var _ = fs.NodeAccesser(&Dir{})
func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error {
return fuse.ENOSYS
}
var _ = fs.NodeAccesser(&File{})
func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error {
return fuse.ENOSYS
}

319
weed/filesys/wfs.go

@ -1,319 +0,0 @@
package filesys
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"math"
"math/rand"
"os"
"path"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
)
type Option struct {
MountDirectory string
FilerAddresses []pb.ServerAddress
filerIndex int
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
CacheDir string
CacheSizeMB int64
DataCenter string
Umask os.FileMode
MountUid uint32
MountGid uint32
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
MountParentInode uint64
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
uniqueCacheDir string
}
var _ = fs.FS(&WFS{})
var _ = fs.FSStatfser(&WFS{})
type WFS struct {
option *Option
// contains all open handles, protected by handlesLock
handlesLock sync.Mutex
handles map[uint64]*FileHandle
bufPool sync.Pool
stats statsCache
root fs.Node
fsNodeCache *FsCache
chunkCache *chunk_cache.TieredChunkCache
metaCache *meta_cache.MetaCache
signature int32
// throttle writers
concurrentWriters *util.LimitedConcurrentExecutor
Server *fs.Server
}
type statsCache struct {
filer_pb.StatisticsResponse
lastChecked int64 // unix time in seconds
}
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
option: option,
handles: make(map[uint64]*FileHandle),
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
},
},
signature: util.RandomInt32(),
}
wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
wfs.option.setupUniqueCacheDirectory()
if option.CacheSizeMB > 0 {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
}
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) {
fsNode := NodeWithId(filePath.AsInode(entry.FileMode()))
if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
}
dir, name := filePath.DirAndName()
parent := NodeWithId(util.FullPath(dir).AsInode(os.ModeDir))
if dir == option.FilerMountRootPath {
parent = NodeWithId(1)
}
if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
}
})
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
os.RemoveAll(option.getUniqueCacheDir())
})
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1}
wfs.fsNodeCache = newFsCache(wfs.root)
if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
}
return wfs
}
func (wfs *WFS) StartBackgroundTasks() {
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
}
func (wfs *WFS) Root() (fs.Node, error) {
return wfs.root, nil
}
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
fullpath := file.fullpath()
glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
inodeId := file.Id()
wfs.handlesLock.Lock()
existingHandle, found := wfs.handles[inodeId]
if found && existingHandle != nil && existingHandle.f.isOpen > 0 {
existingHandle.f.isOpen++
wfs.handlesLock.Unlock()
glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen)
return existingHandle
}
wfs.handlesLock.Unlock()
entry, _ := file.maybeLoadEntry(context.Background())
file.entry = entry
fileHandle = newFileHandle(file, uid, gid)
wfs.handlesLock.Lock()
file.isOpen++
wfs.handles[inodeId] = fileHandle
wfs.handlesLock.Unlock()
fileHandle.handle = inodeId
glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
return
}
func (wfs *WFS) Fsync(file *File, header fuse.Header) error {
inodeId := file.Id()
wfs.handlesLock.Lock()
existingHandle, found := wfs.handles[inodeId]
wfs.handlesLock.Unlock()
if found && existingHandle != nil {
existingHandle.Lock()
defer existingHandle.Unlock()
if existingHandle.f.isOpen > 0 {
return existingHandle.doFlush(context.Background(), header)
}
}
return nil
}
func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
delete(wfs.handles, uint64(handleId))
return
}
// Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
glog.V(4).Infof("reading fs stats: %+v", req)
if wfs.stats.lastChecked < time.Now().Unix()-20 {
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
Collection: wfs.option.Collection,
Replication: wfs.option.Replication,
Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
DiskType: string(wfs.option.DiskType),
}
glog.V(4).Infof("reading filer stats: %+v", request)
resp, err := client.Statistics(context.Background(), request)
if err != nil {
glog.V(0).Infof("reading filer stats %v: %v", request, err)
return err
}
glog.V(4).Infof("read filer stats: %+v", resp)
wfs.stats.TotalSize = resp.TotalSize
wfs.stats.UsedSize = resp.UsedSize
wfs.stats.FileCount = resp.FileCount
wfs.stats.lastChecked = time.Now().Unix()
return nil
})
if err != nil {
glog.V(0).Infof("filer Statistics: %v", err)
return err
}
}
totalDiskSize := wfs.stats.TotalSize
usedDiskSize := wfs.stats.UsedSize
actualFileCount := wfs.stats.FileCount
// Compute the total number of available blocks
resp.Blocks = totalDiskSize / blockSize
// Compute the number of used blocks
numBlocks := uint64(usedDiskSize / blockSize)
// Report the number of free and available blocks for the block size
resp.Bfree = resp.Blocks - numBlocks
resp.Bavail = resp.Blocks - numBlocks
resp.Bsize = uint32(blockSize)
// Report the total number of possible files in the file system (and those free)
resp.Files = math.MaxInt64
resp.Ffree = math.MaxInt64 - actualFileCount
// Report the maximum length of a name and the minimum fragment size
resp.Namelen = 1024
resp.Frsize = uint32(blockSize)
return nil
}
func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
if entry.Attributes == nil {
return
}
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
}
func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
if entry.Attributes == nil {
return
}
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
}
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
return wfs.option.FilerAddresses[wfs.option.filerIndex]
}
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
os.MkdirAll(option.uniqueCacheDir, os.FileMode(0777)&^option.Umask)
}
func (option *Option) getUniqueCacheDir() string {
return option.uniqueCacheDir
}
type NodeWithId uint64
func (n NodeWithId) Id() uint64 {
return uint64(n)
}
func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
return nil
}

51
weed/filesys/wfs_filer_client.go

@ -1,51 +0,0 @@
package filesys
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
var _ = filer_pb.FilerClient(&WFS{})
func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
return util.Retry("filer grpc", func() error {
i := wfs.option.filerIndex
n := len(wfs.option.FilerAddresses)
for x := 0; x < n; x++ {
filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress()
err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, wfs.option.GrpcDialOption)
if err != nil {
glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
} else {
wfs.option.filerIndex = i
return nil
}
i++
if i >= n {
i = 0
}
}
return err
})
}
func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
if wfs.option.VolumeServerAccess == "publicUrl" {
return location.PublicUrl
}
return location.Url
}

84
weed/filesys/wfs_write.go

@ -1,84 +0,0 @@
package filesys
import (
"context"
"fmt"
"io"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string
var auth security.EncodedJwt
if err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: wfs.option.Replication,
Collection: wfs.option.Collection,
TtlSec: wfs.option.TtlSec,
DiskType: string(wfs.option.DiskType),
DataCenter: wfs.option.DataCenter,
Path: string(fullPath),
}
resp, err := client.AssignVolume(context.Background(), request)
if err != nil {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
if resp.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
loc := resp.Location
host = wfs.AdjustedUrl(loc)
collection, replication = resp.Collection, resp.Replication
return nil
})
}); err != nil {
return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if wfs.option.VolumeServerAccess == "filerProxy" {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
}
uploadOption := &operation.UploadOption{
UploadUrl: fileUrl,
Filename: filename,
Cipher: wfs.option.Cipher,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
Jwt: auth,
}
uploadResult, err, data := operation.Upload(reader, uploadOption)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
return nil, "", "", fmt.Errorf("upload data: %v", err)
}
if uploadResult.Error != "" {
glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
}
if offset == 0 {
wfs.chunkCache.SetChunk(fileId, data)
}
chunk = uploadResult.ToPbFileChunk(fileId, offset)
return chunk, collection, replication, nil
}
}

153
weed/filesys/xattr.go

@ -1,153 +0,0 @@
package filesys
import (
"context"
"strings"
"syscall"
"github.com/seaweedfs/fuse"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
const (
XATTR_PREFIX = "xattr-" // same as filer
)
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
if entry == nil {
return fuse.ErrNoXattr
}
if entry.Extended == nil {
return fuse.ErrNoXattr
}
data, found := entry.Extended[XATTR_PREFIX+req.Name]
if !found {
return fuse.ErrNoXattr
}
if req.Position < uint32(len(data)) {
size := req.Size
if req.Position+size >= uint32(len(data)) {
size = uint32(len(data)) - req.Position
}
if size == 0 {
resp.Xattr = data[req.Position:]
} else {
resp.Xattr = data[req.Position : req.Position+size]
}
}
return nil
}
func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error {
if entry == nil {
return fuse.EIO
}
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
data, _ := entry.Extended[XATTR_PREFIX+req.Name]
newData := make([]byte, int(req.Position)+len(req.Xattr))
copy(newData, data)
copy(newData[int(req.Position):], req.Xattr)
entry.Extended[XATTR_PREFIX+req.Name] = newData
return nil
}
func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error {
if entry == nil {
return fuse.ErrNoXattr
}
if entry.Extended == nil {
return fuse.ErrNoXattr
}
_, found := entry.Extended[XATTR_PREFIX+req.Name]
if !found {
return fuse.ErrNoXattr
}
delete(entry.Extended, XATTR_PREFIX+req.Name)
return nil
}
func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
if entry == nil {
return fuse.EIO
}
for k := range entry.Extended {
if strings.HasPrefix(k, XATTR_PREFIX) {
resp.Append(k[len(XATTR_PREFIX):])
}
}
size := req.Size
if req.Position+size >= uint32(len(resp.Xattr)) {
size = uint32(len(resp.Xattr)) - req.Position
}
if size == 0 {
resp.Xattr = resp.Xattr[req.Position:]
} else {
resp.Xattr = resp.Xattr[req.Position : req.Position+size]
}
return nil
}
func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
fullpath := util.NewFullPath(dir, name)
// glog.V(3).Infof("read entry cache miss %s", fullpath)
// return a valid entry for the mount root
if string(fullpath) == wfs.option.FilerMountRootPath {
return &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: wfs.option.MountMtime.Unix(),
FileMode: uint32(wfs.option.MountMode),
Uid: wfs.option.MountUid,
Gid: wfs.option.MountGid,
Crtime: wfs.option.MountCtime.Unix(),
},
}, nil
}
// read from async meta cache
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
}
return cachedEntry.ToProtoEntry(), cacheErr
}
func checkName(name string) error {
if len(name) >= 256 {
return syscall.ENAMETOOLONG
}
return nil
}

2
weed/mount/dirty_pages_chunked.go

@ -2,8 +2,8 @@ package mount
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mount/page_writer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"sync"

2
weed/mount/page_writer.go

@ -1,8 +1,8 @@
package mount
import (
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mount/page_writer"
)
type PageWriter struct {

Loading…
Cancel
Save