|
@ -47,6 +47,8 @@ type SyncOptions struct { |
|
|
metricsHttpIp *string |
|
|
metricsHttpIp *string |
|
|
metricsHttpPort *int |
|
|
metricsHttpPort *int |
|
|
concurrency *int |
|
|
concurrency *int |
|
|
|
|
|
aDoDeleteFiles *bool |
|
|
|
|
|
bDoDeleteFiles *bool |
|
|
clientId int32 |
|
|
clientId int32 |
|
|
clientEpoch int32 |
|
|
clientEpoch int32 |
|
|
} |
|
|
} |
|
@ -90,6 +92,8 @@ func init() { |
|
|
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") |
|
|
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") |
|
|
syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip") |
|
|
syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip") |
|
|
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port") |
|
|
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port") |
|
|
|
|
|
syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A") |
|
|
|
|
|
syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B") |
|
|
syncOptions.clientId = util.RandomInt32() |
|
|
syncOptions.clientId = util.RandomInt32() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -164,6 +168,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { |
|
|
*syncOptions.bDiskType, |
|
|
*syncOptions.bDiskType, |
|
|
*syncOptions.bDebug, |
|
|
*syncOptions.bDebug, |
|
|
*syncOptions.concurrency, |
|
|
*syncOptions.concurrency, |
|
|
|
|
|
*syncOptions.bDoDeleteFiles, |
|
|
aFilerSignature, |
|
|
aFilerSignature, |
|
|
bFilerSignature) |
|
|
bFilerSignature) |
|
|
if err != nil { |
|
|
if err != nil { |
|
@ -201,6 +206,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { |
|
|
*syncOptions.aDiskType, |
|
|
*syncOptions.aDiskType, |
|
|
*syncOptions.aDebug, |
|
|
*syncOptions.aDebug, |
|
|
*syncOptions.concurrency, |
|
|
*syncOptions.concurrency, |
|
|
|
|
|
*syncOptions.aDoDeleteFiles, |
|
|
bFilerSignature, |
|
|
bFilerSignature, |
|
|
aFilerSignature) |
|
|
aFilerSignature) |
|
|
if err != nil { |
|
|
if err != nil { |
|
@ -233,7 +239,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, |
|
|
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, |
|
|
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, sourceFilerSignature int32, targetFilerSignature int32) error { |
|
|
|
|
|
|
|
|
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error { |
|
|
|
|
|
|
|
|
// if first time, start from now
|
|
|
// if first time, start from now
|
|
|
// if has previously synced, resume from that point of time
|
|
|
// if has previously synced, resume from that point of time
|
|
@ -251,7 +257,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti |
|
|
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) |
|
|
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) |
|
|
filerSink.SetSourceFiler(filerSource) |
|
|
filerSink.SetSourceFiler(filerSource) |
|
|
|
|
|
|
|
|
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, true, debug) |
|
|
|
|
|
|
|
|
persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, doDeleteFiles, debug) |
|
|
|
|
|
|
|
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|
|
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { |
|
|
message := resp.EventNotification |
|
|
message := resp.EventNotification |
|
|