You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.4 KiB

7 years ago
7 years ago
7 years ago
6 years ago
  1. package filesys
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "bazil.org/fuse"
  7. "bazil.org/fuse/fs"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/karlseguin/ccache"
  12. "google.golang.org/grpc"
  13. )
  14. type Option struct {
  15. FilerGrpcAddress string
  16. FilerMountRootPath string
  17. Collection string
  18. Replication string
  19. TtlSec int32
  20. ChunkSizeLimit int64
  21. DataCenter string
  22. DirListingLimit int
  23. EntryCacheTtl time.Duration
  24. }
  25. type WFS struct {
  26. option *Option
  27. listDirectoryEntriesCache *ccache.Cache
  28. // contains all open handles
  29. handles []*FileHandle
  30. pathToHandleIndex map[string]int
  31. pathToHandleLock sync.Mutex
  32. // cache grpc connections
  33. grpcClients map[string]*grpc.ClientConn
  34. grpcClientsLock sync.Mutex
  35. }
  36. func NewSeaweedFileSystem(option *Option) *WFS {
  37. return &WFS{
  38. option: option,
  39. listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(int64(option.DirListingLimit) + 200).ItemsToPrune(100)),
  40. pathToHandleIndex: make(map[string]int),
  41. grpcClients: make(map[string]*grpc.ClientConn),
  42. }
  43. }
  44. func (wfs *WFS) Root() (fs.Node, error) {
  45. return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
  46. }
  47. func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
  48. wfs.grpcClientsLock.Lock()
  49. existingConnection, found := wfs.grpcClients[wfs.option.FilerGrpcAddress]
  50. if found {
  51. wfs.grpcClientsLock.Unlock()
  52. client := filer_pb.NewSeaweedFilerClient(existingConnection)
  53. return fn(client)
  54. }
  55. grpcConnection, err := util.GrpcDial(wfs.option.FilerGrpcAddress)
  56. if err != nil {
  57. wfs.grpcClientsLock.Unlock()
  58. return fmt.Errorf("fail to dial %s: %v", wfs.option.FilerGrpcAddress, err)
  59. }
  60. wfs.grpcClients[wfs.option.FilerGrpcAddress] = grpcConnection
  61. wfs.grpcClientsLock.Unlock()
  62. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  63. return fn(client)
  64. }
  65. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  66. wfs.pathToHandleLock.Lock()
  67. defer wfs.pathToHandleLock.Unlock()
  68. fullpath := file.fullpath()
  69. index, found := wfs.pathToHandleIndex[fullpath]
  70. if found && wfs.handles[index] != nil {
  71. glog.V(4).Infoln(fullpath, "found fileHandle id", index)
  72. return wfs.handles[index]
  73. }
  74. if found && wfs.handles[index] != nil {
  75. glog.V(4).Infoln(fullpath, "reuse previous fileHandle id", index)
  76. wfs.handles[index].InitializeToFile(file, uid, gid)
  77. fileHandle.handle = uint64(index)
  78. return
  79. }
  80. fileHandle = newFileHandle(file, uid, gid)
  81. for i, h := range wfs.handles {
  82. if h == nil {
  83. wfs.handles[i] = fileHandle
  84. fileHandle.handle = uint64(i)
  85. wfs.pathToHandleIndex[fullpath] = i
  86. glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle)
  87. return
  88. }
  89. }
  90. wfs.handles = append(wfs.handles, fileHandle)
  91. fileHandle.handle = uint64(len(wfs.handles) - 1)
  92. glog.V(4).Infoln(fullpath, "new fileHandle id", fileHandle.handle)
  93. wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
  94. return
  95. }
  96. func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) {
  97. wfs.pathToHandleLock.Lock()
  98. defer wfs.pathToHandleLock.Unlock()
  99. glog.V(4).Infof("%s releasing handle id %dcurrent handles lengh %d", fullpath, handleId, len(wfs.handles))
  100. delete(wfs.pathToHandleIndex, fullpath)
  101. if int(handleId) < len(wfs.handles) {
  102. wfs.handles[int(handleId)] = nil
  103. }
  104. return
  105. }