diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 80be0b88e..31383916e 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -240,6 +240,43 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr return nil } +func (f *Filer) RenameEntry(ctx context.Context, oldEntry, newEntry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool, maxFilenameLength uint32) error { + if string(newEntry.FullPath) == "/" { + return nil + } + + if newEntry.FullPath.IsLongerFileName(maxFilenameLength) { + return fmt.Errorf("entry name too long") + } + + currentEntry, _ := f.FindEntry(ctx, newEntry.FullPath) + if currentEntry != nil && o_excl { + glog.V(3).Infof("EEXIST: entry %s already exists", newEntry.FullPath) + return fmt.Errorf("EEXIST: entry %s already exists", newEntry.FullPath) + } + + if currentEntry == nil && !skipCreateParentDir { + dirParts := strings.Split(string(newEntry.FullPath), "/") + if err := f.ensureParentDirectoryEntry(ctx, newEntry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { + return err + } + } + + glog.V(4).Infof("renameEntry %s: old entry: %v", newEntry.FullPath, oldEntry.FullPath) + if err := f.renameEntry(ctx, oldEntry, newEntry, currentEntry != nil); err != nil { + glog.Errorf("rename entry %s: %v", newEntry.FullPath, err) + return fmt.Errorf("rename entry %s: %v", newEntry.FullPath, err) + } + + f.NotifyUpdateEvent(ctx, oldEntry, newEntry, true, isFromOtherCluster, signatures) + + f.deleteChunksIfNotNew(currentEntry, newEntry) + + glog.V(4).Infof("RenameEntry %s: renamed", newEntry.FullPath) + + return nil +} + func (f *Filer) ensureParentDirectoryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) { if level == 0 { @@ -313,6 +350,23 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er return f.Store.UpdateEntry(ctx, entry) } +func (f *Filer) renameEntry(ctx context.Context, oldEntry, newEntry *Entry, newEntryExists bool) error { + if oldEntry.IsDirectory() && !newEntry.IsDirectory() { + glog.Errorf("existing %s is a directory", oldEntry.FullPath) + return fmt.Errorf("existing %s is a directory", oldEntry.FullPath) + } + if !oldEntry.IsDirectory() && newEntry.IsDirectory() { + glog.Errorf("existing %s is a file", oldEntry.FullPath) + return fmt.Errorf("existing %s is a file", oldEntry.FullPath) + } + + newEntry.Attr.Crtime = oldEntry.Attr.Crtime + if newEntryExists { + return f.Store.UpdateEntry(ctx, newEntry) + } + return f.Store.InsertEntry(ctx, newEntry) +} + var ( Root = &Entry{ FullPath: "/", diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index db00dd496..172b93a60 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -173,8 +173,8 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee Remote: entry.Remote, Quota: entry.Quota, } - if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures, false, fs.filer.MaxFilenameLength); createErr != nil { - return createErr + if renameErr := fs.filer.RenameEntry(ctx, entry, newEntry, false, false, signatures, false, fs.filer.MaxFilenameLength); renameErr != nil { + return renameErr } if stream != nil { if err := stream.Send(&filer_pb.StreamRenameEntryResponse{