Browse Source

Merge pull request #2668 from chrislusf/mount2

Mount2
pull/2670/head
Chris Lu 3 years ago
committed by GitHub
parent
commit
aa13168b4d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      go.mod
  2. 6
      go.sum
  3. 4
      weed/Makefile
  4. 1
      weed/command/command.go
  5. 83
      weed/command/mount2.go
  6. 210
      weed/command/mount2_std.go
  7. 42
      weed/mount/directory.go
  8. 84
      weed/mount/directory_read.go
  9. 99
      weed/mount/dirty_pages_chunked.go
  10. 95
      weed/mount/filehandle.go
  11. 84
      weed/mount/filehandle_map.go
  12. 114
      weed/mount/filehandle_read.go
  13. 161
      weed/mount/inode_to_path.go
  14. 32
      weed/mount/meta_cache/cache_config.go
  15. 101
      weed/mount/meta_cache/id_mapper.go
  16. 160
      weed/mount/meta_cache/meta_cache.go
  17. 67
      weed/mount/meta_cache/meta_cache_init.go
  18. 68
      weed/mount/meta_cache/meta_cache_subscribe.go
  19. 95
      weed/mount/page_writer.go
  20. 115
      weed/mount/page_writer/chunk_interval_list.go
  21. 49
      weed/mount/page_writer/chunk_interval_list_test.go
  22. 30
      weed/mount/page_writer/dirty_pages.go
  23. 16
      weed/mount/page_writer/page_chunk.go
  24. 69
      weed/mount/page_writer/page_chunk_mem.go
  25. 121
      weed/mount/page_writer/page_chunk_swapfile.go
  26. 182
      weed/mount/page_writer/upload_pipeline.go
  27. 63
      weed/mount/page_writer/upload_pipeline_lock.go
  28. 47
      weed/mount/page_writer/upload_pipeline_test.go
  29. 6
      weed/mount/unmount/unmount.go
  30. 21
      weed/mount/unmount/unmount_linux.go
  31. 18
      weed/mount/unmount/unmount_std.go
  32. 187
      weed/mount/weedfs.go
  33. 208
      weed/mount/weedfs_attr.go
  34. 8
      weed/mount/weedfs_attr_darwin.go
  35. 9
      weed/mount/weedfs_attr_linux.go
  36. 59
      weed/mount/weedfs_dir_lookup.go
  37. 111
      weed/mount/weedfs_dir_mkrm.go
  38. 132
      weed/mount/weedfs_dir_read.go
  39. 98
      weed/mount/weedfs_file_io.go
  40. 130
      weed/mount/weedfs_file_mkrm.go
  41. 59
      weed/mount/weedfs_file_read.go
  42. 180
      weed/mount/weedfs_file_sync.go
  43. 66
      weed/mount/weedfs_file_write.go
  44. 20
      weed/mount/weedfs_filehandle.go
  45. 68
      weed/mount/weedfs_forget.go
  46. 93
      weed/mount/weedfs_link.go
  47. 235
      weed/mount/weedfs_rename.go
  48. 80
      weed/mount/weedfs_stats.go
  49. 78
      weed/mount/weedfs_symlink.go
  50. 65
      weed/mount/weedfs_unsupported.go
  51. 84
      weed/mount/weedfs_write.go
  52. 168
      weed/mount/weedfs_xattr.go
  53. 51
      weed/mount/wfs_filer_client.go
  54. 67
      weed/mount/wfs_save.go

5
go.mod

@ -162,7 +162,10 @@ require (
modernc.org/token v1.0.0 // indirect
)
require github.com/fluent/fluent-logger-golang v1.8.0
require (
github.com/fluent/fluent-logger-golang v1.8.0
github.com/hanwen/go-fuse/v2 v2.1.0
)
require (
cloud.google.com/go/kms v1.0.0 // indirect

6
go.sum

@ -415,6 +415,10 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hanwen/go-fuse v1.0.0 h1:GxS9Zrn6c35/BnfiVsZVWmsG803xwE7eVRDvcf/BEVc=
github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok=
github.com/hanwen/go-fuse/v2 v2.1.0 h1:+32ffteETaLYClUj0a3aHjZ1hOPxxaNEHiZiujuDaek=
github.com/hanwen/go-fuse/v2 v2.1.0/go.mod h1:oRyA5eK+pvJyv5otpO/DgccS8y/RvYMaO00GgRLGryc=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
@ -512,6 +516,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kurin/blazer v0.5.3 h1:SAgYv0TKU0kN/ETfO5ExjNAPyMt2FocO2s/UlCHfjAk=
github.com/kurin/blazer v0.5.3/go.mod h1:4FCXMUWo9DllR2Do4TtBd377ezyAJ51vB5uTBjt0pGU=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.0 h1:Zx5DJFEYQXio93kgXnQ09fXNiUKsqv4OUEu2UtGcB1E=
github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=

4
weed/Makefile

@ -21,6 +21,10 @@ debug_mount:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ -umask=000
debug_mount2:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount2 -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ -umask=000
debug_server:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=~/tmp/99 -filer -volume.port=8343 -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1

1
weed/command/command.go

@ -30,6 +30,7 @@ var Commands = []*Command{
cmdMaster,
cmdMasterFollower,
cmdMount,
cmdMount2,
cmdS3,
cmdIam,
cmdMsgBroker,

83
weed/command/mount2.go

@ -0,0 +1,83 @@
package command
import (
"os"
"time"
)
type Mount2Options struct {
filer *string
filerMountRootPath *string
dir *string
dirAutoCreate *bool
collection *string
replication *string
diskType *string
ttlSec *int
chunkSizeLimitMB *int
concurrentWriters *int
cacheDir *string
cacheSizeMB *int64
dataCenter *string
allowOthers *bool
umaskString *string
nonempty *bool
volumeServerAccess *string
uidMap *string
gidMap *string
readOnly *bool
debug *bool
debugPort *int
}
var (
mount2Options Mount2Options
)
func init() {
cmdMount2.Run = runMount2 // break init cycle
mount2Options.filer = cmdMount2.Flag.String("filer", "localhost:8888", "comma-separated weed filer location")
mount2Options.filerMountRootPath = cmdMount2.Flag.String("filer.path", "/", "mount this remote path from filer server")
mount2Options.dir = cmdMount2.Flag.String("dir", ".", "mount weed filer to this directory")
mount2Options.dirAutoCreate = cmdMount2.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
mount2Options.collection = cmdMount2.Flag.String("collection", "", "collection to create the files")
mount2Options.replication = cmdMount2.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mount2Options.diskType = cmdMount2.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
mount2Options.ttlSec = cmdMount2.Flag.Int("ttl", 0, "file ttl in seconds")
mount2Options.chunkSizeLimitMB = cmdMount2.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mount2Options.concurrentWriters = cmdMount2.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0")
mount2Options.cacheDir = cmdMount2.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
mount2Options.cacheSizeMB = cmdMount2.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
mount2Options.dataCenter = cmdMount2.Flag.String("dataCenter", "", "prefer to write to the data center")
mount2Options.allowOthers = cmdMount2.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mount2Options.umaskString = cmdMount2.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
mount2Options.nonempty = cmdMount2.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory")
mount2Options.volumeServerAccess = cmdMount2.Flag.String("volumeServerAccess", "direct", "access volume servers by [direct|publicUrl|filerProxy]")
mount2Options.uidMap = cmdMount2.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>")
mount2Options.gidMap = cmdMount2.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>")
mount2Options.readOnly = cmdMount2.Flag.Bool("readOnly", false, "read only")
mount2Options.debug = cmdMount2.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
mount2Options.debugPort = cmdMount2.Flag.Int("debug.port", 6061, "http port for debugging")
mountCpuProfile = cmdMount2.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount2.Flag.String("memprofile", "", "memory profile output file")
mountReadRetryTime = cmdMount2.Flag.Duration("readRetryTime", 6*time.Second, "maximum read retry wait time")
}
var cmdMount2 = &Command{
UsageLine: "mount2 -filer=localhost:8888 -dir=/some/dir",
Short: "<WIP> mount weed filer to a directory as file system in userspace(FUSE)",
Long: `mount weed filer to userspace.
Pre-requisites:
1) have SeaweedFS master and volume servers running
2) have a "weed filer" running
These 2 requirements can be achieved with one command "weed server -filer=true"
This uses github.com/seaweedfs/fuse, which enables writing FUSE file systems on
Linux, and OS X.
On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
`,
}

210
weed/command/mount2_std.go

@ -0,0 +1,210 @@
package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mount"
"github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/mount/unmount"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/hanwen/go-fuse/v2/fuse"
"net/http"
"os"
"os/user"
"runtime"
"strconv"
"strings"
"time"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
)
func runMount2(cmd *Command, args []string) bool {
if *mountOptions.debug {
go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil)
}
grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
if *mountReadRetryTime < time.Second {
*mountReadRetryTime = time.Second
}
util.RetryWaitTime = *mountReadRetryTime
umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
if umaskErr != nil {
fmt.Printf("can not parse umask %s", *mountOptions.umaskString)
return false
}
if len(args) > 0 {
return false
}
return RunMount2(&mount2Options, os.FileMode(umask))
}
func RunMount2(option *Mount2Options, umask os.FileMode) bool {
// basic checks
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
if chunkSizeLimitMB <= 0 {
fmt.Printf("Please specify a reasonable buffer size.")
return false
}
// try to connect to filer
filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
var err error
for i := 0; i < 10; i++ {
err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
if err != nil {
glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err)
return true
}
filerMountRootPath := *option.filerMountRootPath
// clean up mount point
dir := util.ResolvePath(*option.dir)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
}
unmount.Unmount(dir)
// detect mount folder mode
if *option.dirAutoCreate {
os.MkdirAll(dir, os.FileMode(0777)&^umask)
}
fileInfo, err := os.Stat(dir)
// collect uid, gid
uid, gid := uint32(0), uint32(0)
mountMode := os.ModeDir | 0777
if err == nil {
mountMode = os.ModeDir | os.FileMode(0777)&^umask
uid, gid = util.GetFileUidGid(fileInfo)
fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode)
} else {
fmt.Printf("can not stat %s\n", dir)
return false
}
// detect uid, gid
if uid == 0 {
if u, err := user.Current(); err == nil {
if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
uid = uint32(parsedId)
}
if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
gid = uint32(parsedId)
}
fmt.Printf("current uid=%d gid=%d\n", uid, gid)
}
}
// mapping uid, gid
uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
if err != nil {
fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
return false
}
// Ensure target mount point availability
if isValid := checkMountPointAvailable(dir); !isValid {
glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
return true
}
// mount fuse
fuseMountOptions := &fuse.MountOptions{
AllowOther: *option.allowOthers,
Options: nil,
MaxBackground: 128,
MaxWrite: 1024 * 1024 * 2,
MaxReadAhead: 1024 * 1024 * 2,
IgnoreSecurityLabels: false,
RememberInodes: false,
FsName: *option.filer + ":" + filerMountRootPath,
Name: "seaweedfs",
SingleThreaded: false,
DisableXAttrs: false,
Debug: true,
EnableLocks: false,
ExplicitDataCacheControl: false,
// SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
DirectMount: true,
DirectMountFlags: 0,
// EnableAcl: false,
}
// find mount point
mountRoot := filerMountRootPath
if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
mountRoot = mountRoot[0 : len(mountRoot)-1]
}
seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
MountDirectory: dir,
FilerAddresses: filerAddresses,
GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot,
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
DiskType: types.ToDiskType(*option.diskType),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
CacheDir: *option.cacheDir,
CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
MountUid: uid,
MountGid: gid,
MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
Umask: umask,
VolumeServerAccess: *mountOptions.volumeServerAccess,
Cipher: cipher,
UidGidMapper: uidGidMapper,
})
server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions)
if err != nil {
glog.Fatalf("Mount fail: %v", err)
}
grace.OnInterrupt(func() {
unmount.Unmount(dir)
})
seaweedFileSystem.StartBackgroundTasks()
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
server.Serve()
return true
}

42
weed/mount/directory.go

@ -0,0 +1,42 @@
package mount
import (
"bytes"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fs"
"strings"
)
type Directory struct {
fs.Inode
name string
wfs *WFS
entry *filer_pb.Entry
parent *Directory
id uint64
}
func (dir *Directory) FullPath() string {
var parts []string
for p := dir; p != nil; p = p.parent {
if strings.HasPrefix(p.name, "/") {
if len(p.name) > 1 {
parts = append(parts, p.name[1:])
}
} else {
parts = append(parts, p.name)
}
}
if len(parts) == 0 {
return "/"
}
var buf bytes.Buffer
for i := len(parts) - 1; i >= 0; i-- {
buf.WriteString("/")
buf.WriteString(parts[i])
}
return buf.String()
}

84
weed/mount/directory_read.go

