Browse Source

to be re-written following fuse virtual file system

pull/2668/head
chrislu 3 years ago
parent
commit
f87da798a4
  1. 1
      go.mod
  2. 4
      weed/Makefile
  3. 2
      weed/command/mount2_std.go
  4. 42
      weed/mount/directory.go
  5. 84
      weed/mount/directory_read.go
  6. 34
      weed/mount/weedfs.go
  7. 6
      weed/mount/weedfs_stats.go

1
go.mod

@ -164,7 +164,6 @@ require (
require ( require (
github.com/fluent/fluent-logger-golang v1.8.0 github.com/fluent/fluent-logger-golang v1.8.0
github.com/hanwen/go-fuse v1.0.0
github.com/hanwen/go-fuse/v2 v2.1.0 github.com/hanwen/go-fuse/v2 v2.1.0
) )

4
weed/Makefile

@ -21,6 +21,10 @@ debug_mount:
go build -gcflags="all=-N -l" go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ -umask=000 dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ -umask=000
debug_mount2:
go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount2 -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ -umask=000
debug_server: debug_server:
go build -gcflags="all=-N -l" go build -gcflags="all=-N -l"
dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=~/tmp/99 -filer -volume.port=8343 -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1 dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=~/tmp/99 -filer -volume.port=8343 -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1

2
weed/command/mount2_std.go

@ -207,7 +207,7 @@ func RunMount2(option *Mount2Options, umask os.FileMode) bool {
UidGidMapper: uidGidMapper, UidGidMapper: uidGidMapper,
}) })
server, err := fs.Mount(dir, seaweedFileSystem, opts)
server, err := fs.Mount(dir, seaweedFileSystem.Root(), opts)
if err != nil { if err != nil {
glog.Fatalf("Mount fail: %v", err) glog.Fatalf("Mount fail: %v", err)
} }

42
weed/mount/directory.go

@ -0,0 +1,42 @@
package mount
import (
"bytes"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fs"
"strings"
)
type Directory struct {
fs.Inode
name string
wfs *WFS
entry *filer_pb.Entry
parent *Directory
id uint64
}
func (dir *Directory) FullPath() string {
var parts []string
for p := dir; p != nil; p = p.parent {
if strings.HasPrefix(p.name, "/") {
if len(p.name) > 1 {
parts = append(parts, p.name[1:])
}
} else {
parts = append(parts, p.name)
}
}
if len(parts) == 0 {
return "/"
}
var buf bytes.Buffer
for i := len(parts) - 1; i >= 0; i-- {
buf.WriteString("/")
buf.WriteString(parts[i])
}
return buf.String()
}

84
weed/mount/directory_read.go

@ -0,0 +1,84 @@
package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
"math"
"os"
"syscall"
)
var _ = fs.NodeReaddirer(&Directory{})
var _ = fs.NodeGetattrer(&Directory{})
func (dir *Directory) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
out.Mode = 0755
return 0
}
func (dir *Directory) Readdir(ctx context.Context) (fs.DirStream, syscall.Errno) {
dirPath := util.FullPath(dir.FullPath())
glog.V(4).Infof("Readdir %s", dirPath)
sourceChan := make(chan fuse.DirEntry, 64)
stream := newDirectoryListStream(sourceChan)
processEachEntryFn := func(entry *filer.Entry, isLast bool) {
sourceChan <- fuse.DirEntry{
Mode: uint32(entry.Mode),
Name: entry.Name(),
Ino: dirPath.Child(entry.Name()).AsInode(os.ModeDir),
}
}
if err := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fs.ToErrno(os.ErrInvalid)
}
go func() {
dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
processEachEntryFn(entry, false)
return true
})
close(sourceChan)
}()
return stream, fs.OK
}
var _ = fs.DirStream(&DirectoryListStream{})
type DirectoryListStream struct {
next fuse.DirEntry
sourceChan chan fuse.DirEntry
isStarted bool
hasNext bool
}
func newDirectoryListStream(ch chan fuse.DirEntry) *DirectoryListStream {
return &DirectoryListStream{
sourceChan: ch,
}
}
func (i *DirectoryListStream) HasNext() bool {
if !i.isStarted {
i.next, i.hasNext = <-i.sourceChan
i.isStarted = true
}
return i.hasNext
}
func (i *DirectoryListStream) Next() (fuse.DirEntry, syscall.Errno) {
t := i.next
i.next, i.hasNext = <-i.sourceChan
return t, fs.OK
}
func (i *DirectoryListStream) Close() {
}

34
weed/mount/weedfs.go

@ -1,7 +1,6 @@
package mount package mount
import ( import (
"context"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -12,11 +11,9 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"syscall"
"time" "time"
"github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
) )
type Option struct { type Option struct {
@ -56,6 +53,7 @@ type WFS struct {
option *Option option *Option
metaCache *meta_cache.MetaCache metaCache *meta_cache.MetaCache
stats statsCache stats statsCache
root Directory
signature int32 signature int32
} }
@ -65,6 +63,13 @@ func NewSeaweedFileSystem(option *Option) *WFS {
signature: util.RandomInt32(), signature: util.RandomInt32(),
} }
wfs.root = Directory{
name: "/",
wfs: wfs,
entry: nil,
parent: nil,
}
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) { wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) {
}) })
grace.OnInterrupt(func() { grace.OnInterrupt(func() {
@ -74,6 +79,10 @@ func NewSeaweedFileSystem(option *Option) *WFS {
return wfs return wfs
} }
func (wfs *WFS) Root() *Directory {
return &wfs.root
}
func (option *Option) setupUniqueCacheDirectory() { func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8] cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId) option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
@ -87,22 +96,3 @@ func (option *Option) getTempFilePageDir() string {
func (option *Option) getUniqueCacheDir() string { func (option *Option) getUniqueCacheDir() string {
return option.uniqueCacheDir return option.uniqueCacheDir
} }
func (r *WFS) OnAdd(ctx context.Context) {
ch := r.NewPersistentInode(
ctx, &fs.MemRegularFile{
Data: []byte("file.txt"),
Attr: fuse.Attr{
Mode: 0644,
},
}, fs.StableAttr{Ino: 2})
r.AddChild("file.txt", ch, false)
}
func (r *WFS) Getattr(ctx context.Context, fh fs.FileHandle, out *fuse.AttrOut) syscall.Errno {
out.Mode = 0755
return 0
}
var _ = (fs.NodeGetattrer)((*WFS)(nil))
var _ = (fs.NodeOnAdder)((*WFS)(nil))

6
weed/mount/weedfs_stats.go

@ -15,14 +15,16 @@ import (
const blockSize = 512 const blockSize = 512
var _ = fs.NodeStatfser(&WFS{})
var _ = fs.NodeStatfser(&Directory{})
type statsCache struct { type statsCache struct {
filer_pb.StatisticsResponse filer_pb.StatisticsResponse
lastChecked int64 // unix time in seconds lastChecked int64 // unix time in seconds
} }
func (wfs *WFS) Statfs(ctx context.Context, out *fuse.StatfsOut) syscall.Errno {
func (dir *Directory) Statfs(ctx context.Context, out *fuse.StatfsOut) syscall.Errno {
wfs := dir.wfs
glog.V(4).Infof("reading fs stats") glog.V(4).Infof("reading fs stats")

Loading…
Cancel
Save