You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
6.3 KiB

5 years ago
7 years ago
6 years ago
6 years ago
6 years ago
5 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/karlseguin/ccache"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/filer2"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/seaweedfs/fuse"
  16. "github.com/seaweedfs/fuse/fs"
  17. )
  18. type Option struct {
  19. FilerGrpcAddress string
  20. GrpcDialOption grpc.DialOption
  21. FilerMountRootPath string
  22. Collection string
  23. Replication string
  24. TtlSec int32
  25. ChunkSizeLimit int64
  26. DataCenter string
  27. DirListCacheLimit int64
  28. EntryCacheTtl time.Duration
  29. Umask os.FileMode
  30. MountUid uint32
  31. MountGid uint32
  32. MountMode os.FileMode
  33. MountCtime time.Time
  34. MountMtime time.Time
  35. }
  36. var _ = fs.FS(&WFS{})
  37. var _ = fs.FSStatfser(&WFS{})
  38. type WFS struct {
  39. option *Option
  40. listDirectoryEntriesCache *ccache.Cache
  41. // contains all open handles, protected by handlesLock
  42. handlesLock sync.Mutex
  43. handles []*FileHandle
  44. pathToHandleIndex map[filer2.FullPath]int
  45. bufPool sync.Pool
  46. stats statsCache
  47. // nodes, protected by nodesLock
  48. nodesLock sync.Mutex
  49. nodes map[uint64]fs.Node
  50. root fs.Node
  51. }
  52. type statsCache struct {
  53. filer_pb.StatisticsResponse
  54. lastChecked int64 // unix time in seconds
  55. }
  56. func NewSeaweedFileSystem(option *Option) *WFS {
  57. wfs := &WFS{
  58. option: option,
  59. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
  60. pathToHandleIndex: make(map[filer2.FullPath]int),
  61. bufPool: sync.Pool{
  62. New: func() interface{} {
  63. return make([]byte, option.ChunkSizeLimit)
  64. },
  65. },
  66. nodes: make(map[uint64]fs.Node),
  67. }
  68. wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}
  69. return wfs
  70. }
  71. func (wfs *WFS) Root() (fs.Node, error) {
  72. return wfs.root, nil
  73. }
  74. func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  75. err := util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  76. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  77. return fn(client)
  78. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  79. if err == nil {
  80. return nil
  81. }
  82. return err
  83. }
  84. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  85. fullpath := file.fullpath()
  86. glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
  87. wfs.handlesLock.Lock()
  88. defer wfs.handlesLock.Unlock()
  89. index, found := wfs.pathToHandleIndex[fullpath]
  90. if found && wfs.handles[index] != nil {
  91. glog.V(2).Infoln(fullpath, "found fileHandle id", index)
  92. return wfs.handles[index]
  93. }
  94. fileHandle = newFileHandle(file, uid, gid)
  95. for i, h := range wfs.handles {
  96. if h == nil {
  97. wfs.handles[i] = fileHandle
  98. fileHandle.handle = uint64(i)
  99. wfs.pathToHandleIndex[fullpath] = i
  100. glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle)
  101. return
  102. }
  103. }
  104. wfs.handles = append(wfs.handles, fileHandle)
  105. fileHandle.handle = uint64(len(wfs.handles) - 1)
  106. wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
  107. glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
  108. return
  109. }
  110. func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) {
  111. wfs.handlesLock.Lock()
  112. defer wfs.handlesLock.Unlock()
  113. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  114. delete(wfs.pathToHandleIndex, fullpath)
  115. if int(handleId) < len(wfs.handles) {
  116. wfs.handles[int(handleId)] = nil
  117. }
  118. return
  119. }
  120. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  121. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  122. glog.V(4).Infof("reading fs stats: %+v", req)
  123. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  124. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  125. request := &filer_pb.StatisticsRequest{
  126. Collection: wfs.option.Collection,
  127. Replication: wfs.option.Replication,
  128. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  129. }
  130. glog.V(4).Infof("reading filer stats: %+v", request)
  131. resp, err := client.Statistics(context.Background(), request)
  132. if err != nil {
  133. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  134. return err
  135. }
  136. glog.V(4).Infof("read filer stats: %+v", resp)
  137. wfs.stats.TotalSize = resp.TotalSize
  138. wfs.stats.UsedSize = resp.UsedSize
  139. wfs.stats.FileCount = resp.FileCount
  140. wfs.stats.lastChecked = time.Now().Unix()
  141. return nil
  142. })
  143. if err != nil {
  144. glog.V(0).Infof("filer Statistics: %v", err)
  145. return err
  146. }
  147. }
  148. totalDiskSize := wfs.stats.TotalSize
  149. usedDiskSize := wfs.stats.UsedSize
  150. actualFileCount := wfs.stats.FileCount
  151. // Compute the total number of available blocks
  152. resp.Blocks = totalDiskSize / blockSize
  153. // Compute the number of used blocks
  154. numBlocks := uint64(usedDiskSize / blockSize)
  155. // Report the number of free and available blocks for the block size
  156. resp.Bfree = resp.Blocks - numBlocks
  157. resp.Bavail = resp.Blocks - numBlocks
  158. resp.Bsize = uint32(blockSize)
  159. // Report the total number of possible files in the file system (and those free)
  160. resp.Files = math.MaxInt64
  161. resp.Ffree = math.MaxInt64 - actualFileCount
  162. // Report the maximum length of a name and the minimum fragment size
  163. resp.Namelen = 1024
  164. resp.Frsize = uint32(blockSize)
  165. return nil
  166. }
  167. func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry {
  168. item := wfs.listDirectoryEntriesCache.Get(string(path))
  169. if item != nil && !item.Expired() {
  170. return item.Value().(*filer_pb.Entry)
  171. }
  172. return nil
  173. }
  174. func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
  175. if entry == nil {
  176. wfs.listDirectoryEntriesCache.Delete(string(path))
  177. } else {
  178. wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
  179. }
  180. }
  181. func (wfs *WFS) cacheDelete(path filer2.FullPath) {
  182. wfs.listDirectoryEntriesCache.Delete(string(path))
  183. }
  184. func (wfs *WFS) getNode(fullpath filer2.FullPath, fn func() fs.Node) fs.Node {
  185. wfs.nodesLock.Lock()
  186. defer wfs.nodesLock.Unlock()
  187. node, found := wfs.nodes[fullpath.AsInode()]
  188. if found {
  189. return node
  190. }
  191. node = fn()
  192. if node != nil {
  193. wfs.nodes[fullpath.AsInode()] = node
  194. }
  195. return node
  196. }
  197. func (wfs *WFS) forgetNode(fullpath filer2.FullPath) {
  198. wfs.nodesLock.Lock()
  199. defer wfs.nodesLock.Unlock()
  200. delete(wfs.nodes, fullpath.AsInode())
  201. }