You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

242 lines
6.5 KiB

7 years ago
5 years ago
5 years ago
6 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/storage"
  6. "math"
  7. "os"
  8. "path"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/util/grace"
  13. "github.com/seaweedfs/fuse"
  14. "github.com/seaweedfs/fuse/fs"
  15. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  20. )
  21. type Option struct {
  22. FilerGrpcAddress string
  23. GrpcDialOption grpc.DialOption
  24. FilerMountRootPath string
  25. Collection string
  26. Replication string
  27. TtlSec int32
  28. VolumeType storage.VolumeType
  29. ChunkSizeLimit int64
  30. ConcurrentWriters int
  31. CacheDir string
  32. CacheSizeMB int64
  33. DataCenter string
  34. EntryCacheTtl time.Duration
  35. Umask os.FileMode
  36. MountUid uint32
  37. MountGid uint32
  38. MountMode os.FileMode
  39. MountCtime time.Time
  40. MountMtime time.Time
  41. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  42. Cipher bool // whether encrypt data on volume server
  43. UidGidMapper *meta_cache.UidGidMapper
  44. }
  45. var _ = fs.FS(&WFS{})
  46. var _ = fs.FSStatfser(&WFS{})
  47. type WFS struct {
  48. option *Option
  49. // contains all open handles, protected by handlesLock
  50. handlesLock sync.Mutex
  51. handles map[uint64]*FileHandle
  52. bufPool sync.Pool
  53. stats statsCache
  54. root fs.Node
  55. fsNodeCache *FsCache
  56. chunkCache *chunk_cache.TieredChunkCache
  57. metaCache *meta_cache.MetaCache
  58. signature int32
  59. // throttle writers
  60. concurrentWriters *util.LimitedConcurrentExecutor
  61. }
  62. type statsCache struct {
  63. filer_pb.StatisticsResponse
  64. lastChecked int64 // unix time in seconds
  65. }
  66. func NewSeaweedFileSystem(option *Option) *WFS {
  67. wfs := &WFS{
  68. option: option,
  69. handles: make(map[uint64]*FileHandle),
  70. bufPool: sync.Pool{
  71. New: func() interface{} {
  72. return make([]byte, option.ChunkSizeLimit)
  73. },
  74. },
  75. signature: util.RandomInt32(),
  76. }
  77. cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
  78. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  79. if option.CacheSizeMB > 0 {
  80. os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
  81. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  82. }
  83. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
  84. fsNode := wfs.fsNodeCache.GetFsNode(filePath)
  85. if fsNode != nil {
  86. if file, ok := fsNode.(*File); ok {
  87. file.clearEntry()
  88. }
  89. }
  90. })
  91. startTime := time.Now()
  92. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  93. grace.OnInterrupt(func() {
  94. wfs.metaCache.Shutdown()
  95. })
  96. entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
  97. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
  98. wfs.fsNodeCache = newFsCache(wfs.root)
  99. if wfs.option.ConcurrentWriters > 0 {
  100. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  101. }
  102. return wfs
  103. }
  104. func (wfs *WFS) Root() (fs.Node, error) {
  105. return wfs.root, nil
  106. }
  107. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  108. fullpath := file.fullpath()
  109. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  110. wfs.handlesLock.Lock()
  111. defer wfs.handlesLock.Unlock()
  112. inodeId := file.fullpath().AsInode()
  113. if file.isOpen > 0 {
  114. existingHandle, found := wfs.handles[inodeId]
  115. if found && existingHandle != nil {
  116. file.isOpen++
  117. return existingHandle
  118. }
  119. }
  120. fileHandle = newFileHandle(file, uid, gid)
  121. file.maybeLoadEntry(context.Background())
  122. file.isOpen++
  123. wfs.handles[inodeId] = fileHandle
  124. fileHandle.handle = inodeId
  125. return
  126. }
  127. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  128. wfs.handlesLock.Lock()
  129. defer wfs.handlesLock.Unlock()
  130. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  131. delete(wfs.handles, fullpath.AsInode())
  132. return
  133. }
  134. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  135. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  136. glog.V(4).Infof("reading fs stats: %+v", req)
  137. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  138. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  139. request := &filer_pb.StatisticsRequest{
  140. Collection: wfs.option.Collection,
  141. Replication: wfs.option.Replication,
  142. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  143. VolumeType: string(wfs.option.VolumeType),
  144. }
  145. glog.V(4).Infof("reading filer stats: %+v", request)
  146. resp, err := client.Statistics(context.Background(), request)
  147. if err != nil {
  148. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  149. return err
  150. }
  151. glog.V(4).Infof("read filer stats: %+v", resp)
  152. wfs.stats.TotalSize = resp.TotalSize
  153. wfs.stats.UsedSize = resp.UsedSize
  154. wfs.stats.FileCount = resp.FileCount
  155. wfs.stats.lastChecked = time.Now().Unix()
  156. return nil
  157. })
  158. if err != nil {
  159. glog.V(0).Infof("filer Statistics: %v", err)
  160. return err
  161. }
  162. }
  163. totalDiskSize := wfs.stats.TotalSize
  164. usedDiskSize := wfs.stats.UsedSize
  165. actualFileCount := wfs.stats.FileCount
  166. // Compute the total number of available blocks
  167. resp.Blocks = totalDiskSize / blockSize
  168. // Compute the number of used blocks
  169. numBlocks := uint64(usedDiskSize / blockSize)
  170. // Report the number of free and available blocks for the block size
  171. resp.Bfree = resp.Blocks - numBlocks
  172. resp.Bavail = resp.Blocks - numBlocks
  173. resp.Bsize = uint32(blockSize)
  174. // Report the total number of possible files in the file system (and those free)
  175. resp.Files = math.MaxInt64
  176. resp.Ffree = math.MaxInt64 - actualFileCount
  177. // Report the maximum length of a name and the minimum fragment size
  178. resp.Namelen = 1024
  179. resp.Frsize = uint32(blockSize)
  180. return nil
  181. }
  182. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  183. if entry.Attributes == nil {
  184. return
  185. }
  186. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  187. }
  188. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  189. if entry.Attributes == nil {
  190. return
  191. }
  192. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  193. }