You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

248 lines
6.4 KiB

5 years ago
7 years ago
5 years ago
5 years ago
6 years ago
5 years ago
6 years ago
5 years ago
5 years ago
6 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "path"
  8. "strings"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/util/grace"
  13. "github.com/seaweedfs/fuse"
  14. "github.com/seaweedfs/fuse/fs"
  15. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/pb"
  18. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  21. )
  22. type Option struct {
  23. FilerGrpcAddress string
  24. GrpcDialOption grpc.DialOption
  25. FilerMountRootPath string
  26. Collection string
  27. Replication string
  28. TtlSec int32
  29. ChunkSizeLimit int64
  30. CacheDir string
  31. CacheSizeMB int64
  32. DataCenter string
  33. DirListCacheLimit int64
  34. EntryCacheTtl time.Duration
  35. Umask os.FileMode
  36. MountUid uint32
  37. MountGid uint32
  38. MountMode os.FileMode
  39. MountCtime time.Time
  40. MountMtime time.Time
  41. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  42. Cipher bool // whether encrypt data on volume server
  43. }
  44. var _ = fs.FS(&WFS{})
  45. var _ = fs.FSStatfser(&WFS{})
  46. type WFS struct {
  47. option *Option
  48. // contains all open handles, protected by handlesLock
  49. handlesLock sync.Mutex
  50. handles map[uint64]*FileHandle
  51. bufPool sync.Pool
  52. stats statsCache
  53. root fs.Node
  54. fsNodeCache *FsCache
  55. chunkCache *chunk_cache.ChunkCache
  56. metaCache *meta_cache.MetaCache
  57. }
  58. type statsCache struct {
  59. filer_pb.StatisticsResponse
  60. lastChecked int64 // unix time in seconds
  61. }
  62. func NewSeaweedFileSystem(option *Option) *WFS {
  63. wfs := &WFS{
  64. option: option,
  65. handles: make(map[uint64]*FileHandle),
  66. bufPool: sync.Pool{
  67. New: func() interface{} {
  68. return make([]byte, option.ChunkSizeLimit)
  69. },
  70. },
  71. }
  72. if option.CacheSizeMB > 0 {
  73. os.MkdirAll(option.CacheDir, 0755)
  74. wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB)
  75. grace.OnInterrupt(func() {
  76. wfs.chunkCache.Shutdown()
  77. })
  78. }
  79. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
  80. startTime := time.Now()
  81. if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
  82. glog.V(0).Infof("failed to init meta cache: %v", err)
  83. } else {
  84. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  85. grace.OnInterrupt(func() {
  86. wfs.metaCache.Shutdown()
  87. })
  88. }
  89. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
  90. wfs.fsNodeCache = newFsCache(wfs.root)
  91. return wfs
  92. }
  93. func (wfs *WFS) Root() (fs.Node, error) {
  94. return wfs.root, nil
  95. }
  96. var _ = filer_pb.FilerClient(&WFS{})
  97. func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  98. err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  99. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  100. return fn(client)
  101. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  102. if err == nil {
  103. return nil
  104. }
  105. return err
  106. }
  107. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  108. fullpath := file.fullpath()
  109. glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
  110. wfs.handlesLock.Lock()
  111. defer wfs.handlesLock.Unlock()
  112. inodeId := file.fullpath().AsInode()
  113. existingHandle, found := wfs.handles[inodeId]
  114. if found && existingHandle != nil {
  115. return existingHandle
  116. }
  117. fileHandle = newFileHandle(file, uid, gid)
  118. wfs.handles[inodeId] = fileHandle
  119. fileHandle.handle = inodeId
  120. glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
  121. return
  122. }
  123. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  124. wfs.handlesLock.Lock()
  125. defer wfs.handlesLock.Unlock()
  126. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  127. delete(wfs.handles, fullpath.AsInode())
  128. return
  129. }
  130. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  131. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  132. glog.V(4).Infof("reading fs stats: %+v", req)
  133. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  134. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  135. request := &filer_pb.StatisticsRequest{
  136. Collection: wfs.option.Collection,
  137. Replication: wfs.option.Replication,
  138. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  139. }
  140. glog.V(4).Infof("reading filer stats: %+v", request)
  141. resp, err := client.Statistics(context.Background(), request)
  142. if err != nil {
  143. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  144. return err
  145. }
  146. glog.V(4).Infof("read filer stats: %+v", resp)
  147. wfs.stats.TotalSize = resp.TotalSize
  148. wfs.stats.UsedSize = resp.UsedSize
  149. wfs.stats.FileCount = resp.FileCount
  150. wfs.stats.lastChecked = time.Now().Unix()
  151. return nil
  152. })
  153. if err != nil {
  154. glog.V(0).Infof("filer Statistics: %v", err)
  155. return err
  156. }
  157. }
  158. totalDiskSize := wfs.stats.TotalSize
  159. usedDiskSize := wfs.stats.UsedSize
  160. actualFileCount := wfs.stats.FileCount
  161. // Compute the total number of available blocks
  162. resp.Blocks = totalDiskSize / blockSize
  163. // Compute the number of used blocks
  164. numBlocks := uint64(usedDiskSize / blockSize)
  165. // Report the number of free and available blocks for the block size
  166. resp.Bfree = resp.Blocks - numBlocks
  167. resp.Bavail = resp.Blocks - numBlocks
  168. resp.Bsize = uint32(blockSize)
  169. // Report the total number of possible files in the file system (and those free)
  170. resp.Files = math.MaxInt64
  171. resp.Ffree = math.MaxInt64 - actualFileCount
  172. // Report the maximum length of a name and the minimum fragment size
  173. resp.Namelen = 1024
  174. resp.Frsize = uint32(blockSize)
  175. return nil
  176. }
  177. func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
  178. return nil
  179. }
  180. func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
  181. }
  182. func (wfs *WFS) cacheDelete(path util.FullPath) {
  183. }
  184. func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
  185. if !wfs.option.OutsideContainerClusterMode {
  186. return hostAndPort
  187. }
  188. commaIndex := strings.Index(hostAndPort, ":")
  189. if commaIndex < 0 {
  190. return hostAndPort
  191. }
  192. filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
  193. return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
  194. }