You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

279 lines
7.7 KiB

7 years ago
5 years ago
5 years ago
6 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/wdclient"
  8. "math"
  9. "os"
  10. "path"
  11. "sync"
  12. "time"
  13. "google.golang.org/grpc"
  14. "github.com/chrislusf/seaweedfs/weed/util/grace"
  15. "github.com/seaweedfs/fuse"
  16. "github.com/seaweedfs/fuse/fs"
  17. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  18. "github.com/chrislusf/seaweedfs/weed/glog"
  19. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  22. )
  23. type Option struct {
  24. MountDirectory string
  25. FilerAddress string
  26. FilerGrpcAddress string
  27. GrpcDialOption grpc.DialOption
  28. FilerMountRootPath string
  29. Collection string
  30. Replication string
  31. TtlSec int32
  32. DiskType types.DiskType
  33. ChunkSizeLimit int64
  34. ConcurrentWriters int
  35. CacheDir string
  36. CacheSizeMB int64
  37. DataCenter string
  38. Umask os.FileMode
  39. MountUid uint32
  40. MountGid uint32
  41. MountMode os.FileMode
  42. MountCtime time.Time
  43. MountMtime time.Time
  44. VolumeServerAccess string // how to access volume servers
  45. Cipher bool // whether encrypt data on volume server
  46. UidGidMapper *meta_cache.UidGidMapper
  47. }
  48. var _ = fs.FS(&WFS{})
  49. var _ = fs.FSStatfser(&WFS{})
  50. type WFS struct {
  51. option *Option
  52. // contains all open handles, protected by handlesLock
  53. handlesLock sync.Mutex
  54. handles map[uint64]*FileHandle
  55. bufPool sync.Pool
  56. stats statsCache
  57. root fs.Node
  58. fsNodeCache *FsCache
  59. chunkCache *chunk_cache.TieredChunkCache
  60. metaCache *meta_cache.MetaCache
  61. signature int32
  62. // throttle writers
  63. concurrentWriters *util.LimitedConcurrentExecutor
  64. Server *fs.Server
  65. }
  66. type statsCache struct {
  67. filer_pb.StatisticsResponse
  68. lastChecked int64 // unix time in seconds
  69. }
  70. func NewSeaweedFileSystem(option *Option) *WFS {
  71. wfs := &WFS{
  72. option: option,
  73. handles: make(map[uint64]*FileHandle),
  74. bufPool: sync.Pool{
  75. New: func() interface{} {
  76. return make([]byte, option.ChunkSizeLimit)
  77. },
  78. },
  79. signature: util.RandomInt32(),
  80. }
  81. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
  82. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  83. if option.CacheSizeMB > 0 {
  84. os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
  85. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  86. }
  87. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
  88. fsNode := NodeWithId(filePath.AsInode())
  89. if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
  90. glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
  91. }
  92. dir, name := filePath.DirAndName()
  93. parent := NodeWithId(util.FullPath(dir).AsInode())
  94. if dir == option.FilerMountRootPath {
  95. parent = NodeWithId(1)
  96. }
  97. if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
  98. glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
  99. }
  100. })
  101. startTime := time.Now()
  102. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  103. grace.OnInterrupt(func() {
  104. wfs.metaCache.Shutdown()
  105. })
  106. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1}
  107. wfs.fsNodeCache = newFsCache(wfs.root)
  108. if wfs.option.ConcurrentWriters > 0 {
  109. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  110. }
  111. return wfs
  112. }
  113. func (wfs *WFS) Root() (fs.Node, error) {
  114. return wfs.root, nil
  115. }
  116. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) {
  117. fullpath := file.fullpath()
  118. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  119. inodeId := file.Id()
  120. wfs.handlesLock.Lock()
  121. existingHandle, found := wfs.handles[inodeId]
  122. wfs.handlesLock.Unlock()
  123. if found && existingHandle != nil {
  124. existingHandle.f.isOpen++
  125. if existingHandle.writeOnly {
  126. existingHandle.writeOnly = writeOnly
  127. }
  128. glog.V(4).Infof("Acquired Handle %s open %d", fullpath, existingHandle.f.isOpen)
  129. return existingHandle
  130. }
  131. entry, _ := file.maybeLoadEntry(context.Background())
  132. file.entry = entry
  133. fileHandle = newFileHandle(file, uid, gid)
  134. fileHandle.writeOnly = writeOnly
  135. file.isOpen++
  136. wfs.handlesLock.Lock()
  137. wfs.handles[inodeId] = fileHandle
  138. wfs.handlesLock.Unlock()
  139. fileHandle.handle = inodeId
  140. glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
  141. return
  142. }
  143. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  144. wfs.handlesLock.Lock()
  145. defer wfs.handlesLock.Unlock()
  146. glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  147. delete(wfs.handles, uint64(handleId))
  148. return
  149. }
  150. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  151. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  152. glog.V(4).Infof("reading fs stats: %+v", req)
  153. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  154. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  155. request := &filer_pb.StatisticsRequest{
  156. Collection: wfs.option.Collection,
  157. Replication: wfs.option.Replication,
  158. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  159. DiskType: string(wfs.option.DiskType),
  160. }
  161. glog.V(4).Infof("reading filer stats: %+v", request)
  162. resp, err := client.Statistics(context.Background(), request)
  163. if err != nil {
  164. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  165. return err
  166. }
  167. glog.V(4).Infof("read filer stats: %+v", resp)
  168. wfs.stats.TotalSize = resp.TotalSize
  169. wfs.stats.UsedSize = resp.UsedSize
  170. wfs.stats.FileCount = resp.FileCount
  171. wfs.stats.lastChecked = time.Now().Unix()
  172. return nil
  173. })
  174. if err != nil {
  175. glog.V(0).Infof("filer Statistics: %v", err)
  176. return err
  177. }
  178. }
  179. totalDiskSize := wfs.stats.TotalSize
  180. usedDiskSize := wfs.stats.UsedSize
  181. actualFileCount := wfs.stats.FileCount
  182. // Compute the total number of available blocks
  183. resp.Blocks = totalDiskSize / blockSize
  184. // Compute the number of used blocks
  185. numBlocks := uint64(usedDiskSize / blockSize)
  186. // Report the number of free and available blocks for the block size
  187. resp.Bfree = resp.Blocks - numBlocks
  188. resp.Bavail = resp.Blocks - numBlocks
  189. resp.Bsize = uint32(blockSize)
  190. // Report the total number of possible files in the file system (and those free)
  191. resp.Files = math.MaxInt64
  192. resp.Ffree = math.MaxInt64 - actualFileCount
  193. // Report the maximum length of a name and the minimum fragment size
  194. resp.Namelen = 1024
  195. resp.Frsize = uint32(blockSize)
  196. return nil
  197. }
  198. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  199. if entry.Attributes == nil {
  200. return
  201. }
  202. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  203. }
  204. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  205. if entry.Attributes == nil {
  206. return
  207. }
  208. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  209. }
  210. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  211. if wfs.option.VolumeServerAccess == "filerProxy" {
  212. return func(fileId string) (targetUrls []string, err error) {
  213. return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
  214. }
  215. }
  216. return filer.LookupFn(wfs)
  217. }
  218. type NodeWithId uint64
  219. func (n NodeWithId) Id() uint64 {
  220. return uint64(n)
  221. }
  222. func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
  223. return nil
  224. }