You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

297 lines
8.3 KiB

7 years ago
5 years ago
5 years ago
6 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/wdclient"
  8. "math"
  9. "math/rand"
  10. "os"
  11. "path"
  12. "path/filepath"
  13. "sync"
  14. "time"
  15. "google.golang.org/grpc"
  16. "github.com/chrislusf/seaweedfs/weed/util/grace"
  17. "github.com/seaweedfs/fuse"
  18. "github.com/seaweedfs/fuse/fs"
  19. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  20. "github.com/chrislusf/seaweedfs/weed/glog"
  21. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  22. "github.com/chrislusf/seaweedfs/weed/util"
  23. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  24. )
  25. type Option struct {
  26. MountDirectory string
  27. FilerAddresses []string
  28. filerIndex int
  29. FilerGrpcAddresses []string
  30. GrpcDialOption grpc.DialOption
  31. FilerMountRootPath string
  32. Collection string
  33. Replication string
  34. TtlSec int32
  35. DiskType types.DiskType
  36. ChunkSizeLimit int64
  37. ConcurrentWriters int
  38. CacheDir string
  39. CacheSizeMB int64
  40. DataCenter string
  41. Umask os.FileMode
  42. MountUid uint32
  43. MountGid uint32
  44. MountMode os.FileMode
  45. MountCtime time.Time
  46. MountMtime time.Time
  47. VolumeServerAccess string // how to access volume servers
  48. Cipher bool // whether encrypt data on volume server
  49. UidGidMapper *meta_cache.UidGidMapper
  50. uniqueCacheDir string
  51. uniqueCacheTempPageDir string
  52. }
  53. var _ = fs.FS(&WFS{})
  54. var _ = fs.FSStatfser(&WFS{})
  55. type WFS struct {
  56. option *Option
  57. // contains all open handles, protected by handlesLock
  58. handlesLock sync.Mutex
  59. handles map[uint64]*FileHandle
  60. bufPool sync.Pool
  61. stats statsCache
  62. root fs.Node
  63. fsNodeCache *FsCache
  64. chunkCache *chunk_cache.TieredChunkCache
  65. metaCache *meta_cache.MetaCache
  66. signature int32
  67. // throttle writers
  68. concurrentWriters *util.LimitedConcurrentExecutor
  69. Server *fs.Server
  70. }
  71. type statsCache struct {
  72. filer_pb.StatisticsResponse
  73. lastChecked int64 // unix time in seconds
  74. }
  75. func NewSeaweedFileSystem(option *Option) *WFS {
  76. wfs := &WFS{
  77. option: option,
  78. handles: make(map[uint64]*FileHandle),
  79. bufPool: sync.Pool{
  80. New: func() interface{} {
  81. return make([]byte, option.ChunkSizeLimit)
  82. },
  83. },
  84. signature: util.RandomInt32(),
  85. }
  86. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  87. wfs.option.setupUniqueCacheDirectory()
  88. if option.CacheSizeMB > 0 {
  89. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  90. }
  91. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
  92. fsNode := NodeWithId(filePath.AsInode())
  93. if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
  94. glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
  95. }
  96. dir, name := filePath.DirAndName()
  97. parent := NodeWithId(util.FullPath(dir).AsInode())
  98. if dir == option.FilerMountRootPath {
  99. parent = NodeWithId(1)
  100. }
  101. if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
  102. glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
  103. }
  104. })
  105. startTime := time.Now()
  106. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  107. grace.OnInterrupt(func() {
  108. wfs.metaCache.Shutdown()
  109. })
  110. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1}
  111. wfs.fsNodeCache = newFsCache(wfs.root)
  112. if wfs.option.ConcurrentWriters > 0 {
  113. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  114. }
  115. return wfs
  116. }
  117. func (wfs *WFS) Root() (fs.Node, error) {
  118. return wfs.root, nil
  119. }
  120. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) {
  121. fullpath := file.fullpath()
  122. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  123. inodeId := file.Id()
  124. wfs.handlesLock.Lock()
  125. existingHandle, found := wfs.handles[inodeId]
  126. wfs.handlesLock.Unlock()
  127. if found && existingHandle != nil {
  128. existingHandle.f.isOpen++
  129. existingHandle.dirtyPages.SetWriteOnly(writeOnly)
  130. glog.V(4).Infof("Acquired Handle %s open %d", fullpath, existingHandle.f.isOpen)
  131. return existingHandle
  132. }
  133. entry, _ := file.maybeLoadEntry(context.Background())
  134. file.entry = entry
  135. fileHandle = newFileHandle(file, uid, gid, writeOnly)
  136. file.isOpen++
  137. wfs.handlesLock.Lock()
  138. wfs.handles[inodeId] = fileHandle
  139. wfs.handlesLock.Unlock()
  140. fileHandle.handle = inodeId
  141. glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
  142. return
  143. }
  144. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  145. wfs.handlesLock.Lock()
  146. defer wfs.handlesLock.Unlock()
  147. glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  148. delete(wfs.handles, uint64(handleId))
  149. return
  150. }
  151. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  152. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  153. glog.V(4).Infof("reading fs stats: %+v", req)
  154. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  155. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  156. request := &filer_pb.StatisticsRequest{
  157. Collection: wfs.option.Collection,
  158. Replication: wfs.option.Replication,
  159. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  160. DiskType: string(wfs.option.DiskType),
  161. }
  162. glog.V(4).Infof("reading filer stats: %+v", request)
  163. resp, err := client.Statistics(context.Background(), request)
  164. if err != nil {
  165. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  166. return err
  167. }
  168. glog.V(4).Infof("read filer stats: %+v", resp)
  169. wfs.stats.TotalSize = resp.TotalSize
  170. wfs.stats.UsedSize = resp.UsedSize
  171. wfs.stats.FileCount = resp.FileCount
  172. wfs.stats.lastChecked = time.Now().Unix()
  173. return nil
  174. })
  175. if err != nil {
  176. glog.V(0).Infof("filer Statistics: %v", err)
  177. return err
  178. }
  179. }
  180. totalDiskSize := wfs.stats.TotalSize
  181. usedDiskSize := wfs.stats.UsedSize
  182. actualFileCount := wfs.stats.FileCount
  183. // Compute the total number of available blocks
  184. resp.Blocks = totalDiskSize / blockSize
  185. // Compute the number of used blocks
  186. numBlocks := uint64(usedDiskSize / blockSize)
  187. // Report the number of free and available blocks for the block size
  188. resp.Bfree = resp.Blocks - numBlocks
  189. resp.Bavail = resp.Blocks - numBlocks
  190. resp.Bsize = uint32(blockSize)
  191. // Report the total number of possible files in the file system (and those free)
  192. resp.Files = math.MaxInt64
  193. resp.Ffree = math.MaxInt64 - actualFileCount
  194. // Report the maximum length of a name and the minimum fragment size
  195. resp.Namelen = 1024
  196. resp.Frsize = uint32(blockSize)
  197. return nil
  198. }
  199. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  200. if entry.Attributes == nil {
  201. return
  202. }
  203. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  204. }
  205. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  206. if entry.Attributes == nil {
  207. return
  208. }
  209. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  210. }
  211. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  212. if wfs.option.VolumeServerAccess == "filerProxy" {
  213. return func(fileId string) (targetUrls []string, err error) {
  214. return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil
  215. }
  216. }
  217. return filer.LookupFn(wfs)
  218. }
  219. func (wfs *WFS) getCurrentFiler() string {
  220. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  221. }
  222. func (option *Option) setupUniqueCacheDirectory() {
  223. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8]
  224. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  225. option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
  226. os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
  227. }
  228. func (option *Option) getTempFilePageDir() string {
  229. return option.uniqueCacheTempPageDir
  230. }
  231. func (option *Option) getUniqueCacheDir() string {
  232. return option.uniqueCacheDir
  233. }
  234. type NodeWithId uint64
  235. func (n NodeWithId) Id() uint64 {
  236. return uint64(n)
  237. }
  238. func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
  239. return nil
  240. }