You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

204 lines
5.2 KiB

7 years ago
5 years ago
5 years ago
6 years ago
5 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "path"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/seaweedfs/fuse"
  13. "github.com/seaweedfs/fuse/fs"
  14. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  15. "github.com/chrislusf/seaweedfs/weed/glog"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/util"
  18. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  19. )
  20. type Option struct {
  21. FilerGrpcAddress string
  22. GrpcDialOption grpc.DialOption
  23. FilerMountRootPath string
  24. Collection string
  25. Replication string
  26. TtlSec int32
  27. ChunkSizeLimit int64
  28. CacheDir string
  29. CacheSizeMB int64
  30. DataCenter string
  31. EntryCacheTtl time.Duration
  32. Umask os.FileMode
  33. MountUid uint32
  34. MountGid uint32
  35. MountMode os.FileMode
  36. MountCtime time.Time
  37. MountMtime time.Time
  38. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  39. Cipher bool // whether encrypt data on volume server
  40. }
  41. var _ = fs.FS(&WFS{})
  42. var _ = fs.FSStatfser(&WFS{})
  43. type WFS struct {
  44. option *Option
  45. // contains all open handles, protected by handlesLock
  46. handlesLock sync.Mutex
  47. handles map[uint64]*FileHandle
  48. bufPool sync.Pool
  49. stats statsCache
  50. root fs.Node
  51. chunkCache *chunk_cache.ChunkCache
  52. metaCache *meta_cache.MetaCache
  53. }
  54. type statsCache struct {
  55. filer_pb.StatisticsResponse
  56. lastChecked int64 // unix time in seconds
  57. }
  58. func NewSeaweedFileSystem(option *Option) *WFS {
  59. wfs := &WFS{
  60. option: option,
  61. handles: make(map[uint64]*FileHandle),
  62. bufPool: sync.Pool{
  63. New: func() interface{} {
  64. return make([]byte, option.ChunkSizeLimit)
  65. },
  66. },
  67. }
  68. cacheUniqueId := util.Base64Md5([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
  69. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  70. if option.CacheSizeMB > 0 {
  71. os.MkdirAll(cacheDir, 0755)
  72. wfs.chunkCache = chunk_cache.NewChunkCache(256, cacheDir, option.CacheSizeMB)
  73. grace.OnInterrupt(func() {
  74. wfs.chunkCache.Shutdown()
  75. })
  76. }
  77. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"))
  78. startTime := time.Now()
  79. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  80. grace.OnInterrupt(func() {
  81. wfs.metaCache.Shutdown()
  82. })
  83. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
  84. return wfs
  85. }
  86. func (wfs *WFS) Root() (fs.Node, error) {
  87. return wfs.root, nil
  88. }
  89. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  90. fullpath := file.fullpath()
  91. glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
  92. wfs.handlesLock.Lock()
  93. defer wfs.handlesLock.Unlock()
  94. inodeId := file.fullpath().AsInode()
  95. existingHandle, found := wfs.handles[inodeId]
  96. if found && existingHandle != nil {
  97. return existingHandle
  98. }
  99. fileHandle = newFileHandle(file, uid, gid)
  100. wfs.handles[inodeId] = fileHandle
  101. fileHandle.handle = inodeId
  102. glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
  103. return
  104. }
  105. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  106. wfs.handlesLock.Lock()
  107. defer wfs.handlesLock.Unlock()
  108. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  109. delete(wfs.handles, fullpath.AsInode())
  110. return
  111. }
  112. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  113. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  114. glog.V(4).Infof("reading fs stats: %+v", req)
  115. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  116. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  117. request := &filer_pb.StatisticsRequest{
  118. Collection: wfs.option.Collection,
  119. Replication: wfs.option.Replication,
  120. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  121. }
  122. glog.V(4).Infof("reading filer stats: %+v", request)
  123. resp, err := client.Statistics(context.Background(), request)
  124. if err != nil {
  125. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  126. return err
  127. }
  128. glog.V(4).Infof("read filer stats: %+v", resp)
  129. wfs.stats.TotalSize = resp.TotalSize
  130. wfs.stats.UsedSize = resp.UsedSize
  131. wfs.stats.FileCount = resp.FileCount
  132. wfs.stats.lastChecked = time.Now().Unix()
  133. return nil
  134. })
  135. if err != nil {
  136. glog.V(0).Infof("filer Statistics: %v", err)
  137. return err
  138. }
  139. }
  140. totalDiskSize := wfs.stats.TotalSize
  141. usedDiskSize := wfs.stats.UsedSize
  142. actualFileCount := wfs.stats.FileCount
  143. // Compute the total number of available blocks
  144. resp.Blocks = totalDiskSize / blockSize
  145. // Compute the number of used blocks
  146. numBlocks := uint64(usedDiskSize / blockSize)
  147. // Report the number of free and available blocks for the block size
  148. resp.Bfree = resp.Blocks - numBlocks
  149. resp.Bavail = resp.Blocks - numBlocks
  150. resp.Bsize = uint32(blockSize)
  151. // Report the total number of possible files in the file system (and those free)
  152. resp.Files = math.MaxInt64
  153. resp.Ffree = math.MaxInt64 - actualFileCount
  154. // Report the maximum length of a name and the minimum fragment size
  155. resp.Namelen = 1024
  156. resp.Frsize = uint32(blockSize)
  157. return nil
  158. }