@ -0,0 +1,84 @@
package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
"math"
"os"
"syscall"
)
var _ = fs.NodeReaddirer(&Directory{})
var _ = fs.NodeGetattrer(&Directory{})
func (dir *Directory) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
out.Mode = 0755
return 0
}
func (dir *Directory) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) {
dirPath := util.FullPath(dir.FullPath())
glog.V(4).Infof("Readdir %s", dirPath)
sourceChan := make(chan fuse.DirEntry, 64)
stream := newDirectoryListStream(sourceChan)
processEachEntryFn := func(entry *filer.Entry, isLast bool) {
sourceChan <- fuse.DirEntry{
Mode: uint32(entry.Mode),
Name: entry.Name(),
Ino: dirPath.Child(entry.Name()).AsInode(os.ModeDir),
}
}
if err := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fs.ToErrno(os.ErrInvalid)
}
go func() {
dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
processEachEntryFn(entry, false)
return true
})
close(sourceChan)
}()
return stream, fs.OK
}
var _ = fs.DirStream(&DirectoryListStream{})
type DirectoryListStream struct {
next fuse.DirEntry
sourceChan chan fuse.DirEntry
isStarted bool
hasNext bool
}
func newDirectoryListStream(ch chan fuse.DirEntry) *DirectoryListStream {
return &DirectoryListStream{
sourceChan: ch,
}
}
func (i *DirectoryListStream) HasNext() bool {
if !i.isStarted {
i.next, i.hasNext = <-i.sourceChan
i.isStarted = true
}
return i.hasNext
}
func (i *DirectoryListStream) Next() (fuse.DirEntry, syscall.Errno) {
t := i.next
i.next, i.hasNext = <-i.sourceChan
return t, fs.OK
}
func (i *DirectoryListStream) Close() {
}

99
weed/mount/dirty_pages_chunked.go

@ -0,0 +1,99 @@
package mount
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"sync"
"time"
)
type ChunkedDirtyPages struct {
fh *FileHandle
writeWaitGroup sync.WaitGroup
lastErr error
collection string
replication string
uploadPipeline *page_writer.UploadPipeline
hasWrites bool
}
var (
_ = page_writer.DirtyPages(&ChunkedDirtyPages{})
)
func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
dirtyPages := &ChunkedDirtyPages{
fh: fh,
}
dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.wfs.option.ConcurrentWriters)
return dirtyPages
}
func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte) {
pages.hasWrites = true
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh, offset, offset+int64(len(data)))
pages.uploadPipeline.SaveDataAt(data, offset)
return
}
func (pages *ChunkedDirtyPages) FlushData() error {
if !pages.hasWrites {
return nil
}
pages.uploadPipeline.FlushAll()
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
return nil
}
func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
if !pages.hasWrites {
return
}
return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
}
func (pages *ChunkedDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
}
func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
mtime := time.Now().UnixNano()
defer cleanupFn()
fileFullPath := pages.fh.FullPath()
fileName := fileFullPath.Name()
chunk, collection, replication, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset)
if err != nil {
glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err)
pages.lastErr = err
return
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
pages.fh.addChunks([]*filer_pb.FileChunk{chunk})
pages.fh.entryViewCache = nil
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
}
func (pages ChunkedDirtyPages) Destroy() {
pages.uploadPipeline.Shutdown()
}
func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) {
pages.uploadPipeline.LockForRead(startOffset, stopOffset)
}
func (pages *ChunkedDirtyPages) UnlockForRead(startOffset, stopOffset int64) {
pages.uploadPipeline.UnlockForRead(startOffset, stopOffset)
}

95
weed/mount/filehandle.go

@ -0,0 +1,95 @@
package mount
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"sort"
"sync"
)
type FileHandleId uint64
type FileHandle struct {
fh FileHandleId
counter int64
entry *filer_pb.Entry
chunkAddLock sync.Mutex
inode uint64
wfs *WFS
// cache file has been written to
dirtyMetadata bool
dirtyPages *PageWriter
entryViewCache []filer.VisibleInterval
reader io.ReaderAt
contentType string
handle uint64
sync.Mutex
isDeleted bool
}
func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
fh := &FileHandle{
fh: handleId,
counter: 1,
inode: inode,
wfs: wfs,
}
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
if entry != nil {
entry.Attributes.FileSize = filer.FileSize(entry)
}
return fh
}
func (fh *FileHandle) FullPath() util.FullPath {
return fh.wfs.inodeToPath.GetPath(fh.inode)
}
func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) {
// find the earliest incoming chunk
newChunks := chunks
earliestChunk := newChunks[0]
for i := 1; i < len(newChunks); i++ {
if lessThan(earliestChunk, newChunks[i]) {
earliestChunk = newChunks[i]
}
}
if fh.entry == nil {
return
}
// pick out-of-order chunks from existing chunks
for _, chunk := range fh.entry.Chunks {
if lessThan(earliestChunk, chunk) {
chunks = append(chunks, chunk)
}
}
// sort incoming chunks
sort.Slice(chunks, func(i, j int) bool {
return lessThan(chunks[i], chunks[j])
})
glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks))
fh.chunkAddLock.Lock()
fh.entry.Chunks = append(fh.entry.Chunks, newChunks...)
fh.entryViewCache = nil
fh.chunkAddLock.Unlock()
}
func lessThan(a, b *filer_pb.FileChunk) bool {
if a.Mtime == b.Mtime {
return a.Fid.FileKey < b.Fid.FileKey
}
return a.Mtime < b.Mtime
}

84
weed/mount/filehandle_map.go

@ -0,0 +1,84 @@
package mount
import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"sync"
)
type FileHandleToInode struct {
sync.RWMutex
nextFh FileHandleId
inode2fh map[uint64]*FileHandle
fh2inode map[FileHandleId]uint64
}
func NewFileHandleToInode() *FileHandleToInode {
return &FileHandleToInode{
inode2fh: make(map[uint64]*FileHandle),
fh2inode: make(map[FileHandleId]uint64),
nextFh: 0,
}
}
func (i *FileHandleToInode) GetFileHandle(fh FileHandleId) *FileHandle {
i.RLock()
defer i.RUnlock()
inode, found := i.fh2inode[fh]
if found {
return i.inode2fh[inode]
}
return nil
}
func (i *FileHandleToInode) FindFileHandle(inode uint64) (fh *FileHandle, found bool) {
i.RLock()
defer i.RUnlock()
fh, found = i.inode2fh[inode]
return
}
func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *filer_pb.Entry) *FileHandle {
i.Lock()
defer i.Unlock()
fh, found := i.inode2fh[inode]
if !found {
fh = newFileHandle(wfs, i.nextFh, inode, entry)
i.nextFh++
i.inode2fh[inode] = fh
i.fh2inode[fh.fh] = inode
} else {
fh.counter++
}
return fh
}
func (i *FileHandleToInode) ReleaseByInode(inode uint64) {
i.Lock()
defer i.Unlock()
fh, found := i.inode2fh[inode]
if found {
fh.counter--
if fh.counter <= 0 {
delete(i.inode2fh, inode)
delete(i.fh2inode, fh.fh)
}
}
}
func (i *FileHandleToInode) ReleaseByHandle(fh FileHandleId) {
i.Lock()
defer i.Unlock()
inode, found := i.fh2inode[fh]
if found {
fhHandle, fhFound := i.inode2fh[inode]
if !fhFound {
delete(i.fh2inode, fh)
} else {
fhHandle.counter--
if fhHandle.counter <= 0 {
delete(i.inode2fh, inode)
delete(i.fh2inode, fhHandle.fh)
}
}
}
}

114
weed/mount/filehandle_read.go

@ -0,0 +1,114 @@
package mount
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"math"
)
func (fh *FileHandle) lockForRead(startOffset int64, size int) {
fh.dirtyPages.LockForRead(startOffset, startOffset+int64(size))
}
func (fh *FileHandle) unlockForRead(startOffset int64, size int) {
fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size))
}
func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
return
}
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
fileFullPath := fh.FullPath()
entry := fh.entry
if entry == nil {
return 0, io.EOF
}
if entry.IsInRemoteOnly() {
glog.V(4).Infof("download remote entry %s", fileFullPath)
newEntry, err := fh.downloadRemoteEntry(entry)
if err != nil {
glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err)
return 0, err
}
entry = newEntry
}
fileSize := int64(filer.FileSize(entry))
if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fileFullPath)
return 0, io.EOF
}
if offset+int64(len(buff)) <= int64(len(entry.Content)) {
totalRead := copy(buff, entry.Content[offset:])
glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), nil
}
var chunkResolveErr error
if fh.entryViewCache == nil {
fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
fh.reader = nil
}
reader := fh.reader
if reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64)
glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews))
for _, chunkView := range chunkViews {
glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId)
}
reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize)
}
fh.reader = reader
totalRead, err := reader.ReadAt(buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fileFullPath, err)
}
glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {
fileFullPath := fh.FullPath()
dir, _ := fileFullPath.DirAndName()
err := fh.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: string(dir),
Name: entry.Name,
}
glog.V(4).Infof("download entry: %v", request)
resp, err := client.CacheRemoteObjectToLocalCluster(context.Background(), request)
if err != nil {
return fmt.Errorf("CacheRemoteObjectToLocalCluster file %s: %v", fileFullPath, err)
}
entry = resp.Entry
fh.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry))
return nil
})
return entry, err
}

161
weed/mount/inode_to_path.go

@ -0,0 +1,161 @@
package mount
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"sync"
)
type InodeToPath struct {
sync.RWMutex
nextInodeId uint64
inode2path map[uint64]*InodeEntry
path2inode map[util.FullPath]uint64
}
type InodeEntry struct {
util.FullPath
nlookup uint64
isDirectory bool
isChildrenCached bool
}
func NewInodeToPath() *InodeToPath {
t := &InodeToPath{
inode2path: make(map[uint64]*InodeEntry),
path2inode: make(map[util.FullPath]uint64),
nextInodeId: 2, // the root inode id is 1
}
t.inode2path[1] = &InodeEntry{"/", 1, true, false}
t.path2inode["/"] = 1
return t
}
func (i *InodeToPath) Lookup(path util.FullPath, isDirectory bool) uint64 {
i.Lock()
defer i.Unlock()
inode, found := i.path2inode[path]
if !found {
inode = i.nextInodeId
i.nextInodeId++
i.path2inode[path] = inode
i.inode2path[inode] = &InodeEntry{path, 1, isDirectory, false}
} else {
i.inode2path[inode].nlookup++
}
return inode
}
func (i *InodeToPath) GetInode(path util.FullPath) uint64 {
if path == "/" {
return 1
}
i.Lock()
defer i.Unlock()
inode, found := i.path2inode[path]
if !found {
// glog.Fatalf("GetInode unknown inode for %s", path)
// this could be the parent for mount point
}
return inode
}
func (i *InodeToPath) GetPath(inode uint64) util.FullPath {
i.RLock()
defer i.RUnlock()
path, found := i.inode2path[inode]
if !found {
glog.Fatalf("not found inode %d", inode)
}
return path.FullPath
}
func (i *InodeToPath) HasPath(path util.FullPath) bool {
i.RLock()
defer i.RUnlock()
_, found := i.path2inode[path]
return found
}
func (i *InodeToPath) MarkChildrenCached(fullpath util.FullPath) {
i.RLock()
defer i.RUnlock()
inode, found := i.path2inode[fullpath]
if !found {
glog.Fatalf("MarkChildrenCached not found inode %v", fullpath)
}
path, found := i.inode2path[inode]
path.isChildrenCached = true
}
func (i *InodeToPath) IsChildrenCached(fullpath util.FullPath) bool {
i.RLock()
defer i.RUnlock()
inode, found := i.path2inode[fullpath]
if !found {
return false
}
path, found := i.inode2path[inode]
if found {
return path.isChildrenCached
}
return false
}
func (i *InodeToPath) HasInode(inode uint64) bool {
if inode == 1 {
return true
}
i.RLock()
defer i.RUnlock()
_, found := i.inode2path[inode]
return found
}
func (i *InodeToPath) RemovePath(path util.FullPath) {
i.Lock()
defer i.Unlock()
inode, found := i.path2inode[path]
if found {
delete(i.path2inode, path)
delete(i.inode2path, inode)
}
}
func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) {
i.Lock()
defer i.Unlock()
sourceInode, sourceFound := i.path2inode[sourcePath]
targetInode, targetFound := i.path2inode[targetPath]
if sourceFound {
delete(i.path2inode, sourcePath)
i.path2inode[targetPath] = sourceInode
} else {
// it is possible some source folder items has not been visited before
// so no need to worry about their source inodes
return
}
i.inode2path[sourceInode].FullPath = targetPath
if targetFound {
delete(i.inode2path, targetInode)
} else {
i.inode2path[sourceInode].nlookup++
}
}
func (i *InodeToPath) Forget(inode, nlookup uint64, onForgetDir func(dir util.FullPath)) {
i.Lock()
path, found := i.inode2path[inode]
if found {
path.nlookup -= nlookup
if path.nlookup <= 0 {
delete(i.path2inode, path.FullPath)
delete(i.inode2path, inode)
}
}
i.Unlock()
if found {
if path.isDirectory && onForgetDir != nil {
onForgetDir(path.FullPath)
}
}
}

32
weed/mount/meta_cache/cache_config.go

@ -0,0 +1,32 @@
package meta_cache
import "github.com/chrislusf/seaweedfs/weed/util"
var (
_ = util.Configuration(&cacheConfig{})
)
// implementing util.Configuraion
type cacheConfig struct {
dir string
}
func (c cacheConfig) GetString(key string) string {
return c.dir
}
func (c cacheConfig) GetBool(key string) bool {
panic("implement me")
}
func (c cacheConfig) GetInt(key string) int {
panic("implement me")
}
func (c cacheConfig) GetStringSlice(key string) []string {
panic("implement me")
}
func (c cacheConfig) SetDefault(key string, value interface{}) {
panic("implement me")
}

