Browse Source

filer.remote.sync: split into buckets mode and single directory mode

pull/2321/head
Chris Lu 3 years ago
parent
commit
4b28c5f6c3
  1. 233
      weed/command/filer_remote_sync.go
  2. 323
      weed/command/filer_remote_sync_buckets.go
  3. 220
      weed/command/filer_remote_sync_dir.go

233
weed/command/filer_remote_sync.go

@ -3,19 +3,14 @@ package command
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc" "google.golang.org/grpc"
"os"
"strings"
"time" "time"
) )
@ -26,6 +21,11 @@ type RemoteSyncOptions struct {
debug *bool debug *bool
timeAgo *time.Duration timeAgo *time.Duration
dir *string dir *string
createBucketAt *string
mappings *remote_pb.RemoteStorageMapping
remoteConfs map[string]*remote_pb.RemoteConf
bucketsDir string
} }
const ( const (
@ -51,19 +51,28 @@ func init() {
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer") remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files") remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
} }
var cmdFilerRemoteSynchronize = &Command{ var cmdFilerRemoteSynchronize = &Command{
UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage",
Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage
UsageLine: "filer.remote.sync -dir=/mount/s3_on_cloud or -createBucketAt=clound1",
Short: "resumable continuously write back updates to remote storage",
Long: `resumable continuously write back updates to remote storage
filer.remote.sync listens on filer update events. filer.remote.sync listens on filer update events.
If any mounted remote file is updated, it will fetch the updated content, If any mounted remote file is updated, it will fetch the updated content,
and write to the remote storage. and write to the remote storage.
There are two modes:
1)Write back one mounted folder to remote storage
weed filer.remote.sync -dir=/mount/s3_on_cloud
2)Watch /buckets folder and write back all changes.
Any new buckets will be created in this remote storage.
weed filer.remote.sync -createBucketAt=cloud1
`, `,
} }
@ -84,6 +93,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
*remoteSyncOptions.readChunkFromFiler, *remoteSyncOptions.readChunkFromFiler,
) )
if dir != "" {
fmt.Printf("synchronize %s to remote storage...\n", dir) fmt.Printf("synchronize %s to remote storage...\n", dir)
util.RetryForever("filer.remote.sync "+dir, func() error { util.RetryForever("filer.remote.sync "+dir, func() error {
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
@ -93,208 +103,31 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
} }
return true return true
}) })
return true
} }
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
// read filer remote storage mount mappings
_, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir)
if detectErr != nil {
return fmt.Errorf("read mount info: %v", detectErr)
}
eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource)
storageName := *remoteSyncOptions.createBucketAt
remoteSyncOptions.bucketsDir = "/buckets"
// check buckets again
remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil { if err != nil {
return err return err
} }
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
lastTime := time.Unix(0, lastTsNs)
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs)
})
lastOffsetTs := collectLastSyncOffset(option, mountedDir)
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return nil, err
}
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
if message.NewEntry == nil {
return nil
}
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
if readErr != nil {
return fmt.Errorf("unmarshal mappings: %v", readErr)
}
if remoteLoc, found := mappings.Mappings[mountedDir]; found {
if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
}
} else {
glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
os.Exit(0)
}
}
if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX {
conf := &remote_pb.RemoteConf{}
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
}
remoteStorage = conf
if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil {
client = newClient
} else {
return err
}
}
remoteSyncOptions.bucketsDir = resp.DirBuckets
return nil return nil
}
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
return handleEtcRemoteChanges(resp)
}
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
if message.OldEntry == nil && message.NewEntry != nil {
if !filer.HasData(message.NewEntry) {
return nil
}
glog.V(2).Infof("create: %+v", resp)
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping creating: %+v", resp)
return nil
}
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
if message.NewEntry.IsDirectory {
glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
return client.WriteDirectory(dest, message.NewEntry)
}
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
reader := filer.NewFileReader(filerSource, message.NewEntry)
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
if message.OldEntry != nil && message.NewEntry == nil {
glog.V(2).Infof("delete: %+v", resp)
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
if message.OldEntry.IsDirectory {
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
return client.RemoveDirectory(dest)
}
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
return client.DeleteFile(dest)
}
if message.OldEntry != nil && message.NewEntry != nil {
oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping updating: %+v", resp)
return nil
}
if message.NewEntry.IsDirectory {
return client.WriteDirectory(dest, message.NewEntry)
}
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
if filer.IsSameData(message.OldEntry, message.NewEntry) {
glog.V(2).Infof("update meta: %+v", resp)
return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
}
}
glog.V(2).Infof("update: %+v", resp)
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
if err := client.DeleteFile(oldDest); err != nil {
return err
}
reader := filer.NewFileReader(filerSource, message.NewEntry)
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
return nil
}
return eachEntryFunc, nil
}
})
func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time {
// 1. specified by timeAgo
// 2. last offset timestamp for this directory
// 3. directory creation time
var lastOffsetTs time.Time
if *option.timeAgo == 0 {
mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
if storageName != "" {
fmt.Printf("synchronize %s, default new bucket creation in %s ...\n", remoteSyncOptions.bucketsDir, storageName)
util.RetryForever("filer.remote.sync buckets "+storageName, func() error {
return followBucketUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, storageName)
}, func(err error) bool {
if err != nil { if err != nil {
glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
return time.Now()
}
lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
if mountedDirEntry != nil {
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
glog.V(0).Infof("resume from %v", lastOffsetTs)
} else {
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
}
} else {
lastOffsetTs = time.Now()
}
} else {
lastOffsetTs = time.Now().Add(-*option.timeAgo)
}
return lastOffsetTs
}
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
source := string(sourcePath[len(mountDir):])
dest := util.FullPath(remoteMountLocation.Path).Child(source)
return &remote_pb.RemoteStorageLocation{
Name: remoteMountLocation.Name,
Bucket: remoteMountLocation.Bucket,
Path: string(dest),
}
glog.Errorf("synchronize %s to %s: %v", remoteSyncOptions.bucketsDir, storageName, err)
} }
func shouldSendToRemote(entry *filer_pb.Entry) bool {
if entry.RemoteEntry == nil {
return true
}
if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
return true return true
}
return false
})
} }
func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
entry.RemoteEntry = remoteEntry
return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
})
return err
})
return true
} }

323
weed/command/filer_remote_sync_buckets.go

@ -0,0 +1,323 @@
package command
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"math"
"strings"
"time"
)
func followBucketUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, storageName string) error {
// read filer remote storage mount mappings
if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
return fmt.Errorf("read mount info: %v", detectErr)
}
eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
if err != nil {
return err
}
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
lastTime := time.Unix(0, lastTsNs)
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, option.bucketsDir, lastTsNs)
})
lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir)
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
handleCreateBucket := func(entry *filer_pb.Entry) error {
if !entry.IsDirectory {
return nil
}
client, err := option.findRemoteStorageClient(entry.Name)
if err != nil {
return err
}
glog.V(0).Infof("create bucket %s", entry.Name)
if err := client.CreateBucket(entry.Name); err != nil {
return err
}
return nil
}
handleDeleteBucket := func(entry *filer_pb.Entry) error {
if !entry.IsDirectory {
return nil
}
client, err := option.findRemoteStorageClient(entry.Name)
if err != nil {
return err
}
glog.V(0).Infof("delete bucket %s", entry.Name)
if err := client.DeleteBucket(entry.Name); err != nil {
return err
}
return nil
}
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
if message.NewEntry == nil {
return nil
}
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
if readErr != nil {
return fmt.Errorf("unmarshal mappings: %v", readErr)
}
option.mappings = newMappings
}
if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
conf := &remote_pb.RemoteConf{}
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
}
option.remoteConfs[conf.Name] = conf
}
return nil
}
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
return handleEtcRemoteChanges(resp)
}
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
if message.OldEntry == nil && message.NewEntry != nil {
if message.NewParentPath == option.bucketsDir {
return handleCreateBucket(message.NewEntry)
}
if !filer.HasData(message.NewEntry) {
return nil
}
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
if !ok {
return nil
}
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return err
}
glog.V(2).Infof("create: %+v", resp)
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping creating: %+v", resp)
return nil
}
dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
if message.NewEntry.IsDirectory {
glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
return client.WriteDirectory(dest, message.NewEntry)
}
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
reader := filer.NewFileReader(filerSource, message.NewEntry)
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
if message.OldEntry != nil && message.NewEntry == nil {
if resp.Directory == option.bucketsDir {
return handleDeleteBucket(message.OldEntry)
}
bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
if !ok {
return nil
}
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return err
}
glog.V(2).Infof("delete: %+v", resp)
dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
if message.OldEntry.IsDirectory {
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
return client.RemoveDirectory(dest)
}
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
return client.DeleteFile(dest)
}
if message.OldEntry != nil && message.NewEntry != nil {
if resp.Directory == option.bucketsDir {
if message.NewParentPath == option.bucketsDir {
if message.OldEntry.Name == message.NewEntry.Name {
return nil
}
if err := handleCreateBucket(message.NewEntry); err != nil {
return err
}
if err := handleDeleteBucket(message.OldEntry); err != nil {
return err
}
}
}
oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
if oldOk && newOk {
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping updating: %+v", resp)
return nil
}
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
if err != nil {
return err
}
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
// update the same entry
if message.NewEntry.IsDirectory {
// update directory property
return nil
}
if filer.IsSameData(message.OldEntry, message.NewEntry) {
glog.V(2).Infof("update meta: %+v", resp)
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
} else {
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
reader := filer.NewFileReader(filerSource, message.NewEntry)
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
}
}
// the following is entry rename
if oldOk {
client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
if err != nil {
return err
}
oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
if message.OldEntry.IsDirectory {
return client.RemoveDirectory(oldDest)
}
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
if err := client.DeleteFile(oldDest); err != nil {
return err
}
}
if newOk {
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping updating: %+v", resp)
return nil
}
client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
if err != nil {
return err
}
newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
if message.NewEntry.IsDirectory {
return client.WriteDirectory(newDest, message.NewEntry)
}
reader := filer.NewFileReader(filerSource, message.NewEntry)
glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
}
return nil
}
return eachEntryFunc, nil
}
func (option *RemoteSyncOptions)findRemoteStorageClient(bucketName string) (remote_storage.RemoteStorageClient, error) {
bucket := util.FullPath(option.bucketsDir).Child(bucketName)
remoteStorageMountLocation, isMounted := option.mappings.Mappings[string(bucket)]
if !isMounted {
return nil, fmt.Errorf("%s is not mounted", bucket)
}
remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
if !hasClient {
return nil, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
}
client, err := remote_storage.GetRemoteStorage(remoteConf)
if err != nil {
return nil, err
}
return client, nil
}
func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
if !ok {
return "", nil, nil, false
}
var isMounted bool
remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
if !isMounted {
glog.Warningf("%s is not mounted", bucket)
return "", nil, nil, false
}
var hasClient bool
remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
if !hasClient {
glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
return "", nil, nil, false
}
return bucket, remoteStorageMountLocation, remoteConf, true
}
func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
if !strings.HasPrefix(dir, bucketsDir+"/") {
return "", false
}
parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
return util.FullPath(bucketsDir).Child(parts[0]), true
}
func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) {
if mappings, err := filer.ReadMountMappings(option.grpcDialOption, *option.filerAddress); err != nil {
return err
} else {
option.mappings = mappings
}
option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
return nil
}
conf := &remote_pb.RemoteConf{}
if err := proto.Unmarshal(entry.Content, conf); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
}
option.remoteConfs[conf.Name] = conf
return nil
}, "", false, math.MaxUint32)
return
}

220
weed/command/filer_remote_sync_dir.go

@ -0,0 +1,220 @@
package command
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"os"
"strings"
"time"
)
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
// read filer remote storage mount mappings
_, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir)
if detectErr != nil {
return fmt.Errorf("read mount info: %v", detectErr)
}
eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource)
if err != nil {
return err
}
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
lastTime := time.Unix(0, lastTsNs)
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs)
})
lastOffsetTs := collectLastSyncOffset(option, mountedDir)
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return nil, err
}
handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
if message.NewEntry == nil {
return nil
}
if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
if readErr != nil {
return fmt.Errorf("unmarshal mappings: %v", readErr)
}
if remoteLoc, found := mappings.Mappings[mountedDir]; found {
if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
}
} else {
glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
os.Exit(0)
}
}
if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX {
conf := &remote_pb.RemoteConf{}
if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
}
remoteStorage = conf
if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil {
client = newClient
} else {
return err
}
}
return nil
}
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
return handleEtcRemoteChanges(resp)
}
if message.OldEntry == nil && message.NewEntry == nil {
return nil
}
if message.OldEntry == nil && message.NewEntry != nil {
if !filer.HasData(message.NewEntry) {
return nil
}
glog.V(2).Infof("create: %+v", resp)
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping creating: %+v", resp)
return nil
}
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
if message.NewEntry.IsDirectory {
glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
return client.WriteDirectory(dest, message.NewEntry)
}
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
reader := filer.NewFileReader(filerSource, message.NewEntry)
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
if message.OldEntry != nil && message.NewEntry == nil {
glog.V(2).Infof("delete: %+v", resp)
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
if message.OldEntry.IsDirectory {
glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
return client.RemoveDirectory(dest)
}
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
return client.DeleteFile(dest)
}
if message.OldEntry != nil && message.NewEntry != nil {
oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
if !shouldSendToRemote(message.NewEntry) {
glog.V(2).Infof("skipping updating: %+v", resp)
return nil
}
if message.NewEntry.IsDirectory {
return client.WriteDirectory(dest, message.NewEntry)
}
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
if filer.IsSameData(message.OldEntry, message.NewEntry) {
glog.V(2).Infof("update meta: %+v", resp)
return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
}
}
glog.V(2).Infof("update: %+v", resp)
glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
if err := client.DeleteFile(oldDest); err != nil {
return err
}
reader := filer.NewFileReader(filerSource, message.NewEntry)
glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
if writeErr != nil {
return writeErr
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
return nil
}
return eachEntryFunc, nil
}
func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time {
// 1. specified by timeAgo
// 2. last offset timestamp for this directory
// 3. directory creation time
var lastOffsetTs time.Time
if *option.timeAgo == 0 {
mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
if err != nil {
glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
return time.Now()
}
lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
if mountedDirEntry != nil {
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
glog.V(0).Infof("resume from %v", lastOffsetTs)
} else {
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
}
} else {
lastOffsetTs = time.Now()
}
} else {
lastOffsetTs = time.Now().Add(-*option.timeAgo)
}
return lastOffsetTs
}
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
source := string(sourcePath[len(mountDir):])
dest := util.FullPath(remoteMountLocation.Path).Child(source)
return &remote_pb.RemoteStorageLocation{
Name: remoteMountLocation.Name,
Bucket: remoteMountLocation.Bucket,
Path: string(dest),
}
}
func shouldSendToRemote(entry *filer_pb.Entry) bool {
if entry.RemoteEntry == nil {
return true
}
if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
return true
}
return false
}
func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
entry.RemoteEntry = remoteEntry
return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
})
return err
})
}
Loading…
Cancel
Save