You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
7.0 KiB

5 years ago
7 years ago
5 years ago
6 years ago
5 years ago
6 years ago
5 years ago
5 years ago
5 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/karlseguin/ccache"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/seaweedfs/fuse"
  17. "github.com/seaweedfs/fuse/fs"
  18. )
  19. type Option struct {
  20. FilerGrpcAddress string
  21. GrpcDialOption grpc.DialOption
  22. FilerMountRootPath string
  23. Collection string
  24. Replication string
  25. TtlSec int32
  26. ChunkSizeLimit int64
  27. DataCenter string
  28. DirListCacheLimit int64
  29. EntryCacheTtl time.Duration
  30. Umask os.FileMode
  31. MountUid uint32
  32. MountGid uint32
  33. MountMode os.FileMode
  34. MountCtime time.Time
  35. MountMtime time.Time
  36. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  37. Cipher bool // whether encrypt data on volume server
  38. }
  39. var _ = fs.FS(&WFS{})
  40. var _ = fs.FSStatfser(&WFS{})
  41. type WFS struct {
  42. option *Option
  43. listDirectoryEntriesCache *ccache.Cache
  44. // contains all open handles, protected by handlesLock
  45. handlesLock sync.Mutex
  46. handles []*FileHandle
  47. pathToHandleIndex map[util.FullPath]int
  48. bufPool sync.Pool
  49. stats statsCache
  50. // nodes, protected by nodesLock
  51. nodesLock sync.Mutex
  52. nodes map[uint64]fs.Node
  53. root fs.Node
  54. }
  55. type statsCache struct {
  56. filer_pb.StatisticsResponse
  57. lastChecked int64 // unix time in seconds
  58. }
  59. func NewSeaweedFileSystem(option *Option) *WFS {
  60. wfs := &WFS{
  61. option: option,
  62. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
  63. pathToHandleIndex: make(map[util.FullPath]int),
  64. bufPool: sync.Pool{
  65. New: func() interface{} {
  66. return make([]byte, option.ChunkSizeLimit)
  67. },
  68. },
  69. nodes: make(map[uint64]fs.Node),
  70. }
  71. wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}
  72. wfs.getNode(util.FullPath(wfs.option.FilerMountRootPath), func() fs.Node {
  73. return wfs.root
  74. })
  75. return wfs
  76. }
  77. func (wfs *WFS) Root() (fs.Node, error) {
  78. return wfs.root, nil
  79. }
  80. func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  81. err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
  82. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  83. return fn(client)
  84. }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
  85. if err == nil {
  86. return nil
  87. }
  88. return err
  89. }
  90. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  91. fullpath := file.fullpath()
  92. glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
  93. wfs.handlesLock.Lock()
  94. defer wfs.handlesLock.Unlock()
  95. index, found := wfs.pathToHandleIndex[fullpath]
  96. if found && wfs.handles[index] != nil {
  97. glog.V(2).Infoln(fullpath, "found fileHandle id", index)
  98. return wfs.handles[index]
  99. }
  100. fileHandle = newFileHandle(file, uid, gid)
  101. for i, h := range wfs.handles {
  102. if h == nil {
  103. wfs.handles[i] = fileHandle
  104. fileHandle.handle = uint64(i)
  105. wfs.pathToHandleIndex[fullpath] = i
  106. glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle)
  107. return
  108. }
  109. }
  110. wfs.handles = append(wfs.handles, fileHandle)
  111. fileHandle.handle = uint64(len(wfs.handles) - 1)
  112. wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
  113. glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
  114. return
  115. }
  116. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  117. wfs.handlesLock.Lock()
  118. defer wfs.handlesLock.Unlock()
  119. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  120. delete(wfs.pathToHandleIndex, fullpath)
  121. if int(handleId) < len(wfs.handles) {
  122. wfs.handles[int(handleId)] = nil
  123. }
  124. return
  125. }
  126. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  127. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  128. glog.V(4).Infof("reading fs stats: %+v", req)
  129. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  130. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  131. request := &filer_pb.StatisticsRequest{
  132. Collection: wfs.option.Collection,
  133. Replication: wfs.option.Replication,
  134. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  135. }
  136. glog.V(4).Infof("reading filer stats: %+v", request)
  137. resp, err := client.Statistics(context.Background(), request)
  138. if err != nil {
  139. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  140. return err
  141. }
  142. glog.V(4).Infof("read filer stats: %+v", resp)
  143. wfs.stats.TotalSize = resp.TotalSize
  144. wfs.stats.UsedSize = resp.UsedSize
  145. wfs.stats.FileCount = resp.FileCount
  146. wfs.stats.lastChecked = time.Now().Unix()
  147. return nil
  148. })
  149. if err != nil {
  150. glog.V(0).Infof("filer Statistics: %v", err)
  151. return err
  152. }
  153. }
  154. totalDiskSize := wfs.stats.TotalSize
  155. usedDiskSize := wfs.stats.UsedSize
  156. actualFileCount := wfs.stats.FileCount
  157. // Compute the total number of available blocks
  158. resp.Blocks = totalDiskSize / blockSize
  159. // Compute the number of used blocks
  160. numBlocks := uint64(usedDiskSize / blockSize)
  161. // Report the number of free and available blocks for the block size
  162. resp.Bfree = resp.Blocks - numBlocks
  163. resp.Bavail = resp.Blocks - numBlocks
  164. resp.Bsize = uint32(blockSize)
  165. // Report the total number of possible files in the file system (and those free)
  166. resp.Files = math.MaxInt64
  167. resp.Ffree = math.MaxInt64 - actualFileCount
  168. // Report the maximum length of a name and the minimum fragment size
  169. resp.Namelen = 1024
  170. resp.Frsize = uint32(blockSize)
  171. return nil
  172. }
  173. func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
  174. item := wfs.listDirectoryEntriesCache.Get(string(path))
  175. if item != nil && !item.Expired() {
  176. return item.Value().(*filer_pb.Entry)
  177. }
  178. return nil
  179. }
  180. func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
  181. if entry == nil {
  182. wfs.listDirectoryEntriesCache.Delete(string(path))
  183. } else {
  184. wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
  185. }
  186. }
  187. func (wfs *WFS) cacheDelete(path util.FullPath) {
  188. wfs.listDirectoryEntriesCache.Delete(string(path))
  189. }
  190. func (wfs *WFS) getNode(fullpath util.FullPath, fn func() fs.Node) fs.Node {
  191. wfs.nodesLock.Lock()
  192. defer wfs.nodesLock.Unlock()
  193. node, found := wfs.nodes[fullpath.AsInode()]
  194. if found {
  195. return node
  196. }
  197. node = fn()
  198. if node != nil {
  199. wfs.nodes[fullpath.AsInode()] = node
  200. }
  201. return node
  202. }
  203. func (wfs *WFS) forgetNode(fullpath util.FullPath) {
  204. wfs.nodesLock.Lock()
  205. defer wfs.nodesLock.Unlock()
  206. delete(wfs.nodes, fullpath.AsInode())
  207. }
  208. func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
  209. if !wfs.option.OutsideContainerClusterMode {
  210. return hostAndPort
  211. }
  212. commaIndex := strings.Index(hostAndPort, ":")
  213. if commaIndex < 0 {
  214. return hostAndPort
  215. }
  216. filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
  217. return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
  218. }