diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go index 573dcf3e7..bb5a3604a 100644 --- a/weed/filer/filer_remote_storage.go +++ b/weed/filer/filer_remote_storage.go @@ -144,6 +144,23 @@ func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *fil return } +func RemoveRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) { + mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) + if unmarshalErr != nil { + return nil, unmarshalErr + } + + // set the new mapping + delete(mappings.Mappings, dir) + + if newContent, err = proto.Marshal(mappings); err != nil { + return oldContent, fmt.Errorf("marshal mappings: %v", err) + } + + return +} + + func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) { var oldContent []byte diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 12f918137..b34bcabb8 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -204,36 +204,39 @@ func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error { return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return DoMkdir(client, parentDirectoryPath, dirName, fn) + }) +} - entry := &Entry{ - Name: dirName, - IsDirectory: true, - Attributes: &FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(0777 | os.ModeDir), - Uid: OS_UID, - Gid: OS_GID, - }, - } +func DoMkdir(client SeaweedFilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error { + entry := &Entry{ + Name: dirName, + IsDirectory: true, + Attributes: &FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0777 | os.ModeDir), + Uid: OS_UID, + Gid: OS_GID, + }, + } - if fn != nil { - fn(entry) - } + if fn != nil { + fn(entry) + } - request := &CreateEntryRequest{ - Directory: parentDirectoryPath, - Entry: entry, - } + request := &CreateEntryRequest{ + Directory: parentDirectoryPath, + Entry: entry, + } - glog.V(1).Infof("mkdir: %v", request) - if err := CreateEntry(client, request); err != nil { - glog.V(0).Infof("mkdir %v: %v", request, err) - return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err) - } + glog.V(1).Infof("mkdir: %v", request) + if err := CreateEntry(client, request); err != nil { + glog.V(0).Infof("mkdir %v: %v", request, err) + return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err) + } - return nil - }) + return nil } func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error { @@ -273,31 +276,33 @@ func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error { return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { + return DoRemove(client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures) + }) +} - deleteEntryRequest := &DeleteEntryRequest{ - Directory: parentDirectoryPath, - Name: name, - IsDeleteData: isDeleteData, - IsRecursive: isRecursive, - IgnoreRecursiveError: ignoreRecursiveErr, - IsFromOtherCluster: isFromOtherCluster, - Signatures: signatures, +func DoRemove(client SeaweedFilerClient, parentDirectoryPath string, name string, isDeleteData bool, isRecursive bool, ignoreRecursiveErr bool, isFromOtherCluster bool, signatures []int32) error { + deleteEntryRequest := &DeleteEntryRequest{ + Directory: parentDirectoryPath, + Name: name, + IsDeleteData: isDeleteData, + IsRecursive: isRecursive, + IgnoreRecursiveError: ignoreRecursiveErr, + IsFromOtherCluster: isFromOtherCluster, + Signatures: signatures, + } + if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil { + if strings.Contains(err.Error(), ErrNotFound.Error()) { + return nil } - if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil { - if strings.Contains(err.Error(), ErrNotFound.Error()) { + return err + } else { + if resp.Error != "" { + if strings.Contains(resp.Error, ErrNotFound.Error()) { return nil } - return err - } else { - if resp.Error != "" { - if strings.Contains(resp.Error, ErrNotFound.Error()) { - return nil - } - return errors.New(resp.Error) - } + return errors.New(resp.Error) } + } - return nil - - }) + return nil } diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index 37b235a55..91f8de2e2 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "io" ) @@ -50,7 +51,8 @@ func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io } if *dir == "" { - return c.listExistingRemoteStorageMounts(commandEnv, writer) + _, err = listExistingRemoteStorageMounts(commandEnv, writer) + return err } remoteStorageLocation := remote_storage.ParseLocation(*remote) @@ -75,24 +77,29 @@ func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io return nil } -func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (err error) { +func listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (mappings *filer_pb.RemoteStorageMapping, err error) { // read current mapping - mappings, readErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) - if readErr != nil { - return readErr + mappings, err = filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if err != nil { + return mappings, err } + jsonPrintln(writer, mappings) + + return + +} + +func jsonPrintln(writer io.Writer, message proto.Message) error { m := jsonpb.Marshaler{ EmitDefaults: false, Indent: " ", } - err = m.Marshal(writer, mappings) + err := m.Marshal(writer, message) fmt.Fprintln(writer) - - return - + return err } func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) { diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go new file mode 100644 index 000000000..b16da44f1 --- /dev/null +++ b/weed/shell/command_remote_unmount.go @@ -0,0 +1,146 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "io" +) + +func init() { + Commands = append(Commands, &commandRemoteUnmount{}) +} + +type commandRemoteUnmount struct { +} + +func (c *commandRemoteUnmount) Name() string { + return "remote.unmount" +} + +func (c *commandRemoteUnmount) Help() string { + return `unmount remote storage + + # assume a remote storage is configured to name "s3_1" + remote.configure -name=s3_1 -type=s3 -access_key=xxx -secret_key=yyy + # mount and pull one bucket + remote.mount -dir=xxx -remote=s3_1/bucket + + # unmount the mounted directory and remove its cache + remote.unmount -dir=xxx + +` +} + +func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + + dir := remoteMountCommand.String("dir", "", "a directory in filer") + + if err = remoteMountCommand.Parse(args); err != nil { + return nil + } + + mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) + if listErr != nil { + return listErr + } + if *dir == "" { + return jsonPrintln(writer, mappings) + } + + _, found := mappings.Mappings[*dir] + if !found { + return fmt.Errorf("directory %s is not mounted", *dir) + } + + // purge mounted data + if err = c.purgeMountedData(commandEnv, *dir); err != nil { + return fmt.Errorf("purge mounted data: %v", err) + } + + // store a mount configuration in filer + if err = c.deleteMountMapping(commandEnv, *dir); err != nil { + return fmt.Errorf("delete mount mapping: %v", err) + } + + return nil +} + +func (c *commandRemoteUnmount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) { + + return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name) + +} + +func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error { + + // find existing directory, and ensure the directory is empty + err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + parent, name := util.FullPath(dir).DirAndName() + lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: parent, + Name: name, + }) + if lookupErr != nil { + return fmt.Errorf("lookup %s: %v", dir, lookupErr) + } + + oldEntry := lookupResp.Entry + + deleteError := filer_pb.DoRemove(client, parent, name, true, true, true, false, nil) + if deleteError != nil { + return fmt.Errorf("delete %s: %v", dir, deleteError) + } + + mkdirErr := filer_pb.DoMkdir(client, parent, name, func(entry *filer_pb.Entry) { + entry.Attributes = oldEntry.Attributes + entry.Extended = oldEntry.Extended + }) + if mkdirErr != nil { + return fmt.Errorf("mkdir %s: %v", dir, mkdirErr) + } + + return nil + }) + if err != nil { + return err + } + + return nil +} + +func (c *commandRemoteUnmount) deleteMountMapping(commandEnv *CommandEnv, dir string) (err error) { + + // read current mapping + var oldContent, newContent []byte + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + oldContent, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE) + return err + }) + if err != nil { + if err != filer_pb.ErrNotFound { + return fmt.Errorf("read existing mapping: %v", err) + } + } + + // add new mapping + newContent, err = filer.RemoveRemoteStorageMapping(oldContent, dir) + if err != nil { + return fmt.Errorf("delete mount %s: %v", dir, err) + } + + // save back + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE, newContent) + }) + if err != nil { + return fmt.Errorf("save mapping: %v", err) + } + + return nil +}