101
weed/mount/meta_cache/id_mapper.go

@ -0,0 +1,101 @@
package meta_cache
import (
"fmt"
"strconv"
"strings"
)
type UidGidMapper struct {
uidMapper *IdMapper
gidMapper *IdMapper
}
type IdMapper struct {
localToFiler map[uint32]uint32
filerToLocal map[uint32]uint32
}
// UidGidMapper translates local uid/gid to filer uid/gid
// The local storage always persists the same as the filer.
// The local->filer translation happens when updating the filer first and later saving to meta_cache.
// And filer->local happens when reading from the meta_cache.
func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
uidMapper, err := newIdMapper(uidPairsStr)
if err != nil {
return nil, err
}
gidMapper, err := newIdMapper(gidPairStr)
if err != nil {
return nil, err
}
return &UidGidMapper{
uidMapper: uidMapper,
gidMapper: gidMapper,
}, nil
}
func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) {
return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
}
func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) {
return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
}
func (m *IdMapper) LocalToFiler(id uint32) uint32 {
value, found := m.localToFiler[id]
if found {
return value
}
return id
}
func (m *IdMapper) FilerToLocal(id uint32) uint32 {
value, found := m.filerToLocal[id]
if found {
return value
}
return id
}
func newIdMapper(pairsStr string) (*IdMapper, error) {
localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
if err != nil {
return nil, err
}
return &IdMapper{
localToFiler: localToFiler,
filerToLocal: filerToLocal,
}, nil
}
func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
if pairsStr == "" {
return
}
localToFiler = make(map[uint32]uint32)
filerToLocal = make(map[uint32]uint32)
for _, pairStr := range strings.Split(pairsStr, ",") {
pair := strings.Split(pairStr, ":")
localUidStr, filerUidStr := pair[0], pair[1]
localUid, localUidErr := strconv.Atoi(localUidStr)
if localUidErr != nil {
err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr)
return
}
filerUid, filerUidErr := strconv.Atoi(filerUidStr)
if filerUidErr != nil {
err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
return
}
localToFiler[uint32(localUid)] = uint32(filerUid)
filerToLocal[uint32(filerUid)] = uint32(localUid)
}
return
}

160
weed/mount/meta_cache/meta_cache.go

@ -0,0 +1,160 @@
package meta_cache
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/leveldb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"os"
)
// need to have logic similar to FilerStoreWrapper
// e.g. fill fileId field for chunks
type MetaCache struct {
localStore filer.VirtualFilerStore
// sync.RWMutex
uidGidMapper *UidGidMapper
markCachedFn func(fullpath util.FullPath)
isCachedFn func(fullpath util.FullPath) bool
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
}
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
return &MetaCache{
localStore: openMetaStore(dbFolder),
markCachedFn: markCachedFn,
isCachedFn: isCachedFn,
uidGidMapper: uidGidMapper,
invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
invalidateFunc(fullpath, entry)
},
}
}
func openMetaStore(dbFolder string) filer.VirtualFilerStore {
os.RemoveAll(dbFolder)
os.MkdirAll(dbFolder, 0755)
store := &leveldb.LevelDBStore{}
config := &cacheConfig{
dir: dbFolder,
}
if err := store.Initialize(config, ""); err != nil {
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
}
return filer.NewFilerStoreWrapper(store)
}
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
//mc.Lock()
//defer mc.Unlock()
return mc.doInsertEntry(ctx, entry)
}
func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
return mc.localStore.InsertEntry(ctx, entry)
}
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
//mc.Lock()
//defer mc.Unlock()
oldDir, _ := oldPath.DirAndName()
if mc.isCachedFn(util.FullPath(oldDir)) {
if oldPath != "" {
if newEntry != nil && oldPath == newEntry.FullPath {
// skip the unnecessary deletion
// leave the update to the following InsertEntry operation
} else {
glog.V(3).Infof("DeleteEntry %s", oldPath)
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
return err
}
}
}
} else {
// println("unknown old directory:", oldDir)
}
if newEntry != nil {
newDir, _ := newEntry.DirAndName()
if mc.isCachedFn(util.FullPath(newDir)) {
glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
return err
}
}
}
return nil
}
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
//mc.Lock()
//defer mc.Unlock()
return mc.localStore.UpdateEntry(ctx, entry)
}
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
//mc.RLock()
//defer mc.RUnlock()
entry, err = mc.localStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
}
mc.mapIdFromFilerToLocal(entry)
return
}
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
//mc.Lock()
//defer mc.Unlock()
return mc.localStore.DeleteEntry(ctx, fp)
}
func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
//mc.Lock()
//defer mc.Unlock()
return mc.localStore.DeleteFolderChildren(ctx, fp)
}
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
//mc.RLock()
//defer mc.RUnlock()
if !mc.isCachedFn(dirPath) {
// if this request comes after renaming, it should be fine
glog.Warningf("unsynchronized dir: %v", dirPath)
}
_, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
mc.mapIdFromFilerToLocal(entry)
return eachEntryFunc(entry)
})
if err != nil {
return err
}
return err
}
func (mc *MetaCache) Shutdown() {
//mc.Lock()
//defer mc.Unlock()
mc.localStore.Shutdown()
}
func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
}
func (mc *MetaCache) Debug() {
if debuggable, ok := mc.localStore.(filer.Debuggable); ok {
println("start debugging")
debuggable.Debug(os.Stderr)
}
}

67
weed/mount/meta_cache/meta_cache_init.go

@ -0,0 +1,67 @@
package meta_cache
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
for {
// the directory children are already cached
// so no need for this and upper directories
if mc.isCachedFn(dirPath) {
return nil
}
if err := doEnsureVisited(mc, client, dirPath); err != nil {
return err
}
// continue to parent directory
if dirPath != "/" {
parent, _ := dirPath.DirAndName()
dirPath = util.FullPath(parent)
} else {
break
}
}
return nil
}
func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
err := util.Retry("ReadDirAllEntries", func() error {
return filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer.FromPbEntry(string(path), pbEntry)
if IsHiddenSystemEntry(string(path), entry.Name()) {
return nil
}
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
glog.V(0).Infof("read %s: %v", entry.FullPath, err)
return err
}
return nil
})
})
if err != nil {
err = fmt.Errorf("list %s: %v", path, err)
}
mc.markCachedFn(path)
return err
}
func IsHiddenSystemEntry(dir, name string) bool {
return dir == "/" && (name == "topics" || name == "etc")
}

68
weed/mount/meta_cache/meta_cache_subscribe.go

@ -0,0 +1,68 @@
package meta_cache
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
for _, sig := range message.Signatures {
if sig == selfSignature && selfSignature != 0 {
return nil
}
}
dir := resp.Directory
var oldPath util.FullPath
var newEntry *filer.Entry
if message.OldEntry != nil {
oldPath = util.NewFullPath(dir, message.OldEntry.Name)
glog.V(4).Infof("deleting %v", oldPath)
}
if message.NewEntry != nil {
if message.NewParentPath != "" {
dir = message.NewParentPath
}
key := util.NewFullPath(dir, message.NewEntry.Name)
glog.V(4).Infof("creating %v", key)
newEntry = filer.FromPbEntry(dir, message.NewEntry)
}
err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
if err == nil {
if message.OldEntry != nil && message.NewEntry != nil {
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
mc.invalidateFunc(oldKey, message.OldEntry)
if message.OldEntry.Name != message.NewEntry.Name {
newKey := util.NewFullPath(dir, message.NewEntry.Name)
mc.invalidateFunc(newKey, message.NewEntry)
}
} else if message.OldEntry == nil && message.NewEntry != nil {
// no need to invaalidate
} else if message.OldEntry != nil && message.NewEntry == nil {
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
mc.invalidateFunc(oldKey, message.OldEntry)
}
}
return err
}
util.RetryForever("followMetaUpdates", func() error {
return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true)
}, func(err error) bool {
glog.Errorf("follow metadata updates: %v", err)
return true
})
return nil
}

95
weed/mount/page_writer.go

@ -0,0 +1,95 @@
package mount
import (
"github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"github.com/chrislusf/seaweedfs/weed/glog"
)
type PageWriter struct {
fh *FileHandle
collection string
replication string
chunkSize int64
randomWriter page_writer.DirtyPages
}
var (
_ = page_writer.DirtyPages(&PageWriter{})
)
func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
pw := &PageWriter{
fh: fh,
chunkSize: chunkSize,
randomWriter: newMemoryChunkPages(fh, chunkSize),
// randomWriter: newTempFileDirtyPages(fh.f, chunkSize),
}
return pw
}
func (pw *PageWriter) AddPage(offset int64, data []byte) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
pw.addToOneChunk(i, offset, data[:writeSize])
offset += writeSize
data = data[writeSize:]
}
}
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) {
pw.randomWriter.AddPage(offset, data)
}
func (pw *PageWriter) FlushData() error {
return pw.randomWriter.FlushData()
}
func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
offset += readSize
data = data[readSize:]
}
return
}
func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
return pw.randomWriter.GetStorageOptions()
}
func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) {
pw.randomWriter.LockForRead(startOffset, stopOffset)
}
func (pw *PageWriter) UnlockForRead(startOffset, stopOffset int64) {
pw.randomWriter.UnlockForRead(startOffset, stopOffset)
}
func (pw *PageWriter) Destroy() {
pw.randomWriter.Destroy()
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}

115
weed/mount/page_writer/chunk_interval_list.go

@ -0,0 +1,115 @@
package page_writer
import "math"
// ChunkWrittenInterval mark one written interval within one page chunk
type ChunkWrittenInterval struct {
StartOffset int64
stopOffset int64
prev *ChunkWrittenInterval
next *ChunkWrittenInterval
}
func (interval *ChunkWrittenInterval) Size() int64 {
return interval.stopOffset - interval.StartOffset
}
func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
return interval.stopOffset-interval.StartOffset == chunkSize
}
// ChunkWrittenIntervalList mark written intervals within one page chunk
type ChunkWrittenIntervalList struct {
head *ChunkWrittenInterval
tail *ChunkWrittenInterval
}
func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
list := &ChunkWrittenIntervalList{
head: &ChunkWrittenInterval{
StartOffset: -1,
stopOffset: -1,
},
tail: &ChunkWrittenInterval{
StartOffset: math.MaxInt64,
stopOffset: math.MaxInt64,
},
}
list.head.next = list.tail
list.tail.prev = list.head
return list
}
func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
interval := &ChunkWrittenInterval{
StartOffset: startOffset,
stopOffset: stopOffset,
}
list.addInterval(interval)
}
func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool {
return list.size() == 1 && list.head.next.isComplete(chunkSize)
}
func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
for t := list.head; t != nil; t = t.next {
writtenByteCount += t.Size()
}
return
}
func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
p := list.head
for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
}
q := list.tail
for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
}
if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
// merge p and q together
p.stopOffset = q.stopOffset
unlinkNodesBetween(p, q.next)
return
}
if interval.StartOffset <= p.stopOffset {
// merge new interval into p
p.stopOffset = interval.stopOffset
unlinkNodesBetween(p, q)
return
}
if q.StartOffset <= interval.stopOffset {
// merge new interval into q
q.StartOffset = interval.StartOffset
unlinkNodesBetween(p, q)
return
}
// add the new interval between p and q
unlinkNodesBetween(p, q)
p.next = interval
interval.prev = p
q.prev = interval
interval.next = q
}
// unlinkNodesBetween remove all nodes after start and before stop, exclusive
func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
if start.next == stop {
return
}
start.next.prev = nil
start.next = stop
stop.prev.next = nil
stop.prev = start
}
func (list *ChunkWrittenIntervalList) size() int {
var count int
for t := list.head; t != nil; t = t.next {
count++
}
return count - 2
}

49
weed/mount/page_writer/chunk_interval_list_test.go

@ -0,0 +1,49 @@
package page_writer
import (
"github.com/stretchr/testify/assert"
"testing"
)
func Test_PageChunkWrittenIntervalList(t *testing.T) {
list := newChunkWrittenIntervalList()
assert.Equal(t, 0, list.size(), "empty list")
list.MarkWritten(0, 5)
assert.Equal(t, 1, list.size(), "one interval")
list.MarkWritten(0, 5)
assert.Equal(t, 1, list.size(), "duplicated interval2")
list.MarkWritten(95, 100)
assert.Equal(t, 2, list.size(), "two intervals")
list.MarkWritten(50, 60)
assert.Equal(t, 3, list.size(), "three intervals")
list.MarkWritten(50, 55)
assert.Equal(t, 3, list.size(), "three intervals merge")
list.MarkWritten(40, 50)
assert.Equal(t, 3, list.size(), "three intervals grow forward")
list.MarkWritten(50, 65)
assert.Equal(t, 3, list.size(), "three intervals grow backward")
list.MarkWritten(70, 80)
assert.Equal(t, 4, list.size(), "four intervals")
list.MarkWritten(60, 70)
assert.Equal(t, 3, list.size(), "three intervals merged")
list.MarkWritten(59, 71)
assert.Equal(t, 3, list.size(), "covered three intervals")
list.MarkWritten(5, 59)
assert.Equal(t, 2, list.size(), "covered two intervals")
list.MarkWritten(70, 99)
assert.Equal(t, 1, list.size(), "covered one intervals")
}

