From 228b133afa570d18cf14cd5081427f1ed3a50efe Mon Sep 17 00:00:00 2001 From: bernardx Date: Sat, 3 Sep 2022 14:03:23 +0800 Subject: [PATCH] new 'concurrency' parameter for filer.sync (#3579) Co-authored-by: XIAOYQ --- weed/command/filer_sync.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index d6f1d63d8..7133b0aef 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -44,10 +44,16 @@ type SyncOptions struct { aProxyByFiler *bool bProxyByFiler *bool metricsHttpPort *int + concurrency *int clientId int32 clientEpoch int32 } +const ( + SyncKeyPrefix = "sync." + DefaultConcurrcyLimit = 32 +) + var ( syncOptions SyncOptions syncCpuProfile *string @@ -77,6 +83,7 @@ func init() { syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files") syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond") syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond") + syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrcyLimit, "The maximum number of files that will be synced concurrently.") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port") @@ -153,6 +160,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug, + *syncOptions.concurrency, aFilerSignature, bFilerSignature) if err != nil { @@ -189,6 +197,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug, + *syncOptions.concurrency, bFilerSignature, aFilerSignature) if err != nil { @@ -221,7 +230,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd } func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, - replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error { + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, sourceFilerSignature int32, targetFilerSignature int32) error { // if first time, start from now // if has previously synced, resume from that point of time @@ -251,7 +260,12 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti } return persistEventFn(resp) } - processor := NewMetadataProcessor(processEventFn, 128) + + if concurrency < 0 || concurrency > 1024 { + glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrcyLimit) + concurrency = DefaultConcurrcyLimit + } + processor := NewMetadataProcessor(processEventFn, concurrency) var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) @@ -276,10 +290,6 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti } -const ( - SyncKeyPrefix = "sync." -) - // When each business is distinguished according to path, and offsets need to be maintained separately. func getSignaturePrefixByPath(path string) string { // compatible historical version