Browse Source
Merge pull request #2241 from chrislusf/add_remote_storage
Merge pull request #2241 from chrislusf/add_remote_storage
WIP: remote storagepull/2252/head
Chris Lu
3 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 2814 additions and 1260 deletions
-
29other/java/client/src/main/proto/filer.proto
-
35unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
-
4weed/Makefile
-
1weed/command/command.go
-
1weed/command/filer.go
-
51weed/command/filer_backup.go
-
48weed/command/filer_meta_backup.go
-
36weed/command/filer_meta_tail.go
-
260weed/command/filer_remote_sync.go
-
6weed/command/filer_replication.go
-
49weed/command/filer_sync.go
-
31weed/command/imports.go
-
1weed/command/shell.go
-
6weed/filer/entry.go
-
2weed/filer/filer.go
-
14weed/filer/filer_on_meta_event.go
-
182weed/filer/filer_remote_storage.go
-
34weed/filer/filer_remote_storage_test.go
-
5weed/filer/filer_search.go
-
29weed/filer/read_remote.go
-
32weed/filer/stream.go
-
41weed/filesys/meta_cache/meta_cache_subscribe.go
-
29weed/pb/filer.proto
-
1809weed/pb/filer_pb/filer.pb.go
-
4weed/pb/filer_pb/filer_pb_helper.go
-
94weed/pb/filer_pb_tail.go
-
24weed/pb/volume_server.proto
-
536weed/pb/volume_server_pb/volume_server.pb.go
-
75weed/remote_storage/remote_storage.go
-
215weed/remote_storage/s3/s3_storage_client.go
-
2weed/replication/sink/filersink/filer_sink.go
-
41weed/s3api/auth_credentials_subscribe.go
-
2weed/server/filer_server.go
-
17weed/server/filer_server_handlers_read.go
-
1weed/server/master_server.go
-
49weed/server/volume_grpc_remote.go
-
19weed/shell/command_remote_configure.go
-
232weed/shell/command_remote_mount.go
-
13weed/shell/command_volume_balance_test.go
-
2weed/shell/command_volume_list_test.go
-
7weed/shell/commands.go
-
6weed/topology/store_replicate.go
@ -0,0 +1,260 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/replication/source" |
|||
"github.com/chrislusf/seaweedfs/weed/security" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"google.golang.org/grpc" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
type RemoteSyncOptions struct { |
|||
filerAddress *string |
|||
grpcDialOption grpc.DialOption |
|||
readChunkFromFiler *bool |
|||
debug *bool |
|||
timeAgo *time.Duration |
|||
dir *string |
|||
} |
|||
|
|||
const ( |
|||
RemoteSyncKeyPrefix = "remote.sync." |
|||
) |
|||
|
|||
var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) |
|||
|
|||
func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { |
|||
return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|||
return fn(client) |
|||
}) |
|||
} |
|||
func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string { |
|||
return location.Url |
|||
} |
|||
|
|||
var ( |
|||
remoteSyncOptions RemoteSyncOptions |
|||
) |
|||
|
|||
func init() { |
|||
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
|
|||
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") |
|||
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer") |
|||
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") |
|||
remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files") |
|||
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") |
|||
} |
|||
|
|||
var cmdFilerRemoteSynchronize = &Command{ |
|||
UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud", |
|||
Short: "resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage", |
|||
Long: `resumeable continuously write back updates to remote storage if the directory is mounted to the remote storage |
|||
|
|||
filer.remote.sync listens on filer update events. |
|||
If any mounted remote file is updated, it will fetch the updated content, |
|||
and write to the remote storage. |
|||
`, |
|||
} |
|||
|
|||
func runFilerRemoteSynchronize(cmd *Command, args []string) bool { |
|||
|
|||
util.LoadConfiguration("security", false) |
|||
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") |
|||
remoteSyncOptions.grpcDialOption = grpcDialOption |
|||
|
|||
// read filer remote storage mount mappings
|
|||
mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress) |
|||
if readErr != nil { |
|||
fmt.Printf("read mount mapping: %v", readErr) |
|||
return false |
|||
} |
|||
|
|||
filerSource := &source.FilerSource{} |
|||
filerSource.DoInitialize( |
|||
*remoteSyncOptions.filerAddress, |
|||
pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress), |
|||
"/", // does not matter
|
|||
*remoteSyncOptions.readChunkFromFiler, |
|||
) |
|||
|
|||
var found bool |
|||
for dir, remoteStorageMountLocation := range mappings.Mappings { |
|||
if *remoteSyncOptions.dir == dir { |
|||
found = true |
|||
storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name) |
|||
if readErr != nil { |
|||
fmt.Printf("read remote storage configuration for %s: %v", dir, readErr) |
|||
continue |
|||
} |
|||
fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir) |
|||
if err := util.Retry("filer.remote.sync "+dir, func() error { |
|||
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation) |
|||
}); err != nil { |
|||
fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err) |
|||
} |
|||
break |
|||
} |
|||
} |
|||
if !found { |
|||
fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir) |
|||
return false |
|||
} |
|||
|
|||
return true |
|||
} |
|||
|
|||
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error { |
|||
|
|||
dirHash := util.HashStringToLong(mountedDir) |
|||
|
|||
// 1. specified by timeAgo
|
|||
// 2. last offset timestamp for this directory
|
|||
// 3. directory creation time
|
|||
var lastOffsetTs time.Time |
|||
if *option.timeAgo == 0 { |
|||
mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) |
|||
if err != nil { |
|||
return fmt.Errorf("lookup %s: %v", mountedDir, err) |
|||
} |
|||
|
|||
lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash)) |
|||
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { |
|||
lastOffsetTs = time.Unix(0, lastOffsetTsNs) |
|||
glog.V(0).Infof("resume from %v", lastOffsetTs) |
|||
} else { |
|||
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) |
|||
} |
|||
} else { |
|||
lastOffsetTs = time.Now().Add(-*option.timeAgo) |
|||
} |
|||
|
|||
client, err := remote_storage.GetRemoteStorage(remoteStorage) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
message := resp.EventNotification |
|||
if message.OldEntry == nil && message.NewEntry == nil { |
|||
return nil |
|||
} |
|||
if message.OldEntry == nil && message.NewEntry != nil { |
|||
if len(message.NewEntry.Chunks) == 0 { |
|||
return nil |
|||
} |
|||
fmt.Printf("create: %+v\n", resp) |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
fmt.Printf("skipping creating: %+v\n", resp) |
|||
return nil |
|||
} |
|||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) |
|||
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) |
|||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry == nil { |
|||
fmt.Printf("delete: %+v\n", resp) |
|||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) |
|||
return client.DeleteFile(dest) |
|||
} |
|||
if message.OldEntry != nil && message.NewEntry != nil { |
|||
oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) |
|||
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) |
|||
if !shouldSendToRemote(message.NewEntry) { |
|||
fmt.Printf("skipping updating: %+v\n", resp) |
|||
return nil |
|||
} |
|||
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { |
|||
if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) { |
|||
fmt.Printf("update meta: %+v\n", resp) |
|||
return client.UpdateFileMetadata(dest, message.NewEntry) |
|||
} |
|||
} |
|||
fmt.Printf("update: %+v\n", resp) |
|||
if err := client.DeleteFile(oldDest); err != nil { |
|||
return err |
|||
} |
|||
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) |
|||
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) |
|||
if writeErr != nil { |
|||
return writeErr |
|||
} |
|||
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { |
|||
lastTime := time.Unix(0, lastTsNs) |
|||
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) |
|||
return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs) |
|||
}) |
|||
|
|||
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, |
|||
"filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) |
|||
} |
|||
|
|||
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation { |
|||
var dest string |
|||
source := string(sourcePath[len(mountDir):]) |
|||
if strings.HasSuffix(remoteMountLocation.Path, "/") { |
|||
dest = remoteMountLocation.Path + source[1:] |
|||
} else { |
|||
dest = remoteMountLocation.Path + source |
|||
} |
|||
return &filer_pb.RemoteStorageLocation{ |
|||
Name: remoteMountLocation.Name, |
|||
Bucket: remoteMountLocation.Bucket, |
|||
Path: dest, |
|||
} |
|||
} |
|||
|
|||
func isSameChunks(a, b []*filer_pb.FileChunk) bool { |
|||
if len(a) != len(b) { |
|||
return false |
|||
} |
|||
for i := 0; i < len(a); i++ { |
|||
x, y := a[i], b[i] |
|||
if !proto.Equal(x, y) { |
|||
return false |
|||
} |
|||
} |
|||
return true |
|||
} |
|||
|
|||
func shouldSendToRemote(entry *filer_pb.Entry) bool { |
|||
if entry.RemoteEntry == nil { |
|||
return true |
|||
} |
|||
if entry.RemoteEntry.Size != int64(filer.FileSize(entry)) { |
|||
return true |
|||
} |
|||
if entry.RemoteEntry.LastModifiedAt < entry.Attributes.Mtime { |
|||
return true |
|||
} |
|||
return false |
|||
} |
|||
|
|||
func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { |
|||
entry.RemoteEntry = remoteEntry |
|||
return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ |
|||
Directory: dir, |
|||
Entry: entry, |
|||
}) |
|||
return err |
|||
}) |
|||
} |
@ -0,0 +1,31 @@ |
|||
package command |
|||
|
|||
import ( |
|||
_ "net/http/pprof" |
|||
|
|||
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" |
|||
|
|||
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" |
|||
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" |
|||
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" |
|||
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" |
|||
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" |
|||
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" |
|||
|
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/redis" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2" |
|||
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" |
|||
|
|||
) |
@ -0,0 +1,182 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"google.golang.org/grpc" |
|||
"math" |
|||
"strings" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/viant/ptrie" |
|||
) |
|||
|
|||
const REMOTE_STORAGE_CONF_SUFFIX = ".conf" |
|||
const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping" |
|||
|
|||
type FilerRemoteStorage struct { |
|||
rules ptrie.Trie |
|||
storageNameToConf map[string]*filer_pb.RemoteConf |
|||
} |
|||
|
|||
func NewFilerRemoteStorage() (rs *FilerRemoteStorage) { |
|||
rs = &FilerRemoteStorage{ |
|||
rules: ptrie.New(), |
|||
storageNameToConf: make(map[string]*filer_pb.RemoteConf), |
|||
} |
|||
return rs |
|||
} |
|||
|
|||
func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) { |
|||
// execute this on filer
|
|||
|
|||
entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "") |
|||
if err != nil { |
|||
if err == filer_pb.ErrNotFound { |
|||
return nil |
|||
} |
|||
glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err) |
|||
return |
|||
} |
|||
|
|||
for _, entry := range entries { |
|||
if entry.Name() == REMOTE_STORAGE_MOUNT_FILE { |
|||
if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil { |
|||
return err |
|||
} |
|||
continue |
|||
} |
|||
if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) { |
|||
return nil |
|||
} |
|||
conf := &filer_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(entry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err) |
|||
} |
|||
rs.storageNameToConf[conf.Name] = conf |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) { |
|||
mappings := &filer_pb.RemoteStorageMapping{} |
|||
if err := proto.Unmarshal(data, mappings); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err) |
|||
} |
|||
for dir, storageLocation := range mappings.Mappings { |
|||
rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *filer_pb.RemoteStorageLocation) { |
|||
rs.rules.Put([]byte(dir+"/"), loc) |
|||
} |
|||
|
|||
func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *filer_pb.RemoteStorageLocation) { |
|||
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { |
|||
mountDir = util.FullPath(string(key[:len(key)-1])) |
|||
remoteLocation = value.(*filer_pb.RemoteStorageLocation) |
|||
return true |
|||
}) |
|||
return |
|||
} |
|||
|
|||
func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { |
|||
var storageLocation *filer_pb.RemoteStorageLocation |
|||
rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { |
|||
storageLocation = value.(*filer_pb.RemoteStorageLocation) |
|||
return true |
|||
}) |
|||
|
|||
if storageLocation == nil { |
|||
found = false |
|||
return |
|||
} |
|||
|
|||
return rs.GetRemoteStorageClient(storageLocation.Name) |
|||
} |
|||
|
|||
func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { |
|||
remoteConf, found = rs.storageNameToConf[storageName] |
|||
if !found { |
|||
return |
|||
} |
|||
|
|||
var err error |
|||
if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil { |
|||
found = true |
|||
return |
|||
} |
|||
return |
|||
} |
|||
|
|||
func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.RemoteStorageMapping, err error) { |
|||
mappings = &filer_pb.RemoteStorageMapping{ |
|||
Mappings: make(map[string]*filer_pb.RemoteStorageLocation), |
|||
} |
|||
if len(oldContent) > 0 { |
|||
if err = proto.Unmarshal(oldContent, mappings); err != nil { |
|||
glog.Warningf("unmarshal existing mappings: %v", err) |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) { |
|||
mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) |
|||
if unmarshalErr != nil { |
|||
// skip
|
|||
} |
|||
|
|||
// set the new mapping
|
|||
mappings.Mappings[dir] = storageLocation |
|||
|
|||
if newContent, err = proto.Marshal(mappings); err != nil { |
|||
return oldContent, fmt.Errorf("marshal mappings: %v", err) |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
|
|||
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) { |
|||
var oldContent []byte |
|||
if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|||
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) |
|||
return readErr |
|||
}); readErr != nil { |
|||
return nil, readErr |
|||
} |
|||
|
|||
mappings, readErr = UnmarshalRemoteStorageMappings(oldContent) |
|||
if readErr != nil { |
|||
return nil, fmt.Errorf("unmarshal mappings: %v", readErr) |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) { |
|||
var oldContent []byte |
|||
if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
|||
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) |
|||
return readErr |
|||
}); readErr != nil { |
|||
return nil, readErr |
|||
} |
|||
|
|||
// unmarshal storage configuration
|
|||
conf = &filer_pb.RemoteConf{} |
|||
if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil { |
|||
readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) |
|||
return |
|||
} |
|||
|
|||
return |
|||
} |
@ -0,0 +1,34 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/stretchr/testify/assert" |
|||
"testing" |
|||
) |
|||
|
|||
func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) { |
|||
conf := &filer_pb.RemoteConf{ |
|||
Name: "s7", |
|||
Type: "s3", |
|||
} |
|||
rs := NewFilerRemoteStorage() |
|||
rs.storageNameToConf[conf.Name] = conf |
|||
|
|||
rs.mapDirectoryToRemoteStorage("/a/b/c", &filer_pb.RemoteStorageLocation{ |
|||
Name: "s7", |
|||
Bucket: "some", |
|||
Path: "/dir", |
|||
}) |
|||
|
|||
_, _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f") |
|||
assert.Equal(t, true, found, "find storage client") |
|||
|
|||
_, _, found2 := rs.FindRemoteStorageClient("/a/b") |
|||
assert.Equal(t, false, found2, "should not find storage client") |
|||
|
|||
_, _, found3 := rs.FindRemoteStorageClient("/a/b/c") |
|||
assert.Equal(t, false, found3, "should not find storage client") |
|||
|
|||
_, _, found4 := rs.FindRemoteStorageClient("/a/b/cc") |
|||
assert.Equal(t, false, found4, "should not find storage client") |
|||
} |
@ -0,0 +1,29 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
) |
|||
|
|||
func (entry *Entry) IsInRemoteOnly() bool { |
|||
return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.Size > 0 |
|||
} |
|||
|
|||
func (f *Filer) ReadRemote(entry *Entry, offset int64, size int64) (data[]byte, err error) { |
|||
client, _, found := f.RemoteStorage.GetRemoteStorageClient(entry.Remote.StorageName) |
|||
if !found { |
|||
return nil, fmt.Errorf("remote storage %v not found", entry.Remote.StorageName) |
|||
} |
|||
|
|||
mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath) |
|||
|
|||
remoteFullPath := remoteLoation.Path + string(entry.FullPath[len(mountDir):]) |
|||
|
|||
sourceLoc := &filer_pb.RemoteStorageLocation{ |
|||
Name: remoteLoation.Name, |
|||
Bucket: remoteLoation.Bucket, |
|||
Path: remoteFullPath, |
|||
} |
|||
|
|||
return client.ReadFile(sourceLoc, offset, size) |
|||
} |
1809
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,94 @@ |
|||
package pb |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"google.golang.org/grpc" |
|||
"io" |
|||
"time" |
|||
) |
|||
|
|||
type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error |
|||
|
|||
func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption, |
|||
clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, |
|||
processEventFn ProcessMetadataFunc, fatalOnError bool) error { |
|||
|
|||
err := WithFilerClient(filerAddress, grpcDialOption, makeFunc( |
|||
clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError)) |
|||
if err != nil { |
|||
return fmt.Errorf("subscribing filer meta change: %v", err) |
|||
} |
|||
return err |
|||
} |
|||
|
|||
func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, |
|||
clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, |
|||
processEventFn ProcessMetadataFunc, fatalOnError bool) error { |
|||
|
|||
err := filerClient.WithFilerClient(makeFunc( |
|||
clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError)) |
|||
if err != nil { |
|||
return fmt.Errorf("subscribing filer meta change: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, |
|||
processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error { |
|||
return func(client filer_pb.SeaweedFilerClient) error { |
|||
ctx, cancel := context.WithCancel(context.Background()) |
|||
defer cancel() |
|||
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ |
|||
ClientName: clientName, |
|||
PathPrefix: pathPrefix, |
|||
SinceNs: lastTsNs, |
|||
Signature: selfSignature, |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("subscribe: %v", err) |
|||
} |
|||
|
|||
for { |
|||
resp, listenErr := stream.Recv() |
|||
if listenErr == io.EOF { |
|||
return nil |
|||
} |
|||
if listenErr != nil { |
|||
return listenErr |
|||
} |
|||
|
|||
if err := processEventFn(resp); err != nil { |
|||
if fatalOnError { |
|||
glog.Fatalf("process %v: %v", resp, err) |
|||
} else { |
|||
glog.Errorf("process %v: %v", resp, err) |
|||
} |
|||
} |
|||
lastTsNs = resp.TsNs |
|||
} |
|||
} |
|||
} |
|||
|
|||
func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc { |
|||
var counter int64 |
|||
var lastWriteTime time.Time |
|||
return func(resp *filer_pb.SubscribeMetadataResponse) error { |
|||
if err := processEventFn(resp); err != nil { |
|||
return err |
|||
} |
|||
counter++ |
|||
if lastWriteTime.Add(offsetInterval).Before(time.Now()) { |
|||
counter = 0 |
|||
lastWriteTime = time.Now() |
|||
if err := offsetFunc(counter, resp.TsNs); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
} |
@ -0,0 +1,75 @@ |
|||
package remote_storage |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"io" |
|||
"strings" |
|||
"sync" |
|||
) |
|||
|
|||
func ParseLocation(remote string) (loc *filer_pb.RemoteStorageLocation) { |
|||
loc = &filer_pb.RemoteStorageLocation{} |
|||
if strings.HasSuffix(string(remote), "/") { |
|||
remote = remote[:len(remote)-1] |
|||
} |
|||
parts := strings.SplitN(string(remote), "/", 3) |
|||
if len(parts) >= 1 { |
|||
loc.Name = parts[0] |
|||
} |
|||
if len(parts) >= 2 { |
|||
loc.Bucket = parts[1] |
|||
} |
|||
loc.Path = string(remote[len(loc.Name)+1+len(loc.Bucket):]) |
|||
if loc.Path == "" { |
|||
loc.Path = "/" |
|||
} |
|||
return |
|||
} |
|||
|
|||
type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error |
|||
|
|||
type RemoteStorageClient interface { |
|||
Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error |
|||
ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) |
|||
WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) |
|||
UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) |
|||
DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) |
|||
} |
|||
|
|||
type RemoteStorageClientMaker interface { |
|||
Make(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) |
|||
} |
|||
|
|||
var ( |
|||
RemoteStorageClientMakers = make(map[string]RemoteStorageClientMaker) |
|||
remoteStorageClients = make(map[string]RemoteStorageClient) |
|||
remoteStorageClientsLock sync.Mutex |
|||
) |
|||
|
|||
func makeRemoteStorageClient(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) { |
|||
maker, found := RemoteStorageClientMakers[remoteConf.Type] |
|||
if !found { |
|||
return nil, fmt.Errorf("remote storage type %s not found", remoteConf.Type) |
|||
} |
|||
return maker.Make(remoteConf) |
|||
} |
|||
|
|||
func GetRemoteStorage(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) { |
|||
remoteStorageClientsLock.Lock() |
|||
defer remoteStorageClientsLock.Unlock() |
|||
|
|||
existingRemoteStorageClient, found := remoteStorageClients[remoteConf.Name] |
|||
if found { |
|||
return existingRemoteStorageClient, nil |
|||
} |
|||
|
|||
newRemoteStorageClient, err := makeRemoteStorageClient(remoteConf) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("make remote storage client %s: %v", remoteConf.Name, err) |
|||
} |
|||
|
|||
remoteStorageClients[remoteConf.Name] = newRemoteStorageClient |
|||
|
|||
return newRemoteStorageClient, nil |
|||
} |
@ -0,0 +1,215 @@ |
|||
package s3 |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/aws/aws-sdk-go/aws" |
|||
"github.com/aws/aws-sdk-go/aws/credentials" |
|||
"github.com/aws/aws-sdk-go/aws/session" |
|||
"github.com/aws/aws-sdk-go/service/s3" |
|||
"github.com/aws/aws-sdk-go/service/s3/s3iface" |
|||
"github.com/aws/aws-sdk-go/service/s3/s3manager" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"io" |
|||
) |
|||
|
|||
func init() { |
|||
remote_storage.RemoteStorageClientMakers["s3"] = new(s3RemoteStorageMaker) |
|||
} |
|||
|
|||
type s3RemoteStorageMaker struct{} |
|||
|
|||
func (s s3RemoteStorageMaker) Make(conf *filer_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { |
|||
client := &s3RemoteStorageClient{ |
|||
conf: conf, |
|||
} |
|||
config := &aws.Config{ |
|||
Region: aws.String(conf.S3Region), |
|||
Endpoint: aws.String(conf.S3Endpoint), |
|||
S3ForcePathStyle: aws.Bool(true), |
|||
} |
|||
if conf.S3AccessKey != "" && conf.S3SecretKey != "" { |
|||
config.Credentials = credentials.NewStaticCredentials(conf.S3AccessKey, conf.S3SecretKey, "") |
|||
} |
|||
|
|||
sess, err := session.NewSession(config) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("create aws session: %v", err) |
|||
} |
|||
client.conn = s3.New(sess) |
|||
return client, nil |
|||
} |
|||
|
|||
type s3RemoteStorageClient struct { |
|||
conf *filer_pb.RemoteConf |
|||
conn s3iface.S3API |
|||
} |
|||
|
|||
var _ = remote_storage.RemoteStorageClient(&s3RemoteStorageClient{}) |
|||
|
|||
func (s *s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { |
|||
|
|||
pathKey := remote.Path[1:] |
|||
|
|||
listInput := &s3.ListObjectsV2Input{ |
|||
Bucket: aws.String(remote.Bucket), |
|||
ContinuationToken: nil, |
|||
Delimiter: nil, // not aws.String("/"), iterate through all entries
|
|||
EncodingType: nil, |
|||
ExpectedBucketOwner: nil, |
|||
FetchOwner: nil, |
|||
MaxKeys: nil, // aws.Int64(1000),
|
|||
Prefix: aws.String(pathKey), |
|||
RequestPayer: nil, |
|||
StartAfter: nil, |
|||
} |
|||
isLastPage := false |
|||
for !isLastPage && err == nil { |
|||
listErr := s.conn.ListObjectsV2Pages(listInput, func(page *s3.ListObjectsV2Output, lastPage bool) bool { |
|||
for _, content := range page.Contents { |
|||
key := *content.Key |
|||
if len(pathKey) == 0 { |
|||
key = "/" + key |
|||
} else { |
|||
key = key[len(pathKey):] |
|||
} |
|||
dir, name := util.FullPath(key).DirAndName() |
|||
if err := visitFn(dir, name, false, &filer_pb.RemoteEntry{ |
|||
LastModifiedAt: (*content.LastModified).Unix(), |
|||
Size: *content.Size, |
|||
ETag: *content.ETag, |
|||
StorageName: s.conf.Name, |
|||
}); err != nil { |
|||
return false |
|||
} |
|||
} |
|||
listInput.ContinuationToken = page.NextContinuationToken |
|||
isLastPage = lastPage |
|||
return true |
|||
}) |
|||
if listErr != nil { |
|||
err = fmt.Errorf("list %v: %v", remote, listErr) |
|||
} |
|||
} |
|||
return |
|||
} |
|||
func (s *s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { |
|||
downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) { |
|||
u.PartSize = int64(4 * 1024 * 1024) |
|||
u.Concurrency = 1 |
|||
}) |
|||
|
|||
dataSlice := make([]byte, int(size)) |
|||
writerAt := aws.NewWriteAtBuffer(dataSlice) |
|||
|
|||
_, err = downloader.Download(writerAt, &s3.GetObjectInput{ |
|||
Bucket: aws.String(loc.Bucket), |
|||
Key: aws.String(loc.Path[1:]), |
|||
Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)), |
|||
}) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err) |
|||
} |
|||
|
|||
return writerAt.Bytes(), nil |
|||
} |
|||
|
|||
func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { |
|||
|
|||
fileSize := int64(filer.FileSize(entry)) |
|||
|
|||
partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB
|
|||
for partSize*1000 < fileSize { |
|||
partSize *= 4 |
|||
} |
|||
|
|||
// Create an uploader with the session and custom options
|
|||
uploader := s3manager.NewUploaderWithClient(s.conn, func(u *s3manager.Uploader) { |
|||
u.PartSize = partSize |
|||
u.Concurrency = 5 |
|||
}) |
|||
|
|||
// process tagging
|
|||
tags := "" |
|||
for k, v := range entry.Extended { |
|||
if len(tags) > 0 { |
|||
tags = tags + "&" |
|||
} |
|||
tags = tags + k + "=" + string(v) |
|||
} |
|||
|
|||
// Upload the file to S3.
|
|||
_, err = uploader.Upload(&s3manager.UploadInput{ |
|||
Bucket: aws.String(loc.Bucket), |
|||
Key: aws.String(loc.Path[1:]), |
|||
Body: reader, |
|||
ACL: aws.String("private"), |
|||
ServerSideEncryption: aws.String("AES256"), |
|||
StorageClass: aws.String("STANDARD_IA"), |
|||
Tagging: aws.String(tags), |
|||
}) |
|||
|
|||
//in case it fails to upload
|
|||
if err != nil { |
|||
return nil, fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err) |
|||
} |
|||
|
|||
// read back the remote entry
|
|||
return s.readFileRemoteEntry(loc) |
|||
|
|||
} |
|||
|
|||
func toTagging(attributes map[string][]byte) *s3.Tagging { |
|||
tagging := &s3.Tagging{} |
|||
for k, v := range attributes { |
|||
tagging.TagSet = append(tagging.TagSet, &s3.Tag{ |
|||
Key: aws.String(k), |
|||
Value: aws.String(string(v)), |
|||
}) |
|||
} |
|||
return tagging |
|||
} |
|||
|
|||
func (s *s3RemoteStorageClient) readFileRemoteEntry(loc *filer_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { |
|||
resp, err := s.conn.HeadObject(&s3.HeadObjectInput{ |
|||
Bucket: aws.String(loc.Bucket), |
|||
Key: aws.String(loc.Path[1:]), |
|||
}) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
return &filer_pb.RemoteEntry{ |
|||
LastModifiedAt: resp.LastModified.Unix(), |
|||
Size: *resp.ContentLength, |
|||
ETag: *resp.ETag, |
|||
StorageName: s.conf.Name, |
|||
}, nil |
|||
|
|||
} |
|||
|
|||
func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { |
|||
tagging := toTagging(entry.Extended) |
|||
if len(tagging.TagSet) > 0 { |
|||
_, err = s.conn.PutObjectTagging(&s3.PutObjectTaggingInput{ |
|||
Bucket: aws.String(loc.Bucket), |
|||
Key: aws.String(loc.Path[1:]), |
|||
Tagging: toTagging(entry.Extended), |
|||
}) |
|||
} else { |
|||
_, err = s.conn.DeleteObjectTagging(&s3.DeleteObjectTaggingInput{ |
|||
Bucket: aws.String(loc.Bucket), |
|||
Key: aws.String(loc.Path[1:]), |
|||
}) |
|||
} |
|||
return |
|||
} |
|||
func (s *s3RemoteStorageClient) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) { |
|||
_, err = s.conn.DeleteObject(&s3.DeleteObjectInput{ |
|||
Bucket: aws.String(loc.Bucket), |
|||
Key: aws.String(loc.Path[1:]), |
|||
}) |
|||
return |
|||
} |
@ -0,0 +1,49 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/storage/needle" |
|||
"github.com/chrislusf/seaweedfs/weed/storage/types" |
|||
) |
|||
|
|||
func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) { |
|||
resp = &volume_server_pb.FetchAndWriteNeedleResponse{} |
|||
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) |
|||
if v == nil { |
|||
return nil, fmt.Errorf("not found volume id %d", req.VolumeId) |
|||
} |
|||
|
|||
remoteConf := &filer_pb.RemoteConf{ |
|||
Type: req.RemoteType, |
|||
Name: req.RemoteName, |
|||
S3AccessKey: req.S3AccessKey, |
|||
S3SecretKey: req.S3SecretKey, |
|||
S3Region: req.S3Region, |
|||
S3Endpoint: req.S3Endpoint, |
|||
} |
|||
|
|||
client, getClientErr := remote_storage.GetRemoteStorage(remoteConf) |
|||
if getClientErr != nil { |
|||
return nil, fmt.Errorf("get remote client: %v", getClientErr) |
|||
} |
|||
|
|||
remoteStorageLocation := &filer_pb.RemoteStorageLocation{ |
|||
Name: req.RemoteName, |
|||
Bucket: req.RemoteBucket, |
|||
Path: req.RemoteKey, |
|||
} |
|||
data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size) |
|||
if ReadRemoteErr != nil { |
|||
return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr) |
|||
} |
|||
|
|||
if err = v.WriteNeedleBlob(types.NeedleId(req.NeedleId), data, types.Size(req.Size)); err != nil { |
|||
return nil, fmt.Errorf("write blob needle %d size %d: %v", req.NeedleId, req.Size, err) |
|||
} |
|||
|
|||
return resp, nil |
|||
} |
@ -0,0 +1,232 @@ |
|||
package shell |
|||
|
|||
import ( |
|||
"context" |
|||
"flag" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/jsonpb" |
|||
"io" |
|||
) |
|||
|
|||
func init() { |
|||
Commands = append(Commands, &commandRemoteMount{}) |
|||
} |
|||
|
|||
type commandRemoteMount struct { |
|||
} |
|||
|
|||
func (c *commandRemoteMount) Name() string { |
|||
return "remote.mount" |
|||
} |
|||
|
|||
func (c *commandRemoteMount) Help() string { |
|||
return `mount remote storage and pull its metadata |
|||
|
|||
# assume a remote storage is configured to name "s3_1" |
|||
remote.configure -name=s3_1 -type=s3 -access_key=xxx -secret_key=yyy |
|||
|
|||
# mount and pull one bucket |
|||
remote.mount -dir=xxx -remote=s3_1/bucket |
|||
# mount and pull one directory in the bucket |
|||
remote.mount -dir=xxx -remote=s3_1/bucket/dir1 |
|||
|
|||
` |
|||
} |
|||
|
|||
func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { |
|||
|
|||
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) |
|||
|
|||
dir := remoteMountCommand.String("dir", "", "a directory in filer") |
|||
nonEmpty := remoteMountCommand.Bool("nonempty", false, "allows the mounting over a non-empty directory") |
|||
remote := remoteMountCommand.String("remote", "", "a directory in remote storage, ex. <storageName>/<bucket>/path/to/dir") |
|||
|
|||
if err = remoteMountCommand.Parse(args); err != nil { |
|||
return nil |
|||
} |
|||
|
|||
if *dir == "" { |
|||
return c.listExistingRemoteStorageMounts(commandEnv, writer) |
|||
} |
|||
|
|||
remoteStorageLocation := remote_storage.ParseLocation(*remote) |
|||
|
|||
// find configuration for remote storage
|
|||
// remotePath is /<bucket>/path/to/dir
|
|||
remoteConf, err := c.findRemoteStorageConfiguration(commandEnv, writer, remoteStorageLocation) |
|||
if err != nil { |
|||
return fmt.Errorf("find configuration for %s: %v", *remote, err) |
|||
} |
|||
|
|||
// pull metadata from remote
|
|||
if err = c.pullMetadata(commandEnv, writer, *dir, *nonEmpty, remoteConf, remoteStorageLocation); err != nil { |
|||
return fmt.Errorf("pull metadata: %v", err) |
|||
} |
|||
|
|||
// store a mount configuration in filer
|
|||
if err = c.saveMountMapping(commandEnv, writer, *dir, remoteStorageLocation); err != nil { |
|||
return fmt.Errorf("save mount mapping: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (err error) { |
|||
|
|||
// read current mapping
|
|||
mappings, readErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress) |
|||
if readErr != nil { |
|||
return readErr |
|||
} |
|||
|
|||
m := jsonpb.Marshaler{ |
|||
EmitDefaults: false, |
|||
Indent: " ", |
|||
} |
|||
|
|||
err = m.Marshal(writer, mappings) |
|||
fmt.Fprintln(writer) |
|||
|
|||
return |
|||
|
|||
} |
|||
|
|||
func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) { |
|||
|
|||
return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name) |
|||
|
|||
} |
|||
|
|||
func (c *commandRemoteMount) pullMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *filer_pb.RemoteConf, remote *filer_pb.RemoteStorageLocation) error { |
|||
|
|||
// find existing directory, and ensure the directory is empty
|
|||
err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
parent, name := util.FullPath(dir).DirAndName() |
|||
_, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ |
|||
Directory: parent, |
|||
Name: name, |
|||
}) |
|||
if lookupErr != nil { |
|||
return fmt.Errorf("lookup %s: %v", dir, lookupErr) |
|||
} |
|||
|
|||
mountToDirIsEmpty := true |
|||
listErr := filer_pb.SeaweedList(client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
mountToDirIsEmpty = false |
|||
return nil |
|||
}, "", false, 1) |
|||
|
|||
if listErr != nil { |
|||
return fmt.Errorf("list %s: %v", dir, listErr) |
|||
} |
|||
|
|||
if !mountToDirIsEmpty { |
|||
if !nonEmpty { |
|||
return fmt.Errorf("dir %s is not empty", dir) |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// visit remote storage
|
|||
remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
ctx := context.Background() |
|||
err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { |
|||
localDir := dir + remoteDir |
|||
println(util.NewFullPath(localDir, name)) |
|||
|
|||
lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ |
|||
Directory: localDir, |
|||
Name: name, |
|||
}) |
|||
var existingEntry *filer_pb.Entry |
|||
if lookupErr != nil { |
|||
if lookupErr != filer_pb.ErrNotFound { |
|||
return lookupErr |
|||
} |
|||
} else { |
|||
existingEntry = lookupResponse.Entry |
|||
} |
|||
|
|||
if existingEntry == nil { |
|||
_, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ |
|||
Directory: localDir, |
|||
Entry: &filer_pb.Entry{ |
|||
Name: name, |
|||
IsDirectory: isDirectory, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
FileSize: uint64(remoteEntry.Size), |
|||
Mtime: remoteEntry.LastModifiedAt, |
|||
FileMode: uint32(0644), |
|||
}, |
|||
RemoteEntry: remoteEntry, |
|||
}, |
|||
}) |
|||
return createErr |
|||
} else { |
|||
if existingEntry.RemoteEntry == nil || existingEntry.RemoteEntry.ETag != remoteEntry.ETag { |
|||
existingEntry.RemoteEntry = remoteEntry |
|||
existingEntry.Attributes.FileSize = uint64(remoteEntry.Size) |
|||
existingEntry.Attributes.Mtime = remoteEntry.LastModifiedAt |
|||
_, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ |
|||
Directory: localDir, |
|||
Entry: existingEntry, |
|||
}) |
|||
return updateErr |
|||
} |
|||
} |
|||
return nil |
|||
}) |
|||
return err |
|||
}) |
|||
|
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (c *commandRemoteMount) saveMountMapping(commandEnv *CommandEnv, writer io.Writer, dir string, remoteStorageLocation *filer_pb.RemoteStorageLocation) (err error) { |
|||
|
|||
// read current mapping
|
|||
var oldContent, newContent []byte |
|||
err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
oldContent, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE) |
|||
return err |
|||
}) |
|||
if err != nil { |
|||
if err != filer_pb.ErrNotFound { |
|||
return fmt.Errorf("read existing mapping: %v", err) |
|||
} |
|||
} |
|||
|
|||
// add new mapping
|
|||
newContent, err = filer.AddRemoteStorageMapping(oldContent, dir, remoteStorageLocation) |
|||
if err != nil { |
|||
return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err) |
|||
} |
|||
|
|||
// save back
|
|||
err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE, newContent) |
|||
}) |
|||
if err != nil { |
|||
return fmt.Errorf("save mapping: %v", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue