Chris Lu
3 years ago
9 changed files with 1262 additions and 857 deletions
-
13other/java/client/src/main/proto/filer.proto
-
6weed/filer/entry.go
-
47weed/filer/filer_remote_storage.go
-
13weed/pb/filer.proto
-
1694weed/pb/filer_pb/filer.pb.go
-
50weed/remote_storage/remote_storage.go
-
94weed/remote_storage/s3/s3_storage_client.go
-
2weed/replication/sink/filersink/filer_sink.go
-
200weed/shell/command_remote_mount.go
@ -0,0 +1,47 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"fmt" |
|||
"github.com/golang/protobuf/proto" |
|||
"io" |
|||
"math" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/jsonpb" |
|||
"github.com/viant/ptrie" |
|||
) |
|||
|
|||
type FilerRemoteStorage struct { |
|||
rules ptrie.Trie |
|||
} |
|||
|
|||
func NewFilerRemoteStorage() (fc *FilerRemoteStorage) { |
|||
fc = &FilerRemoteStorage{ |
|||
rules: ptrie.New(), |
|||
} |
|||
return fc |
|||
} |
|||
|
|||
func (fc *FilerRemoteStorage) loadFromFiler(filer *Filer) (err error) { |
|||
entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "") |
|||
if err != nil { |
|||
if err == filer_pb.ErrNotFound { |
|||
return nil |
|||
} |
|||
glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err) |
|||
return |
|||
} |
|||
|
|||
for _, entry := range entries { |
|||
conf := &filer_pb.RemoteConf{} |
|||
if err := proto.Unmarshal(entry.Content, conf); err != nil { |
|||
return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name, err) |
|||
} |
|||
fc.MountRemoteStorage(dir, conf) |
|||
} |
|||
return nil |
|||
} |
1694
weed/pb/filer_pb/filer.pb.go
File diff suppressed because it is too large
View File
File diff suppressed because it is too large
View File
@ -0,0 +1,50 @@ |
|||
package remote_storage |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"sync" |
|||
) |
|||
|
|||
type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error |
|||
|
|||
type RemoteStorageClient interface { |
|||
Traverse(rootDir string, visitFn VisitFunc) error |
|||
} |
|||
|
|||
type RemoteStorageClientMaker interface { |
|||
Make(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) |
|||
} |
|||
|
|||
var ( |
|||
RemoteStorageClientMakers = make(map[string]RemoteStorageClientMaker) |
|||
remoteStorageClients = make(map[string]RemoteStorageClient) |
|||
remoteStorageClientsLock sync.Mutex |
|||
) |
|||
|
|||
func makeRemoteStorageClient(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) { |
|||
maker, found := RemoteStorageClientMakers[remoteConf.Type] |
|||
if !found { |
|||
return nil, fmt.Errorf("remote storage type %s not found", remoteConf.Type) |
|||
} |
|||
return maker.Make(remoteConf) |
|||
} |
|||
|
|||
func GetRemoteStorage(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) { |
|||
remoteStorageClientsLock.Lock() |
|||
defer remoteStorageClientsLock.Unlock() |
|||
|
|||
existingRemoteStorageClient, found := remoteStorageClients[remoteConf.Name] |
|||
if found { |
|||
return existingRemoteStorageClient, nil |
|||
} |
|||
|
|||
newRemoteStorageClient, err := makeRemoteStorageClient(remoteConf) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("make remote storage client %s: %v", remoteConf.Name, err) |
|||
} |
|||
|
|||
remoteStorageClients[remoteConf.Name] = newRemoteStorageClient |
|||
|
|||
return newRemoteStorageClient, nil |
|||
} |
@ -0,0 +1,94 @@ |
|||
package s3 |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/aws/aws-sdk-go/aws" |
|||
"github.com/aws/aws-sdk-go/aws/credentials" |
|||
"github.com/aws/aws-sdk-go/aws/session" |
|||
"github.com/aws/aws-sdk-go/service/s3" |
|||
"github.com/aws/aws-sdk-go/service/s3/s3iface" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"strings" |
|||
) |
|||
|
|||
func init() { |
|||
remote_storage.RemoteStorageClientMakers["s3"] = new(s3RemoteStorageMaker) |
|||
} |
|||
|
|||
type s3RemoteStorageMaker struct{} |
|||
|
|||
func (s s3RemoteStorageMaker) Make(conf *filer_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { |
|||
client := &s3RemoteStorageClient{ |
|||
conf: conf, |
|||
} |
|||
config := &aws.Config{ |
|||
Region: aws.String(conf.S3Region), |
|||
Endpoint: aws.String(conf.S3Endpoint), |
|||
S3ForcePathStyle: aws.Bool(true), |
|||
} |
|||
if conf.S3AccessKey != "" && conf.S3SecretKey != "" { |
|||
config.Credentials = credentials.NewStaticCredentials(conf.S3AccessKey, conf.S3SecretKey, "") |
|||
} |
|||
|
|||
sess, err := session.NewSession(config) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("create aws session: %v", err) |
|||
} |
|||
client.conn = s3.New(sess) |
|||
return client, nil |
|||
} |
|||
|
|||
type s3RemoteStorageClient struct { |
|||
conf *filer_pb.RemoteConf |
|||
conn s3iface.S3API |
|||
} |
|||
|
|||
func (s s3RemoteStorageClient) Traverse(rootDir string, visitFn remote_storage.VisitFunc) (err error) { |
|||
if !strings.HasPrefix(rootDir, "/") { |
|||
return fmt.Errorf("remote directory %s should start with /", rootDir) |
|||
} |
|||
bucket := strings.Split(rootDir[1:], "/")[0] |
|||
prefix := rootDir[1+len(bucket):] |
|||
if len(prefix) > 0 && strings.HasPrefix(prefix, "/") { |
|||
prefix = prefix[1:] |
|||
} |
|||
|
|||
listInput := &s3.ListObjectsV2Input{ |
|||
Bucket: aws.String(bucket), |
|||
ContinuationToken: nil, |
|||
Delimiter: nil, // not aws.String("/"), iterate through all entries
|
|||
EncodingType: nil, |
|||
ExpectedBucketOwner: nil, |
|||
FetchOwner: nil, |
|||
MaxKeys: nil, // aws.Int64(1000),
|
|||
Prefix: aws.String(prefix), |
|||
RequestPayer: nil, |
|||
StartAfter: nil, |
|||
} |
|||
isLastPage := false |
|||
for !isLastPage && err == nil { |
|||
listErr := s.conn.ListObjectsV2Pages(listInput, func(page *s3.ListObjectsV2Output, lastPage bool) bool { |
|||
for _, content := range page.Contents { |
|||
key := *content.Key |
|||
dir, name := util.FullPath("/" + key).DirAndName() |
|||
if err := visitFn(dir, name, false, &filer_pb.RemoteEntry{ |
|||
LastModifiedAt: (*content.LastModified).Unix(), |
|||
Size: *content.Size, |
|||
ETag: *content.ETag, |
|||
StorageName: s.conf.Name, |
|||
}); err != nil { |
|||
return false |
|||
} |
|||
} |
|||
listInput.ContinuationToken = page.NextContinuationToken |
|||
isLastPage = lastPage |
|||
return true |
|||
}) |
|||
if listErr != nil { |
|||
err = fmt.Errorf("list %v: %v", rootDir, listErr) |
|||
} |
|||
} |
|||
return |
|||
} |
@ -0,0 +1,200 @@ |
|||
package shell |
|||
|
|||
import ( |
|||
"context" |
|||
"flag" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/filer" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/remote_storage" |
|||
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"io" |
|||
"strings" |
|||
) |
|||
|
|||
func init() { |
|||
Commands = append(Commands, &commandRemoteMount{}) |
|||
} |
|||
|
|||
type commandRemoteMount struct { |
|||
} |
|||
|
|||
func (c *commandRemoteMount) Name() string { |
|||
return "remote.mount" |
|||
} |
|||
|
|||
func (c *commandRemoteMount) Help() string { |
|||
return `mount remote storage and pull its metadata |
|||
|
|||
# assume a remote storage is configured to name "s3_1" |
|||
remote.configure -name=s3_1 -type=s3 -access_key=xxx -secret_key=yyy |
|||
|
|||
# mount and pull one bucket |
|||
remote.mount -dir=xxx -remote=s3_1/bucket |
|||
# mount and pull one directory in the bucket |
|||
remote.mount -dir=xxx -remote=s3_1/bucket/dir1 |
|||
|
|||
` |
|||
} |
|||
|
|||
func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { |
|||
|
|||
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) |
|||
|
|||
dir := remoteMountCommand.String("dir", "", "a directory in filer") |
|||
nonEmpty := remoteMountCommand.Bool("nonempty", false, "allows the mounting over a non-empty directory") |
|||
remote := remoteMountCommand.String("remote", "", "a directory in remote storage, ex. <storageName>/<bucket>/path/to/dir") |
|||
|
|||
if err = remoteMountCommand.Parse(args); err != nil { |
|||
return nil |
|||
} |
|||
|
|||
// find configuration for remote storage
|
|||
remoteConf, remotePath, err := c.findRemoteStorageConfiguration(commandEnv, writer, *remote) |
|||
if err != nil { |
|||
return fmt.Errorf("find configuration for %s: %v", *remote, err) |
|||
} |
|||
|
|||
// pull metadata from remote
|
|||
if err = c.pullMetadata(commandEnv, writer, *dir, *nonEmpty, remoteConf, remotePath); err != nil { |
|||
return fmt.Errorf("pull metadata: %v", err) |
|||
} |
|||
|
|||
// store a mount configuration in filer
|
|||
|
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote string) (conf *filer_pb.RemoteConf, remotePath string, err error) { |
|||
|
|||
// find remote configuration
|
|||
parts := strings.Split(remote, "/") |
|||
if len(parts) == 0 { |
|||
err = fmt.Errorf("wrong remote storage location %s", remote) |
|||
return |
|||
} |
|||
storageName := parts[0] |
|||
remotePath = remote[len(storageName):] |
|||
|
|||
// read storage configuration data
|
|||
var confBytes []byte |
|||
err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
confBytes, err = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, storageName) |
|||
return err |
|||
}) |
|||
if err != nil { |
|||
err = fmt.Errorf("no remote storage configuration for %s : %v", storageName, err) |
|||
return |
|||
} |
|||
|
|||
// unmarshal storage configuration
|
|||
conf = &filer_pb.RemoteConf{} |
|||
if unMarshalErr := proto.Unmarshal(confBytes, conf); unMarshalErr != nil { |
|||
err = fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, storageName, unMarshalErr) |
|||
return |
|||
} |
|||
|
|||
return |
|||
} |
|||
|
|||
func (c *commandRemoteMount) pullMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *filer_pb.RemoteConf, remotePath string) error { |
|||
|
|||
// find existing directory, and ensure the directory is empty
|
|||
var mountToDir *filer_pb.Entry |
|||
err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
parent, name := util.FullPath(dir).DirAndName() |
|||
resp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ |
|||
Directory: parent, |
|||
Name: name, |
|||
}) |
|||
if lookupErr != nil { |
|||
return fmt.Errorf("lookup %s: %v", dir, lookupErr) |
|||
} |
|||
mountToDir = resp.Entry |
|||
|
|||
mountToDirIsEmpty := false |
|||
listErr := filer_pb.SeaweedList(client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { |
|||
mountToDirIsEmpty = false |
|||
return nil |
|||
}, "", false, 1) |
|||
|
|||
if listErr != nil { |
|||
return fmt.Errorf("list %s: %v", dir, listErr) |
|||
} |
|||
|
|||
if !mountToDirIsEmpty { |
|||
if !nonEmpty { |
|||
return fmt.Errorf("dir %s is not empty", dir) |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
// visit remote storage
|
|||
remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
ctx := context.Background() |
|||
err = remoteStorage.Traverse(remotePath, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { |
|||
localDir := dir + remoteDir |
|||
println(util.NewFullPath(localDir, name)) |
|||
|
|||
lookupResponse, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ |
|||
Directory: localDir, |
|||
Name: name, |
|||
}) |
|||
if lookupErr != nil { |
|||
if lookupErr != filer_pb.ErrNotFound { |
|||
return lookupErr |
|||
} |
|||
} |
|||
existingEntry := lookupResponse.Entry |
|||
|
|||
if existingEntry == nil { |
|||
_, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ |
|||
Directory: localDir, |
|||
Entry: &filer_pb.Entry{ |
|||
Name: name, |
|||
IsDirectory: isDirectory, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
FileSize: uint64(remoteEntry.Size), |
|||
Mtime: remoteEntry.LastModifiedAt, |
|||
FileMode: uint32(0644), |
|||
}, |
|||
RemoteEntry: remoteEntry, |
|||
}, |
|||
}) |
|||
return createErr |
|||
} else { |
|||
if existingEntry.RemoteEntry.ETag != remoteEntry.ETag { |
|||
existingEntry.RemoteEntry = remoteEntry |
|||
existingEntry.Attributes.FileSize = uint64(remoteEntry.Size) |
|||
existingEntry.Attributes.Mtime = remoteEntry.LastModifiedAt |
|||
_, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ |
|||
Directory: localDir, |
|||
Entry: existingEntry, |
|||
}) |
|||
return updateErr |
|||
} |
|||
} |
|||
return nil |
|||
}) |
|||
return err |
|||
}) |
|||
|
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue