You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

202 lines
4.3 KiB

7 years ago
7 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "path"
  7. "sync"
  8. "bazil.org/fuse/fs"
  9. "bazil.org/fuse"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "time"
  13. )
  14. type Dir struct {
  15. Path string
  16. NodeMap map[string]fs.Node
  17. NodeMapLock sync.Mutex
  18. wfs *WFS
  19. }
  20. func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error {
  21. attr.Mode = os.ModeDir | 0777
  22. return nil
  23. }
  24. func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
  25. resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
  26. err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  27. request := &filer_pb.CreateEntryRequest{
  28. Directory: dir.Path,
  29. Entry: &filer_pb.Entry{
  30. Name: req.Name,
  31. IsDirectory: req.Mode&os.ModeDir > 0,
  32. Attributes: &filer_pb.FuseAttributes{
  33. Mtime: time.Now().Unix(),
  34. FileMode: uint32(req.Mode),
  35. Uid: req.Uid,
  36. Gid: req.Gid,
  37. },
  38. },
  39. }
  40. glog.V(1).Infof("create: %v", request)
  41. if _, err := client.CreateEntry(ctx, request); err != nil {
  42. return fmt.Errorf("create file: %v", err)
  43. }
  44. return nil
  45. })
  46. if err == nil {
  47. node := &File{Name: req.Name, dir: dir, wfs: dir.wfs}
  48. dir.NodeMap[req.Name] = node
  49. return node, node, nil
  50. }
  51. return nil, nil, err
  52. }
  53. func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
  54. dir.NodeMapLock.Lock()
  55. defer dir.NodeMapLock.Unlock()
  56. err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  57. request := &filer_pb.CreateEntryRequest{
  58. Directory: dir.Path,
  59. Entry: &filer_pb.Entry{
  60. Name: req.Name,
  61. IsDirectory: true,
  62. Attributes: &filer_pb.FuseAttributes{
  63. Mtime: time.Now().Unix(),
  64. FileMode: uint32(req.Mode),
  65. Uid: req.Uid,
  66. Gid: req.Gid,
  67. },
  68. },
  69. }
  70. glog.V(1).Infof("mkdir: %v", request)
  71. if _, err := client.CreateEntry(ctx, request); err != nil {
  72. return fmt.Errorf("make dir: %v", err)
  73. }
  74. return nil
  75. })
  76. if err == nil {
  77. node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs}
  78. dir.NodeMap[req.Name] = node
  79. return node, nil
  80. }
  81. return nil, err
  82. }
  83. func (dir *Dir) Lookup(ctx context.Context, name string) (node fs.Node, err error) {
  84. dir.NodeMapLock.Lock()
  85. defer dir.NodeMapLock.Unlock()
  86. if dir.NodeMap == nil {
  87. dir.NodeMap = make(map[string]fs.Node)
  88. }
  89. if node, ok := dir.NodeMap[name]; ok {
  90. return node, nil
  91. }
  92. var entry *filer_pb.Entry
  93. err = dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  94. request := &filer_pb.LookupDirectoryEntryRequest{
  95. Directory: dir.Path,
  96. Name: name,
  97. }
  98. glog.V(1).Infof("lookup directory entry: %v", request)
  99. resp, err := client.LookupDirectoryEntry(ctx, request)
  100. if err != nil {
  101. return err
  102. }
  103. entry = resp.Entry
  104. return nil
  105. })
  106. if entry != nil {
  107. if entry.IsDirectory {
  108. node = &Dir{Path: path.Join(dir.Path, name), wfs: dir.wfs}
  109. } else {
  110. node = &File{Chunks: entry.Chunks, Name: name, dir: dir, wfs: dir.wfs}
  111. }
  112. dir.NodeMap[name] = node
  113. return node, nil
  114. }
  115. return nil, fuse.ENOENT
  116. }
  117. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
  118. err = dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  119. request := &filer_pb.ListEntriesRequest{
  120. Directory: dir.Path,
  121. }
  122. glog.V(1).Infof("read directory: %v", request)
  123. resp, err := client.ListEntries(ctx, request)
  124. if err != nil {
  125. return err
  126. }
  127. for _, entry := range resp.Entries {
  128. if entry.IsDirectory {
  129. dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir}
  130. ret = append(ret, dirent)
  131. } else {
  132. dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File}
  133. ret = append(ret, dirent)
  134. dir.wfs.listDirectoryEntriesCache.Set(dir.Path+"/"+entry.Name, entry.Attributes, 3*time.Second)
  135. }
  136. }
  137. return nil
  138. })
  139. return ret, err
  140. }
  141. func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
  142. dir.NodeMapLock.Lock()
  143. defer dir.NodeMapLock.Unlock()
  144. return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  145. request := &filer_pb.DeleteEntryRequest{
  146. Directory: dir.Path,
  147. Name: req.Name,
  148. IsDirectory: req.Dir,
  149. }
  150. glog.V(1).Infof("remove directory entry: %v", request)
  151. _, err := client.DeleteEntry(ctx, request)
  152. if err != nil {
  153. return err
  154. }
  155. delete(dir.NodeMap, req.Name)
  156. return nil
  157. })
  158. }