30
weed/mount/page_writer/dirty_pages.go

@ -0,0 +1,30 @@
package page_writer
type DirtyPages interface {
AddPage(offset int64, data []byte)
FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
GetStorageOptions() (collection, replication string)
Destroy()
LockForRead(startOffset, stopOffset int64)
UnlockForRead(startOffset, stopOffset int64)
}
func max(x, y int64) int64 {
if x > y {
return x
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}
func minInt(x, y int) int {
if x < y {
return x
}
return y
}

16
weed/mount/page_writer/page_chunk.go

@ -0,0 +1,16 @@
package page_writer
import (
"io"
)
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
type PageChunk interface {
FreeResource()
WriteDataAt(src []byte, offset int64) (n int)
ReadDataAt(p []byte, off int64) (maxStop int64)
IsComplete() bool
WrittenSize() int64
SaveContent(saveFn SaveToStorageFunc)
}

69
weed/mount/page_writer/page_chunk_mem.go

@ -0,0 +1,69 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
)
var (
_ = PageChunk(&MemChunk{})
)
type MemChunk struct {
buf []byte
usage *ChunkWrittenIntervalList
chunkSize int64
logicChunkIndex LogicChunkIndex
}
func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
return &MemChunk{
logicChunkIndex: logicChunkIndex,
chunkSize: chunkSize,
buf: mem.Allocate(int(chunkSize)),
usage: newChunkWrittenIntervalList(),
}
}
func (mc *MemChunk) FreeResource() {
mem.Free(mc.buf)
}
func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
innerOffset := offset % mc.chunkSize
n = copy(mc.buf[innerOffset:], src)
mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
return
}
func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop {
copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
maxStop = max(maxStop, logicStop)
}
}
return
}
func (mc *MemChunk) IsComplete() bool {
return mc.usage.IsComplete(mc.chunkSize)
}
func (mc *MemChunk) WrittenSize() int64 {
return mc.usage.WrittenSize()
}
func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
}
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
})
}
}

121
weed/mount/page_writer/page_chunk_swapfile.go

@ -0,0 +1,121 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/mem"
"os"
)
var (
_ = PageChunk(&SwapFileChunk{})
)
type ActualChunkIndex int
type SwapFile struct {
dir string
file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
chunkSize int64
}
type SwapFileChunk struct {
swapfile *SwapFile
usage *ChunkWrittenIntervalList
logicChunkIndex LogicChunkIndex
actualChunkIndex ActualChunkIndex
}
func NewSwapFile(dir string, chunkSize int64) *SwapFile {
return &SwapFile{
dir: dir,
file: nil,
logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
chunkSize: chunkSize,
}
}
func (sf *SwapFile) FreeResource() {
if sf.file != nil {
sf.file.Close()
os.Remove(sf.file.Name())
}
}
func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
if sf.file == nil {
var err error
sf.file, err = os.CreateTemp(sf.dir, "")
if err != nil {
glog.Errorf("create swap file: %v", err)
return nil
}
}
actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
if !found {
actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
}
return &SwapFileChunk{
swapfile: sf,
usage: newChunkWrittenIntervalList(),
logicChunkIndex: logicChunkIndex,
actualChunkIndex: actualChunkIndex,
}
}
func (sc *SwapFileChunk) FreeResource() {
}
func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
innerOffset := offset % sc.swapfile.chunkSize
var err error
n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
if err == nil {
sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
} else {
glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
}
return
}
func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
logicStart := max(off, chunkStartOffset+t.StartOffset)
logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
break
}
maxStop = max(maxStop, logicStop)
}
}
return
}
func (sc *SwapFileChunk) IsComplete() bool {
return sc.usage.IsComplete(sc.swapfile.chunkSize)
}
func (sc *SwapFileChunk) WrittenSize() int64 {
return sc.usage.WrittenSize()
}
func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
}
for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
data := mem.Allocate(int(t.Size()))
sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
reader := util.NewBytesReader(data)
saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
})
mem.Free(data)
}
sc.usage = newChunkWrittenIntervalList()
}

182
weed/mount/page_writer/upload_pipeline.go

@ -0,0 +1,182 @@
package page_writer
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"sync"
"sync/atomic"
"time"
)
type LogicChunkIndex int
type UploadPipeline struct {
filepath util.FullPath
ChunkSize int64
writableChunks map[LogicChunkIndex]PageChunk
writableChunksLock sync.Mutex
sealedChunks map[LogicChunkIndex]*SealedChunk
sealedChunksLock sync.Mutex
uploaders *util.LimitedConcurrentExecutor
uploaderCount int32
uploaderCountCond *sync.Cond
saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
bufferChunkLimit int
}
type SealedChunk struct {
chunk PageChunk
referenceCounter int // track uploading or reading processes
}
func (sc *SealedChunk) FreeReference(messageOnFree string) {
sc.referenceCounter--
if sc.referenceCounter == 0 {
glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
sc.chunk.FreeResource()
}
}
func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
return &UploadPipeline{
ChunkSize: chunkSize,
writableChunks: make(map[LogicChunkIndex]PageChunk),
sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
uploaders: writers,
uploaderCountCond: sync.NewCond(&sync.Mutex{}),
saveToStorageFn: saveToStorageFn,
activeReadChunks: make(map[LogicChunkIndex]int),
bufferChunkLimit: bufferChunkLimit,
}
}
func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
memChunk, found := up.writableChunks[logicChunkIndex]
if !found {
if len(up.writableChunks) < up.bufferChunkLimit {
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks {
chunkFullness := mc.WrittenSize()
if fullness < chunkFullness {
fullestChunkIndex = lci
fullness = chunkFullness
}
}
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
delete(up.writableChunks, fullestChunkIndex)
fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness)
memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
}
up.writableChunks[logicChunkIndex] = memChunk
}
n = memChunk.WriteDataAt(p, off)
up.maybeMoveToSealed(memChunk, logicChunkIndex)
return
}
func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
// read from sealed chunks first
up.sealedChunksLock.Lock()
sealedChunk, found := up.sealedChunks[logicChunkIndex]
if found {
sealedChunk.referenceCounter++
}
up.sealedChunksLock.Unlock()
if found {
maxStop = sealedChunk.chunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
}
// read from writable chunks last
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
writableChunk, found := up.writableChunks[logicChunkIndex]
if !found {
return
}
writableMaxStop := writableChunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop)
return
}
func (up *UploadPipeline) FlushAll() {
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
for logicChunkIndex, memChunk := range up.writableChunks {
up.moveToSealed(memChunk, logicChunkIndex)
}
up.waitForCurrentWritersToComplete()
}
func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
if memChunk.IsComplete() {
up.moveToSealed(memChunk, logicChunkIndex)
}
}
func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
atomic.AddInt32(&up.uploaderCount, 1)
glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
up.sealedChunksLock.Lock()
if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
}
sealedChunk := &SealedChunk{
chunk: memChunk,
referenceCounter: 1, // default 1 is for uploading process
}
up.sealedChunks[logicChunkIndex] = sealedChunk
delete(up.writableChunks, logicChunkIndex)
up.sealedChunksLock.Unlock()
up.uploaders.Execute(func() {
// first add to the file chunks
sealedChunk.chunk.SaveContent(up.saveToStorageFn)
// notify waiting process
atomic.AddInt32(&up.uploaderCount, -1)
glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
// Lock and Unlock are not required,
// but it may signal multiple times during one wakeup,
// and the waiting goroutine may miss some of them!
up.uploaderCountCond.L.Lock()
up.uploaderCountCond.Broadcast()
up.uploaderCountCond.L.Unlock()
// wait for readers
for up.IsLocked(logicChunkIndex) {
time.Sleep(59 * time.Millisecond)
}
// then remove from sealed chunks
up.sealedChunksLock.Lock()
defer up.sealedChunksLock.Unlock()
delete(up.sealedChunks, logicChunkIndex)
sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
})
}
func (up *UploadPipeline) Shutdown() {
}

63
weed/mount/page_writer/upload_pipeline_lock.go

@ -0,0 +1,63 @@
package page_writer
import (
"sync/atomic"
)
func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
up.activeReadChunks[i] = count + 1
} else {
up.activeReadChunks[i] = 1
}
}
}
func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
if stopOffset%up.ChunkSize > 0 {
stopLogicChunkIndex += 1
}
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
if count, found := up.activeReadChunks[i]; found {
if count == 1 {
delete(up.activeReadChunks, i)
} else {
up.activeReadChunks[i] = count - 1
}
}
}
}
func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
up.activeReadChunksLock.Lock()
defer up.activeReadChunksLock.Unlock()
if count, found := up.activeReadChunks[logicChunkIndex]; found {
return count > 0
}
return false
}
func (up *UploadPipeline) waitForCurrentWritersToComplete() {
up.uploaderCountCond.L.Lock()
t := int32(100)
for {
t = atomic.LoadInt32(&up.uploaderCount)
if t <= 0 {
break
}
up.uploaderCountCond.Wait()
}
up.uploaderCountCond.L.Unlock()
}

47
weed/mount/page_writer/upload_pipeline_test.go

@ -0,0 +1,47 @@
package page_writer
import (
"github.com/chrislusf/seaweedfs/weed/util"
"testing"
)
func TestUploadPipeline(t *testing.T) {
uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16)
writeRange(uploadPipeline, 0, 131072)
writeRange(uploadPipeline, 131072, 262144)
writeRange(uploadPipeline, 262144, 1025536)
confirmRange(t, uploadPipeline, 0, 1025536)
writeRange(uploadPipeline, 1025536, 1296896)
confirmRange(t, uploadPipeline, 1025536, 1296896)
writeRange(uploadPipeline, 1296896, 2162688)
confirmRange(t, uploadPipeline, 1296896, 2162688)
confirmRange(t, uploadPipeline, 1296896, 2162688)
}
// startOff and stopOff must be divided by 4
func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff / 4; i < stopOff/4; i += 4 {
util.Uint32toBytes(p, uint32(i))
uploadPipeline.SaveDataAt(p, i)
}
}
func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff; i < stopOff/4; i += 4 {
uploadPipeline.MaybeReadDataAt(p, i)
x := util.BytesToUint32(p)
if x != uint32(i) {
t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
}
}
}

6
weed/mount/unmount/unmount.go

@ -0,0 +1,6 @@
package unmount
// Unmount tries to unmount the filesystem mounted at dir.
func Unmount(dir string) error {
return unmount(dir)
}

21
weed/mount/unmount/unmount_linux.go

@ -0,0 +1,21 @@
package unmount
import (
"bytes"
"errors"
"os/exec"
)
func unmount(dir string) error {
cmd := exec.Command("fusermount", "-u", dir)
output, err := cmd.CombinedOutput()
if err != nil {
if len(output) > 0 {
output = bytes.TrimRight(output, "\n")
msg := err.Error() + ": " + string(output)
err = errors.New(msg)
}
return err
}
return nil
}

18
weed/mount/unmount/unmount_std.go

@ -0,0 +1,18 @@
//go:build !linux
// +build !linux
package unmount
import (
"os"
"syscall"
)
func unmount(dir string) error {
err := syscall.Unmount(dir, 0)
if err != nil {
err = &os.PathError{Op: "unmount", Path: dir, Err: err}
return err
}
return nil
}

187
weed/mount/weedfs.go

@ -0,0 +1,187 @@
package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/hanwen/go-fuse/v2/fuse"
"google.golang.org/grpc"
"math/rand"
"os"
"path"
"path/filepath"
"time"
"github.com/hanwen/go-fuse/v2/fs"
)
type Option struct {
MountDirectory string
FilerAddresses []pb.ServerAddress
filerIndex int
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
CacheDir string
CacheSizeMB int64
DataCenter string
Umask os.FileMode
MountUid uint32
MountGid uint32
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
MountParentInode uint64
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
uniqueCacheDir string
uniqueCacheTempPageDir string
}
type WFS struct {
// follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
fuse.RawFileSystem
fs.Inode
option *Option
metaCache *meta_cache.MetaCache
stats statsCache
root Directory
chunkCache *chunk_cache.TieredChunkCache
signature int32
concurrentWriters *util.LimitedConcurrentExecutor
inodeToPath *InodeToPath
fhmap *FileHandleToInode
}
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
signature: util.RandomInt32(),
inodeToPath: NewInodeToPath(),
fhmap: NewFileHandleToInode(),
}
wfs.root = Directory{
name: "/",
wfs: wfs,
entry: nil,
parent: nil,
}
wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
wfs.option.setupUniqueCacheDirectory()
if option.CacheSizeMB > 0 {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
}
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper, func(path util.FullPath) {
wfs.inodeToPath.MarkChildrenCached(path)
}, func(path util.FullPath) bool {
return wfs.inodeToPath.IsChildrenCached(path)
}, func(filePath util.FullPath, entry *filer_pb.Entry) {
})
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
})
if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
}
return wfs
}
func (wfs *WFS) StartBackgroundTasks() {
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
}
func (wfs *WFS) Root() *Directory {
return &wfs.root
}
func (wfs *WFS) String() string {
return "seaweedfs"
}
func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
path = wfs.inodeToPath.GetPath(inode)
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
return path, fh, fh.entry, fuse.OK
}
entry, status = wfs.maybeLoadEntry(path)
return
}
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
// glog.V(3).Infof("read entry cache miss %s", fullpath)
dir, name := fullpath.DirAndName()
// return a valid entry for the mount root
if string(fullpath) == wfs.option.FilerMountRootPath {
return &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: wfs.option.MountMtime.Unix(),
FileMode: uint32(wfs.option.MountMode),
Uid: wfs.option.MountUid,
Gid: wfs.option.MountGid,
Crtime: wfs.option.MountCtime.Unix(),
},
}, fuse.OK
}
// read from async meta cache
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
}
return cachedEntry.ToProtoEntry(), fuse.OK
}
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
return func(fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
return wfs.option.FilerAddresses[wfs.option.filerIndex]
}
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
}
func (option *Option) getTempFilePageDir() string {
return option.uniqueCacheTempPageDir
}
func (option *Option) getUniqueCacheDir() string {
return option.uniqueCacheDir
}

