You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
6.0 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package mount
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  12. "github.com/chrislusf/seaweedfs/weed/util/grace"
  13. "github.com/chrislusf/seaweedfs/weed/wdclient"
  14. "github.com/hanwen/go-fuse/v2/fuse"
  15. "google.golang.org/grpc"
  16. "math/rand"
  17. "os"
  18. "path"
  19. "path/filepath"
  20. "time"
  21. "github.com/hanwen/go-fuse/v2/fs"
  22. )
  23. type Option struct {
  24. MountDirectory string
  25. FilerAddresses []pb.ServerAddress
  26. filerIndex int
  27. GrpcDialOption grpc.DialOption
  28. FilerMountRootPath string
  29. Collection string
  30. Replication string
  31. TtlSec int32
  32. DiskType types.DiskType
  33. ChunkSizeLimit int64
  34. ConcurrentWriters int
  35. CacheDir string
  36. CacheSizeMB int64
  37. DataCenter string
  38. Umask os.FileMode
  39. Quota int64
  40. MountUid uint32
  41. MountGid uint32
  42. MountMode os.FileMode
  43. MountCtime time.Time
  44. MountMtime time.Time
  45. MountParentInode uint64
  46. VolumeServerAccess string // how to access volume servers
  47. Cipher bool // whether encrypt data on volume server
  48. UidGidMapper *meta_cache.UidGidMapper
  49. uniqueCacheDir string
  50. uniqueCacheTempPageDir string
  51. }
  52. type WFS struct {
  53. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  54. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  55. fuse.RawFileSystem
  56. mount_pb.UnimplementedSeaweedMountServer
  57. fs.Inode
  58. option *Option
  59. metaCache *meta_cache.MetaCache
  60. stats statsCache
  61. chunkCache *chunk_cache.TieredChunkCache
  62. signature int32
  63. concurrentWriters *util.LimitedConcurrentExecutor
  64. inodeToPath *InodeToPath
  65. fhmap *FileHandleToInode
  66. dhmap *DirectoryHandleToInode
  67. fuseServer *fuse.Server
  68. IsOverQuota bool
  69. }
  70. func NewSeaweedFileSystem(option *Option) *WFS {
  71. wfs := &WFS{
  72. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  73. option: option,
  74. signature: util.RandomInt32(),
  75. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  76. fhmap: NewFileHandleToInode(),
  77. dhmap: NewDirectoryHandleToInode(),
  78. }
  79. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  80. wfs.option.setupUniqueCacheDirectory()
  81. if option.CacheSizeMB > 0 {
  82. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  83. }
  84. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper,
  85. util.FullPath(option.FilerMountRootPath),
  86. func(path util.FullPath) {
  87. wfs.inodeToPath.MarkChildrenCached(path)
  88. }, func(path util.FullPath) bool {
  89. return wfs.inodeToPath.IsChildrenCached(path)
  90. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  91. })
  92. grace.OnInterrupt(func() {
  93. wfs.metaCache.Shutdown()
  94. os.RemoveAll(option.getUniqueCacheDir())
  95. })
  96. if wfs.option.ConcurrentWriters > 0 {
  97. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  98. }
  99. return wfs
  100. }
  101. func (wfs *WFS) StartBackgroundTasks() {
  102. startTime := time.Now()
  103. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  104. go wfs.loopCheckQuota()
  105. }
  106. func (wfs *WFS) String() string {
  107. return "seaweedfs"
  108. }
  109. func (wfs *WFS) Init(server *fuse.Server) {
  110. wfs.fuseServer = server
  111. }
  112. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  113. path, status = wfs.inodeToPath.GetPath(inode)
  114. if status != fuse.OK {
  115. return
  116. }
  117. var found bool
  118. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  119. if fh.entry.Attributes == nil {
  120. fh.entry.Attributes = &filer_pb.FuseAttributes{}
  121. }
  122. return path, fh, fh.entry, fuse.OK
  123. }
  124. entry, status = wfs.maybeLoadEntry(path)
  125. return
  126. }
  127. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  128. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  129. dir, name := fullpath.DirAndName()
  130. // return a valid entry for the mount root
  131. if string(fullpath) == wfs.option.FilerMountRootPath {
  132. return &filer_pb.Entry{
  133. Name: name,
  134. IsDirectory: true,
  135. Attributes: &filer_pb.FuseAttributes{
  136. Mtime: wfs.option.MountMtime.Unix(),
  137. FileMode: uint32(wfs.option.MountMode),
  138. Uid: wfs.option.MountUid,
  139. Gid: wfs.option.MountGid,
  140. Crtime: wfs.option.MountCtime.Unix(),
  141. },
  142. }, fuse.OK
  143. }
  144. // read from async meta cache
  145. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  146. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  147. if cacheErr == filer_pb.ErrNotFound {
  148. return nil, fuse.ENOENT
  149. }
  150. return cachedEntry.ToProtoEntry(), fuse.OK
  151. }
  152. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  153. if wfs.option.VolumeServerAccess == "filerProxy" {
  154. return func(fileId string) (targetUrls []string, err error) {
  155. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  156. }
  157. }
  158. return filer.LookupFn(wfs)
  159. }
  160. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  161. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  162. }
  163. func (option *Option) setupUniqueCacheDirectory() {
  164. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  165. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  166. option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "swap")
  167. os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
  168. }
  169. func (option *Option) getTempFilePageDir() string {
  170. return option.uniqueCacheTempPageDir
  171. }
  172. func (option *Option) getUniqueCacheDir() string {
  173. return option.uniqueCacheDir
  174. }