You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
3.3 KiB

7 years ago
  1. package filesys
  2. import (
  3. "bazil.org/fuse/fs"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "github.com/karlseguin/ccache"
  7. "sync"
  8. "bazil.org/fuse"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "strings"
  12. )
  13. type WFS struct {
  14. filerGrpcAddress string
  15. filerMountRootPath string
  16. listDirectoryEntriesCache *ccache.Cache
  17. collection string
  18. replication string
  19. ttlSec int32
  20. chunkSizeLimit int64
  21. dataCenter string
  22. // contains all open handles
  23. handles []*FileHandle
  24. pathToHandleIndex map[string]int
  25. pathToHandleLock sync.Mutex
  26. }
  27. func NewSeaweedFileSystem(filerGrpcAddress string, filerMountRootPath string, collection string, replication string, ttlSec int32, chunkSizeLimitMB int, dataCenter string) *WFS {
  28. if filerMountRootPath != "/" && strings.HasSuffix(filerMountRootPath, "/") {
  29. filerMountRootPath = filerMountRootPath[0:len(filerMountRootPath)-1]
  30. }
  31. return &WFS{
  32. filerGrpcAddress: filerGrpcAddress,
  33. filerMountRootPath: filerMountRootPath,
  34. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)),
  35. collection: collection,
  36. replication: replication,
  37. ttlSec: ttlSec,
  38. chunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
  39. dataCenter: dataCenter,
  40. pathToHandleIndex: make(map[string]int),
  41. }
  42. }
  43. func (wfs *WFS) Root() (fs.Node, error) {
  44. return &Dir{Path: wfs.filerMountRootPath, wfs: wfs}, nil
  45. }
  46. func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  47. grpcConnection, err := util.GrpcDial(wfs.filerGrpcAddress)
  48. if err != nil {
  49. return fmt.Errorf("fail to dial %s: %v", wfs.filerGrpcAddress, err)
  50. }
  51. defer grpcConnection.Close()
  52. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  53. return fn(client)
  54. }
  55. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (handle *FileHandle) {
  56. wfs.pathToHandleLock.Lock()
  57. defer wfs.pathToHandleLock.Unlock()
  58. fullpath := file.fullpath()
  59. index, found := wfs.pathToHandleIndex[fullpath]
  60. if found && wfs.handles[index] != nil {
  61. glog.V(4).Infoln(fullpath, "found handle id", index)
  62. return wfs.handles[index]
  63. }
  64. // create a new handler
  65. handle = &FileHandle{
  66. f: file,
  67. dirtyPages: newDirtyPages(file),
  68. Uid: uid,
  69. Gid: gid,
  70. }
  71. if found && wfs.handles[index] != nil {
  72. glog.V(4).Infoln(fullpath, "reuse previous handle id", index)
  73. wfs.handles[index] = handle
  74. handle.handle = uint64(index)
  75. return
  76. }
  77. for i, h := range wfs.handles {
  78. if h == nil {
  79. wfs.handles[i] = handle
  80. handle.handle = uint64(i)
  81. wfs.pathToHandleIndex[fullpath] = i
  82. glog.V(4).Infoln(fullpath, "reuse handle id", handle.handle)
  83. return
  84. }
  85. }
  86. wfs.handles = append(wfs.handles, handle)
  87. handle.handle = uint64(len(wfs.handles) - 1)
  88. glog.V(4).Infoln(fullpath, "new handle id", handle.handle)
  89. wfs.pathToHandleIndex[fullpath] = int(handle.handle)
  90. return
  91. }
  92. func (wfs *WFS) ReleaseHandle(handleId fuse.HandleID) {
  93. wfs.pathToHandleLock.Lock()
  94. defer wfs.pathToHandleLock.Unlock()
  95. glog.V(4).Infoln("releasing handle id", handleId, "current handles lengh", len(wfs.handles))
  96. if int(handleId) < len(wfs.handles) {
  97. wfs.handles[int(handleId)] = nil
  98. }
  99. return
  100. }