208
weed/mount/weedfs_attr.go

@ -0,0 +1,208 @@
package mount
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse"
"os"
"syscall"
"time"
)
func (wfs *WFS) GetAttr(cancel <-chan struct{}, input *fuse.GetAttrIn, out *fuse.AttrOut) (code fuse.Status) {
if input.NodeId == 1 {
wfs.setRootAttr(out)
return fuse.OK
}
_, _, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK {
return status
}
out.AttrValid = 1
wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry)
return fuse.OK
}
func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse.AttrOut) (code fuse.Status) {
path, fh, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK {
return status
}
if size, ok := input.GetSize(); ok {
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.Chunks))
if size < filer.FileSize(entry) {
// fmt.Printf("truncate %v \n", fullPath)
var chunks []*filer_pb.FileChunk
var truncatedChunks []*filer_pb.FileChunk
for _, chunk := range entry.Chunks {
int64Size := int64(chunk.Size)
if chunk.Offset+int64Size > int64(size) {
// this chunk is truncated
int64Size = int64(size) - chunk.Offset
if int64Size > 0 {
chunks = append(chunks, chunk)
glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size)
chunk.Size = uint64(int64Size)
} else {
glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString())
truncatedChunks = append(truncatedChunks, chunk)
}
}
}
// set the new chunks and reset entry cache
entry.Chunks = chunks
if fh != nil {
fh.entryViewCache = nil
}
}
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileSize = size
}
if mode, ok := input.GetMode(); ok {
entry.Attributes.FileMode = uint32(mode)
}
if uid, ok := input.GetUID(); ok {
entry.Attributes.Uid = uid
}
if gid, ok := input.GetGID(); ok {
entry.Attributes.Gid = gid
}
if mtime, ok := input.GetMTime(); ok {
entry.Attributes.Mtime = mtime.Unix()
}
entry.Attributes.Mtime = time.Now().Unix()
out.AttrValid = 1
wfs.setAttrByPbEntry(&out.Attr, input.NodeId, entry)
if fh != nil {
fh.dirtyMetadata = true
return fuse.OK
}
return wfs.saveEntry(path, entry)
}
func (wfs *WFS) setRootAttr(out *fuse.AttrOut) {
now := uint64(time.Now().Unix())
out.AttrValid = 119
out.Ino = 1
setBlksize(&out.Attr, blockSize)
out.Uid = wfs.option.MountUid
out.Gid = wfs.option.MountGid
out.Mtime = now
out.Ctime = now
out.Atime = now
out.Mode = toSystemType(os.ModeDir) | uint32(wfs.option.MountMode)
out.Nlink = 1
}
func (wfs *WFS) setAttrByPbEntry(out *fuse.Attr, inode uint64, entry *filer_pb.Entry) {
out.Ino = inode
out.Size = filer.FileSize(entry)
out.Blocks = (out.Size + blockSize - 1) / blockSize
setBlksize(out, blockSize)
out.Mtime = uint64(entry.Attributes.Mtime)
out.Ctime = uint64(entry.Attributes.Mtime)
out.Atime = uint64(entry.Attributes.Mtime)
out.Mode = toSystemMode(os.FileMode(entry.Attributes.FileMode))
if entry.HardLinkCounter > 0 {
out.Nlink = uint32(entry.HardLinkCounter)
} else {
out.Nlink = 1
}
out.Uid = entry.Attributes.Uid
out.Gid = entry.Attributes.Gid
}
func (wfs *WFS) setAttrByFilerEntry(out *fuse.Attr, inode uint64, entry *filer.Entry) {
out.Ino = inode
out.Size = entry.FileSize
out.Blocks = (out.Size + blockSize - 1) / blockSize
setBlksize(out, blockSize)
out.Atime = uint64(entry.Attr.Mtime.Unix())
out.Mtime = uint64(entry.Attr.Mtime.Unix())
out.Ctime = uint64(entry.Attr.Mtime.Unix())
out.Crtime_ = uint64(entry.Attr.Crtime.Unix())
out.Mode = toSystemMode(entry.Attr.Mode)
if entry.HardLinkCounter > 0 {
out.Nlink = uint32(entry.HardLinkCounter)
} else {
out.Nlink = 1
}
out.Uid = entry.Attr.Uid
out.Gid = entry.Attr.Gid
}
func (wfs *WFS) outputPbEntry(out *fuse.EntryOut, inode uint64, entry *filer_pb.Entry) {
out.NodeId = inode
out.Generation = 1
out.EntryValid = 1
out.AttrValid = 1
wfs.setAttrByPbEntry(&out.Attr, inode, entry)
}
func (wfs *WFS) outputFilerEntry(out *fuse.EntryOut, inode uint64, entry *filer.Entry) {
out.NodeId = inode
out.Generation = 1
out.EntryValid = 1
out.AttrValid = 1
wfs.setAttrByFilerEntry(&out.Attr, inode, entry)
}
func toSystemMode(mode os.FileMode) uint32 {
return toSystemType(mode) | uint32(mode)
}
func toSystemType(mode os.FileMode) uint32 {
switch mode & os.ModeType {
case os.ModeDir:
return syscall.S_IFDIR
case os.ModeSymlink:
return syscall.S_IFLNK
case os.ModeNamedPipe:
return syscall.S_IFIFO
case os.ModeSocket:
return syscall.S_IFSOCK
case os.ModeDevice:
return syscall.S_IFBLK
case os.ModeCharDevice:
return syscall.S_IFCHR
default:
return syscall.S_IFREG
}
}
func toFileType(mode uint32) os.FileMode {
switch mode & (syscall.S_IFMT & 0xffff) {
case syscall.S_IFDIR:
return os.ModeDir
case syscall.S_IFLNK:
return os.ModeSymlink
case syscall.S_IFIFO:
return os.ModeNamedPipe
case syscall.S_IFSOCK:
return os.ModeSocket
case syscall.S_IFBLK:
return os.ModeDevice
case syscall.S_IFCHR:
return os.ModeCharDevice
default:
return 0
}
}
func toFileMode(mode uint32) os.FileMode {
return toFileType(mode) | os.FileMode(mode&07777)
}

8
weed/mount/weedfs_attr_darwin.go

@ -0,0 +1,8 @@
package mount
import (
"github.com/hanwen/go-fuse/v2/fuse"
)
func setBlksize(out *fuse.Attr, size uint32) {
}

9
weed/mount/weedfs_attr_linux.go

@ -0,0 +1,9 @@
package mount
import (
"github.com/hanwen/go-fuse/v2/fuse"
)
func setBlksize(out *fuse.Attr, size uint32) {
out.Blksize = size
}

59
weed/mount/weedfs_dir_lookup.go

@ -0,0 +1,59 @@
package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse"
)
// Lookup is called by the kernel when the VFS wants to know
// about a file inside a directory. Many lookup calls can
// occur in parallel, but only one call happens for each (dir,
// name) pair.
func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name string, out *fuse.EntryOut) (code fuse.Status) {
if s := checkName(name); s != fuse.OK {
return s
}
dirPath := wfs.inodeToPath.GetPath(header.NodeId)
fullFilePath := dirPath.Child(name)
visitErr := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath)
if visitErr != nil {
glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
return fuse.EIO
}
localEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullFilePath)
if cacheErr == filer_pb.ErrNotFound {
return fuse.ENOENT
}
if localEntry == nil {
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
entry, err := filer_pb.GetEntry(wfs, fullFilePath)
if err != nil {
glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
return fuse.ENOENT
}
localEntry = filer.FromPbEntry(string(dirPath), entry)
} else {
glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
}
if localEntry == nil {
return fuse.ENOENT
}
inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.IsDirectory())
wfs.outputFilerEntry(out, inode, localEntry)
return fuse.OK
}

111
weed/mount/weedfs_dir_mkrm.go

@ -0,0 +1,111 @@
package mount
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse"
"os"
"strings"
"syscall"
"time"
)
/** Create a directory
*
* Note that the mode argument may not have the type specification
* bits set, i.e. S_ISDIR(mode) can be false. To obtain the
* correct directory type bits use mode|S_IFDIR
* */
func (wfs *WFS) Mkdir(cancel <-chan struct{}, in *fuse.MkdirIn, name string, out *fuse.EntryOut) (code fuse.Status) {
if s := checkName(name); s != fuse.OK {
return s
}
newEntry := &filer_pb.Entry{
Name: name,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(os.ModeDir) | in.Mode&^uint32(wfs.option.Umask),
Uid: in.Uid,
Gid: in.Gid,
},
}
dirFullPath := wfs.inodeToPath.GetPath(in.NodeId)
entryFullPath := dirFullPath.Child(name)
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
wfs.mapPbIdFromLocalToFiler(newEntry)
defer wfs.mapPbIdFromFilerToLocal(newEntry)
request := &filer_pb.CreateEntryRequest{
Directory: string(dirFullPath),
Entry: newEntry,
Signatures: []int32{wfs.signature},
}
glog.V(1).Infof("mkdir: %v", request)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("mkdir %s: %v", entryFullPath, err)
return err
}
if err := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
return fmt.Errorf("local mkdir dir %s: %v", entryFullPath, err)
}
return nil
})
glog.V(0).Infof("mkdir %s: %v", entryFullPath, err)
if err != nil {
return fuse.EIO
}
inode := wfs.inodeToPath.Lookup(entryFullPath, true)
wfs.outputPbEntry(out, inode, newEntry)
return fuse.OK
}
/** Remove a directory */
func (wfs *WFS) Rmdir(cancel <-chan struct{}, header *fuse.InHeader, name string) (code fuse.Status) {
if name == "." {
return fuse.Status(syscall.EINVAL)
}
if name == ".." {
return fuse.Status(syscall.ENOTEMPTY)
}
dirFullPath := wfs.inodeToPath.GetPath(header.NodeId)
entryFullPath := dirFullPath.Child(name)
glog.V(3).Infof("remove directory: %v", entryFullPath)
ignoreRecursiveErr := true // ignore recursion error since the OS should manage it
err := filer_pb.Remove(wfs, string(dirFullPath), name, true, true, ignoreRecursiveErr, false, []int32{wfs.signature})
if err != nil {
glog.V(0).Infof("remove %s: %v", entryFullPath, err)
if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
return fuse.Status(syscall.ENOTEMPTY)
}
return fuse.ENOENT
}
wfs.metaCache.DeleteEntry(context.Background(), entryFullPath)
wfs.inodeToPath.RemovePath(entryFullPath)
return fuse.OK
}

132
weed/mount/weedfs_dir_read.go

@ -0,0 +1,132 @@
package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fuse"
"math"
"os"
)
// Directory handling
/** Open directory
*
* Unless the 'default_permissions' mount option is given,
* this method should check if opendir is permitted for this
* directory. Optionally opendir may also return an arbitrary
* filehandle in the fuse_file_info structure, which will be
* passed to readdir, releasedir and fsyncdir.
*/
func (wfs *WFS) OpenDir(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.OpenOut) (code fuse.Status) {
if !wfs.inodeToPath.HasInode(input.NodeId) {
return fuse.ENOENT
}
return fuse.OK
}
/** Release directory
*
* If the directory has been removed after the call to opendir, the
* path parameter will be NULL.
*/
func (wfs *WFS) ReleaseDir(input *fuse.ReleaseIn) {
}
/** Synchronize directory contents
*
* If the directory has been removed after the call to opendir, the
* path parameter will be NULL.
*
* If the datasync parameter is non-zero, then only the user data
* should be flushed, not the meta data
*/
func (wfs *WFS) FsyncDir(cancel <-chan struct{}, input *fuse.FsyncIn) (code fuse.Status) {
return fuse.OK
}
/** Read directory
*
* The filesystem may choose between two modes of operation:
*
* 1) The readdir implementation ignores the offset parameter, and
* passes zero to the filler function's offset. The filler
* function will not return '1' (unless an error happens), so the
* whole directory is read in a single readdir operation.
*
* 2) The readdir implementation keeps track of the offsets of the
* directory entries. It uses the offset parameter and always
* passes non-zero offset to the filler function. When the buffer
* is full (or an error happens) the filler function will return
* '1'.
*/
func (wfs *WFS) ReadDir(cancel <-chan struct{}, input *fuse.ReadIn, out *fuse.DirEntryList) (code fuse.Status) {
return wfs.doReadDirectory(input, out, false)
}
func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fuse.DirEntryList) (code fuse.Status) {
return wfs.doReadDirectory(input, out, true)
}
func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status {
dirPath := wfs.inodeToPath.GetPath(input.NodeId)
var counter uint64
var dirEntry fuse.DirEntry
if input.Offset == 0 {
counter++
dirEntry.Ino = input.NodeId
dirEntry.Name = "."
dirEntry.Mode = toSystemMode(os.ModeDir)
out.AddDirEntry(dirEntry)
counter++
parentDir, _ := dirPath.DirAndName()
parentInode := wfs.inodeToPath.GetInode(util.FullPath(parentDir))
dirEntry.Ino = parentInode
dirEntry.Name = ".."
dirEntry.Mode = toSystemMode(os.ModeDir)
out.AddDirEntry(dirEntry)
}
processEachEntryFn := func(entry *filer.Entry, isLast bool) bool {
counter++
if counter <= input.Offset {
return true
}
dirEntry.Name = entry.Name()
inode := wfs.inodeToPath.GetInode(dirPath.Child(dirEntry.Name))
dirEntry.Ino = inode
dirEntry.Mode = toSystemMode(entry.Mode)
if !isPlusMode {
if !out.AddDirEntry(dirEntry) {
return false
}
} else {
entryOut := out.AddDirLookupEntry(dirEntry)
if entryOut == nil {
return false
}
wfs.outputFilerEntry(entryOut, inode, entry)
}
return true
}
if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return fuse.EIO
}
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
return processEachEntryFn(entry, false)
})
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
return fuse.EIO
}
return fuse.OK
}

