|
|
@ -105,30 +105,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour |
|
|
|
return fmt.Errorf("read mount info: %v", detectErr) |
|
|
|
} |
|
|
|
|
|
|
|
// 1. specified by timeAgo
|
|
|
|
// 2. last offset timestamp for this directory
|
|
|
|
// 3. directory creation time
|
|
|
|
var lastOffsetTs time.Time |
|
|
|
if *option.timeAgo == 0 { |
|
|
|
mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("lookup %s: %v", mountedDir, err) |
|
|
|
} |
|
|
|
|
|
|
|
lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir) |
|
|
|
if mountedDirEntry != nil { |
|
|
|
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { |
|
|
|
lastOffsetTs = time.Unix(0, lastOffsetTsNs) |
|
|
|
glog.V(0).Infof("resume from %v", lastOffsetTs) |
|
|
|
} else { |
|
|
|
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) |
|
|
|
} |
|
|
|
} else { |
|
|
|
lastOffsetTs = time.Now() |
|
|
|
} |
|
|
|
} else { |
|
|
|
lastOffsetTs = time.Now().Add(-*option.timeAgo) |
|
|
|
} |
|
|
|
lastOffsetTs := collectLastSyncOffset(option, mountedDir) |
|
|
|
|
|
|
|
client, err := remote_storage.GetRemoteStorage(remoteStorage) |
|
|
|
if err != nil { |
|
|
@ -251,6 +228,35 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour |
|
|
|
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) |
|
|
|
} |
|
|
|
|
|
|
|
func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) (time.Time) { |
|
|
|
// 1. specified by timeAgo
|
|
|
|
// 2. last offset timestamp for this directory
|
|
|
|
// 3. directory creation time
|
|
|
|
var lastOffsetTs time.Time |
|
|
|
if *option.timeAgo == 0 { |
|
|
|
mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) |
|
|
|
if err != nil { |
|
|
|
glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err) |
|
|
|
return time.Now() |
|
|
|
} |
|
|
|
|
|
|
|
lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir) |
|
|
|
if mountedDirEntry != nil { |
|
|
|
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { |
|
|
|
lastOffsetTs = time.Unix(0, lastOffsetTsNs) |
|
|
|
glog.V(0).Infof("resume from %v", lastOffsetTs) |
|
|
|
} else { |
|
|
|
lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) |
|
|
|
} |
|
|
|
} else { |
|
|
|
lastOffsetTs = time.Now() |
|
|
|
} |
|
|
|
} else { |
|
|
|
lastOffsetTs = time.Now().Add(-*option.timeAgo) |
|
|
|
} |
|
|
|
return lastOffsetTs |
|
|
|
} |
|
|
|
|
|
|
|
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { |
|
|
|
source := string(sourcePath[len(mountDir):]) |
|
|
|
dest := util.FullPath(remoteMountLocation.Path).Child(source) |
|
|
|