You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 lines
6.5 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package mount
  2. import (
  3. "context"
  4. "github.com/hanwen/go-fuse/v2/fuse"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  13. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  14. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  15. "google.golang.org/grpc"
  16. "math/rand"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "time"
  21. "github.com/hanwen/go-fuse/v2/fs"
  22. )
  23. type Option struct {
  24. MountDirectory string
  25. FilerAddresses []pb.ServerAddress
  26. filerIndex int
  27. GrpcDialOption grpc.DialOption
  28. FilerMountRootPath string
  29. Collection string
  30. Replication string
  31. TtlSec int32
  32. DiskType types.DiskType
  33. ChunkSizeLimit int64
  34. ConcurrentWriters int
  35. CacheDir string
  36. CacheSizeMB int64
  37. DataCenter string
  38. Umask os.FileMode
  39. Quota int64
  40. DisableXAttr bool
  41. MountUid uint32
  42. MountGid uint32
  43. MountMode os.FileMode
  44. MountCtime time.Time
  45. MountMtime time.Time
  46. MountParentInode uint64
  47. VolumeServerAccess string // how to access volume servers
  48. Cipher bool // whether encrypt data on volume server
  49. UidGidMapper *meta_cache.UidGidMapper
  50. uniqueCacheDir string
  51. uniqueCacheTempPageDir string
  52. }
  53. type WFS struct {
  54. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  55. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  56. fuse.RawFileSystem
  57. mount_pb.UnimplementedSeaweedMountServer
  58. fs.Inode
  59. option *Option
  60. metaCache *meta_cache.MetaCache
  61. stats statsCache
  62. chunkCache *chunk_cache.TieredChunkCache
  63. signature int32
  64. concurrentWriters *util.LimitedConcurrentExecutor
  65. inodeToPath *InodeToPath
  66. fhmap *FileHandleToInode
  67. dhmap *DirectoryHandleToInode
  68. fuseServer *fuse.Server
  69. IsOverQuota bool
  70. }
  71. func NewSeaweedFileSystem(option *Option) *WFS {
  72. wfs := &WFS{
  73. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  74. option: option,
  75. signature: util.RandomInt32(),
  76. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  77. fhmap: NewFileHandleToInode(),
  78. dhmap: NewDirectoryHandleToInode(),
  79. }
  80. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  81. wfs.option.setupUniqueCacheDirectory()
  82. if option.CacheSizeMB > 0 {
  83. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  84. }
  85. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper,
  86. util.FullPath(option.FilerMountRootPath),
  87. func(path util.FullPath) {
  88. wfs.inodeToPath.MarkChildrenCached(path)
  89. }, func(path util.FullPath) bool {
  90. return wfs.inodeToPath.IsChildrenCached(path)
  91. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  92. })
  93. grace.OnInterrupt(func() {
  94. wfs.metaCache.Shutdown()
  95. os.RemoveAll(option.getUniqueCacheDir())
  96. })
  97. if wfs.option.ConcurrentWriters > 0 {
  98. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  99. }
  100. return wfs
  101. }
  102. func (wfs *WFS) StartBackgroundTasks() {
  103. startTime := time.Now()
  104. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  105. go wfs.loopCheckQuota()
  106. }
  107. func (wfs *WFS) String() string {
  108. return "seaweedfs"
  109. }
  110. func (wfs *WFS) Init(server *fuse.Server) {
  111. wfs.fuseServer = server
  112. }
  113. func (wfs *WFS) maybeReadEntry(inode uint64, followSymLink bool) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, targetInode uint64, status fuse.Status) {
  114. path, status = wfs.inodeToPath.GetPath(inode)
  115. if status != fuse.OK {
  116. return
  117. }
  118. var found bool
  119. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  120. entry = fh.GetEntry()
  121. if entry != nil && fh.entry.Attributes == nil {
  122. entry.Attributes = &filer_pb.FuseAttributes{}
  123. }
  124. } else {
  125. entry, status = wfs.maybeLoadEntry(path)
  126. }
  127. targetInode = inode
  128. if status == fuse.OK && followSymLink && entry.FileMode()&os.ModeSymlink != 0 {
  129. if entry != nil && entry.Attributes != nil && entry.Attributes.Inode != 0 {
  130. targetInode = entry.Attributes.Inode
  131. }
  132. target := util.FullPath(filepath.Join(string(path), "../"+entry.Attributes.SymlinkTarget))
  133. targetParent, _ := target.DirAndName()
  134. wfs.inodeToPath.EnsurePath(util.FullPath(targetParent), true)
  135. entry, status = wfs.maybeLoadEntry(target)
  136. }
  137. return
  138. }
  139. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  140. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  141. dir, name := fullpath.DirAndName()
  142. // return a valid entry for the mount root
  143. if string(fullpath) == wfs.option.FilerMountRootPath {
  144. return &filer_pb.Entry{
  145. Name: name,
  146. IsDirectory: true,
  147. Attributes: &filer_pb.FuseAttributes{
  148. Mtime: wfs.option.MountMtime.Unix(),
  149. FileMode: uint32(wfs.option.MountMode),
  150. Uid: wfs.option.MountUid,
  151. Gid: wfs.option.MountGid,
  152. Crtime: wfs.option.MountCtime.Unix(),
  153. },
  154. }, fuse.OK
  155. }
  156. // read from async meta cache
  157. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  158. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  159. if cacheErr == filer_pb.ErrNotFound {
  160. return nil, fuse.ENOENT
  161. }
  162. return cachedEntry.ToProtoEntry(), fuse.OK
  163. }
  164. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  165. if wfs.option.VolumeServerAccess == "filerProxy" {
  166. return func(fileId string) (targetUrls []string, err error) {
  167. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  168. }
  169. }
  170. return filer.LookupFn(wfs)
  171. }
  172. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  173. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  174. }
  175. func (option *Option) setupUniqueCacheDirectory() {
  176. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  177. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  178. option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "swap")
  179. os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
  180. }
  181. func (option *Option) getTempFilePageDir() string {
  182. return option.uniqueCacheTempPageDir
  183. }
  184. func (option *Option) getUniqueCacheDir() string {
  185. return option.uniqueCacheDir
  186. }