From 14d82c3deaee2b7eecd95c5432c9a60b23a94033 Mon Sep 17 00:00:00 2001 From: "zhihao.qu" Date: Tue, 14 Jun 2022 19:46:02 +0800 Subject: [PATCH] feat(filer.sync): add offset to path. --- weed/command/filer_sync.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 7aa9c1e8d..dc7c569c3 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -182,7 +182,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, // if first time, start from now // if has previously synced, resume from that point of time - sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature) + sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature) if err != nil { return err } @@ -214,7 +214,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, now := time.Now().Nanosecond() glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now - return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) + return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs) }) return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId, @@ -226,6 +226,16 @@ const ( SyncKeyPrefix = "sync." ) +// When each business is distinguished according to path, and offsets need to be maintained separately. +func getSignaturePrefixByPath(path string) string { + // compatible historical version + if path == "/" { + return SyncKeyPrefix + } else { + return SyncKeyPrefix + path + } +} + func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {