You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
4.8 KiB

7 years ago
7 years ago
7 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "sync"
  7. "time"
  8. "bazil.org/fuse"
  9. "bazil.org/fuse/fs"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/karlseguin/ccache"
  14. "google.golang.org/grpc"
  15. )
  16. type Option struct {
  17. FilerGrpcAddress string
  18. FilerMountRootPath string
  19. Collection string
  20. Replication string
  21. TtlSec int32
  22. ChunkSizeLimit int64
  23. DataCenter string
  24. DirListingLimit int
  25. EntryCacheTtl time.Duration
  26. }
  27. var _ = fs.FS(&WFS{})
  28. var _ = fs.FSStatfser(&WFS{})
  29. type WFS struct {
  30. option *Option
  31. listDirectoryEntriesCache *ccache.Cache
  32. // contains all open handles
  33. handles []*FileHandle
  34. pathToHandleIndex map[string]int
  35. pathToHandleLock sync.Mutex
  36. bufPool sync.Pool
  37. stats statsCache
  38. }
  39. type statsCache struct {
  40. filer_pb.StatisticsResponse
  41. lastChecked int64 // unix time in seconds
  42. }
  43. func NewSeaweedFileSystem(option *Option) *WFS {
  44. return &WFS{
  45. option: option,
  46. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(int64(option.DirListingLimit) + 200).ItemsToPrune(100)),
  47. pathToHandleIndex: make(map[string]int),
  48. bufPool: sync.Pool{
  49. New: func() interface{} {
  50. return make([]byte, option.ChunkSizeLimit)
  51. },
  52. },
  53. }
  54. }
  55. func (wfs *WFS) Root() (fs.Node, error) {
  56. return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
  57. }
  58. func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  59. return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  60. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  61. return fn(client)
  62. }, wfs.option.FilerGrpcAddress)
  63. }
  64. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  65. wfs.pathToHandleLock.Lock()
  66. defer wfs.pathToHandleLock.Unlock()
  67. fullpath := file.fullpath()
  68. index, found := wfs.pathToHandleIndex[fullpath]
  69. if found && wfs.handles[index] != nil {
  70. glog.V(2).Infoln(fullpath, "found fileHandle id", index)
  71. return wfs.handles[index]
  72. }
  73. fileHandle = newFileHandle(file, uid, gid)
  74. for i, h := range wfs.handles {
  75. if h == nil {
  76. wfs.handles[i] = fileHandle
  77. fileHandle.handle = uint64(i)
  78. wfs.pathToHandleIndex[fullpath] = i
  79. glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle)
  80. return
  81. }
  82. }
  83. wfs.handles = append(wfs.handles, fileHandle)
  84. fileHandle.handle = uint64(len(wfs.handles) - 1)
  85. glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle)
  86. wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
  87. return
  88. }
  89. func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) {
  90. wfs.pathToHandleLock.Lock()
  91. defer wfs.pathToHandleLock.Unlock()
  92. glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  93. delete(wfs.pathToHandleIndex, fullpath)
  94. if int(handleId) < len(wfs.handles) {
  95. wfs.handles[int(handleId)] = nil
  96. }
  97. return
  98. }
  99. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  100. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  101. glog.V(4).Infof("reading fs stats: %+v", req)
  102. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  103. err := wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  104. request := &filer_pb.StatisticsRequest{
  105. Collection: wfs.option.Collection,
  106. Replication: wfs.option.Replication,
  107. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  108. }
  109. glog.V(4).Infof("reading filer stats: %+v", request)
  110. resp, err := client.Statistics(ctx, request)
  111. if err != nil {
  112. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  113. return err
  114. }
  115. glog.V(4).Infof("read filer stats: %+v", resp)
  116. wfs.stats.TotalSize = resp.TotalSize
  117. wfs.stats.UsedSize = resp.UsedSize
  118. wfs.stats.FileCount = resp.FileCount
  119. wfs.stats.lastChecked = time.Now().Unix()
  120. return nil
  121. })
  122. if err != nil {
  123. glog.V(0).Infof("filer Statistics: %v", err)
  124. return err
  125. }
  126. }
  127. totalDiskSize := wfs.stats.TotalSize
  128. usedDiskSize := wfs.stats.UsedSize
  129. actualFileCount := wfs.stats.FileCount
  130. // Compute the total number of available blocks
  131. resp.Blocks = totalDiskSize / blockSize
  132. // Compute the number of used blocks
  133. numblocks := uint64(usedDiskSize / blockSize)
  134. // Report the number of free and available blocks for the block size
  135. resp.Bfree = resp.Blocks - numblocks
  136. resp.Bavail = resp.Blocks - numblocks
  137. resp.Bsize = uint32(blockSize)
  138. // Report the total number of possible files in the file system (and those free)
  139. resp.Files = math.MaxInt64
  140. resp.Ffree = math.MaxInt64 - actualFileCount
  141. // Report the maximum length of a name and the minimum fragment size
  142. resp.Namelen = 1024
  143. resp.Frsize = uint32(blockSize)
  144. return nil
  145. }