You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

241 lines
6.2 KiB

5 years ago
7 years ago
5 years ago
5 years ago
6 years ago
5 years ago
6 years ago
5 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/karlseguin/ccache"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  17. "github.com/seaweedfs/fuse"
  18. "github.com/seaweedfs/fuse/fs"
  19. )
  20. type Option struct {
  21. FilerGrpcAddress string
  22. GrpcDialOption grpc.DialOption
  23. FilerMountRootPath string
  24. Collection string
  25. Replication string
  26. TtlSec int32
  27. ChunkSizeLimit int64
  28. CacheDir string
  29. CacheSizeMB int64
  30. DataCenter string
  31. DirListCacheLimit int64
  32. EntryCacheTtl time.Duration
  33. Umask os.FileMode
  34. MountUid uint32
  35. MountGid uint32
  36. MountMode os.FileMode
  37. MountCtime time.Time
  38. MountMtime time.Time
  39. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  40. Cipher bool // whether encrypt data on volume server
  41. }
  42. var _ = fs.FS(&WFS{})
  43. var _ = fs.FSStatfser(&WFS{})
  44. type WFS struct {
  45. option *Option
  46. listDirectoryEntriesCache *ccache.Cache
  47. // contains all open handles, protected by handlesLock
  48. handlesLock sync.Mutex
  49. handles map[uint64]*FileHandle
  50. bufPool sync.Pool
  51. stats statsCache
  52. root fs.Node
  53. fsNodeCache *FsCache
  54. chunkCache *chunk_cache.ChunkCache
  55. }
  56. type statsCache struct {
  57. filer_pb.StatisticsResponse
  58. lastChecked int64 // unix time in seconds
  59. }
  60. func NewSeaweedFileSystem(option *Option) *WFS {
  61. wfs := &WFS{
  62. option: option,
  63. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
  64. handles: make(map[uint64]*FileHandle),
  65. bufPool: sync.Pool{
  66. New: func() interface{} {
  67. return make([]byte, option.ChunkSizeLimit)
  68. },
  69. },
  70. }
  71. if option.CacheSizeMB > 0 {
  72. wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB, 4)
  73. util.OnInterrupt(func() {
  74. wfs.chunkCache.Shutdown()
  75. })
  76. }
  77. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
  78. wfs.fsNodeCache = newFsCache(wfs.root)
  79. return wfs
  80. }
  81. func (wfs *WFS) Root() (fs.Node, error) {
  82. return wfs.root, nil
  83. }
  84. func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  85. err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  86. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  87. return fn(client)
  88. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  89. if err == nil {
  90. return nil
  91. }
  92. return err
  93. }
  94. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  95. fullpath := file.fullpath()
  96. glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
  97. wfs.handlesLock.Lock()
  98. defer wfs.handlesLock.Unlock()
  99. inodeId := file.fullpath().AsInode()
  100. existingHandle, found := wfs.handles[inodeId]
  101. if found && existingHandle != nil {
  102. return existingHandle
  103. }
  104. fileHandle = newFileHandle(file, uid, gid)
  105. wfs.handles[inodeId] = fileHandle
  106. fileHandle.handle = inodeId
  107. glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
  108. return
  109. }
  110. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  111. wfs.handlesLock.Lock()
  112. defer wfs.handlesLock.Unlock()
  113. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  114. delete(wfs.handles, fullpath.AsInode())
  115. return
  116. }
  117. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  118. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  119. glog.V(4).Infof("reading fs stats: %+v", req)
  120. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  121. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  122. request := &filer_pb.StatisticsRequest{
  123. Collection: wfs.option.Collection,
  124. Replication: wfs.option.Replication,
  125. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  126. }
  127. glog.V(4).Infof("reading filer stats: %+v", request)
  128. resp, err := client.Statistics(context.Background(), request)
  129. if err != nil {
  130. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  131. return err
  132. }
  133. glog.V(4).Infof("read filer stats: %+v", resp)
  134. wfs.stats.TotalSize = resp.TotalSize
  135. wfs.stats.UsedSize = resp.UsedSize
  136. wfs.stats.FileCount = resp.FileCount
  137. wfs.stats.lastChecked = time.Now().Unix()
  138. return nil
  139. })
  140. if err != nil {
  141. glog.V(0).Infof("filer Statistics: %v", err)
  142. return err
  143. }
  144. }
  145. totalDiskSize := wfs.stats.TotalSize
  146. usedDiskSize := wfs.stats.UsedSize
  147. actualFileCount := wfs.stats.FileCount
  148. // Compute the total number of available blocks
  149. resp.Blocks = totalDiskSize / blockSize
  150. // Compute the number of used blocks
  151. numBlocks := uint64(usedDiskSize / blockSize)
  152. // Report the number of free and available blocks for the block size
  153. resp.Bfree = resp.Blocks - numBlocks
  154. resp.Bavail = resp.Blocks - numBlocks
  155. resp.Bsize = uint32(blockSize)
  156. // Report the total number of possible files in the file system (and those free)
  157. resp.Files = math.MaxInt64
  158. resp.Ffree = math.MaxInt64 - actualFileCount
  159. // Report the maximum length of a name and the minimum fragment size
  160. resp.Namelen = 1024
  161. resp.Frsize = uint32(blockSize)
  162. return nil
  163. }
  164. func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
  165. item := wfs.listDirectoryEntriesCache.Get(string(path))
  166. if item != nil && !item.Expired() {
  167. return item.Value().(*filer_pb.Entry)
  168. }
  169. return nil
  170. }
  171. func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
  172. if entry == nil {
  173. wfs.listDirectoryEntriesCache.Delete(string(path))
  174. } else {
  175. wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
  176. }
  177. }
  178. func (wfs *WFS) cacheDelete(path util.FullPath) {
  179. wfs.listDirectoryEntriesCache.Delete(string(path))
  180. }
  181. func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
  182. if !wfs.option.OutsideContainerClusterMode {
  183. return hostAndPort
  184. }
  185. commaIndex := strings.Index(hostAndPort, ":")
  186. if commaIndex < 0 {
  187. return hostAndPort
  188. }
  189. filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
  190. return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
  191. }