Browse Source

check cross device rename error

pull/1681/head
Chris Lu 4 years ago
parent
commit
3fedfec1e7
  1. 2
      go.mod
  2. 2
      go.sum
  3. 30
      weed/filer/filer_rename.go
  4. 2
      weed/filesys/dir_rename.go
  5. 11
      weed/server/filer_grpc_server_rename.go
  6. 9
      weed/server/filer_server_handlers_write.go

2
go.mod

@ -58,7 +58,7 @@ require (
github.com/prometheus/client_golang v1.3.0
github.com/rakyll/statik v0.1.7
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
github.com/seaweedfs/fuse v1.0.7
github.com/seaweedfs/fuse v1.0.8
github.com/seaweedfs/goexif v1.0.2
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/spaolacci/murmur3 v1.1.0 // indirect

2
go.sum

@ -549,6 +549,8 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seaweedfs/fuse v1.0.7 h1:tESMXhI3gXzN+dlWsCUrkIZDiWA4dZX18rQMoqmvazw=
github.com/seaweedfs/fuse v1.0.7/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8=
github.com/seaweedfs/fuse v1.0.8 h1:HBPJTC77OlxwSd2JiTwvLPn8bWTElqQp3xs9vf3C15s=
github.com/seaweedfs/fuse v1.0.8/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8=
github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E=
github.com/seaweedfs/goexif v1.0.2/go.mod h1:MrKs5LK0HXdffrdCZrW3OIMegL2xXpC6ThLyXMyjdrk=
github.com/secsy/goftp v0.0.0-20190720192957-f31499d7c79a h1:C6IhVTxNkhlb0tlCB6JfHOUv1f0xHPK7V8X4HlJZEJw=

30
weed/filer/filer_rename.go

@ -0,0 +1,30 @@
package filer
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/util"
"strings"
)
func (f *Filer) CanRename(source, target util.FullPath) error {
sourceBucket := f.DetectBucket(source)
targetBucket := f.DetectBucket(target)
if sourceBucket != targetBucket {
return fmt.Errorf("can not move across collection %s => %s", sourceBucket, targetBucket)
}
return nil
}
func (f *Filer) DetectBucket(source util.FullPath) (bucket string) {
if strings.HasPrefix(string(source), f.DirBucketsPath+"/") {
bucketAndObjectKey := string(source)[len(f.DirBucketsPath)+1:]
t := strings.Index(bucketAndObjectKey, "/")
if t < 0 {
bucket = bucketAndObjectKey
}
if t > 0 {
bucket = bucketAndObjectKey[:t]
}
}
return bucket
}

2
weed/filesys/dir_rename.go

@ -42,7 +42,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
_, err := client.AtomicRenameEntry(ctx, request)
if err != nil {
glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
return fuse.EXDEV
}
return nil

11
weed/server/filer_grpc_server_rename.go

@ -15,13 +15,18 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
glog.V(1).Infof("AtomicRenameEntry %v", req)
oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
newParent := util.FullPath(filepath.ToSlash(req.NewDirectory))
if err := fs.filer.CanRename(oldParent, newParent); err != nil {
return nil, err
}
ctx, err := fs.filer.BeginTransaction(ctx)
if err != nil {
return nil, err
}
oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName))
if err != nil {
fs.filer.RollbackTransaction(ctx)
@ -29,7 +34,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
}
var events MoveEvents
moveErr := fs.moveEntry(ctx, oldParent, oldEntry, util.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events)
moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, &events)
if moveErr != nil {
fs.filer.RollbackTransaction(ctx)
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)

9
weed/server/filer_server_handlers_write.go

@ -111,14 +111,7 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
// required by buckets folder
bucketDefaultReplication, fsync := "", false
if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
t := strings.Index(bucketAndObjectKey, "/")
if t < 0 {
collection = bucketAndObjectKey
}
if t > 0 {
collection = bucketAndObjectKey[:t]
}
collection = fs.filer.DetectBucket(util.FullPath(requestURI))
bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection)
}
if replication == "" {

Loading…
Cancel
Save