Browse Source

Merge branch 'master' of https://github.com/chrislusf/seaweedfs

pull/3185/head
chrislu 3 years ago
parent
commit
b347b2fb54
  1. 14
      weed/command/filer_sync.go

14
weed/command/filer_sync.go

@ -182,7 +182,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
// if first time, start from now // if first time, start from now
// if has previously synced, resume from that point of time // if has previously synced, resume from that point of time
sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
if err != nil { if err != nil {
return err return err
} }
@ -214,7 +214,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
now := time.Now().Nanosecond() now := time.Now().Nanosecond()
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now lastLogTsNs = now
return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs)
}) })
return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId, return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId,
@ -226,6 +226,16 @@ const (
SyncKeyPrefix = "sync." SyncKeyPrefix = "sync."
) )
// When each business is distinguished according to path, and offsets need to be maintained separately.
func getSignaturePrefixByPath(path string) string {
// compatible historical version
if path == "/" {
return SyncKeyPrefix
} else {
return SyncKeyPrefix + path
}
}
func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {

Loading…
Cancel
Save