@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
statsCollect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
@ -40,6 +41,7 @@ type SyncOptions struct {
bFromTsMs * int64
aProxyByFiler * bool
bProxyByFiler * bool
metricsHttpPort * int
clientId int32
}
@ -72,6 +74,7 @@ func init() {
syncOptions . bFromTsMs = cmdFilerSynchronize . Flag . Int64 ( "b.fromTsMs" , 0 , "synchronization from timestamp on filer B. The unit is millisecond" )
syncCpuProfile = cmdFilerSynchronize . Flag . String ( "cpuprofile" , "" , "cpu profile output file" )
syncMemProfile = cmdFilerSynchronize . Flag . String ( "memprofile" , "" , "memory profile output file" )
syncOptions . metricsHttpPort = cmdFilerSynchronize . Flag . Int ( "metricsPort" , 0 , "metrics listen port" )
syncOptions . clientId = util . RandomInt32 ( )
}
@ -103,6 +106,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
filerA := pb . ServerAddress ( * syncOptions . filerA )
filerB := pb . ServerAddress ( * syncOptions . filerB )
// start filer.sync metrics server
go statsCollect . StartMetricsServer ( * syncOptions . metricsHttpPort )
// read a filer signature
aFilerSignature , aFilerErr := replication . ReadFilerSignature ( grpcDialOption , filerA )
if aFilerErr != nil {
@ -210,14 +216,17 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
}
var lastLogTsNs = time . Now ( ) . Nanosecond ( )
var clientName = fmt . Sprintf ( "syncTo_%s_From_%s" , string ( targetFiler ) , string ( sourceFiler ) )
processEventFnWithOffset := pb . AddOffsetFunc ( processEventFn , 3 * time . Second , func ( counter int64 , lastTsNs int64 ) error {
now := time . Now ( ) . Nanosecond ( )
glog . V ( 0 ) . Infof ( "sync %s to %s progressed to %v %0.2f/sec" , sourceFiler , targetFiler , time . Unix ( 0 , lastTsNs ) , float64 ( counter ) / ( float64 ( now - lastLogTsNs ) / 1e9 ) )
lastLogTsNs = now
// collect synchronous offset
statsCollect . FilerSyncOffsetGauge . WithLabelValues ( sourceFiler . String ( ) , targetFiler . String ( ) , clientName , sourcePath ) . Set ( float64 ( lastTsNs ) )
return setOffset ( grpcDialOption , targetFiler , getSignaturePrefixByPath ( sourcePath ) , sourceFilerSignature , lastTsNs )
} )
return pb . FollowMetadata ( sourceFiler , grpcDialOption , "syncTo_" + string ( targetFiler ) , clientId ,
return pb . FollowMetadata ( sourceFiler , grpcDialOption , clientName , clientId ,
sourcePath , nil , sourceFilerOffsetTsNs , 0 , targetFilerSignature , processEventFnWithOffset , pb . RetryForeverOnError )
}