You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
7.4 KiB

7 years ago
5 years ago
5 years ago
6 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/wdclient"
  8. "math"
  9. "os"
  10. "path"
  11. "sync"
  12. "time"
  13. "google.golang.org/grpc"
  14. "github.com/chrislusf/seaweedfs/weed/util/grace"
  15. "github.com/seaweedfs/fuse"
  16. "github.com/seaweedfs/fuse/fs"
  17. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  18. "github.com/chrislusf/seaweedfs/weed/glog"
  19. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  22. )
  23. type Option struct {
  24. MountDirectory string
  25. FilerAddress string
  26. FilerGrpcAddress string
  27. GrpcDialOption grpc.DialOption
  28. FilerMountRootPath string
  29. Collection string
  30. Replication string
  31. TtlSec int32
  32. DiskType types.DiskType
  33. ChunkSizeLimit int64
  34. ConcurrentWriters int
  35. CacheDir string
  36. CacheSizeMB int64
  37. DataCenter string
  38. EntryCacheTtl time.Duration
  39. Umask os.FileMode
  40. ReadOnly bool
  41. MountUid uint32
  42. MountGid uint32
  43. MountMode os.FileMode
  44. MountCtime time.Time
  45. MountMtime time.Time
  46. VolumeServerAccess string // how to access volume servers
  47. Cipher bool // whether encrypt data on volume server
  48. UidGidMapper *meta_cache.UidGidMapper
  49. }
  50. var _ = fs.FS(&WFS{})
  51. var _ = fs.FSStatfser(&WFS{})
  52. type WFS struct {
  53. option *Option
  54. // contains all open handles, protected by handlesLock
  55. handlesLock sync.Mutex
  56. handles map[uint64]*FileHandle
  57. bufPool sync.Pool
  58. stats statsCache
  59. root fs.Node
  60. fsNodeCache *FsCache
  61. chunkCache *chunk_cache.TieredChunkCache
  62. metaCache *meta_cache.MetaCache
  63. signature int32
  64. // throttle writers
  65. concurrentWriters *util.LimitedConcurrentExecutor
  66. Server *fs.Server
  67. }
  68. type statsCache struct {
  69. filer_pb.StatisticsResponse
  70. lastChecked int64 // unix time in seconds
  71. }
  72. func NewSeaweedFileSystem(option *Option) *WFS {
  73. wfs := &WFS{
  74. option: option,
  75. handles: make(map[uint64]*FileHandle),
  76. bufPool: sync.Pool{
  77. New: func() interface{} {
  78. return make([]byte, option.ChunkSizeLimit)
  79. },
  80. },
  81. signature: util.RandomInt32(),
  82. }
  83. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
  84. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  85. if option.CacheSizeMB > 0 {
  86. os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
  87. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  88. }
  89. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
  90. fsNode := wfs.fsNodeCache.GetFsNode(filePath)
  91. if fsNode != nil {
  92. if file, ok := fsNode.(*File); ok {
  93. if err := wfs.Server.InvalidateNodeData(file); err != nil {
  94. glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
  95. }
  96. file.entry = nil
  97. }
  98. }
  99. dir, name := filePath.DirAndName()
  100. parent := wfs.root
  101. if dir != "/" {
  102. parent = wfs.fsNodeCache.GetFsNode(util.FullPath(dir))
  103. }
  104. if parent != nil {
  105. if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
  106. glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
  107. }
  108. }
  109. })
  110. startTime := time.Now()
  111. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  112. grace.OnInterrupt(func() {
  113. wfs.metaCache.Shutdown()
  114. })
  115. entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
  116. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
  117. wfs.fsNodeCache = newFsCache(wfs.root)
  118. if wfs.option.ConcurrentWriters > 0 {
  119. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  120. }
  121. return wfs
  122. }
  123. func (wfs *WFS) Root() (fs.Node, error) {
  124. return wfs.root, nil
  125. }
  126. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  127. fullpath := file.fullpath()
  128. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  129. wfs.handlesLock.Lock()
  130. defer wfs.handlesLock.Unlock()
  131. inodeId := file.fullpath().AsInode()
  132. if file.isOpen > 0 {
  133. existingHandle, found := wfs.handles[inodeId]
  134. if found && existingHandle != nil {
  135. file.isOpen++
  136. return existingHandle
  137. }
  138. }
  139. fileHandle = newFileHandle(file, uid, gid)
  140. file.maybeLoadEntry(context.Background())
  141. file.isOpen++
  142. wfs.handles[inodeId] = fileHandle
  143. fileHandle.handle = inodeId
  144. return
  145. }
  146. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  147. wfs.handlesLock.Lock()
  148. defer wfs.handlesLock.Unlock()
  149. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  150. delete(wfs.handles, fullpath.AsInode())
  151. return
  152. }
  153. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  154. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  155. glog.V(4).Infof("reading fs stats: %+v", req)
  156. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  157. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  158. request := &filer_pb.StatisticsRequest{
  159. Collection: wfs.option.Collection,
  160. Replication: wfs.option.Replication,
  161. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  162. DiskType: string(wfs.option.DiskType),
  163. }
  164. glog.V(4).Infof("reading filer stats: %+v", request)
  165. resp, err := client.Statistics(context.Background(), request)
  166. if err != nil {
  167. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  168. return err
  169. }
  170. glog.V(4).Infof("read filer stats: %+v", resp)
  171. wfs.stats.TotalSize = resp.TotalSize
  172. wfs.stats.UsedSize = resp.UsedSize
  173. wfs.stats.FileCount = resp.FileCount
  174. wfs.stats.lastChecked = time.Now().Unix()
  175. return nil
  176. })
  177. if err != nil {
  178. glog.V(0).Infof("filer Statistics: %v", err)
  179. return err
  180. }
  181. }
  182. totalDiskSize := wfs.stats.TotalSize
  183. usedDiskSize := wfs.stats.UsedSize
  184. actualFileCount := wfs.stats.FileCount
  185. // Compute the total number of available blocks
  186. resp.Blocks = totalDiskSize / blockSize
  187. // Compute the number of used blocks
  188. numBlocks := uint64(usedDiskSize / blockSize)
  189. // Report the number of free and available blocks for the block size
  190. resp.Bfree = resp.Blocks - numBlocks
  191. resp.Bavail = resp.Blocks - numBlocks
  192. resp.Bsize = uint32(blockSize)
  193. // Report the total number of possible files in the file system (and those free)
  194. resp.Files = math.MaxInt64
  195. resp.Ffree = math.MaxInt64 - actualFileCount
  196. // Report the maximum length of a name and the minimum fragment size
  197. resp.Namelen = 1024
  198. resp.Frsize = uint32(blockSize)
  199. return nil
  200. }
  201. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  202. if entry.Attributes == nil {
  203. return
  204. }
  205. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  206. }
  207. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  208. if entry.Attributes == nil {
  209. return
  210. }
  211. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  212. }
  213. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  214. if wfs.option.VolumeServerAccess == "filerProxy" {
  215. return func(fileId string) (targetUrls []string, err error) {
  216. return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
  217. }
  218. }
  219. return filer.LookupFn(wfs)
  220. }