You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
6.6 KiB

5 years ago
7 years ago
5 years ago
6 years ago
5 years ago
6 years ago
5 years ago
5 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/karlseguin/ccache"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/pb_cache"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. "github.com/seaweedfs/fuse"
  18. "github.com/seaweedfs/fuse/fs"
  19. )
  20. type Option struct {
  21. FilerGrpcAddress string
  22. GrpcDialOption grpc.DialOption
  23. FilerMountRootPath string
  24. Collection string
  25. Replication string
  26. TtlSec int32
  27. ChunkSizeLimit int64
  28. ChunkCacheCountLimit int64
  29. DataCenter string
  30. DirListCacheLimit int64
  31. EntryCacheTtl time.Duration
  32. Umask os.FileMode
  33. MountUid uint32
  34. MountGid uint32
  35. MountMode os.FileMode
  36. MountCtime time.Time
  37. MountMtime time.Time
  38. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  39. Cipher bool // whether encrypt data on volume server
  40. }
  41. var _ = fs.FS(&WFS{})
  42. var _ = fs.FSStatfser(&WFS{})
  43. type WFS struct {
  44. option *Option
  45. listDirectoryEntriesCache *ccache.Cache
  46. // contains all open handles, protected by handlesLock
  47. handlesLock sync.Mutex
  48. handles []*FileHandle
  49. pathToHandleIndex map[util.FullPath]int
  50. bufPool sync.Pool
  51. stats statsCache
  52. root fs.Node
  53. fsNodeCache *FsCache
  54. chunkCache *pb_cache.ChunkCache
  55. }
  56. type statsCache struct {
  57. filer_pb.StatisticsResponse
  58. lastChecked int64 // unix time in seconds
  59. }
  60. func NewSeaweedFileSystem(option *Option) *WFS {
  61. wfs := &WFS{
  62. option: option,
  63. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
  64. pathToHandleIndex: make(map[util.FullPath]int),
  65. bufPool: sync.Pool{
  66. New: func() interface{} {
  67. return make([]byte, option.ChunkSizeLimit)
  68. },
  69. },
  70. chunkCache: pb_cache.NewChunkCache(option.ChunkCacheCountLimit),
  71. }
  72. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
  73. wfs.fsNodeCache = newFsCache(wfs.root)
  74. return wfs
  75. }
  76. func (wfs *WFS) Root() (fs.Node, error) {
  77. return wfs.root, nil
  78. }
  79. func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  80. err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  81. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  82. return fn(client)
  83. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  84. if err == nil {
  85. return nil
  86. }
  87. return err
  88. }
  89. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  90. fullpath := file.fullpath()
  91. glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
  92. wfs.handlesLock.Lock()
  93. defer wfs.handlesLock.Unlock()
  94. index, found := wfs.pathToHandleIndex[fullpath]
  95. if found && wfs.handles[index] != nil {
  96. glog.V(2).Infoln(fullpath, "found fileHandle id", index)
  97. return wfs.handles[index]
  98. }
  99. fileHandle = newFileHandle(file, uid, gid)
  100. for i, h := range wfs.handles {
  101. if h == nil {
  102. wfs.handles[i] = fileHandle
  103. fileHandle.handle = uint64(i)
  104. wfs.pathToHandleIndex[fullpath] = i
  105. glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle)
  106. return
  107. }
  108. }
  109. wfs.handles = append(wfs.handles, fileHandle)
  110. fileHandle.handle = uint64(len(wfs.handles) - 1)
  111. wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
  112. glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
  113. return
  114. }
  115. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  116. wfs.handlesLock.Lock()
  117. defer wfs.handlesLock.Unlock()
  118. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  119. delete(wfs.pathToHandleIndex, fullpath)
  120. if int(handleId) < len(wfs.handles) {
  121. wfs.handles[int(handleId)] = nil
  122. }
  123. return
  124. }
  125. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  126. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  127. glog.V(4).Infof("reading fs stats: %+v", req)
  128. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  129. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  130. request := &filer_pb.StatisticsRequest{
  131. Collection: wfs.option.Collection,
  132. Replication: wfs.option.Replication,
  133. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  134. }
  135. glog.V(4).Infof("reading filer stats: %+v", request)
  136. resp, err := client.Statistics(context.Background(), request)
  137. if err != nil {
  138. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  139. return err
  140. }
  141. glog.V(4).Infof("read filer stats: %+v", resp)
  142. wfs.stats.TotalSize = resp.TotalSize
  143. wfs.stats.UsedSize = resp.UsedSize
  144. wfs.stats.FileCount = resp.FileCount
  145. wfs.stats.lastChecked = time.Now().Unix()
  146. return nil
  147. })
  148. if err != nil {
  149. glog.V(0).Infof("filer Statistics: %v", err)
  150. return err
  151. }
  152. }
  153. totalDiskSize := wfs.stats.TotalSize
  154. usedDiskSize := wfs.stats.UsedSize
  155. actualFileCount := wfs.stats.FileCount
  156. // Compute the total number of available blocks
  157. resp.Blocks = totalDiskSize / blockSize
  158. // Compute the number of used blocks
  159. numBlocks := uint64(usedDiskSize / blockSize)
  160. // Report the number of free and available blocks for the block size
  161. resp.Bfree = resp.Blocks - numBlocks
  162. resp.Bavail = resp.Blocks - numBlocks
  163. resp.Bsize = uint32(blockSize)
  164. // Report the total number of possible files in the file system (and those free)
  165. resp.Files = math.MaxInt64
  166. resp.Ffree = math.MaxInt64 - actualFileCount
  167. // Report the maximum length of a name and the minimum fragment size
  168. resp.Namelen = 1024
  169. resp.Frsize = uint32(blockSize)
  170. return nil
  171. }
  172. func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
  173. item := wfs.listDirectoryEntriesCache.Get(string(path))
  174. if item != nil && !item.Expired() {
  175. return item.Value().(*filer_pb.Entry)
  176. }
  177. return nil
  178. }
  179. func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
  180. if entry == nil {
  181. wfs.listDirectoryEntriesCache.Delete(string(path))
  182. } else {
  183. wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
  184. }
  185. }
  186. func (wfs *WFS) cacheDelete(path util.FullPath) {
  187. wfs.listDirectoryEntriesCache.Delete(string(path))
  188. }
  189. func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
  190. if !wfs.option.OutsideContainerClusterMode {
  191. return hostAndPort
  192. }
  193. commaIndex := strings.Index(hostAndPort, ":")
  194. if commaIndex < 0 {
  195. return hostAndPort
  196. }
  197. filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
  198. return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
  199. }