You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

279 lines
7.8 KiB

7 years ago
5 years ago
5 years ago
6 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/wdclient"
  8. "math"
  9. "os"
  10. "path"
  11. "sync"
  12. "time"
  13. "google.golang.org/grpc"
  14. "github.com/chrislusf/seaweedfs/weed/util/grace"
  15. "github.com/seaweedfs/fuse"
  16. "github.com/seaweedfs/fuse/fs"
  17. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  18. "github.com/chrislusf/seaweedfs/weed/glog"
  19. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  22. )
  23. type Option struct {
  24. MountDirectory string
  25. FilerAddresses []string
  26. filerIndex int
  27. FilerGrpcAddresses []string
  28. GrpcDialOption grpc.DialOption
  29. FilerMountRootPath string
  30. Collection string
  31. Replication string
  32. TtlSec int32
  33. DiskType types.DiskType
  34. ChunkSizeLimit int64
  35. ConcurrentWriters int
  36. CacheDir string
  37. CacheSizeMB int64
  38. DataCenter string
  39. Umask os.FileMode
  40. MountUid uint32
  41. MountGid uint32
  42. MountMode os.FileMode
  43. MountCtime time.Time
  44. MountMtime time.Time
  45. VolumeServerAccess string // how to access volume servers
  46. Cipher bool // whether encrypt data on volume server
  47. UidGidMapper *meta_cache.UidGidMapper
  48. }
  49. var _ = fs.FS(&WFS{})
  50. var _ = fs.FSStatfser(&WFS{})
  51. type WFS struct {
  52. option *Option
  53. // contains all open handles, protected by handlesLock
  54. handlesLock sync.Mutex
  55. handles map[uint64]*FileHandle
  56. bufPool sync.Pool
  57. stats statsCache
  58. root fs.Node
  59. fsNodeCache *FsCache
  60. chunkCache *chunk_cache.TieredChunkCache
  61. metaCache *meta_cache.MetaCache
  62. signature int32
  63. // throttle writers
  64. concurrentWriters *util.LimitedConcurrentExecutor
  65. Server *fs.Server
  66. }
  67. type statsCache struct {
  68. filer_pb.StatisticsResponse
  69. lastChecked int64 // unix time in seconds
  70. }
  71. func NewSeaweedFileSystem(option *Option) *WFS {
  72. wfs := &WFS{
  73. option: option,
  74. handles: make(map[uint64]*FileHandle),
  75. bufPool: sync.Pool{
  76. New: func() interface{} {
  77. return make([]byte, option.ChunkSizeLimit)
  78. },
  79. },
  80. signature: util.RandomInt32(),
  81. }
  82. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8]
  83. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  84. if option.CacheSizeMB > 0 {
  85. os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
  86. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  87. }
  88. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
  89. fsNode := NodeWithId(filePath.AsInode())
  90. if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
  91. glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
  92. }
  93. dir, name := filePath.DirAndName()
  94. parent := NodeWithId(util.FullPath(dir).AsInode())
  95. if dir == option.FilerMountRootPath {
  96. parent = NodeWithId(1)
  97. }
  98. if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
  99. glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
  100. }
  101. })
  102. startTime := time.Now()
  103. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  104. grace.OnInterrupt(func() {
  105. wfs.metaCache.Shutdown()
  106. })
  107. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1}
  108. wfs.fsNodeCache = newFsCache(wfs.root)
  109. if wfs.option.ConcurrentWriters > 0 {
  110. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  111. }
  112. return wfs
  113. }
  114. func (wfs *WFS) Root() (fs.Node, error) {
  115. return wfs.root, nil
  116. }
  117. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) {
  118. fullpath := file.fullpath()
  119. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  120. inodeId := file.Id()
  121. wfs.handlesLock.Lock()
  122. existingHandle, found := wfs.handles[inodeId]
  123. wfs.handlesLock.Unlock()
  124. if found && existingHandle != nil {
  125. existingHandle.f.isOpen++
  126. existingHandle.dirtyPages.SetWriteOnly(writeOnly)
  127. glog.V(4).Infof("Acquired Handle %s open %d", fullpath, existingHandle.f.isOpen)
  128. return existingHandle
  129. }
  130. entry, _ := file.maybeLoadEntry(context.Background())
  131. file.entry = entry
  132. fileHandle = newFileHandle(file, uid, gid, writeOnly)
  133. file.isOpen++
  134. wfs.handlesLock.Lock()
  135. wfs.handles[inodeId] = fileHandle
  136. wfs.handlesLock.Unlock()
  137. fileHandle.handle = inodeId
  138. glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
  139. return
  140. }
  141. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  142. wfs.handlesLock.Lock()
  143. defer wfs.handlesLock.Unlock()
  144. glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  145. delete(wfs.handles, uint64(handleId))
  146. return
  147. }
  148. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  149. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  150. glog.V(4).Infof("reading fs stats: %+v", req)
  151. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  152. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  153. request := &filer_pb.StatisticsRequest{
  154. Collection: wfs.option.Collection,
  155. Replication: wfs.option.Replication,
  156. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  157. DiskType: string(wfs.option.DiskType),
  158. }
  159. glog.V(4).Infof("reading filer stats: %+v", request)
  160. resp, err := client.Statistics(context.Background(), request)
  161. if err != nil {
  162. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  163. return err
  164. }
  165. glog.V(4).Infof("read filer stats: %+v", resp)
  166. wfs.stats.TotalSize = resp.TotalSize
  167. wfs.stats.UsedSize = resp.UsedSize
  168. wfs.stats.FileCount = resp.FileCount
  169. wfs.stats.lastChecked = time.Now().Unix()
  170. return nil
  171. })
  172. if err != nil {
  173. glog.V(0).Infof("filer Statistics: %v", err)
  174. return err
  175. }
  176. }
  177. totalDiskSize := wfs.stats.TotalSize
  178. usedDiskSize := wfs.stats.UsedSize
  179. actualFileCount := wfs.stats.FileCount
  180. // Compute the total number of available blocks
  181. resp.Blocks = totalDiskSize / blockSize
  182. // Compute the number of used blocks
  183. numBlocks := uint64(usedDiskSize / blockSize)
  184. // Report the number of free and available blocks for the block size
  185. resp.Bfree = resp.Blocks - numBlocks
  186. resp.Bavail = resp.Blocks - numBlocks
  187. resp.Bsize = uint32(blockSize)
  188. // Report the total number of possible files in the file system (and those free)
  189. resp.Files = math.MaxInt64
  190. resp.Ffree = math.MaxInt64 - actualFileCount
  191. // Report the maximum length of a name and the minimum fragment size
  192. resp.Namelen = 1024
  193. resp.Frsize = uint32(blockSize)
  194. return nil
  195. }
  196. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  197. if entry.Attributes == nil {
  198. return
  199. }
  200. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  201. }
  202. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  203. if entry.Attributes == nil {
  204. return
  205. }
  206. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  207. }
  208. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  209. if wfs.option.VolumeServerAccess == "filerProxy" {
  210. return func(fileId string) (targetUrls []string, err error) {
  211. return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil
  212. }
  213. }
  214. return filer.LookupFn(wfs)
  215. }
  216. func (wfs *WFS) getCurrentFiler() string {
  217. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  218. }
  219. type NodeWithId uint64
  220. func (n NodeWithId) Id() uint64 {
  221. return uint64(n)
  222. }
  223. func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
  224. return nil
  225. }