You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

137 lines
4.1 KiB

5 years ago
5 years ago
4 years ago
3 years ago
  1. package filesys
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "github.com/seaweedfs/fuse"
  9. "github.com/seaweedfs/fuse/fs"
  10. "io"
  11. "strings"
  12. )
  13. func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
  14. newDir := newDirectory.(*Dir)
  15. newPath := util.NewFullPath(newDir.FullPath(), req.NewName)
  16. oldPath := util.NewFullPath(dir.FullPath(), req.OldName)
  17. glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
  18. // update remote filer
  19. err := dir.wfs.WithFilerClient(true, func(client filer_pb.SeaweedFilerClient) error {
  20. ctx, cancel := context.WithCancel(context.Background())
  21. defer cancel()
  22. request := &filer_pb.StreamRenameEntryRequest{
  23. OldDirectory: dir.FullPath(),
  24. OldName: req.OldName,
  25. NewDirectory: newDir.FullPath(),
  26. NewName: req.NewName,
  27. Signatures: []int32{dir.wfs.signature},
  28. }
  29. stream, err := client.StreamRenameEntry(ctx, request)
  30. if err != nil {
  31. glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
  32. return fuse.EIO
  33. }
  34. for {
  35. resp, recvErr := stream.Recv()
  36. if recvErr != nil {
  37. if recvErr == io.EOF {
  38. break
  39. } else {
  40. glog.V(0).Infof("dir Rename %s => %s receive: %v", oldPath, newPath, recvErr)
  41. if strings.Contains(recvErr.Error(), "not empty") {
  42. return fuse.EEXIST
  43. }
  44. if strings.Contains(recvErr.Error(), "not directory") {
  45. return fuse.ENOTDIR
  46. }
  47. return recvErr
  48. }
  49. }
  50. if err = dir.handleRenameResponse(ctx, resp); err != nil {
  51. glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
  52. return err
  53. }
  54. }
  55. return nil
  56. })
  57. return err
  58. }
  59. func (dir *Dir) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamRenameEntryResponse) error {
  60. // comes from filer StreamRenameEntry, can only be create or delete entry
  61. if resp.EventNotification.NewEntry != nil {
  62. // with new entry, the old entry name also exists. This is the first step to create new entry
  63. newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry)
  64. if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil {
  65. return err
  66. }
  67. oldParent, newParent := util.FullPath(resp.Directory), util.FullPath(resp.EventNotification.NewParentPath)
  68. oldName, newName := resp.EventNotification.OldEntry.Name, resp.EventNotification.NewEntry.Name
  69. oldPath := oldParent.Child(oldName)
  70. newPath := newParent.Child(newName)
  71. oldFsNode := NodeWithId(oldPath.AsInode())
  72. newFsNode := NodeWithId(newPath.AsInode())
  73. newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode()))
  74. var newDir *Dir
  75. if found {
  76. newDir = newDirNode.(*Dir)
  77. }
  78. dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
  79. if file, ok := internalNode.(*File); ok {
  80. glog.V(4).Infof("internal file node %s", oldParent.Child(oldName))
  81. file.Name = newName
  82. file.id = uint64(newFsNode)
  83. if found {
  84. file.dir = newDir
  85. }
  86. }
  87. if dir, ok := internalNode.(*Dir); ok {
  88. glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName))
  89. dir.name = newName
  90. dir.id = uint64(newFsNode)
  91. if found {
  92. dir.parent = newDir
  93. }
  94. }
  95. })
  96. // change file handle
  97. inodeId := oldPath.AsInode()
  98. dir.wfs.handlesLock.Lock()
  99. if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle != nil {
  100. glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath)
  101. delete(dir.wfs.handles, inodeId)
  102. existingHandle.handle = newPath.AsInode()
  103. existingHandle.f.entry.Name = newName
  104. existingHandle.f.id = newPath.AsInode()
  105. dir.wfs.handles[newPath.AsInode()] = existingHandle
  106. }
  107. dir.wfs.handlesLock.Unlock()
  108. } else if resp.EventNotification.OldEntry != nil {
  109. // without new entry, only old entry name exists. This is the second step to delete old entry
  110. if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil {
  111. return err
  112. }
  113. }
  114. return nil
  115. }