From 6b54ff991249d2ce5dfbe28ffb503fc770d5db6c Mon Sep 17 00:00:00 2001
From: Konstantin Lebedev <lebedev_k@tochka.com>
Date: Wed, 27 Jan 2021 15:01:33 +0500
Subject: [PATCH] replication to create time date directory

---
 docker/replication.toml                       |  5 ++---
 weed/command/filer_replication.go             |  1 +
 weed/command/scaffold.go                      |  7 ++++++-
 weed/replication/replicator.go                | 13 ++++++++++++-
 .../sink/backupsink/backup_sink.go            | 18 ++++++++++++++++++
 weed/replication/sink/localsink/local_sink.go | 19 ++++++-------------
 6 files changed, 45 insertions(+), 18 deletions(-)
 create mode 100644 weed/replication/sink/backupsink/backup_sink.go

diff --git a/docker/replication.toml b/docker/replication.toml
index 2cee755a5..75e34a899 100644
--- a/docker/replication.toml
+++ b/docker/replication.toml
@@ -6,7 +6,6 @@ grpcAddress = "filer:18888"
 # i.e., all files with this "prefix" are sent to notification message queue.
 directory = "/buckets"
 
-[sink.local]
+[sink.backup]
 enabled = true
-directory = "/data"
-todays_date_format = "2006-02-01"
\ No newline at end of file
+directory = "/data"
\ No newline at end of file
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index e8c06b208..f2754139b 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -9,6 +9,7 @@ import (
 	"github.com/chrislusf/seaweedfs/weed/replication/sink"
 	_ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
 	_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
+	_ "github.com/chrislusf/seaweedfs/weed/replication/sink/backupsink"
 	_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
 	_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
 	_ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 1705c6ae4..9cbb1fa1b 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -353,9 +353,14 @@ directory = "/buckets"
 
 [sink.local]
 enabled = false
-directory = "/backup"
+directory = "/data"
 todays_date_format = ""  # set this to 2006-02-01 for incremental backup
 
+[sink.backup]
+enabled = false
+# all replicated files are under create time date directory tree
+directory = "/backup"
+
 [sink.filer]
 enabled = false
 grpcAddress = "localhost:18888"
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index c4228434f..81d546c3c 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -6,6 +6,7 @@ import (
 	"github.com/chrislusf/seaweedfs/weed/pb"
 	"google.golang.org/grpc"
 	"strings"
+	"time"
 
 	"github.com/chrislusf/seaweedfs/weed/glog"
 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -40,7 +41,17 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
 		glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
 		return nil
 	}
-	newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):])
+	var dateKey string
+	if r.sink.GetName() == "backup" {
+		var crTime int64
+		if message.NewEntry != nil {
+			crTime = message.NewEntry.Attributes.Crtime
+		} else if message.OldEntry != nil {
+			crTime = message.OldEntry.Attributes.Crtime
+		}
+		dateKey = time.Unix(crTime, 0).Format("2006-01-02")
+	}
+	newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):])
 	glog.V(3).Infof("replicate %s => %s", key, newKey)
 	key = newKey
 	if message.OldEntry != nil && message.NewEntry == nil {
diff --git a/weed/replication/sink/backupsink/backup_sink.go b/weed/replication/sink/backupsink/backup_sink.go
new file mode 100644
index 000000000..df0a778d1
--- /dev/null
+++ b/weed/replication/sink/backupsink/backup_sink.go
@@ -0,0 +1,18 @@
+package backupsink
+
+import (
+	"github.com/chrislusf/seaweedfs/weed/replication/sink"
+	"github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
+)
+
+type BackupSink struct {
+	localsink.LocalSink
+}
+
+func (backupsink *BackupSink) GetName() string {
+	return "backup"
+}
+
+func init() {
+	sink.Sinks = append(sink.Sinks, &BackupSink{})
+}
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
index 5ca562ec8..21c625c3f 100644
--- a/weed/replication/sink/localsink/local_sink.go
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -12,13 +12,11 @@ import (
 	"os"
 	"path/filepath"
 	"strings"
-	"time"
 )
 
 type LocalSink struct {
-	dir              string
-	todaysDateFormat string
-	filerSource      *source.FilerSource
+	Dir         string
+	filerSource *source.FilerSource
 }
 
 func init() {
@@ -37,24 +35,19 @@ func (localsink *LocalSink) isMultiPartEntry(key string) bool {
 	return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
 }
 
-func (localsink *LocalSink) initialize(dir string, todaysDateFormat string) error {
-	localsink.dir = dir
-	localsink.todaysDateFormat = todaysDateFormat
+func (localsink *LocalSink) initialize(dir string) error {
+	localsink.Dir = dir
 	return nil
 }
 
 func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
 	dir := configuration.GetString(prefix + "directory")
-	todaysDateFormat := configuration.GetString(prefix + "todays_date_format")
 	glog.V(4).Infof("sink.local.directory: %v", dir)
-	return localsink.initialize(dir, todaysDateFormat)
+	return localsink.initialize(dir)
 }
 
 func (localsink *LocalSink) GetSinkToDirectory() string {
-	if localsink.todaysDateFormat != "" {
-		return filepath.Join(localsink.dir, time.Now().Format(localsink.todaysDateFormat))
-	}
-	return localsink.dir
+	return localsink.Dir
 }
 
 func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {