You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

185 lines
5.5 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package mount
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/types"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/wdclient"
  13. "github.com/hanwen/go-fuse/v2/fuse"
  14. "google.golang.org/grpc"
  15. "math/rand"
  16. "os"
  17. "path"
  18. "time"
  19. "github.com/hanwen/go-fuse/v2/fs"
  20. )
  21. type Option struct {
  22. MountDirectory string
  23. FilerAddresses []pb.ServerAddress
  24. filerIndex int
  25. GrpcDialOption grpc.DialOption
  26. FilerMountRootPath string
  27. Collection string
  28. Replication string
  29. TtlSec int32
  30. DiskType types.DiskType
  31. ChunkSizeLimit int64
  32. ConcurrentWriters int
  33. CacheDir string
  34. CacheSizeMB int64
  35. DataCenter string
  36. Umask os.FileMode
  37. Quota int64
  38. MountUid uint32
  39. MountGid uint32
  40. MountMode os.FileMode
  41. MountCtime time.Time
  42. MountMtime time.Time
  43. MountParentInode uint64
  44. VolumeServerAccess string // how to access volume servers
  45. Cipher bool // whether encrypt data on volume server
  46. UidGidMapper *meta_cache.UidGidMapper
  47. uniqueCacheDir string
  48. }
  49. type WFS struct {
  50. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  51. fuse.RawFileSystem
  52. fs.Inode
  53. option *Option
  54. metaCache *meta_cache.MetaCache
  55. stats statsCache
  56. chunkCache *chunk_cache.TieredChunkCache
  57. signature int32
  58. concurrentWriters *util.LimitedConcurrentExecutor
  59. inodeToPath *InodeToPath
  60. fhmap *FileHandleToInode
  61. dhmap *DirectoryHandleToInode
  62. fuseServer *fuse.Server
  63. IsOverQuota bool
  64. }
  65. func NewSeaweedFileSystem(option *Option) *WFS {
  66. wfs := &WFS{
  67. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  68. option: option,
  69. signature: util.RandomInt32(),
  70. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  71. fhmap: NewFileHandleToInode(),
  72. dhmap: NewDirectoryHandleToInode(),
  73. }
  74. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  75. wfs.option.setupUniqueCacheDirectory()
  76. if option.CacheSizeMB > 0 {
  77. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  78. }
  79. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper,
  80. util.FullPath(option.FilerMountRootPath),
  81. func(path util.FullPath) {
  82. wfs.inodeToPath.MarkChildrenCached(path)
  83. }, func(path util.FullPath) bool {
  84. return wfs.inodeToPath.IsChildrenCached(path)
  85. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  86. })
  87. grace.OnInterrupt(func() {
  88. wfs.metaCache.Shutdown()
  89. os.RemoveAll(option.getUniqueCacheDir())
  90. })
  91. if wfs.option.ConcurrentWriters > 0 {
  92. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  93. }
  94. return wfs
  95. }
  96. func (wfs *WFS) StartBackgroundTasks() {
  97. startTime := time.Now()
  98. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  99. go wfs.loopCheckQuota()
  100. }
  101. func (wfs *WFS) String() string {
  102. return "seaweedfs"
  103. }
  104. func (wfs *WFS) Init(server *fuse.Server) {
  105. wfs.fuseServer = server
  106. }
  107. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  108. path, status = wfs.inodeToPath.GetPath(inode)
  109. if status != fuse.OK {
  110. return
  111. }
  112. var found bool
  113. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  114. return path, fh, fh.entry, fuse.OK
  115. }
  116. entry, status = wfs.maybeLoadEntry(path)
  117. return
  118. }
  119. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  120. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  121. dir, name := fullpath.DirAndName()
  122. // return a valid entry for the mount root
  123. if string(fullpath) == wfs.option.FilerMountRootPath {
  124. return &filer_pb.Entry{
  125. Name: name,
  126. IsDirectory: true,
  127. Attributes: &filer_pb.FuseAttributes{
  128. Mtime: wfs.option.MountMtime.Unix(),
  129. FileMode: uint32(wfs.option.MountMode),
  130. Uid: wfs.option.MountUid,
  131. Gid: wfs.option.MountGid,
  132. Crtime: wfs.option.MountCtime.Unix(),
  133. },
  134. }, fuse.OK
  135. }
  136. // read from async meta cache
  137. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir), nil)
  138. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  139. if cacheErr == filer_pb.ErrNotFound {
  140. return nil, fuse.ENOENT
  141. }
  142. return cachedEntry.ToProtoEntry(), fuse.OK
  143. }
  144. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  145. if wfs.option.VolumeServerAccess == "filerProxy" {
  146. return func(fileId string) (targetUrls []string, err error) {
  147. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  148. }
  149. }
  150. return filer.LookupFn(wfs)
  151. }
  152. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  153. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  154. }
  155. func (option *Option) setupUniqueCacheDirectory() {
  156. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  157. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  158. os.MkdirAll(option.uniqueCacheDir, os.FileMode(0777)&^option.Umask)
  159. }
  160. func (option *Option) getUniqueCacheDir() string {
  161. return option.uniqueCacheDir
  162. }