Browse Source

Merge pull request #1761 from kmlebedev/todaysDateFormat

replication to todays date directory
pull/1762/head
Chris Lu 4 years ago
committed by GitHub
parent
commit
2266b5c528
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      docker/notification.toml
  2. 1
      docker/replication.toml
  3. 5
      weed/command/scaffold.go
  4. 17
      weed/replication/sink/localsink/local_sink.go

3
docker/notification.toml

@ -11,6 +11,7 @@ enabled = true
# This URL will Dial the RabbitMQ server at the URL in the environment # This URL will Dial the RabbitMQ server at the URL in the environment
# variable RABBIT_SERVER_URL and open the exchange "myexchange". # variable RABBIT_SERVER_URL and open the exchange "myexchange".
# The exchange must have already been created by some other means, like # The exchange must have already been created by some other means, like
# the RabbitMQ management plugin.
# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
# create binding myexchange => myqueue
topic_url = "rabbit://swexchange" topic_url = "rabbit://swexchange"
sub_url = "rabbit://swqueue" sub_url = "rabbit://swqueue"

1
docker/replication.toml

@ -9,3 +9,4 @@ directory = "/buckets"
[sink.local] [sink.local]
enabled = true enabled = true
directory = "/data" directory = "/data"
todays_date_format = "2006-02-01"

5
weed/command/scaffold.go

@ -329,7 +329,8 @@ enabled = false
# This URL will Dial the RabbitMQ server at the URL in the environment # This URL will Dial the RabbitMQ server at the URL in the environment
# variable RABBIT_SERVER_URL and open the exchange "myexchange". # variable RABBIT_SERVER_URL and open the exchange "myexchange".
# The exchange must have already been created by some other means, like # The exchange must have already been created by some other means, like
# the RabbitMQ management plugin.
# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
# create binding myexchange => myqueue
topic_url = "rabbit://myexchange" topic_url = "rabbit://myexchange"
sub_url = "rabbit://myqueue" sub_url = "rabbit://myqueue"
` `
@ -353,6 +354,8 @@ directory = "/buckets"
[sink.local] [sink.local]
enabled = false enabled = false
directory = "/backup" directory = "/backup"
# all replicated files are under todays date directory tree
todays_date_format = ""
[sink.filer] [sink.filer]
enabled = false enabled = false

17
weed/replication/sink/localsink/local_sink.go

@ -12,11 +12,13 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
) )
type LocalSink struct { type LocalSink struct {
dir string
filerSource *source.FilerSource
dir string
todaysDateFormat string
filerSource *source.FilerSource
} }
func init() { func init() {
@ -35,18 +37,23 @@ func (localsink *LocalSink) isMultiPartEntry(key string) bool {
return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/") return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
} }
func (localsink *LocalSink) initialize(dir string) error {
func (localsink *LocalSink) initialize(dir string, todaysDateFormat string) error {
localsink.dir = dir localsink.dir = dir
localsink.todaysDateFormat = todaysDateFormat
return nil return nil
} }
func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error { func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
dir := configuration.GetString(prefix + "directory") dir := configuration.GetString(prefix + "directory")
todaysDateFormat := configuration.GetString(prefix + "todays_date_format")
glog.V(4).Infof("sink.local.directory: %v", dir) glog.V(4).Infof("sink.local.directory: %v", dir)
return localsink.initialize(dir)
return localsink.initialize(dir, todaysDateFormat)
} }
func (localsink *LocalSink) GetSinkToDirectory() string { func (localsink *LocalSink) GetSinkToDirectory() string {
if localsink.todaysDateFormat != "" {
return filepath.Join(localsink.dir, time.Now().Format(localsink.todaysDateFormat))
}
return localsink.dir return localsink.dir
} }
@ -56,7 +63,7 @@ func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeCh
} }
glog.V(4).Infof("Delete Entry key: %s", key) glog.V(4).Infof("Delete Entry key: %s", key)
if err := os.Remove(key); err != nil { if err := os.Remove(key); err != nil {
return err
glog.V(0).Infof("remove entry key %s: %s", key, err)
} }
return nil return nil
} }

Loading…
Cancel
Save