@ -26,7 +26,9 @@ type SyncOptions struct {
filerA * string
filerA * string
filerB * string
filerB * string
aPath * string
aPath * string
aExcludePaths * string
bPath * string
bPath * string
bExcludePaths * string
aReplication * string
aReplication * string
bReplication * string
bReplication * string
aCollection * string
aCollection * string
@ -58,7 +60,9 @@ func init() {
syncOptions . filerA = cmdFilerSynchronize . Flag . String ( "a" , "" , "filer A in one SeaweedFS cluster" )
syncOptions . filerA = cmdFilerSynchronize . Flag . String ( "a" , "" , "filer A in one SeaweedFS cluster" )
syncOptions . filerB = cmdFilerSynchronize . Flag . String ( "b" , "" , "filer B in the other SeaweedFS cluster" )
syncOptions . filerB = cmdFilerSynchronize . Flag . String ( "b" , "" , "filer B in the other SeaweedFS cluster" )
syncOptions . aPath = cmdFilerSynchronize . Flag . String ( "a.path" , "/" , "directory to sync on filer A" )
syncOptions . aPath = cmdFilerSynchronize . Flag . String ( "a.path" , "/" , "directory to sync on filer A" )
syncOptions . aExcludePaths = cmdFilerSynchronize . Flag . String ( "a.excludePaths" , "" , "exclude directories to sync on filer A" )
syncOptions . bPath = cmdFilerSynchronize . Flag . String ( "b.path" , "/" , "directory to sync on filer B" )
syncOptions . bPath = cmdFilerSynchronize . Flag . String ( "b.path" , "/" , "directory to sync on filer B" )
syncOptions . bExcludePaths = cmdFilerSynchronize . Flag . String ( "b.excludePaths" , "" , "exclude directories to sync on filer B" )
syncOptions . aReplication = cmdFilerSynchronize . Flag . String ( "a.replication" , "" , "replication on filer A" )
syncOptions . aReplication = cmdFilerSynchronize . Flag . String ( "a.replication" , "" , "replication on filer A" )
syncOptions . bReplication = cmdFilerSynchronize . Flag . String ( "b.replication" , "" , "replication on filer B" )
syncOptions . bReplication = cmdFilerSynchronize . Flag . String ( "b.replication" , "" , "replication on filer B" )
syncOptions . aCollection = cmdFilerSynchronize . Flag . String ( "a.collection" , "" , "collection on filer A" )
syncOptions . aCollection = cmdFilerSynchronize . Flag . String ( "a.collection" , "" , "collection on filer A" )
@ -133,9 +137,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}
}
for {
for {
syncOptions . clientEpoch ++
syncOptions . clientEpoch ++
err := doSubscribeFilerMetaChanges ( syncOptions . clientId , syncOptions . clientEpoch , grpcDialOption , filerA , * syncOptions . aPath , * syncOptions . aProxyByFiler , filerB ,
* syncOptions . bPath , * syncOptions . bReplication , * syncOptions . bCollection , * syncOptions . bTtlSec , * syncOptions . bProxyByFiler , * syncOptions . bDiskType ,
* syncOptions . bDebug , aFilerSignature , bFilerSignature )
err := doSubscribeFilerMetaChanges (
syncOptions . clientId ,
syncOptions . clientEpoch ,
grpcDialOption ,
filerA ,
* syncOptions . aPath ,
strings . Split ( * syncOptions . aExcludePaths , "," ) ,
* syncOptions . aProxyByFiler ,
filerB ,
* syncOptions . bPath ,
* syncOptions . bReplication ,
* syncOptions . bCollection ,
* syncOptions . bTtlSec ,
* syncOptions . bProxyByFiler ,
* syncOptions . bDiskType ,
* syncOptions . bDebug ,
aFilerSignature ,
bFilerSignature )
if err != nil {
if err != nil {
glog . Errorf ( "sync from %s to %s: %v" , * syncOptions . filerA , * syncOptions . filerB , err )
glog . Errorf ( "sync from %s to %s: %v" , * syncOptions . filerA , * syncOptions . filerB , err )
time . Sleep ( 1747 * time . Millisecond )
time . Sleep ( 1747 * time . Millisecond )
@ -154,9 +173,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func ( ) {
go func ( ) {
for {
for {
syncOptions . clientEpoch ++
syncOptions . clientEpoch ++
err := doSubscribeFilerMetaChanges ( syncOptions . clientId , syncOptions . clientEpoch , grpcDialOption , filerB , * syncOptions . bPath , * syncOptions . bProxyByFiler , filerA ,
* syncOptions . aPath , * syncOptions . aReplication , * syncOptions . aCollection , * syncOptions . aTtlSec , * syncOptions . aProxyByFiler , * syncOptions . aDiskType ,
* syncOptions . aDebug , bFilerSignature , aFilerSignature )
err := doSubscribeFilerMetaChanges (
syncOptions . clientId ,
syncOptions . clientEpoch ,
grpcDialOption ,
filerB ,
* syncOptions . bPath ,
strings . Split ( * syncOptions . bExcludePaths , "," ) ,
* syncOptions . bProxyByFiler ,
filerA ,
* syncOptions . aPath ,
* syncOptions . aReplication ,
* syncOptions . aCollection ,
* syncOptions . aTtlSec ,
* syncOptions . aProxyByFiler ,
* syncOptions . aDiskType ,
* syncOptions . aDebug ,
bFilerSignature ,
aFilerSignature )
if err != nil {
if err != nil {
glog . Errorf ( "sync from %s to %s: %v" , * syncOptions . filerB , * syncOptions . filerA , err )
glog . Errorf ( "sync from %s to %s: %v" , * syncOptions . filerB , * syncOptions . filerA , err )
time . Sleep ( 2147 * time . Millisecond )
time . Sleep ( 2147 * time . Millisecond )
@ -186,7 +220,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
return nil
return nil
}
}
func doSubscribeFilerMetaChanges ( clientId int32 , clientEpoch int32 , grpcDialOption grpc . DialOption , sourceFiler pb . ServerAddress , sourcePath string , sourceReadChunkFromFiler bool , targetFiler pb . ServerAddress , targetPath string ,
func doSubscribeFilerMetaChanges ( clientId int32 , clientEpoch int32 , grpcDialOption grpc . DialOption , sourceFiler pb . ServerAddress , sourcePath string , sourceExcludePaths [ ] string , source ReadChunkFromFiler bool , targetFiler pb . ServerAddress , targetPath string ,
replicationStr , collection string , ttlSec int , sinkWriteChunkByFiler bool , diskType string , debug bool , sourceFilerSignature int32 , targetFilerSignature int32 ) error {
replicationStr , collection string , ttlSec int , sinkWriteChunkByFiler bool , diskType string , debug bool , sourceFilerSignature int32 , targetFilerSignature int32 ) error {
// if first time, start from now
// if first time, start from now
@ -205,7 +239,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
filerSink . DoInitialize ( targetFiler . ToHttpAddress ( ) , targetFiler . ToGrpcAddress ( ) , targetPath , replicationStr , collection , ttlSec , diskType , grpcDialOption , sinkWriteChunkByFiler )
filerSink . DoInitialize ( targetFiler . ToHttpAddress ( ) , targetFiler . ToGrpcAddress ( ) , targetPath , replicationStr , collection , ttlSec , diskType , grpcDialOption , sinkWriteChunkByFiler )
filerSink . SetSourceFiler ( filerSource )
filerSink . SetSourceFiler ( filerSource )
persistEventFn := genProcessFunction ( sourcePath , targetPath , filerSink , debug )
persistEventFn := genProcessFunction ( sourcePath , targetPath , sourceExcludePaths , filerSink , debug )
processEventFn := func ( resp * filer_pb . SubscribeMetadataResponse ) error {
processEventFn := func ( resp * filer_pb . SubscribeMetadataResponse ) error {
message := resp . EventNotification
message := resp . EventNotification
@ -302,7 +336,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
}
}
func genProcessFunction ( sourcePath string , targetPath string , dataSink sink . ReplicationSink , debug bool ) func ( resp * filer_pb . SubscribeMetadataResponse ) error {
func genProcessFunction ( sourcePath string , targetPath string , excludePaths [ ] string , dataSink sink . ReplicationSink , debug bool ) func ( resp * filer_pb . SubscribeMetadataResponse ) error {
// process function
// process function
processEventFn := func ( resp * filer_pb . SubscribeMetadataResponse ) error {
processEventFn := func ( resp * filer_pb . SubscribeMetadataResponse ) error {
message := resp . EventNotification
message := resp . EventNotification
@ -322,7 +356,11 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
if ! strings . HasPrefix ( resp . Directory , sourcePath ) {
if ! strings . HasPrefix ( resp . Directory , sourcePath ) {
return nil
return nil
}
}
for _ , excludePath := range excludePaths {
if strings . HasPrefix ( resp . Directory , excludePath ) {
return nil
}
}
// handle deletions
// handle deletions
if filer_pb . IsDelete ( resp ) {
if filer_pb . IsDelete ( resp ) {
if ! strings . HasPrefix ( string ( sourceOldKey ) , sourcePath ) {
if ! strings . HasPrefix ( string ( sourceOldKey ) , sourcePath ) {