98
weed/mount/weedfs_file_io.go

@ -0,0 +1,98 @@
package mount
import (
"github.com/hanwen/go-fuse/v2/fuse"
)
/**
* Open a file
*
* Open flags are available in fi->flags. The following rules
* apply.
*
* - Creation (O_CREAT, O_EXCL, O_NOCTTY) flags will be
* filtered out / handled by the kernel.
*
* - Access modes (O_RDONLY, O_WRONLY, O_RDWR) should be used
* by the filesystem to check if the operation is
* permitted. If the ``-o default_permissions`` mount
* option is given, this check is already done by the
* kernel before calling open() and may thus be omitted by
* the filesystem.
*
* - When writeback caching is enabled, the kernel may send
* read requests even for files opened with O_WRONLY. The
* filesystem should be prepared to handle this.
*
* - When writeback caching is disabled, the filesystem is
* expected to properly handle the O_APPEND flag and ensure
* that each write is appending to the end of the file.
*
* - When writeback caching is enabled, the kernel will
* handle O_APPEND. However, unless all changes to the file
* come through the kernel this will not work reliably. The
* filesystem should thus either ignore the O_APPEND flag
* (and let the kernel handle it), or return an error
* (indicating that reliably O_APPEND is not available).
*
* Filesystem may store an arbitrary file handle (pointer,
* index, etc) in fi->fh, and use this in other all other file
* operations (read, write, flush, release, fsync).
*
* Filesystem may also implement stateless file I/O and not store
* anything in fi->fh.
*
* There are also some flags (direct_io, keep_cache) which the
* filesystem may set in fi, to change the way the file is opened.
* See fuse_file_info structure in <fuse_common.h> for more details.
*
* If this request is answered with an error code of ENOSYS
* and FUSE_CAP_NO_OPEN_SUPPORT is set in
* `fuse_conn_info.capable`, this is treated as success and
* future calls to open and release will also succeed without being
* sent to the filesystem process.
*
* Valid replies:
* fuse_reply_open
* fuse_reply_err
*
* @param req request handle
* @param ino the inode number
* @param fi file information
*/
func (wfs *WFS) Open(cancel <-chan struct{}, in *fuse.OpenIn, out *fuse.OpenOut) (status fuse.Status) {
fileHandle, code := wfs.AcquireHandle(in.NodeId, in.Uid, in.Gid)
if code == fuse.OK {
out.Fh = uint64(fileHandle.fh)
}
return code
}
/**
* Release an open file
*
* Release is called when there are no more references to an open
* file: all file descriptors are closed and all memory mappings
* are unmapped.
*
* For every open call there will be exactly one release call (unless
* the filesystem is force-unmounted).
*
* The filesystem may reply with an error, but error values are
* not returned to close() or munmap() which triggered the
* release.
*
* fi->fh will contain the value set by the open method, or will
* be undefined if the open method didn't set any value.
* fi->flags will contain the same flags as for open.
*
* Valid replies:
* fuse_reply_err
*
* @param req request handle
* @param ino the inode number
* @param fi file information
*/
func (wfs *WFS) Release(cancel <-chan struct{}, in *fuse.ReleaseIn) {
wfs.ReleaseHandle(FileHandleId(in.Fh))
}

130
weed/mount/weedfs_file_mkrm.go

@ -0,0 +1,130 @@
package mount
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse"
"time"
)
/**
* Create and open a file
*
* If the file does not exist, first create it with the specified
* mode, and then open it.
*
* If this method is not implemented or under Linux kernel
* versions earlier than 2.6.15, the mknod() and open() methods
* will be called instead.
*/
func (wfs *WFS) Create(cancel <-chan struct{}, in *fuse.CreateIn, name string, out *fuse.CreateOut) (code fuse.Status) {
// if implemented, need to use
// inode := wfs.inodeToPath.Lookup(entryFullPath)
// to ensure nlookup counter
return fuse.ENOSYS
}
/** Create a file node
*
* This is called for creation of all non-directory, non-symlink
* nodes. If the filesystem defines a create() method, then for
* regular files that will be called instead.
*/
func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out *fuse.EntryOut) (code fuse.Status) {
if s := checkName(name); s != fuse.OK {
return s
}
newEntry := &filer_pb.Entry{
Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(toFileMode(in.Mode) &^ wfs.option.Umask),
Uid: in.Uid,
Gid: in.Gid,
Collection: wfs.option.Collection,
Replication: wfs.option.Replication,
TtlSec: wfs.option.TtlSec,
},
}
dirFullPath := wfs.inodeToPath.GetPath(in.NodeId)
entryFullPath := dirFullPath.Child(name)
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
wfs.mapPbIdFromLocalToFiler(newEntry)
defer wfs.mapPbIdFromFilerToLocal(newEntry)
request := &filer_pb.CreateEntryRequest{
Directory: string(dirFullPath),
Entry: newEntry,
Signatures: []int32{wfs.signature},
}
glog.V(1).Infof("mknod: %v", request)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("mknod %s: %v", entryFullPath, err)
return err
}
if err := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
return fmt.Errorf("local mknod %s: %v", entryFullPath, err)
}
return nil
})
glog.V(0).Infof("mknod %s: %v", entryFullPath, err)
if err != nil {
return fuse.EIO
}
inode := wfs.inodeToPath.Lookup(entryFullPath, false)
wfs.outputPbEntry(out, inode, newEntry)
return fuse.OK
}
/** Remove a file */
func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name string) (code fuse.Status) {
dirFullPath := wfs.inodeToPath.GetPath(header.NodeId)
entryFullPath := dirFullPath.Child(name)
entry, status := wfs.maybeLoadEntry(entryFullPath)
if status != fuse.OK {
return status
}
// first, ensure the filer store can correctly delete
glog.V(3).Infof("remove file: %v", entryFullPath)
isDeleteData := entry != nil && entry.HardLinkCounter <= 1
err := filer_pb.Remove(wfs, string(dirFullPath), name, isDeleteData, false, false, false, []int32{wfs.signature})
if err != nil {
glog.V(0).Infof("remove %s: %v", entryFullPath, err)
return fuse.ENOENT
}
// then, delete meta cache
if err = wfs.metaCache.DeleteEntry(context.Background(), entryFullPath); err != nil {
glog.V(3).Infof("local DeleteEntry %s: %v", entryFullPath, err)
return fuse.EIO
}
wfs.metaCache.DeleteEntry(context.Background(), entryFullPath)
wfs.inodeToPath.RemovePath(entryFullPath)
return fuse.OK
}

59
weed/mount/weedfs_file_read.go

@ -0,0 +1,59 @@
package mount
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/hanwen/go-fuse/v2/fuse"
"io"
)
/**
* Read data
*
* Read should send exactly the number of bytes requested except
* on EOF or error, otherwise the rest of the data will be
* substituted with zeroes. An exception to this is when the file
* has been opened in 'direct_io' mode, in which case the return
* value of the read system call will reflect the return value of
* this operation.
*
* fi->fh will contain the value set by the open method, or will
* be undefined if the open method didn't set any value.
*
* Valid replies:
* fuse_reply_buf
* fuse_reply_iov
* fuse_reply_data
* fuse_reply_err
*
* @param req request handle
* @param ino the inode number
* @param size number of bytes to read
* @param off offset to read from
* @param fi file information
*/
func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse.ReadResult, fuse.Status) {
fh := wfs.GetHandle(FileHandleId(in.Fh))
if fh == nil {
return nil, fuse.ENOENT
}
offset := int64(in.Offset)
fh.lockForRead(offset, len(buff))
defer fh.unlockForRead(offset, len(buff))
totalRead, err := fh.readFromChunks(buff, offset)
if err == nil || err == io.EOF {
maxStop := fh.readFromDirtyPages(buff, offset)
totalRead = max(maxStop-offset, totalRead)
}
if err == io.EOF {
err = nil
}
if err != nil {
glog.Warningf("file handle read %s %d: %v", fh.FullPath(), totalRead, err)
return nil, fuse.EIO
}
return fuse.ReadResultData(buff[:totalRead]), fuse.OK
}

180
weed/mount/weedfs_file_sync.go

