@ -182,7 +182,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
// if first time, start from now
// if first time, start from now
// if has previously synced, resume from that point of time
// if has previously synced, resume from that point of time
sourceFilerOffsetTsNs , err := getOffset ( grpcDialOption , targetFiler , SyncKeyPrefix , sourceFilerSignature )
sourceFilerOffsetTsNs , err := getOffset ( grpcDialOption , targetFiler , getSignaturePrefixByPath ( sourcePath ) , sourceFilerSignature )
if err != nil {
if err != nil {
return err
return err
}
}
@ -214,7 +214,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
now := time . Now ( ) . Nanosecond ( )
now := time . Now ( ) . Nanosecond ( )
glog . V ( 0 ) . Infof ( "sync %s to %s progressed to %v %0.2f/sec" , sourceFiler , targetFiler , time . Unix ( 0 , lastTsNs ) , float64 ( counter ) / ( float64 ( now - lastLogTsNs ) / 1e9 ) )
glog . V ( 0 ) . Infof ( "sync %s to %s progressed to %v %0.2f/sec" , sourceFiler , targetFiler , time . Unix ( 0 , lastTsNs ) , float64 ( counter ) / ( float64 ( now - lastLogTsNs ) / 1e9 ) )
lastLogTsNs = now
lastLogTsNs = now
return setOffset ( grpcDialOption , targetFiler , SyncKeyPrefix , sourceFilerSignature , lastTsNs )
return setOffset ( grpcDialOption , targetFiler , getSignaturePrefixByPath ( sourcePath ) , sourceFilerSignature , lastTsNs )
} )
} )
return pb . FollowMetadata ( sourceFiler , grpcDialOption , "syncTo_" + string ( targetFiler ) , clientId ,
return pb . FollowMetadata ( sourceFiler , grpcDialOption , "syncTo_" + string ( targetFiler ) , clientId ,
@ -226,6 +226,16 @@ const (
SyncKeyPrefix = "sync."
SyncKeyPrefix = "sync."
)
)
// When each business is distinguished according to path, and offsets need to be maintained separately.
func getSignaturePrefixByPath ( path string ) string {
// compatible historical version
if path == "/" {
return SyncKeyPrefix
} else {
return SyncKeyPrefix + path
}
}
func getOffset ( grpcDialOption grpc . DialOption , filer pb . ServerAddress , signaturePrefix string , signature int32 ) ( lastOffsetTsNs int64 , readErr error ) {
func getOffset ( grpcDialOption grpc . DialOption , filer pb . ServerAddress , signaturePrefix string , signature int32 ) ( lastOffsetTsNs int64 , readErr error ) {
readErr = pb . WithFilerClient ( false , filer , grpcDialOption , func ( client filer_pb . SeaweedFilerClient ) error {
readErr = pb . WithFilerClient ( false , filer , grpcDialOption , func ( client filer_pb . SeaweedFilerClient ) error {