You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
4.9 KiB

5 years ago
5 years ago
5 years ago
4 years ago
4 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "math"
  7. "github.com/seaweedfs/fuse"
  8. "github.com/seaweedfs/fuse/fs"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
  14. newDir := newDirectory.(*Dir)
  15. newPath := util.NewFullPath(newDir.FullPath(), req.NewName)
  16. oldPath := util.NewFullPath(dir.FullPath(), req.OldName)
  17. glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
  18. // find local old entry
  19. oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath)
  20. if err != nil {
  21. glog.Errorf("dir Rename can not find source %s : %v", oldPath, err)
  22. return fuse.ENOENT
  23. }
  24. // update remote filer
  25. err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  26. ctx, cancel := context.WithCancel(context.Background())
  27. defer cancel()
  28. request := &filer_pb.AtomicRenameEntryRequest{
  29. OldDirectory: dir.FullPath(),
  30. OldName: req.OldName,
  31. NewDirectory: newDir.FullPath(),
  32. NewName: req.NewName,
  33. Signatures: []int32{dir.wfs.signature},
  34. }
  35. _, err := client.AtomicRenameEntry(ctx, request)
  36. if err != nil {
  37. glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
  38. return fuse.EXDEV
  39. }
  40. return nil
  41. })
  42. if err != nil {
  43. glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
  44. return fuse.EIO
  45. }
  46. err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName)
  47. if err != nil {
  48. glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err)
  49. return fuse.EIO
  50. }
  51. return nil
  52. }
  53. func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
  54. oldName := entry.Name()
  55. oldPath := oldParent.Child(oldName)
  56. newPath := newParent.Child(newName)
  57. if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
  58. oldFsNode := NodeWithId(oldPath.AsInode())
  59. newFsNode := NodeWithId(newPath.AsInode())
  60. newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode()))
  61. var newDir *Dir
  62. if found {
  63. newDir = newDirNode.(*Dir)
  64. }
  65. dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
  66. if file, ok := internalNode.(*File); ok {
  67. glog.V(4).Infof("internal file node %s", oldParent.Child(oldName))
  68. file.Name = newName
  69. file.id = uint64(newFsNode)
  70. if found {
  71. file.dir = newDir
  72. }
  73. }
  74. if dir, ok := internalNode.(*Dir); ok {
  75. glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName))
  76. dir.name = newName
  77. dir.id = uint64(newFsNode)
  78. if found {
  79. dir.parent = newDir
  80. }
  81. }
  82. })
  83. // change file handle
  84. inodeId := oldPath.AsInode()
  85. dir.wfs.handlesLock.Lock()
  86. if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle == nil {
  87. glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath)
  88. delete(dir.wfs.handles, inodeId)
  89. dir.wfs.handles[newPath.AsInode()] = existingHandle
  90. }
  91. dir.wfs.handlesLock.Unlock()
  92. if entry.IsDirectory() {
  93. if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil {
  94. return err
  95. }
  96. }
  97. return nil
  98. }); err != nil {
  99. return fmt.Errorf("fail to move %s => %s: %v", oldPath, newPath, err)
  100. }
  101. return nil
  102. }
  103. func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error {
  104. currentDirPath := oldParent.Child(oldName)
  105. newDirPath := newParent.Child(newName)
  106. glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath)
  107. var moveErr error
  108. listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool {
  109. moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
  110. if moveErr != nil {
  111. return false
  112. }
  113. return true
  114. })
  115. if listErr != nil {
  116. return listErr
  117. }
  118. if moveErr != nil {
  119. return moveErr
  120. }
  121. return nil
  122. }
  123. func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
  124. newPath := newParent.Child(newName)
  125. oldPath := oldParent.Child(entry.Name())
  126. entry.FullPath = newPath
  127. if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil {
  128. glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err)
  129. return fuse.EIO
  130. }
  131. if moveFolderSubEntries != nil {
  132. if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
  133. return moveChildrenErr
  134. }
  135. }
  136. if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil {
  137. glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
  138. return fuse.EIO
  139. }
  140. return nil
  141. }