You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

281 lines
7.8 KiB

7 years ago
5 years ago
5 years ago
6 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/wdclient"
  8. "math"
  9. "math/rand"
  10. "os"
  11. "path"
  12. "sync"
  13. "time"
  14. "google.golang.org/grpc"
  15. "github.com/chrislusf/seaweedfs/weed/util/grace"
  16. "github.com/seaweedfs/fuse"
  17. "github.com/seaweedfs/fuse/fs"
  18. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  19. "github.com/chrislusf/seaweedfs/weed/glog"
  20. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  21. "github.com/chrislusf/seaweedfs/weed/util"
  22. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  23. )
  24. type Option struct {
  25. MountDirectory string
  26. FilerAddresses []string
  27. filerIndex int
  28. FilerGrpcAddresses []string
  29. GrpcDialOption grpc.DialOption
  30. FilerMountRootPath string
  31. Collection string
  32. Replication string
  33. TtlSec int32
  34. DiskType types.DiskType
  35. ChunkSizeLimit int64
  36. ConcurrentWriters int
  37. CacheDir string
  38. CacheSizeMB int64
  39. DataCenter string
  40. Umask os.FileMode
  41. MountUid uint32
  42. MountGid uint32
  43. MountMode os.FileMode
  44. MountCtime time.Time
  45. MountMtime time.Time
  46. VolumeServerAccess string // how to access volume servers
  47. Cipher bool // whether encrypt data on volume server
  48. UidGidMapper *meta_cache.UidGidMapper
  49. }
  50. var _ = fs.FS(&WFS{})
  51. var _ = fs.FSStatfser(&WFS{})
  52. type WFS struct {
  53. option *Option
  54. // contains all open handles, protected by handlesLock
  55. handlesLock sync.Mutex
  56. handles map[uint64]*FileHandle
  57. bufPool sync.Pool
  58. stats statsCache
  59. root fs.Node
  60. fsNodeCache *FsCache
  61. chunkCache *chunk_cache.TieredChunkCache
  62. metaCache *meta_cache.MetaCache
  63. signature int32
  64. // throttle writers
  65. concurrentWriters *util.LimitedConcurrentExecutor
  66. Server *fs.Server
  67. }
  68. type statsCache struct {
  69. filer_pb.StatisticsResponse
  70. lastChecked int64 // unix time in seconds
  71. }
  72. func NewSeaweedFileSystem(option *Option) *WFS {
  73. wfs := &WFS{
  74. option: option,
  75. handles: make(map[uint64]*FileHandle),
  76. bufPool: sync.Pool{
  77. New: func() interface{} {
  78. return make([]byte, option.ChunkSizeLimit)
  79. },
  80. },
  81. signature: util.RandomInt32(),
  82. }
  83. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  84. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8]
  85. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  86. if option.CacheSizeMB > 0 {
  87. os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
  88. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  89. }
  90. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
  91. fsNode := NodeWithId(filePath.AsInode())
  92. if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
  93. glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
  94. }
  95. dir, name := filePath.DirAndName()
  96. parent := NodeWithId(util.FullPath(dir).AsInode())
  97. if dir == option.FilerMountRootPath {
  98. parent = NodeWithId(1)
  99. }
  100. if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
  101. glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
  102. }
  103. })
  104. startTime := time.Now()
  105. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  106. grace.OnInterrupt(func() {
  107. wfs.metaCache.Shutdown()
  108. })
  109. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1}
  110. wfs.fsNodeCache = newFsCache(wfs.root)
  111. if wfs.option.ConcurrentWriters > 0 {
  112. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  113. }
  114. return wfs
  115. }
  116. func (wfs *WFS) Root() (fs.Node, error) {
  117. return wfs.root, nil
  118. }
  119. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) {
  120. fullpath := file.fullpath()
  121. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  122. inodeId := file.Id()
  123. wfs.handlesLock.Lock()
  124. existingHandle, found := wfs.handles[inodeId]
  125. wfs.handlesLock.Unlock()
  126. if found && existingHandle != nil {
  127. existingHandle.f.isOpen++
  128. existingHandle.dirtyPages.SetWriteOnly(writeOnly)
  129. glog.V(4).Infof("Acquired Handle %s open %d", fullpath, existingHandle.f.isOpen)
  130. return existingHandle
  131. }
  132. entry, _ := file.maybeLoadEntry(context.Background())
  133. file.entry = entry
  134. fileHandle = newFileHandle(file, uid, gid, writeOnly)
  135. file.isOpen++
  136. wfs.handlesLock.Lock()
  137. wfs.handles[inodeId] = fileHandle
  138. wfs.handlesLock.Unlock()
  139. fileHandle.handle = inodeId
  140. glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
  141. return
  142. }
  143. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  144. wfs.handlesLock.Lock()
  145. defer wfs.handlesLock.Unlock()
  146. glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  147. delete(wfs.handles, uint64(handleId))
  148. return
  149. }
  150. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  151. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  152. glog.V(4).Infof("reading fs stats: %+v", req)
  153. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  154. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  155. request := &filer_pb.StatisticsRequest{
  156. Collection: wfs.option.Collection,
  157. Replication: wfs.option.Replication,
  158. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  159. DiskType: string(wfs.option.DiskType),
  160. }
  161. glog.V(4).Infof("reading filer stats: %+v", request)
  162. resp, err := client.Statistics(context.Background(), request)
  163. if err != nil {
  164. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  165. return err
  166. }
  167. glog.V(4).Infof("read filer stats: %+v", resp)
  168. wfs.stats.TotalSize = resp.TotalSize
  169. wfs.stats.UsedSize = resp.UsedSize
  170. wfs.stats.FileCount = resp.FileCount
  171. wfs.stats.lastChecked = time.Now().Unix()
  172. return nil
  173. })
  174. if err != nil {
  175. glog.V(0).Infof("filer Statistics: %v", err)
  176. return err
  177. }
  178. }
  179. totalDiskSize := wfs.stats.TotalSize
  180. usedDiskSize := wfs.stats.UsedSize
  181. actualFileCount := wfs.stats.FileCount
  182. // Compute the total number of available blocks
  183. resp.Blocks = totalDiskSize / blockSize
  184. // Compute the number of used blocks
  185. numBlocks := uint64(usedDiskSize / blockSize)
  186. // Report the number of free and available blocks for the block size
  187. resp.Bfree = resp.Blocks - numBlocks
  188. resp.Bavail = resp.Blocks - numBlocks
  189. resp.Bsize = uint32(blockSize)
  190. // Report the total number of possible files in the file system (and those free)
  191. resp.Files = math.MaxInt64
  192. resp.Ffree = math.MaxInt64 - actualFileCount
  193. // Report the maximum length of a name and the minimum fragment size
  194. resp.Namelen = 1024
  195. resp.Frsize = uint32(blockSize)
  196. return nil
  197. }
  198. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  199. if entry.Attributes == nil {
  200. return
  201. }
  202. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  203. }
  204. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  205. if entry.Attributes == nil {
  206. return
  207. }
  208. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  209. }
  210. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  211. if wfs.option.VolumeServerAccess == "filerProxy" {
  212. return func(fileId string) (targetUrls []string, err error) {
  213. return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil
  214. }
  215. }
  216. return filer.LookupFn(wfs)
  217. }
  218. func (wfs *WFS) getCurrentFiler() string {
  219. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  220. }
  221. type NodeWithId uint64
  222. func (n NodeWithId) Id() uint64 {
  223. return uint64(n)
  224. }
  225. func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
  226. return nil
  227. }