You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
5.9 KiB

7 years ago
5 years ago
5 years ago
6 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "path"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/seaweedfs/fuse"
  13. "github.com/seaweedfs/fuse/fs"
  14. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  15. "github.com/chrislusf/seaweedfs/weed/glog"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/util"
  18. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  19. )
  20. type Option struct {
  21. FilerGrpcAddress string
  22. GrpcDialOption grpc.DialOption
  23. FilerMountRootPath string
  24. Collection string
  25. Replication string
  26. TtlSec int32
  27. ChunkSizeLimit int64
  28. CacheDir string
  29. CacheSizeMB int64
  30. DataCenter string
  31. EntryCacheTtl time.Duration
  32. Umask os.FileMode
  33. MountCtime time.Time
  34. MountMtime time.Time
  35. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  36. Cipher bool // whether encrypt data on volume server
  37. UidGidMapper *meta_cache.UidGidMapper
  38. }
  39. var _ = fs.FS(&WFS{})
  40. var _ = fs.FSStatfser(&WFS{})
  41. type WFS struct {
  42. option *Option
  43. // contains all open handles, protected by handlesLock
  44. handlesLock sync.Mutex
  45. handles map[uint64]*FileHandle
  46. bufPool sync.Pool
  47. stats statsCache
  48. root fs.Node
  49. fsNodeCache *FsCache
  50. chunkCache *chunk_cache.TieredChunkCache
  51. metaCache *meta_cache.MetaCache
  52. signature int32
  53. }
  54. type statsCache struct {
  55. filer_pb.StatisticsResponse
  56. lastChecked int64 // unix time in seconds
  57. }
  58. func NewSeaweedFileSystem(option *Option) *WFS {
  59. wfs := &WFS{
  60. option: option,
  61. handles: make(map[uint64]*FileHandle),
  62. bufPool: sync.Pool{
  63. New: func() interface{} {
  64. return make([]byte, option.ChunkSizeLimit)
  65. },
  66. },
  67. signature: util.RandomInt32(),
  68. }
  69. cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
  70. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  71. if option.CacheSizeMB > 0 {
  72. os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
  73. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  74. }
  75. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), option.UidGidMapper)
  76. startTime := time.Now()
  77. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  78. grace.OnInterrupt(func() {
  79. wfs.metaCache.Shutdown()
  80. })
  81. entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
  82. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
  83. wfs.fsNodeCache = newFsCache(wfs.root)
  84. return wfs
  85. }
  86. func (wfs *WFS) Root() (fs.Node, error) {
  87. return wfs.root, nil
  88. }
  89. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  90. fullpath := file.fullpath()
  91. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  92. wfs.handlesLock.Lock()
  93. defer wfs.handlesLock.Unlock()
  94. inodeId := file.fullpath().AsInode()
  95. existingHandle, found := wfs.handles[inodeId]
  96. if found && existingHandle != nil {
  97. file.isOpen++
  98. return existingHandle
  99. }
  100. fileHandle = newFileHandle(file, uid, gid)
  101. file.maybeLoadEntry(context.Background())
  102. file.isOpen++
  103. wfs.handles[inodeId] = fileHandle
  104. fileHandle.handle = inodeId
  105. return
  106. }
  107. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  108. wfs.handlesLock.Lock()
  109. defer wfs.handlesLock.Unlock()
  110. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  111. delete(wfs.handles, fullpath.AsInode())
  112. return
  113. }
  114. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  115. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  116. glog.V(4).Infof("reading fs stats: %+v", req)
  117. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  118. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  119. request := &filer_pb.StatisticsRequest{
  120. Collection: wfs.option.Collection,
  121. Replication: wfs.option.Replication,
  122. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  123. }
  124. glog.V(4).Infof("reading filer stats: %+v", request)
  125. resp, err := client.Statistics(context.Background(), request)
  126. if err != nil {
  127. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  128. return err
  129. }
  130. glog.V(4).Infof("read filer stats: %+v", resp)
  131. wfs.stats.TotalSize = resp.TotalSize
  132. wfs.stats.UsedSize = resp.UsedSize
  133. wfs.stats.FileCount = resp.FileCount
  134. wfs.stats.lastChecked = time.Now().Unix()
  135. return nil
  136. })
  137. if err != nil {
  138. glog.V(0).Infof("filer Statistics: %v", err)
  139. return err
  140. }
  141. }
  142. totalDiskSize := wfs.stats.TotalSize
  143. usedDiskSize := wfs.stats.UsedSize
  144. actualFileCount := wfs.stats.FileCount
  145. // Compute the total number of available blocks
  146. resp.Blocks = totalDiskSize / blockSize
  147. // Compute the number of used blocks
  148. numBlocks := uint64(usedDiskSize / blockSize)
  149. // Report the number of free and available blocks for the block size
  150. resp.Bfree = resp.Blocks - numBlocks
  151. resp.Bavail = resp.Blocks - numBlocks
  152. resp.Bsize = uint32(blockSize)
  153. // Report the total number of possible files in the file system (and those free)
  154. resp.Files = math.MaxInt64
  155. resp.Ffree = math.MaxInt64 - actualFileCount
  156. // Report the maximum length of a name and the minimum fragment size
  157. resp.Namelen = 1024
  158. resp.Frsize = uint32(blockSize)
  159. return nil
  160. }
  161. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  162. if entry.Attributes == nil {
  163. return
  164. }
  165. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  166. }
  167. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  168. if entry.Attributes == nil {
  169. return
  170. }
  171. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  172. }