You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
7.3 KiB

3 years ago
4 months ago
3 years ago
3 years ago
3 years ago
3 years ago
1 year ago
3 years ago
3 years ago
3 years ago
1 year ago
3 years ago
3 years ago
3 years ago
4 months ago
3 years ago
4 months ago
3 years ago
3 years ago
1 year ago
3 years ago
1 year ago
4 months ago
4 months ago
4 months ago
4 months ago
3 years ago
1 year ago
1 year ago
3 years ago
3 years ago
3 years ago
4 months ago
4 months ago
3 years ago
3 years ago
3 years ago
3 years ago
1 year ago
1 year ago
3 years ago
1 year ago
3 years ago
  1. package mount
  2. import (
  3. "context"
  4. "errors"
  5. "math/rand"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "sync/atomic"
  10. "time"
  11. "github.com/hanwen/go-fuse/v2/fuse"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/filer"
  14. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  21. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  22. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  23. "github.com/hanwen/go-fuse/v2/fs"
  24. )
  25. type Option struct {
  26. filerIndex int32 // align memory for atomic read/write
  27. FilerAddresses []pb.ServerAddress
  28. MountDirectory string
  29. GrpcDialOption grpc.DialOption
  30. FilerMountRootPath string
  31. Collection string
  32. Replication string
  33. TtlSec int32
  34. DiskType types.DiskType
  35. ChunkSizeLimit int64
  36. ConcurrentWriters int
  37. CacheDirForRead string
  38. CacheSizeMBForRead int64
  39. CacheDirForWrite string
  40. DataCenter string
  41. Umask os.FileMode
  42. Quota int64
  43. DisableXAttr bool
  44. WriteOnceReadMany bool
  45. MountUid uint32
  46. MountGid uint32
  47. MountMode os.FileMode
  48. MountCtime time.Time
  49. MountMtime time.Time
  50. MountParentInode uint64
  51. VolumeServerAccess string // how to access volume servers
  52. Cipher bool // whether encrypt data on volume server
  53. UidGidMapper *meta_cache.UidGidMapper
  54. uniqueCacheDirForRead string
  55. uniqueCacheDirForWrite string
  56. }
  57. type WFS struct {
  58. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  59. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  60. fuse.RawFileSystem
  61. mount_pb.UnimplementedSeaweedMountServer
  62. fs.Inode
  63. option *Option
  64. metaCache *meta_cache.MetaCache
  65. stats statsCache
  66. chunkCache *chunk_cache.TieredChunkCache
  67. signature int32
  68. concurrentWriters *util.LimitedConcurrentExecutor
  69. inodeToPath *InodeToPath
  70. fhMap *FileHandleToInode
  71. dhMap *DirectoryHandleToInode
  72. fuseServer *fuse.Server
  73. IsOverQuota bool
  74. fhLockTable *util.LockTable[FileHandleId]
  75. }
  76. func NewSeaweedFileSystem(option *Option) *WFS {
  77. wfs := &WFS{
  78. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  79. option: option,
  80. signature: util.RandomInt32(),
  81. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  82. fhMap: NewFileHandleToInode(),
  83. dhMap: NewDirectoryHandleToInode(),
  84. fhLockTable: util.NewLockTable[FileHandleId](),
  85. }
  86. wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
  87. wfs.option.setupUniqueCacheDirectory()
  88. if option.CacheSizeMBForRead > 0 {
  89. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
  90. }
  91. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
  92. util.FullPath(option.FilerMountRootPath),
  93. func(path util.FullPath) {
  94. wfs.inodeToPath.MarkChildrenCached(path)
  95. }, func(path util.FullPath) bool {
  96. return wfs.inodeToPath.IsChildrenCached(path)
  97. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  98. // Find inode if it is not a deleted path
  99. if inode, inodeFound := wfs.inodeToPath.GetInode(filePath); inodeFound {
  100. // Find open file handle
  101. if fh, fhFound := wfs.fhMap.FindFileHandle(inode); fhFound {
  102. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
  103. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  104. // Recreate dirty pages
  105. fh.dirtyPages.Destroy()
  106. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  107. // Update handle entry
  108. newEntry, status := wfs.maybeLoadEntry(filePath)
  109. if status == fuse.OK {
  110. if fh.GetEntry().GetEntry() != newEntry {
  111. fh.SetEntry(newEntry)
  112. }
  113. }
  114. }
  115. }
  116. })
  117. grace.OnInterrupt(func() {
  118. wfs.metaCache.Shutdown()
  119. os.RemoveAll(option.getUniqueCacheDirForWrite())
  120. os.RemoveAll(option.getUniqueCacheDirForRead())
  121. })
  122. if wfs.option.ConcurrentWriters > 0 {
  123. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  124. }
  125. return wfs
  126. }
  127. func (wfs *WFS) StartBackgroundTasks() {
  128. startTime := time.Now()
  129. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  130. go wfs.loopCheckQuota()
  131. }
  132. func (wfs *WFS) String() string {
  133. return "seaweedfs"
  134. }
  135. func (wfs *WFS) Init(server *fuse.Server) {
  136. wfs.fuseServer = server
  137. }
  138. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  139. path, status = wfs.inodeToPath.GetPath(inode)
  140. if status != fuse.OK {
  141. return
  142. }
  143. var found bool
  144. if fh, found = wfs.fhMap.FindFileHandle(inode); found {
  145. entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
  146. if entry != nil && fh.entry.Attributes == nil {
  147. entry.Attributes = &filer_pb.FuseAttributes{}
  148. }
  149. })
  150. } else {
  151. entry, status = wfs.maybeLoadEntry(path)
  152. }
  153. return
  154. }
  155. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  156. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  157. dir, name := fullpath.DirAndName()
  158. // return a valid entry for the mount root
  159. if string(fullpath) == wfs.option.FilerMountRootPath {
  160. return &filer_pb.Entry{
  161. Name: name,
  162. IsDirectory: true,
  163. Attributes: &filer_pb.FuseAttributes{
  164. Mtime: wfs.option.MountMtime.Unix(),
  165. FileMode: uint32(wfs.option.MountMode),
  166. Uid: wfs.option.MountUid,
  167. Gid: wfs.option.MountGid,
  168. Crtime: wfs.option.MountCtime.Unix(),
  169. },
  170. }, fuse.OK
  171. }
  172. // read from async meta cache
  173. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  174. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  175. if errors.Is(cacheErr, filer_pb.ErrNotFound) {
  176. return nil, fuse.ENOENT
  177. }
  178. return cachedEntry.ToProtoEntry(), fuse.OK
  179. }
  180. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  181. if wfs.option.VolumeServerAccess == "filerProxy" {
  182. return func(fileId string) (targetUrls []string, err error) {
  183. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  184. }
  185. }
  186. return filer.LookupFn(wfs)
  187. }
  188. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  189. i := atomic.LoadInt32(&wfs.option.filerIndex)
  190. return wfs.option.FilerAddresses[i]
  191. }
  192. func (option *Option) setupUniqueCacheDirectory() {
  193. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  194. option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
  195. os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
  196. option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
  197. os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
  198. }
  199. func (option *Option) getUniqueCacheDirForWrite() string {
  200. return option.uniqueCacheDirForWrite
  201. }
  202. func (option *Option) getUniqueCacheDirForRead() string {
  203. return option.uniqueCacheDirForRead
  204. }