You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

513 lines
20 KiB

4 years ago
4 years ago
2 years ago
4 years ago
4 years ago
4 years ago
4 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/replication"
  10. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  11. "github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink"
  12. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  13. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  14. "github.com/seaweedfs/seaweedfs/weed/security"
  15. statsCollect "github.com/seaweedfs/seaweedfs/weed/stats"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  18. "google.golang.org/grpc"
  19. "os"
  20. "regexp"
  21. "strings"
  22. "sync/atomic"
  23. "time"
  24. )
  25. type SyncOptions struct {
  26. isActivePassive *bool
  27. filerA *string
  28. filerB *string
  29. aPath *string
  30. aExcludePaths *string
  31. bPath *string
  32. bExcludePaths *string
  33. aReplication *string
  34. bReplication *string
  35. aCollection *string
  36. bCollection *string
  37. aTtlSec *int
  38. bTtlSec *int
  39. aDiskType *string
  40. bDiskType *string
  41. aDebug *bool
  42. bDebug *bool
  43. aFromTsMs *int64
  44. bFromTsMs *int64
  45. aProxyByFiler *bool
  46. bProxyByFiler *bool
  47. metricsHttpIp *string
  48. metricsHttpPort *int
  49. concurrency *int
  50. aDoDeleteFiles *bool
  51. bDoDeleteFiles *bool
  52. clientId int32
  53. clientEpoch atomic.Int32
  54. }
  55. const (
  56. SyncKeyPrefix = "sync."
  57. DefaultConcurrencyLimit = 32
  58. )
  59. var (
  60. syncOptions SyncOptions
  61. syncCpuProfile *string
  62. syncMemProfile *string
  63. )
  64. func init() {
  65. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  66. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  67. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  68. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  69. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  70. syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A")
  71. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  72. syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B")
  73. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  74. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  75. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  76. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  77. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  78. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  79. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
  80. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
  81. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  82. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  83. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  84. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  85. syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
  86. syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
  87. syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrencyLimit, "The maximum number of files that will be synced concurrently.")
  88. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  89. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  90. syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip")
  91. syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
  92. syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A")
  93. syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B")
  94. syncOptions.clientId = util.RandomInt32()
  95. }
  96. var cmdFilerSynchronize = &Command{
  97. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  98. Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  99. Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
  100. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  101. and write to the other destination. Different from filer.replicate:
  102. * filer.sync only works between two filers.
  103. * filer.sync does not need any special message queue setup.
  104. * filer.sync supports both active-active and active-passive modes.
  105. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  106. A fresh sync will start from the earliest metadata logs.
  107. `,
  108. }
  109. func runFilerSynchronize(cmd *Command, args []string) bool {
  110. util.LoadConfiguration("security", false)
  111. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  112. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  113. filerA := pb.ServerAddress(*syncOptions.filerA)
  114. filerB := pb.ServerAddress(*syncOptions.filerB)
  115. // start filer.sync metrics server
  116. go statsCollect.StartMetricsServer(*syncOptions.metricsHttpIp, *syncOptions.metricsHttpPort)
  117. // read a filer signature
  118. aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
  119. if aFilerErr != nil {
  120. glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
  121. return true
  122. }
  123. // read b filer signature
  124. bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
  125. if bFilerErr != nil {
  126. glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
  127. return true
  128. }
  129. go func() {
  130. // a->b
  131. // set synchronization start timestamp to offset
  132. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath))
  133. if initOffsetError != nil {
  134. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
  135. os.Exit(2)
  136. }
  137. for {
  138. syncOptions.clientEpoch.Add(1)
  139. err := doSubscribeFilerMetaChanges(
  140. syncOptions.clientId,
  141. syncOptions.clientEpoch.Load(),
  142. grpcDialOption,
  143. filerA,
  144. *syncOptions.aPath,
  145. util.StringSplit(*syncOptions.aExcludePaths, ","),
  146. *syncOptions.aProxyByFiler,
  147. filerB,
  148. *syncOptions.bPath,
  149. *syncOptions.bReplication,
  150. *syncOptions.bCollection,
  151. *syncOptions.bTtlSec,
  152. *syncOptions.bProxyByFiler,
  153. *syncOptions.bDiskType,
  154. *syncOptions.bDebug,
  155. *syncOptions.concurrency,
  156. *syncOptions.bDoDeleteFiles,
  157. aFilerSignature,
  158. bFilerSignature)
  159. if err != nil {
  160. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  161. time.Sleep(1747 * time.Millisecond)
  162. }
  163. }
  164. }()
  165. if !*syncOptions.isActivePassive {
  166. // b->a
  167. // set synchronization start timestamp to offset
  168. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath))
  169. if initOffsetError != nil {
  170. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
  171. os.Exit(2)
  172. }
  173. go func() {
  174. for {
  175. syncOptions.clientEpoch.Add(1)
  176. err := doSubscribeFilerMetaChanges(
  177. syncOptions.clientId,
  178. syncOptions.clientEpoch.Load(),
  179. grpcDialOption,
  180. filerB,
  181. *syncOptions.bPath,
  182. util.StringSplit(*syncOptions.bExcludePaths, ","),
  183. *syncOptions.bProxyByFiler,
  184. filerA,
  185. *syncOptions.aPath,
  186. *syncOptions.aReplication,
  187. *syncOptions.aCollection,
  188. *syncOptions.aTtlSec,
  189. *syncOptions.aProxyByFiler,
  190. *syncOptions.aDiskType,
  191. *syncOptions.aDebug,
  192. *syncOptions.concurrency,
  193. *syncOptions.aDoDeleteFiles,
  194. bFilerSignature,
  195. aFilerSignature)
  196. if err != nil {
  197. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  198. time.Sleep(2147 * time.Millisecond)
  199. }
  200. }
  201. }()
  202. }
  203. select {}
  204. return true
  205. }
  206. // initOffsetFromTsMs Initialize offset
  207. func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64, signaturePrefix string) error {
  208. if fromTsMs <= 0 {
  209. return nil
  210. }
  211. // convert to nanosecond
  212. fromTsNs := fromTsMs * 1000_000
  213. // If not successful, exit the program.
  214. setOffsetErr := setOffset(grpcDialOption, targetFiler, signaturePrefix, sourceFilerSignature, fromTsNs)
  215. if setOffsetErr != nil {
  216. return setOffsetErr
  217. }
  218. glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
  219. return nil
  220. }
  221. func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
  222. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error {
  223. // if first time, start from now
  224. // if has previously synced, resume from that point of time
  225. sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
  226. if err != nil {
  227. return err
  228. }
  229. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  230. // create filer sink
  231. filerSource := &source.FilerSource{}
  232. filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
  233. filerSink := &filersink.FilerSink{}
  234. filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  235. filerSink.SetSourceFiler(filerSource)
  236. persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, doDeleteFiles, debug)
  237. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  238. message := resp.EventNotification
  239. for _, sig := range message.Signatures {
  240. if sig == targetFilerSignature && targetFilerSignature != 0 {
  241. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  242. return nil
  243. }
  244. }
  245. return persistEventFn(resp)
  246. }
  247. if concurrency < 0 || concurrency > 1024 {
  248. glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit)
  249. concurrency = DefaultConcurrencyLimit
  250. }
  251. processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
  252. var lastLogTsNs = time.Now().UnixNano()
  253. var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
  254. processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
  255. processor.AddSyncJob(resp)
  256. return nil
  257. }, 3*time.Second, func(counter int64, lastTsNs int64) error {
  258. offsetTsNs := processor.processedTsWatermark.Load()
  259. if offsetTsNs == 0 {
  260. return nil
  261. }
  262. // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
  263. now := time.Now().UnixNano()
  264. glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  265. lastLogTsNs = now
  266. // collect synchronous offset
  267. statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs))
  268. return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
  269. })
  270. metadataFollowOption := &pb.MetadataFollowOption{
  271. ClientName: clientName,
  272. ClientId: clientId,
  273. ClientEpoch: clientEpoch,
  274. SelfSignature: targetFilerSignature,
  275. PathPrefix: sourcePath,
  276. AdditionalPathPrefixes: nil,
  277. DirectoriesToWatch: nil,
  278. StartTsNs: sourceFilerOffsetTsNs,
  279. StopTsNs: 0,
  280. EventErrorType: pb.RetryForeverOnError,
  281. }
  282. return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  283. }
  284. // When each business is distinguished according to path, and offsets need to be maintained separately.
  285. func getSignaturePrefixByPath(path string) string {
  286. // compatible historical version
  287. if path == "/" {
  288. return SyncKeyPrefix
  289. } else {
  290. return SyncKeyPrefix + path
  291. }
  292. }
  293. func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
  294. readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  295. syncKey := []byte(signaturePrefix + "____")
  296. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  297. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  298. if err != nil {
  299. return err
  300. }
  301. if len(resp.Error) != 0 {
  302. return errors.New(resp.Error)
  303. }
  304. if len(resp.Value) < 8 {
  305. return nil
  306. }
  307. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  308. return nil
  309. })
  310. return
  311. }
  312. func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
  313. return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  314. syncKey := []byte(signaturePrefix + "____")
  315. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  316. valueBuf := make([]byte, 8)
  317. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  318. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  319. Key: syncKey,
  320. Value: valueBuf,
  321. })
  322. if err != nil {
  323. return err
  324. }
  325. if len(resp.Error) != 0 {
  326. return errors.New(resp.Error)
  327. }
  328. return nil
  329. })
  330. }
  331. func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, doDeleteFiles bool, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
  332. // process function
  333. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  334. message := resp.EventNotification
  335. var sourceOldKey, sourceNewKey util.FullPath
  336. if message.OldEntry != nil {
  337. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  338. }
  339. if message.NewEntry != nil {
  340. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  341. }
  342. if debug {
  343. glog.V(0).Infof("received %v", resp)
  344. }
  345. if strings.Contains(resp.Directory, "/"+s3_constants.MultipartUploadsFolder+"/") {
  346. return nil
  347. }
  348. if !strings.HasPrefix(resp.Directory, sourcePath) {
  349. return nil
  350. }
  351. for _, excludePath := range excludePaths {
  352. if strings.HasPrefix(resp.Directory, excludePath) {
  353. return nil
  354. }
  355. }
  356. if reExcludeFileName != nil && reExcludeFileName.MatchString(message.NewEntry.Name) {
  357. return nil
  358. }
  359. if dataSink.IsIncremental() {
  360. doDeleteFiles = false
  361. }
  362. // handle deletions
  363. if filer_pb.IsDelete(resp) {
  364. if !doDeleteFiles {
  365. return nil
  366. }
  367. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  368. return nil
  369. }
  370. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  371. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  372. }
  373. // handle new entries
  374. if filer_pb.IsCreate(resp) {
  375. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  376. return nil
  377. }
  378. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  379. if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
  380. return fmt.Errorf("create entry1 : %v", err)
  381. } else {
  382. return nil
  383. }
  384. }
  385. // this is something special?
  386. if filer_pb.IsEmpty(resp) {
  387. return nil
  388. }
  389. // handle updates
  390. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  391. // old key is in the watched directory
  392. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  393. // new key is also in the watched directory
  394. if doDeleteFiles {
  395. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  396. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  397. foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  398. if foundExisting {
  399. return err
  400. }
  401. // not able to find old entry
  402. if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  403. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  404. }
  405. }
  406. // create the new entry
  407. newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  408. if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
  409. return fmt.Errorf("create entry2 : %v", err)
  410. } else {
  411. return nil
  412. }
  413. } else {
  414. // new key is outside of the watched directory
  415. if doDeleteFiles {
  416. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  417. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  418. }
  419. }
  420. } else {
  421. // old key is outside of the watched directory
  422. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  423. // new key is in the watched directory
  424. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  425. if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
  426. return fmt.Errorf("create entry3 : %v", err)
  427. } else {
  428. return nil
  429. }
  430. } else {
  431. // new key is also outside of the watched directory
  432. // skip
  433. }
  434. }
  435. return nil
  436. }
  437. return processEventFn
  438. }
  439. func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
  440. if !dataSink.IsIncremental() {
  441. key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
  442. } else {
  443. var mTime int64
  444. if message.NewEntry != nil {
  445. mTime = message.NewEntry.Attributes.Mtime
  446. } else if message.OldEntry != nil {
  447. mTime = message.OldEntry.Attributes.Mtime
  448. }
  449. dateKey := time.Unix(mTime, 0).Format("2006-01-02")
  450. key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
  451. }
  452. return escapeKey(key)
  453. }