You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
5.0 KiB

7 years ago
6 years ago
6 years ago
6 years ago
6 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/joeslay/seaweedfs/weed/glog"
  10. "github.com/joeslay/seaweedfs/weed/pb/filer_pb"
  11. "github.com/joeslay/seaweedfs/weed/util"
  12. "github.com/karlseguin/ccache"
  13. "github.com/seaweedfs/fuse"
  14. "github.com/seaweedfs/fuse/fs"
  15. "google.golang.org/grpc"
  16. )
  17. type Option struct {
  18. FilerGrpcAddress string
  19. GrpcDialOption grpc.DialOption
  20. FilerMountRootPath string
  21. Collection string
  22. Replication string
  23. TtlSec int32
  24. ChunkSizeLimit int64
  25. DataCenter string
  26. DirListingLimit int
  27. EntryCacheTtl time.Duration
  28. Umask os.FileMode
  29. MountUid uint32
  30. MountGid uint32
  31. MountMode os.FileMode
  32. MountCtime time.Time
  33. MountMtime time.Time
  34. }
  35. var _ = fs.FS(&WFS{})
  36. var _ = fs.FSStatfser(&WFS{})
  37. type WFS struct {
  38. option *Option
  39. listDirectoryEntriesCache *ccache.Cache
  40. // contains all open handles
  41. handles []*FileHandle
  42. pathToHandleIndex map[string]int
  43. pathToHandleLock sync.Mutex
  44. bufPool sync.Pool
  45. stats statsCache
  46. }
  47. type statsCache struct {
  48. filer_pb.StatisticsResponse
  49. lastChecked int64 // unix time in seconds
  50. }
  51. func NewSeaweedFileSystem(option *Option) *WFS {
  52. wfs := &WFS{
  53. option: option,
  54. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024 * 8).ItemsToPrune(100)),
  55. pathToHandleIndex: make(map[string]int),
  56. bufPool: sync.Pool{
  57. New: func() interface{} {
  58. return make([]byte, option.ChunkSizeLimit)
  59. },
  60. },
  61. }
  62. return wfs
  63. }
  64. func (wfs *WFS) Root() (fs.Node, error) {
  65. return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
  66. }
  67. func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
  68. return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
  69. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  70. return fn(client)
  71. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  72. }
  73. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  74. wfs.pathToHandleLock.Lock()
  75. defer wfs.pathToHandleLock.Unlock()
  76. fullpath := file.fullpath()
  77. index, found := wfs.pathToHandleIndex[fullpath]
  78. if found && wfs.handles[index] != nil {
  79. glog.V(2).Infoln(fullpath, "found fileHandle id", index)
  80. return wfs.handles[index]
  81. }
  82. fileHandle = newFileHandle(file, uid, gid)
  83. for i, h := range wfs.handles {
  84. if h == nil {
  85. wfs.handles[i] = fileHandle
  86. fileHandle.handle = uint64(i)
  87. wfs.pathToHandleIndex[fullpath] = i
  88. glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle)
  89. return
  90. }
  91. }
  92. wfs.handles = append(wfs.handles, fileHandle)
  93. fileHandle.handle = uint64(len(wfs.handles) - 1)
  94. glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle)
  95. wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
  96. return
  97. }
  98. func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) {
  99. wfs.pathToHandleLock.Lock()
  100. defer wfs.pathToHandleLock.Unlock()
  101. glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  102. delete(wfs.pathToHandleIndex, fullpath)
  103. if int(handleId) < len(wfs.handles) {
  104. wfs.handles[int(handleId)] = nil
  105. }
  106. return
  107. }
  108. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  109. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  110. glog.V(4).Infof("reading fs stats: %+v", req)
  111. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  112. err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
  113. request := &filer_pb.StatisticsRequest{
  114. Collection: wfs.option.Collection,
  115. Replication: wfs.option.Replication,
  116. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  117. }
  118. glog.V(4).Infof("reading filer stats: %+v", request)
  119. resp, err := client.Statistics(ctx, request)
  120. if err != nil {
  121. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  122. return err
  123. }
  124. glog.V(4).Infof("read filer stats: %+v", resp)
  125. wfs.stats.TotalSize = resp.TotalSize
  126. wfs.stats.UsedSize = resp.UsedSize
  127. wfs.stats.FileCount = resp.FileCount
  128. wfs.stats.lastChecked = time.Now().Unix()
  129. return nil
  130. })
  131. if err != nil {
  132. glog.V(0).Infof("filer Statistics: %v", err)
  133. return err
  134. }
  135. }
  136. totalDiskSize := wfs.stats.TotalSize
  137. usedDiskSize := wfs.stats.UsedSize
  138. actualFileCount := wfs.stats.FileCount
  139. // Compute the total number of available blocks
  140. resp.Blocks = totalDiskSize / blockSize
  141. // Compute the number of used blocks
  142. numBlocks := uint64(usedDiskSize / blockSize)
  143. // Report the number of free and available blocks for the block size
  144. resp.Bfree = resp.Blocks - numBlocks
  145. resp.Bavail = resp.Blocks - numBlocks
  146. resp.Bsize = uint32(blockSize)
  147. // Report the total number of possible files in the file system (and those free)
  148. resp.Files = math.MaxInt64
  149. resp.Ffree = math.MaxInt64 - actualFileCount
  150. // Report the maximum length of a name and the minimum fragment size
  151. resp.Namelen = 1024
  152. resp.Frsize = uint32(blockSize)
  153. return nil
  154. }