@ -12,6 +12,7 @@ import ( 
			
		
	
		
			
				
						"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"  
			
		
	
		
			
				
						"github.com/chrislusf/seaweedfs/weed/replication/source"  
			
		
	
		
			
				
						"github.com/chrislusf/seaweedfs/weed/security"  
			
		
	
		
			
				
						statsCollect  "github.com/chrislusf/seaweedfs/weed/stats"  
			
		
	
		
			
				
						"github.com/chrislusf/seaweedfs/weed/util"  
			
		
	
		
			
				
						"github.com/chrislusf/seaweedfs/weed/util/grace"  
			
		
	
		
			
				
						"google.golang.org/grpc"  
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -40,6 +41,7 @@ type SyncOptions struct { 
			
		
	
		
			
				
						bFromTsMs        * int64  
			
		
	
		
			
				
						aProxyByFiler    * bool  
			
		
	
		
			
				
						bProxyByFiler    * bool  
			
		
	
		
			
				
						metricsHttpPort  * int  
			
		
	
		
			
				
						clientId         int32  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -72,6 +74,7 @@ func init() { 
			
		
	
		
			
				
						syncOptions . bFromTsMs  =  cmdFilerSynchronize . Flag . Int64 ( "b.fromTsMs" ,  0 ,  "synchronization from timestamp on filer B. The unit is millisecond" )  
			
		
	
		
			
				
						syncCpuProfile  =  cmdFilerSynchronize . Flag . String ( "cpuprofile" ,  "" ,  "cpu profile output file" )  
			
		
	
		
			
				
						syncMemProfile  =  cmdFilerSynchronize . Flag . String ( "memprofile" ,  "" ,  "memory profile output file" )  
			
		
	
		
			
				
						syncOptions . metricsHttpPort  =  cmdFilerSynchronize . Flag . Int ( "metricsPort" ,  0 ,  "metrics listen port" )  
			
		
	
		
			
				
						syncOptions . clientId  =  util . RandomInt32 ( )  
			
		
	
		
			
				
					}  
			
		
	
		
			
				
					
 
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -103,6 +106,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool { 
			
		
	
		
			
				
						filerA  :=  pb . ServerAddress ( * syncOptions . filerA )  
			
		
	
		
			
				
						filerB  :=  pb . ServerAddress ( * syncOptions . filerB )  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						// start filer.sync metrics server
  
			
		
	
		
			
				
						go  statsCollect . StartMetricsServer ( * syncOptions . metricsHttpPort )  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						// read a filer signature
  
			
		
	
		
			
				
						aFilerSignature ,  aFilerErr  :=  replication . ReadFilerSignature ( grpcDialOption ,  filerA )  
			
		
	
		
			
				
						if  aFilerErr  !=  nil  {  
			
		
	
	
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
				
				@ -210,14 +216,17 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, 
			
		
	
		
			
				
						}  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						var  lastLogTsNs  =  time . Now ( ) . Nanosecond ( )  
			
		
	
		
			
				
						var  clientName  =  fmt . Sprintf ( "syncFrom_%s_To_%s" ,  string ( sourceFiler ) ,  string ( targetFiler ) )  
			
		
	
		
			
				
						processEventFnWithOffset  :=  pb . AddOffsetFunc ( processEventFn ,  3 * time . Second ,  func ( counter  int64 ,  lastTsNs  int64 )  error  {  
			
		
	
		
			
				
							now  :=  time . Now ( ) . Nanosecond ( )  
			
		
	
		
			
				
							glog . V ( 0 ) . Infof ( "sync %s to %s progressed to %v %0.2f/sec" ,  sourceFiler ,  targetFiler ,  time . Unix ( 0 ,  lastTsNs ) ,  float64 ( counter ) / ( float64 ( now - lastLogTsNs ) / 1e9 ) )  
			
		
	
		
			
				
							lastLogTsNs  =  now  
			
		
	
		
			
				
							// collect synchronous offset
  
			
		
	
		
			
				
							statsCollect . FilerSyncOffsetGauge . WithLabelValues ( sourceFiler . String ( ) ,  targetFiler . String ( ) ,  clientName ,  sourcePath ) . Set ( float64 ( lastTsNs ) )  
			
		
	
		
			
				
							return  setOffset ( grpcDialOption ,  targetFiler ,  getSignaturePrefixByPath ( sourcePath ) ,  sourceFilerSignature ,  lastTsNs )  
			
		
	
		
			
				
						} )  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
						return  pb . FollowMetadata ( sourceFiler ,  grpcDialOption ,  "syncTo_" + string ( targetFiler ) ,  clientId ,  
			
		
	
		
			
				
						return  pb . FollowMetadata ( sourceFiler ,  grpcDialOption ,  clientName ,  clientId ,  
			
		
	
		
			
				
							sourcePath ,  nil ,  sourceFilerOffsetTsNs ,  0 ,  targetFilerSignature ,  processEventFnWithOffset ,  pb . RetryForeverOnError )  
			
		
	
		
			
				
					
 
			
		
	
		
			
				
					}