You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

317 lines
8.6 KiB

7 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "math"
  7. "math/rand"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/filer"
  13. "github.com/chrislusf/seaweedfs/weed/storage/types"
  14. "github.com/chrislusf/seaweedfs/weed/wdclient"
  15. "google.golang.org/grpc"
  16. "github.com/chrislusf/seaweedfs/weed/util/grace"
  17. "github.com/seaweedfs/fuse"
  18. "github.com/seaweedfs/fuse/fs"
  19. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  20. "github.com/chrislusf/seaweedfs/weed/glog"
  21. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  22. "github.com/chrislusf/seaweedfs/weed/util"
  23. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  24. )
  25. type Option struct {
  26. MountDirectory string
  27. FilerAddresses []pb.ServerAddress
  28. filerIndex int
  29. GrpcDialOption grpc.DialOption
  30. FilerMountRootPath string
  31. Collection string
  32. Replication string
  33. TtlSec int32
  34. DiskType types.DiskType
  35. ChunkSizeLimit int64
  36. ConcurrentWriters int
  37. CacheDir string
  38. CacheSizeMB int64
  39. DataCenter string
  40. Umask os.FileMode
  41. MountUid uint32
  42. MountGid uint32
  43. MountMode os.FileMode
  44. MountCtime time.Time
  45. MountMtime time.Time
  46. MountParentInode uint64
  47. VolumeServerAccess string // how to access volume servers
  48. Cipher bool // whether encrypt data on volume server
  49. UidGidMapper *meta_cache.UidGidMapper
  50. uniqueCacheDir string
  51. }
  52. var _ = fs.FS(&WFS{})
  53. var _ = fs.FSStatfser(&WFS{})
  54. type WFS struct {
  55. option *Option
  56. // contains all open handles, protected by handlesLock
  57. handlesLock sync.Mutex
  58. handles map[uint64]*FileHandle
  59. bufPool sync.Pool
  60. stats statsCache
  61. root fs.Node
  62. fsNodeCache *FsCache
  63. chunkCache *chunk_cache.TieredChunkCache
  64. metaCache *meta_cache.MetaCache
  65. signature int32
  66. // throttle writers
  67. concurrentWriters *util.LimitedConcurrentExecutor
  68. Server *fs.Server
  69. }
  70. type statsCache struct {
  71. filer_pb.StatisticsResponse
  72. lastChecked int64 // unix time in seconds
  73. }
  74. func NewSeaweedFileSystem(option *Option) *WFS {
  75. wfs := &WFS{
  76. option: option,
  77. handles: make(map[uint64]*FileHandle),
  78. bufPool: sync.Pool{
  79. New: func() interface{} {
  80. return make([]byte, option.ChunkSizeLimit)
  81. },
  82. },
  83. signature: util.RandomInt32(),
  84. }
  85. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  86. wfs.option.setupUniqueCacheDirectory()
  87. if option.CacheSizeMB > 0 {
  88. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  89. }
  90. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) {
  91. fsNode := NodeWithId(filePath.AsInode(entry.FileMode()))
  92. if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
  93. glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
  94. }
  95. dir, name := filePath.DirAndName()
  96. parent := NodeWithId(util.FullPath(dir).AsInode(os.ModeDir))
  97. if dir == option.FilerMountRootPath {
  98. parent = NodeWithId(1)
  99. }
  100. if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
  101. glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
  102. }
  103. })
  104. grace.OnInterrupt(func() {
  105. wfs.metaCache.Shutdown()
  106. })
  107. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1}
  108. wfs.fsNodeCache = newFsCache(wfs.root)
  109. if wfs.option.ConcurrentWriters > 0 {
  110. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  111. }
  112. return wfs
  113. }
  114. func (wfs *WFS) StartBackgroundTasks() {
  115. startTime := time.Now()
  116. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  117. }
  118. func (wfs *WFS) Root() (fs.Node, error) {
  119. return wfs.root, nil
  120. }
  121. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  122. fullpath := file.fullpath()
  123. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  124. inodeId := file.Id()
  125. wfs.handlesLock.Lock()
  126. existingHandle, found := wfs.handles[inodeId]
  127. if found && existingHandle != nil && existingHandle.f.isOpen > 0 {
  128. existingHandle.f.isOpen++
  129. wfs.handlesLock.Unlock()
  130. glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen)
  131. return existingHandle
  132. }
  133. wfs.handlesLock.Unlock()
  134. entry, _ := file.maybeLoadEntry(context.Background())
  135. file.entry = entry
  136. fileHandle = newFileHandle(file, uid, gid)
  137. wfs.handlesLock.Lock()
  138. file.isOpen++
  139. wfs.handles[inodeId] = fileHandle
  140. wfs.handlesLock.Unlock()
  141. fileHandle.handle = inodeId
  142. glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
  143. return
  144. }
  145. func (wfs *WFS) Fsync(file *File, header fuse.Header) error {
  146. inodeId := file.Id()
  147. wfs.handlesLock.Lock()
  148. existingHandle, found := wfs.handles[inodeId]
  149. wfs.handlesLock.Unlock()
  150. if found && existingHandle != nil {
  151. existingHandle.Lock()
  152. defer existingHandle.Unlock()
  153. if existingHandle.f.isOpen > 0 {
  154. return existingHandle.doFlush(context.Background(), header)
  155. }
  156. }
  157. return nil
  158. }
  159. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  160. wfs.handlesLock.Lock()
  161. defer wfs.handlesLock.Unlock()
  162. glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  163. delete(wfs.handles, uint64(handleId))
  164. return
  165. }
  166. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  167. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  168. glog.V(4).Infof("reading fs stats: %+v", req)
  169. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  170. err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  171. request := &filer_pb.StatisticsRequest{
  172. Collection: wfs.option.Collection,
  173. Replication: wfs.option.Replication,
  174. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  175. DiskType: string(wfs.option.DiskType),
  176. }
  177. glog.V(4).Infof("reading filer stats: %+v", request)
  178. resp, err := client.Statistics(context.Background(), request)
  179. if err != nil {
  180. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  181. return err
  182. }
  183. glog.V(4).Infof("read filer stats: %+v", resp)
  184. wfs.stats.TotalSize = resp.TotalSize
  185. wfs.stats.UsedSize = resp.UsedSize
  186. wfs.stats.FileCount = resp.FileCount
  187. wfs.stats.lastChecked = time.Now().Unix()
  188. return nil
  189. })
  190. if err != nil {
  191. glog.V(0).Infof("filer Statistics: %v", err)
  192. return err
  193. }
  194. }
  195. totalDiskSize := wfs.stats.TotalSize
  196. usedDiskSize := wfs.stats.UsedSize
  197. actualFileCount := wfs.stats.FileCount
  198. // Compute the total number of available blocks
  199. resp.Blocks = totalDiskSize / blockSize
  200. // Compute the number of used blocks
  201. numBlocks := uint64(usedDiskSize / blockSize)
  202. // Report the number of free and available blocks for the block size
  203. resp.Bfree = resp.Blocks - numBlocks
  204. resp.Bavail = resp.Blocks - numBlocks
  205. resp.Bsize = uint32(blockSize)
  206. // Report the total number of possible files in the file system (and those free)
  207. resp.Files = math.MaxInt64
  208. resp.Ffree = math.MaxInt64 - actualFileCount
  209. // Report the maximum length of a name and the minimum fragment size
  210. resp.Namelen = 1024
  211. resp.Frsize = uint32(blockSize)
  212. return nil
  213. }
  214. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  215. if entry.Attributes == nil {
  216. return
  217. }
  218. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  219. }
  220. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  221. if entry.Attributes == nil {
  222. return
  223. }
  224. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  225. }
  226. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  227. if wfs.option.VolumeServerAccess == "filerProxy" {
  228. return func(fileId string) (targetUrls []string, err error) {
  229. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  230. }
  231. }
  232. return filer.LookupFn(wfs)
  233. }
  234. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  235. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  236. }
  237. func (option *Option) setupUniqueCacheDirectory() {
  238. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  239. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  240. }
  241. func (option *Option) getUniqueCacheDir() string {
  242. return option.uniqueCacheDir
  243. }
  244. type NodeWithId uint64
  245. func (n NodeWithId) Id() uint64 {
  246. return uint64(n)
  247. }
  248. func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
  249. return nil
  250. }