You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

193 lines
5.8 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package mount
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/types"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/wdclient"
  13. "github.com/hanwen/go-fuse/v2/fuse"
  14. "google.golang.org/grpc"
  15. "math/rand"
  16. "os"
  17. "path"
  18. "path/filepath"
  19. "time"
  20. "github.com/hanwen/go-fuse/v2/fs"
  21. )
  22. type Option struct {
  23. MountDirectory string
  24. FilerAddresses []pb.ServerAddress
  25. filerIndex int
  26. GrpcDialOption grpc.DialOption
  27. FilerMountRootPath string
  28. Collection string
  29. Replication string
  30. TtlSec int32
  31. DiskType types.DiskType
  32. ChunkSizeLimit int64
  33. ConcurrentWriters int
  34. CacheDir string
  35. CacheSizeMB int64
  36. DataCenter string
  37. Umask os.FileMode
  38. Quota int64
  39. MountUid uint32
  40. MountGid uint32
  41. MountMode os.FileMode
  42. MountCtime time.Time
  43. MountMtime time.Time
  44. MountParentInode uint64
  45. VolumeServerAccess string // how to access volume servers
  46. Cipher bool // whether encrypt data on volume server
  47. UidGidMapper *meta_cache.UidGidMapper
  48. uniqueCacheDir string
  49. uniqueCacheTempPageDir string
  50. }
  51. type WFS struct {
  52. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  53. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  54. fuse.RawFileSystem
  55. fs.Inode
  56. option *Option
  57. metaCache *meta_cache.MetaCache
  58. stats statsCache
  59. chunkCache *chunk_cache.TieredChunkCache
  60. signature int32
  61. concurrentWriters *util.LimitedConcurrentExecutor
  62. inodeToPath *InodeToPath
  63. fhmap *FileHandleToInode
  64. dhmap *DirectoryHandleToInode
  65. fuseServer *fuse.Server
  66. IsOverQuota bool
  67. }
  68. func NewSeaweedFileSystem(option *Option) *WFS {
  69. wfs := &WFS{
  70. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  71. option: option,
  72. signature: util.RandomInt32(),
  73. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  74. fhmap: NewFileHandleToInode(),
  75. dhmap: NewDirectoryHandleToInode(),
  76. }
  77. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  78. wfs.option.setupUniqueCacheDirectory()
  79. if option.CacheSizeMB > 0 {
  80. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  81. }
  82. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper,
  83. util.FullPath(option.FilerMountRootPath),
  84. func(path util.FullPath) {
  85. wfs.inodeToPath.MarkChildrenCached(path)
  86. }, func(path util.FullPath) bool {
  87. return wfs.inodeToPath.IsChildrenCached(path)
  88. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  89. })
  90. grace.OnInterrupt(func() {
  91. wfs.metaCache.Shutdown()
  92. os.RemoveAll(option.getUniqueCacheDir())
  93. })
  94. if wfs.option.ConcurrentWriters > 0 {
  95. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  96. }
  97. return wfs
  98. }
  99. func (wfs *WFS) StartBackgroundTasks() {
  100. startTime := time.Now()
  101. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  102. go wfs.loopCheckQuota()
  103. }
  104. func (wfs *WFS) String() string {
  105. return "seaweedfs"
  106. }
  107. func (wfs *WFS) Init(server *fuse.Server) {
  108. wfs.fuseServer = server
  109. }
  110. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  111. path, status = wfs.inodeToPath.GetPath(inode)
  112. if status != fuse.OK {
  113. return
  114. }
  115. var found bool
  116. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  117. return path, fh, fh.entry, fuse.OK
  118. }
  119. entry, status = wfs.maybeLoadEntry(path)
  120. return
  121. }
  122. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  123. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  124. dir, name := fullpath.DirAndName()
  125. // return a valid entry for the mount root
  126. if string(fullpath) == wfs.option.FilerMountRootPath {
  127. return &filer_pb.Entry{
  128. Name: name,
  129. IsDirectory: true,
  130. Attributes: &filer_pb.FuseAttributes{
  131. Mtime: wfs.option.MountMtime.Unix(),
  132. FileMode: uint32(wfs.option.MountMode),
  133. Uid: wfs.option.MountUid,
  134. Gid: wfs.option.MountGid,
  135. Crtime: wfs.option.MountCtime.Unix(),
  136. },
  137. }, fuse.OK
  138. }
  139. // read from async meta cache
  140. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir), nil)
  141. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  142. if cacheErr == filer_pb.ErrNotFound {
  143. return nil, fuse.ENOENT
  144. }
  145. return cachedEntry.ToProtoEntry(), fuse.OK
  146. }
  147. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  148. if wfs.option.VolumeServerAccess == "filerProxy" {
  149. return func(fileId string) (targetUrls []string, err error) {
  150. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  151. }
  152. }
  153. return filer.LookupFn(wfs)
  154. }
  155. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  156. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  157. }
  158. func (option *Option) setupUniqueCacheDirectory() {
  159. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  160. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  161. option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "swap")
  162. os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
  163. }
  164. func (option *Option) getTempFilePageDir() string {
  165. return option.uniqueCacheTempPageDir
  166. }
  167. func (option *Option) getUniqueCacheDir() string {
  168. return option.uniqueCacheDir
  169. }