diff --git a/weed/Makefile b/weed/Makefile index fd0843c22..8f1257d09 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -33,3 +33,7 @@ debug_webdav: debug_s3: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 s3 + +debug_filer_copy: + go build -gcflags="all=-N -l" + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h diff --git a/weed/command/command.go b/weed/command/command.go index bbc2e0423..5506e6969 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -15,6 +15,7 @@ var Commands = []*Command{ cmdDownload, cmdExport, cmdFiler, + cmdFilerBackup, cmdFilerCat, cmdFilerMetaTail, cmdFilerReplicate, diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go new file mode 100644 index 000000000..8cb7441f6 --- /dev/null +++ b/weed/command/filer_backup.go @@ -0,0 +1,158 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "io" + "time" +) + +type FilerBackupOptions struct { + isActivePassive *bool + filer *string + path *string + debug *bool + proxyByFiler *bool + timeAgo *time.Duration +} + +var ( + filerBackupOptions FilerBackupOptions +) + +func init() { + cmdFilerBackup.Run = runFilerBackup // break init cycle + filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster") + filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer") + filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers") + filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files") + filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") +} + +var cmdFilerBackup = &Command{ + UsageLine: "filer.backup -filer=: ", + Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml", + Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml + + filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content, + and write to the destination. This is to replace filer.replicate command since additional message queue is not needed. + + If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute. + A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value. + +`, +} + +func runFilerBackup(cmd *Command, args []string) bool { + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + util.LoadConfiguration("security", false) + util.LoadConfiguration("replication", true) + + for { + err := doFilerBackup(grpcDialOption, &filerBackupOptions) + if err != nil { + glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err) + time.Sleep(1747 * time.Millisecond) + } + } + + return true +} + +const ( + BackupKeyPrefix = "backup." +) + +func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error { + + // find data sink + config := util.GetViper() + dataSink := findSink(config) + if dataSink == nil { + return fmt.Errorf("no data sink configured in replication.toml") + } + + sourceFiler := *backupOption.filer + sourcePath := *backupOption.path + timeAgo := *backupOption.timeAgo + targetPath := dataSink.GetSinkToDirectory() + debug := *backupOption.debug + + // get start time for the data sink + startFrom := time.Unix(0, 0) + sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory()) + if timeAgo.Milliseconds() == 0 { + lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId)) + if err != nil { + glog.V(0).Infof("starting from %v", startFrom) + } else { + startFrom = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resuming from %v", startFrom) + } + } else { + startFrom = time.Now().Add(-timeAgo) + glog.V(0).Infof("start time is set to %v", startFrom) + } + + // create filer sink + filerSource := &source.FilerSource{} + filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler) + dataSink.SetSourceFiler(filerSource) + + processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) + + return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "backup_" + dataSink.GetName(), + PathPrefix: sourcePath, + SinceNs: startFrom.UnixNano(), + }) + if err != nil { + return fmt.Errorf("listen: %v", err) + } + + var counter int64 + var lastWriteTime time.Time + for { + resp, listenErr := stream.Recv() + + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + + if err := processEventFn(resp); err != nil { + return fmt.Errorf("processEventFn: %v", err) + } + + counter++ + if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { + glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) + counter = 0 + lastWriteTime = time.Now() + if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil { + return fmt.Errorf("setOffset: %v", err) + } + } + + } + + }) + +} + diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index e8c06b208..885c95540 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -74,18 +74,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { } } - var dataSink sink.ReplicationSink - for _, sk := range sink.Sinks { - if config.GetBool("sink." + sk.GetName() + ".enabled") { - if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil { - glog.Fatalf("Failed to initialize sink for %s: %+v", - sk.GetName(), err) - } - glog.V(0).Infof("Configure sink to %s", sk.GetName()) - dataSink = sk - break - } - } + dataSink := findSink(config) if dataSink == nil { println("no data sink configured in replication.toml:") @@ -135,6 +124,22 @@ func runFilerReplicate(cmd *Command, args []string) bool { } +func findSink(config *util.ViperProxy) sink.ReplicationSink { + var dataSink sink.ReplicationSink + for _, sk := range sink.Sinks { + if config.GetBool("sink." + sk.GetName() + ".enabled") { + if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil { + glog.Fatalf("Failed to initialize sink for %s: %+v", + sk.GetName(), err) + } + glog.V(0).Infof("Configure sink to %s", sk.GetName()) + dataSink = sk + break + } + } + return dataSink +} + func validateOneEnabledInput(config *util.ViperProxy) { enabledInput := "" for _, input := range sub.NotificationInputs { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index f8beb6fda..0f34e5701 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication" + "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" @@ -137,7 +138,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so // if first time, start from now // if has previously synced, resume from that point of time - sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature) + sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature) if err != nil { return err } @@ -151,93 +152,17 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) + persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification - - var sourceOldKey, sourceNewKey util.FullPath - if message.OldEntry != nil { - sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name) - } - if message.NewEntry != nil { - sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name) - } - for _, sig := range message.Signatures { if sig == targetFilerSignature && targetFilerSignature != 0 { fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message) return nil } } - if debug { - fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature) - } - - if !strings.HasPrefix(resp.Directory, sourcePath) { - return nil - } - - // handle deletions - if message.OldEntry != nil && message.NewEntry == nil { - if !strings.HasPrefix(string(sourceOldKey), sourcePath) { - return nil - } - key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) - } - - // handle new entries - if message.OldEntry == nil && message.NewEntry != nil { - if !strings.HasPrefix(string(sourceNewKey), sourcePath) { - return nil - } - key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) - return filerSink.CreateEntry(key, message.NewEntry, message.Signatures) - } - - // this is something special? - if message.OldEntry == nil && message.NewEntry == nil { - return nil - } - - // handle updates - if strings.HasPrefix(string(sourceOldKey), sourcePath) { - // old key is in the watched directory - if strings.HasPrefix(string(sourceNewKey), sourcePath) { - // new key is also in the watched directory - oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) - foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) - if foundExisting { - return err - } - - // not able to find old entry - if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { - return fmt.Errorf("delete old entry %v: %v", oldKey, err) - } - - // create the new entry - newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) - return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures) - - } else { - // new key is outside of the watched directory - key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) - return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) - } - } else { - // old key is outside of the watched directory - if strings.HasPrefix(string(sourceNewKey), sourcePath) { - // new key is in the watched directory - key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):]) - return filerSink.CreateEntry(key, message.NewEntry, message.Signatures) - } else { - // new key is also outside of the watched directory - // skip - } - } - - return nil + return persistEventFn(resp) } return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -275,7 +200,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) counter = 0 lastWriteTime = time.Now() - if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil { + if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil { return err } } @@ -290,11 +215,11 @@ const ( SyncKeyPrefix = "sync." ) -func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) { +func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - syncKey := []byte(SyncKeyPrefix + "____") - util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature)) + syncKey := []byte(signaturePrefix + "____") + util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey}) if err != nil { @@ -317,11 +242,11 @@ func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature } -func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error { +func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error { return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - syncKey := []byte(SyncKeyPrefix + "____") - util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature)) + syncKey := []byte(signaturePrefix + "____") + util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) valueBuf := make([]byte, 8) util.Uint64toBytes(valueBuf, uint64(offsetTsNs)) @@ -343,3 +268,107 @@ func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignatur }) } + +func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { + // process function + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + + var sourceOldKey, sourceNewKey util.FullPath + if message.OldEntry != nil { + sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name) + } + if message.NewEntry != nil { + sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name) + } + + if debug { + glog.V(0).Infof("received %v", resp) + } + + if !strings.HasPrefix(resp.Directory, sourcePath) { + return nil + } + + // handle deletions + if message.OldEntry != nil && message.NewEntry == nil { + if !strings.HasPrefix(string(sourceOldKey), sourcePath) { + return nil + } + key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + } + + // handle new entries + if message.OldEntry == nil && message.NewEntry != nil { + if !strings.HasPrefix(string(sourceNewKey), sourcePath) { + return nil + } + key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + return dataSink.CreateEntry(key, message.NewEntry, message.Signatures) + } + + // this is something special? + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + + // handle updates + if strings.HasPrefix(string(sourceOldKey), sourcePath) { + // old key is in the watched directory + if strings.HasPrefix(string(sourceNewKey), sourcePath) { + // new key is also in the watched directory + if !dataSink.IsIncremental() { + oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):]) + message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):]) + foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) + if foundExisting { + return err + } + + // not able to find old entry + if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil { + return fmt.Errorf("delete old entry %v: %v", oldKey, err) + } + } + // create the new entry + newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures) + + } else { + // new key is outside of the watched directory + if !dataSink.IsIncremental() { + key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + } + } + } else { + // old key is outside of the watched directory + if strings.HasPrefix(string(sourceNewKey), sourcePath) { + // new key is in the watched directory + key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) + return dataSink.CreateEntry(key, message.NewEntry, message.Signatures) + } else { + // new key is also outside of the watched directory + // skip + } + } + + return nil + } + return processEventFn +} + +func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string { + if !dataSink.IsIncremental() { + return util.Join(targetPath, string(sourceKey)[len(sourcePath):]) + } + var mTime int64 + if message.NewEntry != nil { + mTime = message.NewEntry.Attributes.Mtime + } else if message.OldEntry != nil { + mTime = message.OldEntry.Attributes.Mtime + } + dateKey := time.Unix(mTime, 0).Format("2006-01-02") + return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):]) +}