Browse Source

Merge pull request #1762 from kmlebedev/backupsink

replication to create time date directory
pull/1778/head
Chris Lu 4 years ago
committed by GitHub
parent
commit
822f1ade9d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      docker/replication.toml
  2. 1
      weed/command/filer_replication.go
  3. 7
      weed/command/scaffold.go
  4. 13
      weed/replication/replicator.go
  5. 18
      weed/replication/sink/localincrementalsink/local_incremental_sink.go
  6. 19
      weed/replication/sink/localsink/local_sink.go

3
docker/replication.toml

@ -6,7 +6,6 @@ grpcAddress = "filer:18888"
# i.e., all files with this "prefix" are sent to notification message queue. # i.e., all files with this "prefix" are sent to notification message queue.
directory = "/buckets" directory = "/buckets"
[sink.local]
[sink.local_incremental]
enabled = true enabled = true
directory = "/data" directory = "/data"
todays_date_format = "2006-02-01"

1
weed/command/filer_replication.go

@ -11,6 +11,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/localincrementalsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub" "github.com/chrislusf/seaweedfs/weed/replication/sub"

7
weed/command/scaffold.go

@ -353,9 +353,14 @@ directory = "/buckets"
[sink.local] [sink.local]
enabled = false enabled = false
directory = "/backup"
directory = "/data"
todays_date_format = "" # set this to 2006-02-01 for incremental backup todays_date_format = "" # set this to 2006-02-01 for incremental backup
[sink.local_incremental]
enabled = false
# all replicated files are under modification time date directory tree
directory = "/backup"
[sink.filer] [sink.filer]
enabled = false enabled = false
grpcAddress = "localhost:18888" grpcAddress = "localhost:18888"

13
weed/replication/replicator.go

@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"google.golang.org/grpc" "google.golang.org/grpc"
"strings" "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -40,7 +41,17 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
return nil return nil
} }
newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):])
var dateKey string
if r.sink.GetName() == "local_incremental" {
var mTime int64
if message.NewEntry != nil {
mTime = message.NewEntry.Attributes.Mtime
} else if message.OldEntry != nil {
mTime = message.OldEntry.Attributes.Mtime
}
dateKey = time.Unix(mTime, 0).Format("2006-01-02")
}
newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):])
glog.V(3).Infof("replicate %s => %s", key, newKey) glog.V(3).Infof("replicate %s => %s", key, newKey)
key = newKey key = newKey
if message.OldEntry != nil && message.NewEntry == nil { if message.OldEntry != nil && message.NewEntry == nil {

18
weed/replication/sink/localincrementalsink/local_incremental_sink.go

@ -0,0 +1,18 @@
package localincrementalsink
import (
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
)
type LocalIncSink struct {
localsink.LocalSink
}
func (localincsink *LocalIncSink) GetName() string {
return "local_incremental"
}
func init() {
sink.Sinks = append(sink.Sinks, &LocalIncSink{})
}

19
weed/replication/sink/localsink/local_sink.go

@ -12,13 +12,11 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
) )
type LocalSink struct { type LocalSink struct {
dir string
todaysDateFormat string
filerSource *source.FilerSource
Dir string
filerSource *source.FilerSource
} }
func init() { func init() {
@ -37,24 +35,19 @@ func (localsink *LocalSink) isMultiPartEntry(key string) bool {
return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/") return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
} }
func (localsink *LocalSink) initialize(dir string, todaysDateFormat string) error {
localsink.dir = dir
localsink.todaysDateFormat = todaysDateFormat
func (localsink *LocalSink) initialize(dir string) error {
localsink.Dir = dir
return nil return nil
} }
func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error { func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
dir := configuration.GetString(prefix + "directory") dir := configuration.GetString(prefix + "directory")
todaysDateFormat := configuration.GetString(prefix + "todays_date_format")
glog.V(4).Infof("sink.local.directory: %v", dir) glog.V(4).Infof("sink.local.directory: %v", dir)
return localsink.initialize(dir, todaysDateFormat)
return localsink.initialize(dir)
} }
func (localsink *LocalSink) GetSinkToDirectory() string { func (localsink *LocalSink) GetSinkToDirectory() string {
if localsink.todaysDateFormat != "" {
return filepath.Join(localsink.dir, time.Now().Format(localsink.todaysDateFormat))
}
return localsink.dir
return localsink.Dir
} }
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {

Loading…
Cancel
Save