diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index deb458525..7aa9c1e8d 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -15,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" + "os" "strings" "time" ) @@ -35,6 +36,8 @@ type SyncOptions struct { bDiskType *string aDebug *bool bDebug *bool + aFromTsMs *int64 + bFromTsMs *int64 aProxyByFiler *bool bProxyByFiler *bool clientId int32 @@ -65,6 +68,8 @@ func init() { syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers") syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files") syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files") + syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond") + syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") syncOptions.clientId = util.RandomInt32() @@ -97,10 +102,32 @@ func runFilerSynchronize(cmd *Command, args []string) bool { filerA := pb.ServerAddress(*syncOptions.filerA) filerB := pb.ServerAddress(*syncOptions.filerB) + + // read a filer signature + aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA) + if aFilerErr != nil { + glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr) + return true + } + // read b filer signature + bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB) + if bFilerErr != nil { + glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr) + return true + } + go func() { + // a->b + // set synchronization start timestamp to offset + initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs) + if initOffsetError != nil { + glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError) + os.Exit(2) + } for { err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, - *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug) + *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, + *syncOptions.bDebug, aFilerSignature, bFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -109,10 +136,18 @@ func runFilerSynchronize(cmd *Command, args []string) bool { }() if !*syncOptions.isActivePassive { + // b->a + // set synchronization start timestamp to offset + initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs) + if initOffsetError != nil { + glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError) + os.Exit(2) + } go func() { for { err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, - *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug) + *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, + *syncOptions.aDebug, bFilerSignature, aFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -126,19 +161,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } -func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, - replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error { - - // read source filer signature - sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler) - if sourceErr != nil { - return sourceErr +// initOffsetFromTsMs Initialize offset +func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error { + if fromTsMs <= 0 { + return nil } - // read target filer signature - targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler) - if targetErr != nil { - return targetErr + // convert to nanosecond + fromTsNs := fromTsMs * 1000_000 + // If not successful, exit the program. + setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs) + if setOffsetErr != nil { + return setOffsetErr } + glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB) + return nil +} + +func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error { // if first time, start from now // if has previously synced, resume from that point of time