You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

212 lines
5.7 KiB

5 years ago
7 years ago
6 years ago
5 years ago
6 years ago
5 years ago
6 years ago
5 years ago
6 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/karlseguin/ccache"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/filer2"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/seaweedfs/fuse"
  16. "github.com/seaweedfs/fuse/fs"
  17. )
  18. type Option struct {
  19. FilerGrpcAddress string
  20. GrpcDialOption grpc.DialOption
  21. FilerMountRootPath string
  22. Collection string
  23. Replication string
  24. TtlSec int32
  25. ChunkSizeLimit int64
  26. DataCenter string
  27. DirListCacheLimit int64
  28. EntryCacheTtl time.Duration
  29. Umask os.FileMode
  30. MountUid uint32
  31. MountGid uint32
  32. MountMode os.FileMode
  33. MountCtime time.Time
  34. MountMtime time.Time
  35. }
  36. var _ = fs.FS(&WFS{})
  37. var _ = fs.FSStatfser(&WFS{})
  38. type WFS struct {
  39. option *Option
  40. listDirectoryEntriesCache *ccache.Cache
  41. // contains all open handles
  42. handles []*FileHandle
  43. pathToHandleIndex map[filer2.FullPath]int
  44. pathToHandleLock sync.Mutex
  45. bufPool sync.Pool
  46. stats statsCache
  47. }
  48. type statsCache struct {
  49. filer_pb.StatisticsResponse
  50. lastChecked int64 // unix time in seconds
  51. }
  52. func NewSeaweedFileSystem(option *Option) *WFS {
  53. wfs := &WFS{
  54. option: option,
  55. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
  56. pathToHandleIndex: make(map[filer2.FullPath]int),
  57. bufPool: sync.Pool{
  58. New: func() interface{} {
  59. return make([]byte, option.ChunkSizeLimit)
  60. },
  61. },
  62. }
  63. return wfs
  64. }
  65. func (wfs *WFS) Root() (fs.Node, error) {
  66. return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
  67. }
  68. func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
  69. return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
  70. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  71. return fn(client)
  72. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  73. }
  74. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  75. wfs.pathToHandleLock.Lock()
  76. defer wfs.pathToHandleLock.Unlock()
  77. fullpath := file.fullpath()
  78. index, found := wfs.pathToHandleIndex[fullpath]
  79. if found && wfs.handles[index] != nil {
  80. glog.V(2).Infoln(fullpath, "found fileHandle id", index)
  81. return wfs.handles[index]
  82. }
  83. fileHandle = newFileHandle(file, uid, gid)
  84. for i, h := range wfs.handles {
  85. if h == nil {
  86. wfs.handles[i] = fileHandle
  87. fileHandle.handle = uint64(i)
  88. wfs.pathToHandleIndex[fullpath] = i
  89. glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle)
  90. return
  91. }
  92. }
  93. wfs.handles = append(wfs.handles, fileHandle)
  94. fileHandle.handle = uint64(len(wfs.handles) - 1)
  95. glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle)
  96. wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
  97. return
  98. }
  99. func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) {
  100. wfs.pathToHandleLock.Lock()
  101. defer wfs.pathToHandleLock.Unlock()
  102. glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  103. delete(wfs.pathToHandleIndex, fullpath)
  104. if int(handleId) < len(wfs.handles) {
  105. wfs.handles[int(handleId)] = nil
  106. }
  107. return
  108. }
  109. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  110. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  111. glog.V(4).Infof("reading fs stats: %+v", req)
  112. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  113. err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  114. request := &filer_pb.StatisticsRequest{
  115. Collection: wfs.option.Collection,
  116. Replication: wfs.option.Replication,
  117. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  118. }
  119. glog.V(4).Infof("reading filer stats: %+v", request)
  120. resp, err := client.Statistics(ctx, request)
  121. if err != nil {
  122. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  123. return err
  124. }
  125. glog.V(4).Infof("read filer stats: %+v", resp)
  126. wfs.stats.TotalSize = resp.TotalSize
  127. wfs.stats.UsedSize = resp.UsedSize
  128. wfs.stats.FileCount = resp.FileCount
  129. wfs.stats.lastChecked = time.Now().Unix()
  130. return nil
  131. })
  132. if err != nil {
  133. glog.V(0).Infof("filer Statistics: %v", err)
  134. return err
  135. }
  136. }
  137. totalDiskSize := wfs.stats.TotalSize
  138. usedDiskSize := wfs.stats.UsedSize
  139. actualFileCount := wfs.stats.FileCount
  140. // Compute the total number of available blocks
  141. resp.Blocks = totalDiskSize / blockSize
  142. // Compute the number of used blocks
  143. numBlocks := uint64(usedDiskSize / blockSize)
  144. // Report the number of free and available blocks for the block size
  145. resp.Bfree = resp.Blocks - numBlocks
  146. resp.Bavail = resp.Blocks - numBlocks
  147. resp.Bsize = uint32(blockSize)
  148. // Report the total number of possible files in the file system (and those free)
  149. resp.Files = math.MaxInt64
  150. resp.Ffree = math.MaxInt64 - actualFileCount
  151. // Report the maximum length of a name and the minimum fragment size
  152. resp.Namelen = 1024
  153. resp.Frsize = uint32(blockSize)
  154. return nil
  155. }
  156. func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry {
  157. item := wfs.listDirectoryEntriesCache.Get(string(path))
  158. if item != nil && !item.Expired() {
  159. return item.Value().(*filer_pb.Entry)
  160. }
  161. return nil
  162. }
  163. func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
  164. if entry == nil {
  165. wfs.listDirectoryEntriesCache.Delete(string(path))
  166. }else{
  167. wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
  168. }
  169. }
  170. func (wfs *WFS) cacheDelete(path filer2.FullPath) {
  171. wfs.listDirectoryEntriesCache.Delete(string(path))
  172. }