You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

231 lines
6.2 KiB

7 years ago
5 years ago
5 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "path"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/seaweedfs/fuse"
  13. "github.com/seaweedfs/fuse/fs"
  14. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  15. "github.com/chrislusf/seaweedfs/weed/glog"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/util"
  18. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  19. )
  20. type Option struct {
  21. FilerGrpcAddress string
  22. GrpcDialOption grpc.DialOption
  23. FilerMountRootPath string
  24. Collection string
  25. Replication string
  26. TtlSec int32
  27. ChunkSizeLimit int64
  28. CacheDir string
  29. CacheSizeMB int64
  30. DataCenter string
  31. EntryCacheTtl time.Duration
  32. Umask os.FileMode
  33. MountUid uint32
  34. MountGid uint32
  35. MountMode os.FileMode
  36. MountCtime time.Time
  37. MountMtime time.Time
  38. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  39. Cipher bool // whether encrypt data on volume server
  40. UidGidMapper *meta_cache.UidGidMapper
  41. }
  42. var _ = fs.FS(&WFS{})
  43. var _ = fs.FSStatfser(&WFS{})
  44. type WFS struct {
  45. option *Option
  46. // contains all open handles, protected by handlesLock
  47. handlesLock sync.Mutex
  48. handles map[uint64]*FileHandle
  49. bufPool sync.Pool
  50. stats statsCache
  51. root fs.Node
  52. fsNodeCache *FsCache
  53. chunkCache *chunk_cache.TieredChunkCache
  54. metaCache *meta_cache.MetaCache
  55. signature int32
  56. }
  57. type statsCache struct {
  58. filer_pb.StatisticsResponse
  59. lastChecked int64 // unix time in seconds
  60. }
  61. func NewSeaweedFileSystem(option *Option) *WFS {
  62. wfs := &WFS{
  63. option: option,
  64. handles: make(map[uint64]*FileHandle),
  65. bufPool: sync.Pool{
  66. New: func() interface{} {
  67. return make([]byte, option.ChunkSizeLimit)
  68. },
  69. },
  70. signature: util.RandomInt32(),
  71. }
  72. cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
  73. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  74. if option.CacheSizeMB > 0 {
  75. os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
  76. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  77. }
  78. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
  79. fsNode := wfs.fsNodeCache.GetFsNode(filePath)
  80. if fsNode != nil {
  81. if file, ok := fsNode.(*File); ok {
  82. file.entry = nil
  83. }
  84. }
  85. })
  86. startTime := time.Now()
  87. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  88. grace.OnInterrupt(func() {
  89. wfs.metaCache.Shutdown()
  90. })
  91. entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
  92. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
  93. wfs.fsNodeCache = newFsCache(wfs.root)
  94. return wfs
  95. }
  96. func (wfs *WFS) Root() (fs.Node, error) {
  97. return wfs.root, nil
  98. }
  99. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  100. fullpath := file.fullpath()
  101. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  102. wfs.handlesLock.Lock()
  103. defer wfs.handlesLock.Unlock()
  104. inodeId := file.fullpath().AsInode()
  105. if file.isOpen > 0 {
  106. existingHandle, found := wfs.handles[inodeId]
  107. if found && existingHandle != nil {
  108. file.isOpen++
  109. return existingHandle
  110. }
  111. }
  112. fileHandle = newFileHandle(file, uid, gid)
  113. file.maybeLoadEntry(context.Background())
  114. file.isOpen++
  115. wfs.handles[inodeId] = fileHandle
  116. fileHandle.handle = inodeId
  117. return
  118. }
  119. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  120. wfs.handlesLock.Lock()
  121. defer wfs.handlesLock.Unlock()
  122. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  123. delete(wfs.handles, fullpath.AsInode())
  124. return
  125. }
  126. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  127. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  128. glog.V(4).Infof("reading fs stats: %+v", req)
  129. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  130. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  131. request := &filer_pb.StatisticsRequest{
  132. Collection: wfs.option.Collection,
  133. Replication: wfs.option.Replication,
  134. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  135. }
  136. glog.V(4).Infof("reading filer stats: %+v", request)
  137. resp, err := client.Statistics(context.Background(), request)
  138. if err != nil {
  139. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  140. return err
  141. }
  142. glog.V(4).Infof("read filer stats: %+v", resp)
  143. wfs.stats.TotalSize = resp.TotalSize
  144. wfs.stats.UsedSize = resp.UsedSize
  145. wfs.stats.FileCount = resp.FileCount
  146. wfs.stats.lastChecked = time.Now().Unix()
  147. return nil
  148. })
  149. if err != nil {
  150. glog.V(0).Infof("filer Statistics: %v", err)
  151. return err
  152. }
  153. }
  154. totalDiskSize := wfs.stats.TotalSize
  155. usedDiskSize := wfs.stats.UsedSize
  156. actualFileCount := wfs.stats.FileCount
  157. // Compute the total number of available blocks
  158. resp.Blocks = totalDiskSize / blockSize
  159. // Compute the number of used blocks
  160. numBlocks := uint64(usedDiskSize / blockSize)
  161. // Report the number of free and available blocks for the block size
  162. resp.Bfree = resp.Blocks - numBlocks
  163. resp.Bavail = resp.Blocks - numBlocks
  164. resp.Bsize = uint32(blockSize)
  165. // Report the total number of possible files in the file system (and those free)
  166. resp.Files = math.MaxInt64
  167. resp.Ffree = math.MaxInt64 - actualFileCount
  168. // Report the maximum length of a name and the minimum fragment size
  169. resp.Namelen = 1024
  170. resp.Frsize = uint32(blockSize)
  171. return nil
  172. }
  173. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  174. if entry.Attributes == nil {
  175. return
  176. }
  177. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  178. }
  179. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  180. if entry.Attributes == nil {
  181. return
  182. }
  183. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  184. }