You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
260 lines
9.5 KiB
260 lines
9.5 KiB
package command
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/chrislusf/seaweedfs/weed/filer"
|
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
"github.com/chrislusf/seaweedfs/weed/pb"
|
|
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/chrislusf/seaweedfs/weed/remote_storage"
|
|
"github.com/chrislusf/seaweedfs/weed/replication/source"
|
|
"github.com/chrislusf/seaweedfs/weed/security"
|
|
"github.com/chrislusf/seaweedfs/weed/util"
|
|
"github.com/golang/protobuf/proto"
|
|
"google.golang.org/grpc"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type RemoteSyncOptions struct {
|
|
filerAddress *string
|
|
grpcDialOption grpc.DialOption
|
|
readChunkFromFiler *bool
|
|
debug *bool
|
|
timeAgo *time.Duration
|
|
dir *string
|
|
}
|
|
|
|
const (
|
|
RemoteSyncKeyPrefix = "remote.sync."
|
|
)
|
|
|
|
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
|
|
|
|
func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
|
|
return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
|
|
return fn(client)
|
|
})
|
|
}
|
|
func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
|
|
return location.Url
|
|
}
|
|
|
|
var (
|
|
remoteSyncOptions RemoteSyncOptions
|
|
)
|
|
|
|
func init() {
|
|
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
|
|
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
|
|
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
|
|
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
|
|
remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
|
|
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
|
}
|
|
|
|
var cmdFilerRemoteSynchronize = &Command{
|
|
UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
|
|
Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage",
|
|
Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage
|
|
|
|
filer.remote.sync listens on filer update events.
|
|
If any mounted remote file is updated, it will fetch the updated content,
|
|
and write to the remote storage.
|
|
`,
|
|
}
|
|
|
|
func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
|
|
|
|
util.LoadConfiguration("security", false)
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
|
|
remoteSyncOptions.grpcDialOption = grpcDialOption
|
|
|
|
// read filer remote storage mount mappings
|
|
mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
|
|
if readErr != nil {
|
|
fmt.Printf("read mount mapping: %v", readErr)
|
|
return false
|
|
}
|
|
|
|
filerSource := &source.FilerSource{}
|
|
filerSource.DoInitialize(
|
|
*remoteSyncOptions.filerAddress,
|
|
pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
|
|
"/", // does not matter
|
|
*remoteSyncOptions.readChunkFromFiler,
|
|
)
|
|
|
|
var found bool
|
|
for dir, remoteStorageMountLocation := range mappings.Mappings {
|
|
if *remoteSyncOptions.dir == dir {
|
|
found = true
|
|
storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
|
|
if readErr != nil {
|
|
fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
|
|
continue
|
|
}
|
|
fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
|
|
if err := util.Retry("filer.remote.sync "+dir, func() error {
|
|
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
|
|
}); err != nil {
|
|
fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
|
|
|
|
dirHash := util.HashStringToLong(mountedDir)
|
|
|
|
// 1. specified by timeAgo
|
|
// 2. last offset timestamp for this directory
|
|
// 3. directory creation time
|
|
var lastOffsetTs time.Time
|
|
if *option.timeAgo == 0 {
|
|
mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
|
|
if err != nil {
|
|
return fmt.Errorf("lookup %s: %v", mountedDir, err)
|
|
}
|
|
|
|
lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
|
|
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
|
|
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
|
|
glog.V(0).Infof("resume from %v", lastOffsetTs)
|
|
} else {
|
|
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
|
|
}
|
|
} else {
|
|
lastOffsetTs = time.Now().Add(-*option.timeAgo)
|
|
}
|
|
|
|
client, err := remote_storage.GetRemoteStorage(remoteStorage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
|
|
message := resp.EventNotification
|
|
if message.OldEntry == nil && message.NewEntry == nil {
|
|
return nil
|
|
}
|
|
if message.OldEntry == nil && message.NewEntry != nil {
|
|
if len(message.NewEntry.Chunks) == 0 {
|
|
return nil
|
|
}
|
|
fmt.Printf("create: %+v\n", resp)
|
|
if !shouldSendToRemote(message.NewEntry) {
|
|
fmt.Printf("skipping creating: %+v\n", resp)
|
|
return nil
|
|
}
|
|
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
|
|
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
|
|
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
|
|
if writeErr != nil {
|
|
return writeErr
|
|
}
|
|
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
|
|
}
|
|
if message.OldEntry != nil && message.NewEntry == nil {
|
|
fmt.Printf("delete: %+v\n", resp)
|
|
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
|
|
return client.DeleteFile(dest)
|
|
}
|
|
if message.OldEntry != nil && message.NewEntry != nil {
|
|
oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
|
|
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
|
|
if !shouldSendToRemote(message.NewEntry) {
|
|
fmt.Printf("skipping updating: %+v\n", resp)
|
|
return nil
|
|
}
|
|
if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
|
|
if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
|
|
fmt.Printf("update meta: %+v\n", resp)
|
|
return client.UpdateFileMetadata(dest, message.NewEntry)
|
|
}
|
|
}
|
|
fmt.Printf("update: %+v\n", resp)
|
|
if err := client.DeleteFile(oldDest); err != nil {
|
|
return err
|
|
}
|
|
reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
|
|
remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
|
|
if writeErr != nil {
|
|
return writeErr
|
|
}
|
|
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
|
lastTime := time.Unix(0, lastTsNs)
|
|
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
|
|
return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
|
|
})
|
|
|
|
return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
|
|
"filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
|
|
}
|
|
|
|
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
|
|
var dest string
|
|
source := string(sourcePath[len(mountDir):])
|
|
if strings.HasSuffix(remoteMountLocation.Path, "/") {
|
|
dest = remoteMountLocation.Path + source[1:]
|
|
} else {
|
|
dest = remoteMountLocation.Path + source
|
|
}
|
|
return &filer_pb.RemoteStorageLocation{
|
|
Name: remoteMountLocation.Name,
|
|
Bucket: remoteMountLocation.Bucket,
|
|
Path: dest,
|
|
}
|
|
}
|
|
|
|
func isSameChunks(a, b []*filer_pb.FileChunk) bool {
|
|
if len(a) != len(b) {
|
|
return false
|
|
}
|
|
for i := 0; i < len(a); i++ {
|
|
x, y := a[i], b[i]
|
|
if !proto.Equal(x, y) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func shouldSendToRemote(entry *filer_pb.Entry) bool {
|
|
if entry.RemoteEntry == nil {
|
|
return true
|
|
}
|
|
if entry.RemoteEntry.Size != int64(filer.FileSize(entry)) {
|
|
return true
|
|
}
|
|
if entry.RemoteEntry.LastModifiedAt < entry.Attributes.Mtime {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
|
|
entry.RemoteEntry = remoteEntry
|
|
return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
|
_, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
|
|
Directory: dir,
|
|
Entry: entry,
|
|
})
|
|
return err
|
|
})
|
|
}
|