You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

246 lines
6.3 KiB

5 years ago
7 years ago
6 years ago
6 years ago
6 years ago
5 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/karlseguin/ccache"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/filer2"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/seaweedfs/fuse"
  16. "github.com/seaweedfs/fuse/fs"
  17. )
  18. type Option struct {
  19. FilerGrpcAddress string
  20. GrpcDialOption grpc.DialOption
  21. FilerMountRootPath string
  22. Collection string
  23. Replication string
  24. TtlSec int32
  25. ChunkSizeLimit int64
  26. DataCenter string
  27. DirListCacheLimit int64
  28. EntryCacheTtl time.Duration
  29. Umask os.FileMode
  30. MountUid uint32
  31. MountGid uint32
  32. MountMode os.FileMode
  33. MountCtime time.Time
  34. MountMtime time.Time
  35. }
  36. var _ = fs.FS(&WFS{})
  37. var _ = fs.FSStatfser(&WFS{})
  38. type WFS struct {
  39. option *Option
  40. listDirectoryEntriesCache *ccache.Cache
  41. // contains all open handles, protected by handlesLock
  42. handlesLock sync.Mutex
  43. handles []*FileHandle
  44. pathToHandleIndex map[filer2.FullPath]int
  45. bufPool sync.Pool
  46. stats statsCache
  47. // nodes, protected by nodesLock
  48. nodesLock sync.Mutex
  49. nodes map[uint64]fs.Node
  50. root fs.Node
  51. }
  52. type statsCache struct {
  53. filer_pb.StatisticsResponse
  54. lastChecked int64 // unix time in seconds
  55. }
  56. func NewSeaweedFileSystem(option *Option) *WFS {
  57. wfs := &WFS{
  58. option: option,
  59. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
  60. pathToHandleIndex: make(map[filer2.FullPath]int),
  61. bufPool: sync.Pool{
  62. New: func() interface{} {
  63. return make([]byte, option.ChunkSizeLimit)
  64. },
  65. },
  66. nodes: make(map[uint64]fs.Node),
  67. }
  68. wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}
  69. return wfs
  70. }
  71. func (wfs *WFS) Root() (fs.Node, error) {
  72. return wfs.root, nil
  73. }
  74. func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
  75. return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
  76. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  77. return fn(client)
  78. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  79. }
  80. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  81. fullpath := file.fullpath()
  82. glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
  83. wfs.handlesLock.Lock()
  84. defer wfs.handlesLock.Unlock()
  85. index, found := wfs.pathToHandleIndex[fullpath]
  86. if found && wfs.handles[index] != nil {
  87. glog.V(2).Infoln(fullpath, "found fileHandle id", index)
  88. return wfs.handles[index]
  89. }
  90. fileHandle = newFileHandle(file, uid, gid)
  91. for i, h := range wfs.handles {
  92. if h == nil {
  93. wfs.handles[i] = fileHandle
  94. fileHandle.handle = uint64(i)
  95. wfs.pathToHandleIndex[fullpath] = i
  96. glog.V(4).Infof( "%s reuse fh %d", fullpath,fileHandle.handle)
  97. return
  98. }
  99. }
  100. wfs.handles = append(wfs.handles, fileHandle)
  101. fileHandle.handle = uint64(len(wfs.handles) - 1)
  102. wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
  103. glog.V(4).Infof( "%s new fh %d", fullpath,fileHandle.handle)
  104. return
  105. }
  106. func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) {
  107. wfs.handlesLock.Lock()
  108. defer wfs.handlesLock.Unlock()
  109. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  110. delete(wfs.pathToHandleIndex, fullpath)
  111. if int(handleId) < len(wfs.handles) {
  112. wfs.handles[int(handleId)] = nil
  113. }
  114. return
  115. }
  116. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  117. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  118. glog.V(4).Infof("reading fs stats: %+v", req)
  119. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  120. err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  121. request := &filer_pb.StatisticsRequest{
  122. Collection: wfs.option.Collection,
  123. Replication: wfs.option.Replication,
  124. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  125. }
  126. glog.V(4).Infof("reading filer stats: %+v", request)
  127. resp, err := client.Statistics(ctx, request)
  128. if err != nil {
  129. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  130. return err
  131. }
  132. glog.V(4).Infof("read filer stats: %+v", resp)
  133. wfs.stats.TotalSize = resp.TotalSize
  134. wfs.stats.UsedSize = resp.UsedSize
  135. wfs.stats.FileCount = resp.FileCount
  136. wfs.stats.lastChecked = time.Now().Unix()
  137. return nil
  138. })
  139. if err != nil {
  140. glog.V(0).Infof("filer Statistics: %v", err)
  141. return err
  142. }
  143. }
  144. totalDiskSize := wfs.stats.TotalSize
  145. usedDiskSize := wfs.stats.UsedSize
  146. actualFileCount := wfs.stats.FileCount
  147. // Compute the total number of available blocks
  148. resp.Blocks = totalDiskSize / blockSize
  149. // Compute the number of used blocks
  150. numBlocks := uint64(usedDiskSize / blockSize)
  151. // Report the number of free and available blocks for the block size
  152. resp.Bfree = resp.Blocks - numBlocks
  153. resp.Bavail = resp.Blocks - numBlocks
  154. resp.Bsize = uint32(blockSize)
  155. // Report the total number of possible files in the file system (and those free)
  156. resp.Files = math.MaxInt64
  157. resp.Ffree = math.MaxInt64 - actualFileCount
  158. // Report the maximum length of a name and the minimum fragment size
  159. resp.Namelen = 1024
  160. resp.Frsize = uint32(blockSize)
  161. return nil
  162. }
  163. func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry {
  164. item := wfs.listDirectoryEntriesCache.Get(string(path))
  165. if item != nil && !item.Expired() {
  166. return item.Value().(*filer_pb.Entry)
  167. }
  168. return nil
  169. }
  170. func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
  171. if entry == nil {
  172. wfs.listDirectoryEntriesCache.Delete(string(path))
  173. } else {
  174. wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
  175. }
  176. }
  177. func (wfs *WFS) cacheDelete(path filer2.FullPath) {
  178. wfs.listDirectoryEntriesCache.Delete(string(path))
  179. }
  180. func (wfs *WFS) getNode(fullpath filer2.FullPath, fn func() fs.Node) fs.Node {
  181. wfs.nodesLock.Lock()
  182. defer wfs.nodesLock.Unlock()
  183. node, found := wfs.nodes[fullpath.AsInode()]
  184. if found {
  185. return node
  186. }
  187. node = fn()
  188. if node != nil {
  189. wfs.nodes[fullpath.AsInode()] = node
  190. }
  191. return node
  192. }
  193. func (wfs *WFS) forgetNode(fullpath filer2.FullPath) {
  194. wfs.nodesLock.Lock()
  195. defer wfs.nodesLock.Unlock()
  196. delete(wfs.nodes, fullpath.AsInode())
  197. }