You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

230 lines
7.0 KiB

5 years ago
5 years ago
3 years ago
5 years ago
5 years ago
3 years ago
5 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "path/filepath"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. )
  12. func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.AtomicRenameEntryRequest) (*filer_pb.AtomicRenameEntryResponse, error) {
  13. glog.V(1).Infof("AtomicRenameEntry %v", req)
  14. oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
  15. newParent := util.FullPath(filepath.ToSlash(req.NewDirectory))
  16. if err := fs.filer.CanRename(oldParent, newParent); err != nil {
  17. return nil, err
  18. }
  19. ctx, err := fs.filer.BeginTransaction(ctx)
  20. if err != nil {
  21. return nil, err
  22. }
  23. oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName))
  24. if err != nil {
  25. fs.filer.RollbackTransaction(ctx)
  26. return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
  27. }
  28. moveErr := fs.moveEntry(ctx, nil, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
  29. if moveErr != nil {
  30. fs.filer.RollbackTransaction(ctx)
  31. return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
  32. } else {
  33. if commitError := fs.filer.CommitTransaction(ctx); commitError != nil {
  34. fs.filer.RollbackTransaction(ctx)
  35. return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError)
  36. }
  37. }
  38. return &filer_pb.AtomicRenameEntryResponse{}, nil
  39. }
  40. func (fs *FilerServer) StreamRenameEntry(req *filer_pb.StreamRenameEntryRequest, stream filer_pb.SeaweedFiler_StreamRenameEntryServer) (err error) {
  41. glog.V(1).Infof("StreamRenameEntry %v", req)
  42. oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
  43. newParent := util.FullPath(filepath.ToSlash(req.NewDirectory))
  44. if err := fs.filer.CanRename(oldParent, newParent); err != nil {
  45. return err
  46. }
  47. ctx := context.Background()
  48. ctx, err = fs.filer.BeginTransaction(ctx)
  49. if err != nil {
  50. return err
  51. }
  52. oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName))
  53. if err != nil {
  54. fs.filer.RollbackTransaction(ctx)
  55. return fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
  56. }
  57. if oldEntry.IsDirectory() {
  58. // follow https://pubs.opengroup.org/onlinepubs/000095399/functions/rename.html
  59. targetDir := newParent.Child(req.NewName)
  60. newEntry, err := fs.filer.FindEntry(ctx, targetDir)
  61. if err == nil {
  62. if !newEntry.IsDirectory() {
  63. fs.filer.RollbackTransaction(ctx)
  64. return fmt.Errorf("%s is not directory", targetDir)
  65. }
  66. if entries, _, _ := fs.filer.ListDirectoryEntries(context.Background(), targetDir, "", false, 1, "", "", ""); len(entries) > 0 {
  67. return fmt.Errorf("%s is not empty", targetDir)
  68. }
  69. }
  70. }
  71. moveErr := fs.moveEntry(ctx, stream, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
  72. if moveErr != nil {
  73. fs.filer.RollbackTransaction(ctx)
  74. return fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
  75. } else {
  76. if commitError := fs.filer.CommitTransaction(ctx); commitError != nil {
  77. fs.filer.RollbackTransaction(ctx)
  78. return fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError)
  79. }
  80. }
  81. return nil
  82. }
  83. func (fs *FilerServer) moveEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
  84. if err := fs.moveSelfEntry(ctx, stream, oldParent, entry, newParent, newName, func() error {
  85. if entry.IsDirectory() {
  86. if err := fs.moveFolderSubEntries(ctx, stream, oldParent, entry, newParent, newName, signatures); err != nil {
  87. return err
  88. }
  89. }
  90. return nil
  91. }, signatures); err != nil {
  92. return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(entry.Name()), newParent.Child(newName), err)
  93. }
  94. return nil
  95. }
  96. func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
  97. currentDirPath := oldParent.Child(entry.Name())
  98. newDirPath := newParent.Child(newName)
  99. glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath)
  100. lastFileName := ""
  101. includeLastFile := false
  102. for {
  103. entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "", "")
  104. if err != nil {
  105. return err
  106. }
  107. // println("found", len(entries), "entries under", currentDirPath)
  108. for _, item := range entries {
  109. lastFileName = item.Name()
  110. // println("processing", lastFileName)
  111. err := fs.moveEntry(ctx, stream, currentDirPath, item, newDirPath, item.Name(), signatures)
  112. if err != nil {
  113. return err
  114. }
  115. }
  116. if !hasMore {
  117. break
  118. }
  119. }
  120. return nil
  121. }
  122. func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error {
  123. oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
  124. glog.V(1).Infof("moving entry %s => %s", oldPath, newPath)
  125. if oldPath == newPath {
  126. glog.V(1).Infof("skip moving entry %s => %s", oldPath, newPath)
  127. return nil
  128. }
  129. // add to new directory
  130. newEntry := &filer.Entry{
  131. FullPath: newPath,
  132. Attr: entry.Attr,
  133. Chunks: entry.Chunks,
  134. Extended: entry.Extended,
  135. Content: entry.Content,
  136. HardLinkCounter: entry.HardLinkCounter,
  137. HardLinkId: entry.HardLinkId,
  138. Remote: entry.Remote,
  139. Quota: entry.Quota,
  140. }
  141. if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures, false); createErr != nil {
  142. return createErr
  143. }
  144. if stream != nil {
  145. if err := stream.Send(&filer_pb.StreamRenameEntryResponse{
  146. Directory: string(oldParent),
  147. EventNotification: &filer_pb.EventNotification{
  148. OldEntry: &filer_pb.Entry{
  149. Name: entry.Name(),
  150. },
  151. NewEntry: newEntry.ToProtoEntry(),
  152. DeleteChunks: false,
  153. NewParentPath: string(newParent),
  154. IsFromOtherCluster: false,
  155. Signatures: nil,
  156. },
  157. TsNs: time.Now().UnixNano(),
  158. }); err != nil {
  159. return err
  160. }
  161. }
  162. if moveFolderSubEntries != nil {
  163. if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
  164. return moveChildrenErr
  165. }
  166. }
  167. // delete old entry
  168. deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures)
  169. if deleteErr != nil {
  170. return deleteErr
  171. }
  172. if stream != nil {
  173. if err := stream.Send(&filer_pb.StreamRenameEntryResponse{
  174. Directory: string(oldParent),
  175. EventNotification: &filer_pb.EventNotification{
  176. OldEntry: &filer_pb.Entry{
  177. Name: entry.Name(),
  178. },
  179. NewEntry: nil,
  180. DeleteChunks: false,
  181. NewParentPath: "",
  182. IsFromOtherCluster: false,
  183. Signatures: nil,
  184. },
  185. TsNs: time.Now().UnixNano(),
  186. }); err != nil {
  187. return err
  188. }
  189. }
  190. return nil
  191. }