You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

235 lines
5.9 KiB

5 years ago
7 years ago
6 years ago
6 years ago
6 years ago
5 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/karlseguin/ccache"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/filer2"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/seaweedfs/fuse"
  16. "github.com/seaweedfs/fuse/fs"
  17. )
  18. type Option struct {
  19. FilerGrpcAddress string
  20. GrpcDialOption grpc.DialOption
  21. FilerMountRootPath string
  22. Collection string
  23. Replication string
  24. TtlSec int32
  25. ChunkSizeLimit int64
  26. DataCenter string
  27. DirListCacheLimit int64
  28. EntryCacheTtl time.Duration
  29. Umask os.FileMode
  30. MountUid uint32
  31. MountGid uint32
  32. MountMode os.FileMode
  33. MountCtime time.Time
  34. MountMtime time.Time
  35. }
  36. var _ = fs.FS(&WFS{})
  37. var _ = fs.FSStatfser(&WFS{})
  38. type WFS struct {
  39. option *Option
  40. listDirectoryEntriesCache *ccache.Cache
  41. // contains all open handles, protected by handlesLock
  42. handlesLock sync.Mutex
  43. handles []*FileHandle
  44. bufPool sync.Pool
  45. stats statsCache
  46. // nodes, protected by nodesLock
  47. nodesLock sync.Mutex
  48. nodes map[uint64]fs.Node
  49. root fs.Node
  50. }
  51. type statsCache struct {
  52. filer_pb.StatisticsResponse
  53. lastChecked int64 // unix time in seconds
  54. }
  55. func NewSeaweedFileSystem(option *Option) *WFS {
  56. wfs := &WFS{
  57. option: option,
  58. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
  59. bufPool: sync.Pool{
  60. New: func() interface{} {
  61. return make([]byte, option.ChunkSizeLimit)
  62. },
  63. },
  64. nodes: make(map[uint64]fs.Node),
  65. }
  66. wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}
  67. return wfs
  68. }
  69. func (wfs *WFS) Root() (fs.Node, error) {
  70. return wfs.root, nil
  71. }
  72. func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
  73. return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
  74. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  75. return fn(client)
  76. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  77. }
  78. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  79. fullpath := file.fullpath()
  80. glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
  81. wfs.handlesLock.Lock()
  82. defer wfs.handlesLock.Unlock()
  83. fileHandle = newFileHandle(file, uid, gid)
  84. for i, h := range wfs.handles {
  85. if h == nil {
  86. wfs.handles[i] = fileHandle
  87. fileHandle.handle = uint64(i)
  88. glog.V(4).Infof( "%s reuse fh %d", fullpath,fileHandle.handle)
  89. return
  90. }
  91. }
  92. wfs.handles = append(wfs.handles, fileHandle)
  93. fileHandle.handle = uint64(len(wfs.handles) - 1)
  94. glog.V(4).Infof( "%s new fh %d", fullpath,fileHandle.handle)
  95. return
  96. }
  97. func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) {
  98. wfs.handlesLock.Lock()
  99. defer wfs.handlesLock.Unlock()
  100. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  101. if int(handleId) < len(wfs.handles) {
  102. wfs.handles[int(handleId)] = nil
  103. }
  104. return
  105. }
  106. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  107. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  108. glog.V(4).Infof("reading fs stats: %+v", req)
  109. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  110. err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  111. request := &filer_pb.StatisticsRequest{
  112. Collection: wfs.option.Collection,
  113. Replication: wfs.option.Replication,
  114. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  115. }
  116. glog.V(4).Infof("reading filer stats: %+v", request)
  117. resp, err := client.Statistics(ctx, request)
  118. if err != nil {
  119. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  120. return err
  121. }
  122. glog.V(4).Infof("read filer stats: %+v", resp)
  123. wfs.stats.TotalSize = resp.TotalSize
  124. wfs.stats.UsedSize = resp.UsedSize
  125. wfs.stats.FileCount = resp.FileCount
  126. wfs.stats.lastChecked = time.Now().Unix()
  127. return nil
  128. })
  129. if err != nil {
  130. glog.V(0).Infof("filer Statistics: %v", err)
  131. return err
  132. }
  133. }
  134. totalDiskSize := wfs.stats.TotalSize
  135. usedDiskSize := wfs.stats.UsedSize
  136. actualFileCount := wfs.stats.FileCount
  137. // Compute the total number of available blocks
  138. resp.Blocks = totalDiskSize / blockSize
  139. // Compute the number of used blocks
  140. numBlocks := uint64(usedDiskSize / blockSize)
  141. // Report the number of free and available blocks for the block size
  142. resp.Bfree = resp.Blocks - numBlocks
  143. resp.Bavail = resp.Blocks - numBlocks
  144. resp.Bsize = uint32(blockSize)
  145. // Report the total number of possible files in the file system (and those free)
  146. resp.Files = math.MaxInt64
  147. resp.Ffree = math.MaxInt64 - actualFileCount
  148. // Report the maximum length of a name and the minimum fragment size
  149. resp.Namelen = 1024
  150. resp.Frsize = uint32(blockSize)
  151. return nil
  152. }
  153. func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry {
  154. item := wfs.listDirectoryEntriesCache.Get(string(path))
  155. if item != nil && !item.Expired() {
  156. return item.Value().(*filer_pb.Entry)
  157. }
  158. return nil
  159. }
  160. func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
  161. if entry == nil {
  162. wfs.listDirectoryEntriesCache.Delete(string(path))
  163. } else {
  164. wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
  165. }
  166. }
  167. func (wfs *WFS) cacheDelete(path filer2.FullPath) {
  168. wfs.listDirectoryEntriesCache.Delete(string(path))
  169. }
  170. func (wfs *WFS) getNode(fullpath filer2.FullPath, fn func() fs.Node) fs.Node {
  171. wfs.nodesLock.Lock()
  172. defer wfs.nodesLock.Unlock()
  173. node, found := wfs.nodes[fullpath.AsInode()]
  174. if found {
  175. return node
  176. }
  177. node = fn()
  178. if node != nil {
  179. wfs.nodes[fullpath.AsInode()] = node
  180. }
  181. return node
  182. }
  183. func (wfs *WFS) forgetNode(fullpath filer2.FullPath) {
  184. wfs.nodesLock.Lock()
  185. defer wfs.nodesLock.Unlock()
  186. delete(wfs.nodes, fullpath.AsInode())
  187. }