You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
7.0 KiB

5 years ago
7 years ago
5 years ago
5 years ago
6 years ago
5 years ago
6 years ago
5 years ago
5 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "path"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/karlseguin/ccache"
  13. "google.golang.org/grpc"
  14. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  15. "github.com/chrislusf/seaweedfs/weed/glog"
  16. "github.com/chrislusf/seaweedfs/weed/pb"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  20. "github.com/seaweedfs/fuse"
  21. "github.com/seaweedfs/fuse/fs"
  22. )
  23. type Option struct {
  24. FilerGrpcAddress string
  25. GrpcDialOption grpc.DialOption
  26. FilerMountRootPath string
  27. Collection string
  28. Replication string
  29. TtlSec int32
  30. ChunkSizeLimit int64
  31. CacheDir string
  32. CacheSizeMB int64
  33. DataCenter string
  34. DirListCacheLimit int64
  35. EntryCacheTtl time.Duration
  36. Umask os.FileMode
  37. MountUid uint32
  38. MountGid uint32
  39. MountMode os.FileMode
  40. MountCtime time.Time
  41. MountMtime time.Time
  42. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  43. Cipher bool // whether encrypt data on volume server
  44. AsyncMetaDataCaching bool // whether asynchronously cache meta data
  45. }
  46. var _ = fs.FS(&WFS{})
  47. var _ = fs.FSStatfser(&WFS{})
  48. type WFS struct {
  49. option *Option
  50. listDirectoryEntriesCache *ccache.Cache
  51. // contains all open handles, protected by handlesLock
  52. handlesLock sync.Mutex
  53. handles map[uint64]*FileHandle
  54. bufPool sync.Pool
  55. stats statsCache
  56. root fs.Node
  57. fsNodeCache *FsCache
  58. chunkCache *chunk_cache.ChunkCache
  59. metaCache *meta_cache.MetaCache
  60. }
  61. type statsCache struct {
  62. filer_pb.StatisticsResponse
  63. lastChecked int64 // unix time in seconds
  64. }
  65. func NewSeaweedFileSystem(option *Option) *WFS {
  66. wfs := &WFS{
  67. option: option,
  68. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
  69. handles: make(map[uint64]*FileHandle),
  70. bufPool: sync.Pool{
  71. New: func() interface{} {
  72. return make([]byte, option.ChunkSizeLimit)
  73. },
  74. },
  75. }
  76. if option.CacheSizeMB > 0 {
  77. wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB)
  78. grace.OnInterrupt(func() {
  79. wfs.chunkCache.Shutdown()
  80. })
  81. }
  82. if wfs.option.AsyncMetaDataCaching {
  83. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
  84. startTime := time.Now()
  85. if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
  86. glog.V(0).Infof("failed to init meta cache: %v", err)
  87. } else {
  88. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  89. grace.OnInterrupt(func() {
  90. wfs.metaCache.Shutdown()
  91. })
  92. }
  93. }
  94. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
  95. wfs.fsNodeCache = newFsCache(wfs.root)
  96. return wfs
  97. }
  98. func (wfs *WFS) Root() (fs.Node, error) {
  99. return wfs.root, nil
  100. }
  101. var _ = filer_pb.FilerClient(&WFS{})
  102. func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  103. err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  104. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  105. return fn(client)
  106. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  107. if err == nil {
  108. return nil
  109. }
  110. return err
  111. }
  112. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  113. fullpath := file.fullpath()
  114. glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
  115. wfs.handlesLock.Lock()
  116. defer wfs.handlesLock.Unlock()
  117. inodeId := file.fullpath().AsInode()
  118. existingHandle, found := wfs.handles[inodeId]
  119. if found && existingHandle != nil {
  120. return existingHandle
  121. }
  122. fileHandle = newFileHandle(file, uid, gid)
  123. wfs.handles[inodeId] = fileHandle
  124. fileHandle.handle = inodeId
  125. glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
  126. return
  127. }
  128. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  129. wfs.handlesLock.Lock()
  130. defer wfs.handlesLock.Unlock()
  131. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  132. delete(wfs.handles, fullpath.AsInode())
  133. return
  134. }
  135. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  136. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  137. glog.V(4).Infof("reading fs stats: %+v", req)
  138. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  139. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  140. request := &filer_pb.StatisticsRequest{
  141. Collection: wfs.option.Collection,
  142. Replication: wfs.option.Replication,
  143. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  144. }
  145. glog.V(4).Infof("reading filer stats: %+v", request)
  146. resp, err := client.Statistics(context.Background(), request)
  147. if err != nil {
  148. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  149. return err
  150. }
  151. glog.V(4).Infof("read filer stats: %+v", resp)
  152. wfs.stats.TotalSize = resp.TotalSize
  153. wfs.stats.UsedSize = resp.UsedSize
  154. wfs.stats.FileCount = resp.FileCount
  155. wfs.stats.lastChecked = time.Now().Unix()
  156. return nil
  157. })
  158. if err != nil {
  159. glog.V(0).Infof("filer Statistics: %v", err)
  160. return err
  161. }
  162. }
  163. totalDiskSize := wfs.stats.TotalSize
  164. usedDiskSize := wfs.stats.UsedSize
  165. actualFileCount := wfs.stats.FileCount
  166. // Compute the total number of available blocks
  167. resp.Blocks = totalDiskSize / blockSize
  168. // Compute the number of used blocks
  169. numBlocks := uint64(usedDiskSize / blockSize)
  170. // Report the number of free and available blocks for the block size
  171. resp.Bfree = resp.Blocks - numBlocks
  172. resp.Bavail = resp.Blocks - numBlocks
  173. resp.Bsize = uint32(blockSize)
  174. // Report the total number of possible files in the file system (and those free)
  175. resp.Files = math.MaxInt64
  176. resp.Ffree = math.MaxInt64 - actualFileCount
  177. // Report the maximum length of a name and the minimum fragment size
  178. resp.Namelen = 1024
  179. resp.Frsize = uint32(blockSize)
  180. return nil
  181. }
  182. func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
  183. item := wfs.listDirectoryEntriesCache.Get(string(path))
  184. if item != nil && !item.Expired() {
  185. return item.Value().(*filer_pb.Entry)
  186. }
  187. return nil
  188. }
  189. func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
  190. if entry == nil {
  191. wfs.listDirectoryEntriesCache.Delete(string(path))
  192. } else {
  193. wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
  194. }
  195. }
  196. func (wfs *WFS) cacheDelete(path util.FullPath) {
  197. wfs.listDirectoryEntriesCache.Delete(string(path))
  198. }
  199. func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
  200. if !wfs.option.OutsideContainerClusterMode {
  201. return hostAndPort
  202. }
  203. commaIndex := strings.Index(hostAndPort, ":")
  204. if commaIndex < 0 {
  205. return hostAndPort
  206. }
  207. filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
  208. return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
  209. }