|
|
@ -4,6 +4,7 @@ import ( |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"path/filepath" |
|
|
|
"time" |
|
|
|
|
|
|
|
"github.com/chrislusf/seaweedfs/weed/filer" |
|
|
|
"github.com/chrislusf/seaweedfs/weed/glog" |
|
|
@ -33,7 +34,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom |
|
|
|
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) |
|
|
|
} |
|
|
|
|
|
|
|
moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, req.Signatures) |
|
|
|
moveErr := fs.moveEntry(ctx, nil, oldParent, oldEntry, newParent, req.NewName, req.Signatures) |
|
|
|
if moveErr != nil { |
|
|
|
fs.filer.RollbackTransaction(ctx) |
|
|
|
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) |
|
|
@ -47,11 +48,49 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom |
|
|
|
return &filer_pb.AtomicRenameEntryResponse{}, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { |
|
|
|
func (fs *FilerServer) StreamRenameEntry(req *filer_pb.StreamRenameEntryRequest, stream filer_pb.SeaweedFiler_StreamRenameEntryServer) (err error) { |
|
|
|
|
|
|
|
if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { |
|
|
|
glog.V(1).Infof("StreamRenameEntry %v", req) |
|
|
|
|
|
|
|
oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory)) |
|
|
|
newParent := util.FullPath(filepath.ToSlash(req.NewDirectory)) |
|
|
|
|
|
|
|
if err := fs.filer.CanRename(oldParent, newParent); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
ctx := context.Background() |
|
|
|
|
|
|
|
ctx, err = fs.filer.BeginTransaction(ctx) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName)) |
|
|
|
if err != nil { |
|
|
|
fs.filer.RollbackTransaction(ctx) |
|
|
|
return fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) |
|
|
|
} |
|
|
|
|
|
|
|
moveErr := fs.moveEntry(ctx, stream, oldParent, oldEntry, newParent, req.NewName, req.Signatures) |
|
|
|
if moveErr != nil { |
|
|
|
fs.filer.RollbackTransaction(ctx) |
|
|
|
return fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) |
|
|
|
} else { |
|
|
|
if commitError := fs.filer.CommitTransaction(ctx); commitError != nil { |
|
|
|
fs.filer.RollbackTransaction(ctx) |
|
|
|
return fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerServer) moveEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { |
|
|
|
|
|
|
|
if err := fs.moveSelfEntry(ctx, stream, oldParent, entry, newParent, newName, func() error { |
|
|
|
if entry.IsDirectory() { |
|
|
|
if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, signatures); err != nil { |
|
|
|
if err := fs.moveFolderSubEntries(ctx, stream, oldParent, entry, newParent, newName, signatures); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
@ -63,7 +102,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { |
|
|
|
func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { |
|
|
|
|
|
|
|
currentDirPath := oldParent.Child(entry.Name()) |
|
|
|
newDirPath := newParent.Child(newName) |
|
|
@ -84,7 +123,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. |
|
|
|
for _, item := range entries { |
|
|
|
lastFileName = item.Name() |
|
|
|
// println("processing", lastFileName)
|
|
|
|
err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), signatures) |
|
|
|
err := fs.moveEntry(ctx, stream, currentDirPath, item, newDirPath, item.Name(), signatures) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
@ -96,7 +135,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error { |
|
|
|
func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error { |
|
|
|
|
|
|
|
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) |
|
|
|
|
|
|
@ -118,6 +157,24 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat |
|
|
|
if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil { |
|
|
|
return createErr |
|
|
|
} |
|
|
|
if stream != nil { |
|
|
|
if err := stream.Send(&filer_pb.StreamRenameEntryResponse{ |
|
|
|
Directory: string(newParent), |
|
|
|
EventNotification: &filer_pb.EventNotification{ |
|
|
|
OldEntry: &filer_pb.Entry{ |
|
|
|
Name: entry.Name(), |
|
|
|
}, |
|
|
|
NewEntry: newEntry.ToProtoEntry(), |
|
|
|
DeleteChunks: false, |
|
|
|
NewParentPath: string(newParent), |
|
|
|
IsFromOtherCluster: false, |
|
|
|
Signatures: nil, |
|
|
|
}, |
|
|
|
TsNs: time.Now().UnixNano(), |
|
|
|
}); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if moveFolderSubEntries != nil { |
|
|
|
if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { |
|
|
@ -130,6 +187,24 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat |
|
|
|
if deleteErr != nil { |
|
|
|
return deleteErr |
|
|
|
} |
|
|
|
if stream != nil { |
|
|
|
if err := stream.Send(&filer_pb.StreamRenameEntryResponse{ |
|
|
|
Directory: string(oldParent), |
|
|
|
EventNotification: &filer_pb.EventNotification{ |
|
|
|
OldEntry: &filer_pb.Entry{ |
|
|
|
Name: entry.Name(), |
|
|
|
}, |
|
|
|
NewEntry: nil, |
|
|
|
DeleteChunks: false, |
|
|
|
NewParentPath: "", |
|
|
|
IsFromOtherCluster: false, |
|
|
|
Signatures: nil, |
|
|
|
}, |
|
|
|
TsNs: time.Now().UnixNano(), |
|
|
|
}); err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
|
|
|
|