@ -0,0 +1,180 @@
package mount
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse"
"os"
"time"
)
/**
* Flush method
*
* This is called on each close() of the opened file.
*
* Since file descriptors can be duplicated (dup, dup2, fork), for
* one open call there may be many flush calls.
*
* Filesystems shouldn't assume that flush will always be called
* after some writes, or that if will be called at all.
*
* fi->fh will contain the value set by the open method, or will
* be undefined if the open method didn't set any value.
*
* NOTE: the name of the method is misleading, since (unlike
* fsync) the filesystem is not forced to flush pending writes.
* One reason to flush data is if the filesystem wants to return
* write errors during close. However, such use is non-portable
* because POSIX does not require [close] to wait for delayed I/O to
* complete.
*
* If the filesystem supports file locking operations (setlk,
* getlk) it should remove all locks belonging to 'fi->owner'.
*
* If this request is answered with an error code of ENOSYS,
* this is treated as success and future calls to flush() will
* succeed automatically without being send to the filesystem
* process.
*
* Valid replies:
* fuse_reply_err
*
* @param req request handle
* @param ino the inode number
* @param fi file information
*
* [close]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/close.html
*/
func (wfs *WFS) Flush(cancel <-chan struct{}, in *fuse.FlushIn) fuse.Status {
fh := wfs.GetHandle(FileHandleId(in.Fh))
if fh == nil {
return fuse.ENOENT
}
fh.Lock()
defer fh.Unlock()
return wfs.doFlush(fh, in.Uid, in.Gid)
}
/**
* Synchronize file contents
*
* If the datasync parameter is non-zero, then only the user data
* should be flushed, not the meta data.
*
* If this request is answered with an error code of ENOSYS,
* this is treated as success and future calls to fsync() will
* succeed automatically without being send to the filesystem
* process.
*
* Valid replies:
* fuse_reply_err
*
* @param req request handle
* @param ino the inode number
* @param datasync flag indicating if only data should be flushed
* @param fi file information
*/
func (wfs *WFS) Fsync(cancel <-chan struct{}, in *fuse.FsyncIn) (code fuse.Status) {
fh := wfs.GetHandle(FileHandleId(in.Fh))
if fh == nil {
return fuse.ENOENT
}
fh.Lock()
defer fh.Unlock()
return wfs.doFlush(fh, in.Uid, in.Gid)
}
func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
// flush works at fh level
fileFullPath := fh.FullPath()
dir, _ := fileFullPath.DirAndName()
// send the data to the OS
glog.V(4).Infof("doFlush %s fh %d", fileFullPath, fh.handle)
if err := fh.dirtyPages.FlushData(); err != nil {
glog.Errorf("%v doFlush: %v", fileFullPath, err)
return fuse.EIO
}
if !fh.dirtyMetadata {
return fuse.OK
}
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
entry := fh.entry
if entry == nil {
return nil
}
if entry.Attributes != nil {
entry.Attributes.Mime = fh.contentType
if entry.Attributes.Uid == 0 {
entry.Attributes.Uid = uid
}
if entry.Attributes.Gid == 0 {
entry.Attributes.Gid = gid
}
if entry.Attributes.Crtime == 0 {
entry.Attributes.Crtime = time.Now().Unix()
}
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ wfs.option.Umask)
entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions()
}
request := &filer_pb.CreateEntryRequest{
Directory: string(dir),
Entry: entry,
Signatures: []int32{wfs.signature},
}
glog.V(4).Infof("%s set chunks: %v", fileFullPath, len(entry.Chunks))
for i, chunk := range entry.Chunks {
glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fileFullPath, i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
chunks, _ := filer.CompactFileChunks(wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(wfs.saveDataAsChunk(fileFullPath), chunks)
if manifestErr != nil {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
entry.Chunks = append(chunks, manifestChunks...)
wfs.mapPbIdFromLocalToFiler(request.Entry)
defer wfs.mapPbIdFromFilerToLocal(request.Entry)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.Errorf("fh flush create %s: %v", fileFullPath, err)
return fmt.Errorf("fh flush create %s: %v", fileFullPath, err)
}
wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
if err == nil {
fh.dirtyMetadata = false
}
if err != nil {
glog.Errorf("%v fh %d flush: %v", fileFullPath, fh.handle, err)
return fuse.EIO
}
return fuse.OK
}

66
weed/mount/weedfs_file_write.go

@ -0,0 +1,66 @@
package mount
import (
"github.com/hanwen/go-fuse/v2/fuse"
"net/http"
)
/**
* Write data
*
* Write should return exactly the number of bytes requested
* except on error. An exception to this is when the file has
* been opened in 'direct_io' mode, in which case the return value
* of the write system call will reflect the return value of this
* operation.
*
* Unless FUSE_CAP_HANDLE_KILLPRIV is disabled, this method is
* expected to reset the setuid and setgid bits.
*
* fi->fh will contain the value set by the open method, or will
* be undefined if the open method didn't set any value.
*
* Valid replies:
* fuse_reply_write
* fuse_reply_err
*
* @param req request handle
* @param ino the inode number
* @param buf data to write
* @param size number of bytes to write
* @param off offset to write to
* @param fi file information
*/
func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (written uint32, code fuse.Status) {
fh := wfs.GetHandle(FileHandleId(in.Fh))
if fh == nil {
return 0, fuse.ENOENT
}
fh.Lock()
defer fh.Unlock()
entry := fh.entry
if entry == nil {
return 0, fuse.OK
}
entry.Content = nil
offset := int64(in.Offset)
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(offset, data)
written = uint32(len(data))
if offset == 0 {
// detect mime type
fh.contentType = http.DetectContentType(data)
}
fh.dirtyMetadata = true
return written, fuse.OK
}

20
weed/mount/weedfs_filehandle.go

@ -0,0 +1,20 @@
package mount
import "github.com/hanwen/go-fuse/v2/fuse"
func (wfs *WFS) AcquireHandle(inode uint64, uid, gid uint32) (fileHandle *FileHandle, code fuse.Status) {
_, _, entry, status := wfs.maybeReadEntry(inode)
if status == fuse.OK {
fileHandle = wfs.fhmap.AcquireFileHandle(wfs, inode, entry)
fileHandle.entry = entry
}
return
}
func (wfs *WFS) ReleaseHandle(handleId FileHandleId) {
wfs.fhmap.ReleaseByHandle(handleId)
}
func (wfs *WFS) GetHandle(handleId FileHandleId) *FileHandle {
return wfs.fhmap.GetFileHandle(handleId)
}

68
weed/mount/weedfs_forget.go

@ -0,0 +1,68 @@
package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/util"
)
// Forget is called when the kernel discards entries from its
// dentry cache. This happens on unmount, and when the kernel
// is short on memory. Since it is not guaranteed to occur at
// any moment, and since there is no return value, Forget
// should not do I/O, as there is no channel to report back
// I/O errors.
// from https://github.com/libfuse/libfuse/blob/master/include/fuse_lowlevel.h
/**
* Forget about an inode
*
* This function is called when the kernel removes an inode
* from its internal caches.
*
* The inode's lookup count increases by one for every call to
* fuse_reply_entry and fuse_reply_create. The nlookup parameter
* indicates by how much the lookup count should be decreased.
*
* Inodes with a non-zero lookup count may receive request from
* the kernel even after calls to unlink, rmdir or (when
* overwriting an existing file) rename. Filesystems must handle
* such requests properly and it is recommended to defer removal
* of the inode until the lookup count reaches zero. Calls to
* unlink, rmdir or rename will be followed closely by forget
* unless the file or directory is open, in which case the
* kernel issues forget only after the release or releasedir
* calls.
*
* Note that if a file system will be exported over NFS the
* inodes lifetime must extend even beyond forget. See the
* generation field in struct fuse_entry_param above.
*
* On unmount the lookup count for all inodes implicitly drops
* to zero. It is not guaranteed that the file system will
* receive corresponding forget messages for the affected
* inodes.
*
* Valid replies:
* fuse_reply_none
*
* @param req request handle
* @param ino the inode number
* @param nlookup the number of lookups to forget
*/
/*
https://libfuse.github.io/doxygen/include_2fuse__lowlevel_8h.html
int fuse_reply_entry ( fuse_req_t req,
const struct fuse_entry_param * e
)
Reply with a directory entry
Possible requests: lookup, mknod, mkdir, symlink, link
Side effects: increments the lookup count on success
*/
func (wfs *WFS) Forget(nodeid, nlookup uint64) {
wfs.inodeToPath.Forget(nodeid, nlookup, func(dir util.FullPath) {
wfs.metaCache.DeleteFolderChildren(context.Background(), dir)
})
}

93
weed/mount/weedfs_link.go

@ -0,0 +1,93 @@
package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fuse"
"time"
)
const (
HARD_LINK_MARKER = '\x01'
)
/** Create a hard link to a file */
func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *fuse.EntryOut) (code fuse.Status) {
if s := checkName(name); s != fuse.OK {
return s
}
newParentPath := wfs.inodeToPath.GetPath(in.NodeId)
oldEntryPath := wfs.inodeToPath.GetPath(in.Oldnodeid)
oldParentPath, _ := oldEntryPath.DirAndName()
oldEntry, status := wfs.maybeLoadEntry(oldEntryPath)
if status != fuse.OK {
return status
}
// update old file to hardlink mode
if len(oldEntry.HardLinkId) == 0 {
oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
oldEntry.HardLinkCounter = 1
}
oldEntry.HardLinkCounter++
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
Directory: oldParentPath,
Entry: oldEntry,
Signatures: []int32{wfs.signature},
}
// CreateLink 1.2 : update new file to hardlink mode
oldEntry.Attributes.Mtime = time.Now().Unix()
request := &filer_pb.CreateEntryRequest{
Directory: string(newParentPath),
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: false,
Attributes: oldEntry.Attributes,
Chunks: oldEntry.Chunks,
Extended: oldEntry.Extended,
HardLinkId: oldEntry.HardLinkId,
HardLinkCounter: oldEntry.HardLinkCounter,
},
Signatures: []int32{wfs.signature},
}
// apply changes to the filer, and also apply to local metaCache
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
wfs.mapPbIdFromLocalToFiler(request.Entry)
defer wfs.mapPbIdFromFilerToLocal(request.Entry)
if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil {
return err
}
wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry))
if err := filer_pb.CreateEntry(client, request); err != nil {
return err
}
wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
newEntryPath := newParentPath.Child(name)
if err != nil {
glog.V(0).Infof("Link %v -> %s: %v", oldEntryPath, newEntryPath, err)
return fuse.EIO
}
inode := wfs.inodeToPath.Lookup(newEntryPath, false)
wfs.outputPbEntry(out, inode, request.Entry)
return fuse.OK
}

235
weed/mount/weedfs_rename.go

@ -0,0 +1,235 @@
package mount
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
"io"
"strings"
"syscall"
)
/** Rename a file
*
* If the target exists it should be atomically replaced. If
* the target's inode's lookup count is non-zero, the file
* system is expected to postpone any removal of the inode
* until the lookup count reaches zero (see description of the
* forget function).
*
* If this request is answered with an error code of ENOSYS, this is
* treated as a permanent failure with error code EINVAL, i.e. all
* future bmap requests will fail with EINVAL without being
* send to the filesystem process.
*
* *flags* may be `RENAME_EXCHANGE` or `RENAME_NOREPLACE`. If
* RENAME_NOREPLACE is specified, the filesystem must not
* overwrite *newname* if it exists and return an error
* instead. If `RENAME_EXCHANGE` is specified, the filesystem
* must atomically exchange the two files, i.e. both must
* exist and neither may be deleted.
*
* Valid replies:
* fuse_reply_err
*
* @param req request handle
* @param parent inode number of the old parent directory
* @param name old name
* @param newparent inode number of the new parent directory
* @param newname new name
*/
/*
renameat2()
renameat2() has an additional flags argument. A renameat2() call
with a zero flags argument is equivalent to renameat().
The flags argument is a bit mask consisting of zero or more of
the following flags:
RENAME_EXCHANGE
Atomically exchange oldpath and newpath. Both pathnames
must exist but may be of different types (e.g., one could
be a non-empty directory and the other a symbolic link).
RENAME_NOREPLACE
Don't overwrite newpath of the rename. Return an error if
newpath already exists.
RENAME_NOREPLACE can't be employed together with
RENAME_EXCHANGE.
RENAME_NOREPLACE requires support from the underlying
filesystem. Support for various filesystems was added as
follows:
* ext4 (Linux 3.15);
* btrfs, tmpfs, and cifs (Linux 3.17);
* xfs (Linux 4.0);
* Support for many other filesystems was added in Linux
4.9, including ext2, minix, reiserfs, jfs, vfat, and
bpf.
RENAME_WHITEOUT (since Linux 3.18)
This operation makes sense only for overlay/union
filesystem implementations.
Specifying RENAME_WHITEOUT creates a "whiteout" object at
the source of the rename at the same time as performing
the rename. The whole operation is atomic, so that if the
rename succeeds then the whiteout will also have been
created.
A "whiteout" is an object that has special meaning in
union/overlay filesystem constructs. In these constructs,
multiple layers exist and only the top one is ever
modified. A whiteout on an upper layer will effectively
hide a matching file in the lower layer, making it appear
as if the file didn't exist.
When a file that exists on the lower layer is renamed, the
file is first copied up (if not already on the upper
layer) and then renamed on the upper, read-write layer.
At the same time, the source file needs to be "whiteouted"
(so that the version of the source file in the lower layer
is rendered invisible). The whole operation needs to be
done atomically.
When not part of a union/overlay, the whiteout appears as
a character device with a {0,0} device number. (Note that
other union/overlay implementations may employ different
methods for storing whiteout entries; specifically, BSD
union mount employs a separate inode type, DT_WHT, which,
while supported by some filesystems available in Linux,
such as CODA and XFS, is ignored by the kernel's whiteout
support code, as of Linux 4.19, at least.)
RENAME_WHITEOUT requires the same privileges as creating a
device node (i.e., the CAP_MKNOD capability).
RENAME_WHITEOUT can't be employed together with
RENAME_EXCHANGE.
RENAME_WHITEOUT requires support from the underlying
filesystem. Among the filesystems that support it are
tmpfs (since Linux 3.18), ext4 (since Linux 3.18), XFS
(since Linux 4.1), f2fs (since Linux 4.2), btrfs (since
Linux 4.7), and ubifs (since Linux 4.9).
*/
const (
RenameEmptyFlag = 0
RenameNoReplace = 1
RenameExchange = fs.RENAME_EXCHANGE
RenameWhiteout = 3
)
func (wfs *WFS) Rename(cancel <-chan struct{}, in *fuse.RenameIn, oldName string, newName string) (code fuse.Status) {
if s := checkName(newName); s != fuse.OK {
return s
}
switch in.Flags {
case RenameEmptyFlag:
case RenameNoReplace:
case RenameExchange:
case RenameWhiteout:
return fuse.ENOTSUP
default:
return fuse.EINVAL
}
oldDir := wfs.inodeToPath.GetPath(in.NodeId)
oldPath := oldDir.Child(oldName)
newDir := wfs.inodeToPath.GetPath(in.Newdir)
newPath := newDir.Child(newName)
glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
// update remote filer
err := wfs.WithFilerClient(true, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
request := &filer_pb.StreamRenameEntryRequest{
OldDirectory: string(oldDir),
OldName: oldName,
NewDirectory: string(newDir),
NewName: newName,
Signatures: []int32{wfs.signature},
}
stream, err := client.StreamRenameEntry(ctx, request)
if err != nil {
code = fuse.EIO
return fmt.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
}
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
if recvErr == io.EOF {
break
} else {
if strings.Contains(recvErr.Error(), "not empty") {
code = fuse.Status(syscall.ENOTEMPTY)
} else if strings.Contains(recvErr.Error(), "not directory") {
code = fuse.ENOTDIR
}
return fmt.Errorf("dir Rename %s => %s receive: %v", oldPath, newPath, recvErr)
}
}
if err = wfs.handleRenameResponse(ctx, resp); err != nil {
glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
return err
}
}
return nil
})
if err != nil {
glog.V(0).Infof("Link: %v", err)
return
}
return fuse.OK
}
func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamRenameEntryResponse) error {
// comes from filer StreamRenameEntry, can only be create or delete entry
if resp.EventNotification.NewEntry != nil {
// with new entry, the old entry name also exists. This is the first step to create new entry
newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry)
if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil {
return err
}
oldParent, newParent := util.FullPath(resp.Directory), util.FullPath(resp.EventNotification.NewParentPath)
oldName, newName := resp.EventNotification.OldEntry.Name, resp.EventNotification.NewEntry.Name
oldPath := oldParent.Child(oldName)
newPath := newParent.Child(newName)
wfs.inodeToPath.MovePath(oldPath, newPath)
} else if resp.EventNotification.OldEntry != nil {
// without new entry, only old entry name exists. This is the second step to delete old entry
if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil {
return err
}
}
return nil
}

80
weed/mount/weedfs_stats.go

