You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
4.9 KiB

7 years ago
7 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "github.com/karlseguin/ccache"
  13. "github.com/seaweedfs/fuse"
  14. "github.com/seaweedfs/fuse/fs"
  15. "google.golang.org/grpc"
  16. )
  17. type Option struct {
  18. FilerGrpcAddress string
  19. FilerMountRootPath string
  20. Collection string
  21. Replication string
  22. TtlSec int32
  23. ChunkSizeLimit int64
  24. DataCenter string
  25. DirListingLimit int
  26. EntryCacheTtl time.Duration
  27. MountUid uint32
  28. MountGid uint32
  29. MountMode os.FileMode
  30. }
  31. var _ = fs.FS(&WFS{})
  32. var _ = fs.FSStatfser(&WFS{})
  33. type WFS struct {
  34. option *Option
  35. listDirectoryEntriesCache *ccache.Cache
  36. // contains all open handles
  37. handles []*FileHandle
  38. pathToHandleIndex map[string]int
  39. pathToHandleLock sync.Mutex
  40. bufPool sync.Pool
  41. stats statsCache
  42. }
  43. type statsCache struct {
  44. filer_pb.StatisticsResponse
  45. lastChecked int64 // unix time in seconds
  46. }
  47. func NewSeaweedFileSystem(option *Option) *WFS {
  48. wfs := &WFS{
  49. option: option,
  50. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024 * 8).ItemsToPrune(100)),
  51. pathToHandleIndex: make(map[string]int),
  52. bufPool: sync.Pool{
  53. New: func() interface{} {
  54. return make([]byte, option.ChunkSizeLimit)
  55. },
  56. },
  57. }
  58. return wfs
  59. }
  60. func (wfs *WFS) Root() (fs.Node, error) {
  61. return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
  62. }
  63. func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  64. return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  65. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  66. return fn(client)
  67. }, wfs.option.FilerGrpcAddress)
  68. }
  69. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  70. wfs.pathToHandleLock.Lock()
  71. defer wfs.pathToHandleLock.Unlock()
  72. fullpath := file.fullpath()
  73. index, found := wfs.pathToHandleIndex[fullpath]
  74. if found && wfs.handles[index] != nil {
  75. glog.V(2).Infoln(fullpath, "found fileHandle id", index)
  76. return wfs.handles[index]
  77. }
  78. fileHandle = newFileHandle(file, uid, gid)
  79. for i, h := range wfs.handles {
  80. if h == nil {
  81. wfs.handles[i] = fileHandle
  82. fileHandle.handle = uint64(i)
  83. wfs.pathToHandleIndex[fullpath] = i
  84. glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle)
  85. return
  86. }
  87. }
  88. wfs.handles = append(wfs.handles, fileHandle)
  89. fileHandle.handle = uint64(len(wfs.handles) - 1)
  90. glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle)
  91. wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
  92. return
  93. }
  94. func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) {
  95. wfs.pathToHandleLock.Lock()
  96. defer wfs.pathToHandleLock.Unlock()
  97. glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  98. delete(wfs.pathToHandleIndex, fullpath)
  99. if int(handleId) < len(wfs.handles) {
  100. wfs.handles[int(handleId)] = nil
  101. }
  102. return
  103. }
  104. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  105. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  106. glog.V(4).Infof("reading fs stats: %+v", req)
  107. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  108. err := wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  109. request := &filer_pb.StatisticsRequest{
  110. Collection: wfs.option.Collection,
  111. Replication: wfs.option.Replication,
  112. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  113. }
  114. glog.V(4).Infof("reading filer stats: %+v", request)
  115. resp, err := client.Statistics(ctx, request)
  116. if err != nil {
  117. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  118. return err
  119. }
  120. glog.V(4).Infof("read filer stats: %+v", resp)
  121. wfs.stats.TotalSize = resp.TotalSize
  122. wfs.stats.UsedSize = resp.UsedSize
  123. wfs.stats.FileCount = resp.FileCount
  124. wfs.stats.lastChecked = time.Now().Unix()
  125. return nil
  126. })
  127. if err != nil {
  128. glog.V(0).Infof("filer Statistics: %v", err)
  129. return err
  130. }
  131. }
  132. totalDiskSize := wfs.stats.TotalSize
  133. usedDiskSize := wfs.stats.UsedSize
  134. actualFileCount := wfs.stats.FileCount
  135. // Compute the total number of available blocks
  136. resp.Blocks = totalDiskSize / blockSize
  137. // Compute the number of used blocks
  138. numBlocks := uint64(usedDiskSize / blockSize)
  139. // Report the number of free and available blocks for the block size
  140. resp.Bfree = resp.Blocks - numBlocks
  141. resp.Bavail = resp.Blocks - numBlocks
  142. resp.Bsize = uint32(blockSize)
  143. // Report the total number of possible files in the file system (and those free)
  144. resp.Files = math.MaxInt64
  145. resp.Ffree = math.MaxInt64 - actualFileCount
  146. // Report the maximum length of a name and the minimum fragment size
  147. resp.Namelen = 1024
  148. resp.Frsize = uint32(blockSize)
  149. return nil
  150. }