@ -0,0 +1,80 @@
package mount
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse"
"math"
"time"
)
const blockSize = 512
type statsCache struct {
filer_pb.StatisticsResponse
lastChecked int64 // unix time in seconds
}
func (wfs *WFS) StatFs(cancel <-chan struct{}, in *fuse.InHeader, out *fuse.StatfsOut) (code fuse.Status) {
glog.V(4).Infof("reading fs stats")
if wfs.stats.lastChecked < time.Now().Unix()-20 {
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
Collection: wfs.option.Collection,
Replication: wfs.option.Replication,
Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
DiskType: string(wfs.option.DiskType),
}
glog.V(4).Infof("reading filer stats: %+v", request)
resp, err := client.Statistics(context.Background(), request)
if err != nil {
glog.V(0).Infof("reading filer stats %v: %v", request, err)
return err
}
glog.V(4).Infof("read filer stats: %+v", resp)
wfs.stats.TotalSize = resp.TotalSize
wfs.stats.UsedSize = resp.UsedSize
wfs.stats.FileCount = resp.FileCount
wfs.stats.lastChecked = time.Now().Unix()
return nil
})
if err != nil {
glog.V(0).Infof("filer Statistics: %v", err)
return fuse.OK
}
}
totalDiskSize := wfs.stats.TotalSize
usedDiskSize := wfs.stats.UsedSize
actualFileCount := wfs.stats.FileCount
// Compute the total number of available blocks
out.Blocks = totalDiskSize / blockSize
// Compute the number of used blocks
numBlocks := uint64(usedDiskSize / blockSize)
// Report the number of free and available blocks for the block size
out.Bfree = out.Blocks - numBlocks
out.Bavail = out.Blocks - numBlocks
out.Bsize = uint32(blockSize)
// Report the total number of possible files in the file system (and those free)
out.Files = math.MaxInt64
out.Ffree = math.MaxInt64 - actualFileCount
// Report the maximum length of a name and the minimum fragment size
out.NameLen = 1024
out.Frsize = uint32(blockSize)
return fuse.OK
}

78
weed/mount/weedfs_symlink.go

@ -0,0 +1,78 @@
package mount
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse"
"os"
"time"
)
/** Create a symbolic link */
func (wfs *WFS) Symlink(cancel <-chan struct{}, header *fuse.InHeader, target string, name string, out *fuse.EntryOut) (code fuse.Status) {
if s := checkName(name); s != fuse.OK {
return s
}
dirPath := wfs.inodeToPath.GetPath(header.NodeId)
entryFullPath := dirPath.Child(name)
request := &filer_pb.CreateEntryRequest{
Directory: string(dirPath),
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32((os.FileMode(0777) | os.ModeSymlink) &^ wfs.option.Umask),
Uid: header.Uid,
Gid: header.Gid,
SymlinkTarget: target,
},
},
Signatures: []int32{wfs.signature},
}
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
wfs.mapPbIdFromLocalToFiler(request.Entry)
defer wfs.mapPbIdFromFilerToLocal(request.Entry)
if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("symlink %s: %v", entryFullPath, err)
}
wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
if err != nil {
glog.V(0).Infof("Symlink %s => %s: %v", entryFullPath, target, err)
return fuse.EIO
}
inode := wfs.inodeToPath.Lookup(entryFullPath, false)
wfs.outputPbEntry(out, inode, request.Entry)
return fuse.OK
}
func (wfs *WFS) Readlink(cancel <-chan struct{}, header *fuse.InHeader) (out []byte, code fuse.Status) {
entryFullPath := wfs.inodeToPath.GetPath(header.NodeId)
entry, status := wfs.maybeLoadEntry(entryFullPath)
if status != fuse.OK {
return nil, status
}
if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 {
return nil, fuse.EINVAL
}
return []byte(entry.Attributes.SymlinkTarget), fuse.OK
}

65
weed/mount/weedfs_unsupported.go

@ -0,0 +1,65 @@
package mount
import "github.com/hanwen/go-fuse/v2/fuse"
// https://github.com/libfuse/libfuse/blob/48ae2e72b39b6a31cb2194f6f11786b7ca06aac6/include/fuse.h#L778
/**
* Copy a range of data from one file to anotherNiels de Vos, 4 years ago: libfuse: add copy_file_range() support
*
* Performs an optimized copy between two file descriptors without the
* additional cost of transferring data through the FUSE kernel module
* to user space (glibc) and then back into the FUSE filesystem again.
*
* In case this method is not implemented, applications are expected to
* fall back to a regular file copy. (Some glibc versions did this
* emulation automatically, but the emulation has been removed from all
* glibc release branches.)
*/
func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) (written uint32, code fuse.Status) {
return 0, fuse.ENOSYS
}
/**
* Allocates space for an open file
*
* This function ensures that required space is allocated for specified
* file. If this function returns success then any subsequent write
* request to specified range is guaranteed not to fail because of lack
* of space on the file system media.
*/
func (wfs *WFS) Fallocate(cancel <-chan struct{}, in *fuse.FallocateIn) (code fuse.Status) {
return fuse.ENOSYS
}
/**
* Find next data or hole after the specified offset
*/
func (wfs *WFS) Lseek(cancel <-chan struct{}, in *fuse.LseekIn, out *fuse.LseekOut) fuse.Status {
return fuse.ENOSYS
}
func (wfs *WFS) GetLk(cancel <-chan struct{}, in *fuse.LkIn, out *fuse.LkOut) (code fuse.Status) {
return fuse.ENOSYS
}
func (wfs *WFS) SetLk(cancel <-chan struct{}, in *fuse.LkIn) (code fuse.Status) {
return fuse.ENOSYS
}
func (wfs *WFS) SetLkw(cancel <-chan struct{}, in *fuse.LkIn) (code fuse.Status) {
return fuse.ENOSYS
}
/**
* Check file access permissions
*
* This will be called for the access() system call. If the
* 'default_permissions' mount option is given, this method is not
* called.
*
* This method is not called under Linux kernel versions 2.4.x
*/
func (wfs *WFS) Access(cancel <-chan struct{}, input *fuse.AccessIn) (code fuse.Status) {
return fuse.ENOSYS
}

84
weed/mount/weedfs_write.go

@ -0,0 +1,84 @@
package mount
import (
"context"
"fmt"
"io"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string
var auth security.EncodedJwt
if err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: wfs.option.Replication,
Collection: wfs.option.Collection,
TtlSec: wfs.option.TtlSec,
DiskType: string(wfs.option.DiskType),
DataCenter: wfs.option.DataCenter,
Path: string(fullPath),
}
resp, err := client.AssignVolume(context.Background(), request)
if err != nil {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
if resp.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
loc := resp.Location
host = wfs.AdjustedUrl(loc)
collection, replication = resp.Collection, resp.Replication
return nil
})
}); err != nil {
return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if wfs.option.VolumeServerAccess == "filerProxy" {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
}
uploadOption := &operation.UploadOption{
UploadUrl: fileUrl,
Filename: filename,
Cipher: wfs.option.Cipher,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
Jwt: auth,
}
uploadResult, err, data := operation.Upload(reader, uploadOption)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
return nil, "", "", fmt.Errorf("upload data: %v", err)
}
if uploadResult.Error != "" {
glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
}
if offset == 0 {
wfs.chunkCache.SetChunk(fileId, data)
}
chunk = uploadResult.ToPbFileChunk(fileId, offset)
return chunk, collection, replication, nil
}
}

168
weed/mount/weedfs_xattr.go

@ -0,0 +1,168 @@
package mount
import (
"github.com/hanwen/go-fuse/v2/fuse"
sys "golang.org/x/sys/unix"
"runtime"
"strings"
"syscall"
)
const (
// https://man7.org/linux/man-pages/man7/xattr.7.html#:~:text=The%20VFS%20imposes%20limitations%20that,in%20listxattr(2)).
MAX_XATTR_NAME_SIZE = 255
MAX_XATTR_VALUE_SIZE = 65536
XATTR_PREFIX = "xattr-" // same as filer
)
// GetXAttr reads an extended attribute, and should return the
// number of bytes. If the buffer is too small, return ERANGE,
// with the required buffer size.
func (wfs *WFS) GetXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string, dest []byte) (size uint32, code fuse.Status) {
//validate attr name
if len(attr) > MAX_XATTR_NAME_SIZE {
if runtime.GOOS == "darwin" {
return 0, fuse.EPERM
} else {
return 0, fuse.ERANGE
}
}
if len(attr) == 0 {
return 0, fuse.EINVAL
}
_, _, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK {
return 0, status
}
if entry == nil {
return 0, fuse.ENOENT
}
if entry.Extended == nil {
return 0, fuse.ENOATTR
}
data, found := entry.Extended[XATTR_PREFIX+attr]
if !found {
return 0, fuse.ENOATTR
}
if len(dest) < len(data) {
return uint32(len(data)), fuse.ERANGE
}
copy(dest, data)
return uint32(len(data)), fuse.OK
}
// SetXAttr writes an extended attribute.
// https://man7.org/linux/man-pages/man2/setxattr.2.html
// By default (i.e., flags is zero), the extended attribute will be
// created if it does not exist, or the value will be replaced if
// the attribute already exists. To modify these semantics, one of
// the following values can be specified in flags:
//
// XATTR_CREATE
// Perform a pure create, which fails if the named attribute
// exists already.
//
// XATTR_REPLACE
// Perform a pure replace operation, which fails if the named
// attribute does not already exist.
func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr string, data []byte) fuse.Status {
//validate attr name
if len(attr) > MAX_XATTR_NAME_SIZE {
if runtime.GOOS == "darwin" {
return fuse.EPERM
} else {
return fuse.ERANGE
}
}
if len(attr) == 0 {
return fuse.EINVAL
}
//validate attr value
if len(data) > MAX_XATTR_VALUE_SIZE {
if runtime.GOOS == "darwin" {
return fuse.Status(syscall.E2BIG)
} else {
return fuse.ERANGE
}
}
path, _, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK {
return status
}
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
oldData, _ := entry.Extended[XATTR_PREFIX+attr]
switch input.Flags {
case sys.XATTR_CREATE:
if len(oldData) > 0 {
break
}
fallthrough
case sys.XATTR_REPLACE:
fallthrough
default:
entry.Extended[XATTR_PREFIX+attr] = data
}
return wfs.saveEntry(path, entry)
}
// ListXAttr lists extended attributes as '\0' delimited byte
// slice, and return the number of bytes. If the buffer is too
// small, return ERANGE, with the required buffer size.
func (wfs *WFS) ListXAttr(cancel <-chan struct{}, header *fuse.InHeader, dest []byte) (n uint32, code fuse.Status) {
_, _, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK {
return 0, status
}
if entry == nil {
return 0, fuse.ENOENT
}
if entry.Extended == nil {
return 0, fuse.ENOATTR
}
var data []byte
for k := range entry.Extended {
if strings.HasPrefix(k, XATTR_PREFIX) {
data = append(data, k[len(XATTR_PREFIX):]...)
data = append(data, 0)
}
}
if len(dest) < len(data) {
return uint32(len(data)), fuse.ERANGE
}
copy(dest, data)
return uint32(len(data)), fuse.OK
}
// RemoveXAttr removes an extended attribute.
func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string) fuse.Status {
if len(attr) == 0 {
return fuse.EINVAL
}
path, _, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK {
return status
}
if entry.Extended == nil {
return fuse.ENOATTR
}
_, found := entry.Extended[XATTR_PREFIX+attr]
if !found {
return fuse.ENOATTR
}
delete(entry.Extended, XATTR_PREFIX+attr)
return wfs.saveEntry(path, entry)
}

51
weed/mount/wfs_filer_client.go

@ -0,0 +1,51 @@
package mount
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
var _ = filer_pb.FilerClient(&WFS{})
func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
return util.Retry("filer grpc", func() error {
i := wfs.option.filerIndex
n := len(wfs.option.FilerAddresses)
for x := 0; x < n; x++ {
filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress()
err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, wfs.option.GrpcDialOption)
if err != nil {
glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
} else {
wfs.option.filerIndex = i
return nil
}
i++
if i >= n {
i = 0
}
}
return err
})
}
func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
if wfs.option.VolumeServerAccess == "publicUrl" {
return location.PublicUrl
}
return location.Url
}

67
weed/mount/wfs_save.go

@ -0,0 +1,67 @@
package mount
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fuse"
"syscall"
)
func (wfs *WFS) saveEntry(path util.FullPath, entry *filer_pb.Entry) (code fuse.Status) {
parentDir, _ := path.DirAndName()
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
wfs.mapPbIdFromLocalToFiler(entry)
defer wfs.mapPbIdFromFilerToLocal(entry)
request := &filer_pb.UpdateEntryRequest{
Directory: parentDir,
Entry: entry,
Signatures: []int32{wfs.signature},
}
glog.V(1).Infof("save entry: %v", request)
_, err := client.UpdateEntry(context.Background(), request)
if err != nil {
return fmt.Errorf("UpdateEntry dir %s: %v", path, err)
}
if err := wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
return fmt.Errorf("UpdateEntry dir %s: %v", path, err)
}
return nil
})
if err != nil {
glog.Errorf("saveEntry %s: %v", path, err)
return fuse.EIO
}
return fuse.OK
}
func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
if entry.Attributes == nil {
return
}
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
}
func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
if entry.Attributes == nil {
return
}
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
}
func checkName(name string) fuse.Status {
if len(name) >= 256 {
return fuse.Status(syscall.ENAMETOOLONG)
}
return fuse.OK
}
Loading…
Cancel
Save