Browse Source

bootstrap filer replication with weed filer.export -targetStore=notification

pull/763/head
Chris Lu 6 years ago
parent
commit
a64613172d
  1. 50
      weed/command/filer_export.go
  2. 12
      weed/filer2/entry.go
  3. 16
      weed/filer2/filer_notify.go
  4. 4
      weed/filer2/filer_notify_test.go

50
weed/command/filer_export.go

@ -5,6 +5,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/server"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/notification"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
) )
func init() { func init() {
@ -22,14 +24,18 @@ var cmdFilerExport = &Command{
If target store is empty, only the directory tree will be listed. If target store is empty, only the directory tree will be listed.
If target store is "notification", the list of entries will be sent to notification.
This is usually used to bootstrap filer replication to a remote system.
`, `,
} }
var ( var (
// filerExportOutputFile = cmdFilerExport.Flag.String("output", "", "the output file. If empty, only list out the directory tree") // filerExportOutputFile = cmdFilerExport.Flag.String("output", "", "the output file. If empty, only list out the directory tree")
filerExportSourceStore = cmdFilerExport.Flag.String("sourceStore", "", "the source store name in filer.toml") filerExportSourceStore = cmdFilerExport.Flag.String("sourceStore", "", "the source store name in filer.toml")
filerExportTargetStore = cmdFilerExport.Flag.String("targetStore", "", "the target store name in filer.toml")
filerExportTargetStore = cmdFilerExport.Flag.String("targetStore", "", "the target store name in filer.toml, or \"notification\" to export all files to message queue")
dirListLimit = cmdFilerExport.Flag.Int("dirListLimit", 100000, "limit directory list size") dirListLimit = cmdFilerExport.Flag.Int("dirListLimit", 100000, "limit directory list size")
dryRun = cmdFilerExport.Flag.Bool("dryRun", false, "not actually moving data")
) )
type statistics struct { type statistics struct {
@ -48,7 +54,7 @@ func runFilerExport(cmd *Command, args []string) bool {
if store.GetName() == *filerExportSourceStore { if store.GetName() == *filerExportSourceStore {
viperSub := config.Sub(store.GetName()) viperSub := config.Sub(store.GetName())
if err := store.Initialize(viperSub); err != nil { if err := store.Initialize(viperSub); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v",
glog.Fatalf("Failed to initialize source store for %s: %+v",
store.GetName(), err) store.GetName(), err)
} else { } else {
sourceStore = store sourceStore = store
@ -61,7 +67,7 @@ func runFilerExport(cmd *Command, args []string) bool {
if store.GetName() == *filerExportTargetStore { if store.GetName() == *filerExportTargetStore {
viperSub := config.Sub(store.GetName()) viperSub := config.Sub(store.GetName())
if err := store.Initialize(viperSub); err != nil { if err := store.Initialize(viperSub); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v",
glog.Fatalf("Failed to initialize target store for %s: %+v",
store.GetName(), err) store.GetName(), err)
} else { } else {
targetStore = store targetStore = store
@ -79,14 +85,44 @@ func runFilerExport(cmd *Command, args []string) bool {
return false return false
} }
if targetStore == nil && *filerExportTargetStore != "" && *filerExportTargetStore != "notification" {
glog.Errorf("Failed to find target store %s", *filerExportTargetStore)
println("existing data sources are:")
for _, store := range filer2.Stores {
println(" " + store.GetName())
}
return false
}
stat := statistics{} stat := statistics{}
var fn func(level int, entry *filer2.Entry) error var fn func(level int, entry *filer2.Entry) error
if targetStore == nil {
if *filerExportTargetStore == "notification" {
weed_server.LoadConfiguration("notification", false)
v := viper.GetViper()
notification.LoadConfiguration(v.Sub("notification"))
fn = func(level int, entry *filer2.Entry) error {
printout(level, entry)
if *dryRun {
return nil
}
return notification.Queue.SendMessage(
string(entry.FullPath),
&filer_pb.EventNotification{
NewEntry: entry.ToProtoEntry(),
},
)
}
} else if targetStore == nil {
fn = printout fn = printout
} else { } else {
fn = func(level int, entry *filer2.Entry) error { fn = func(level int, entry *filer2.Entry) error {
printout(level, entry)
if *dryRun {
return nil
}
return targetStore.InsertEntry(entry) return targetStore.InsertEntry(entry)
} }
} }
@ -126,7 +162,11 @@ func doTraverse(stat *statistics, filerStore filer2.FilerStore, parentPath filer
func printout(level int, entry *filer2.Entry) error { func printout(level int, entry *filer2.Entry) error {
for i := 0; i < level; i++ { for i := 0; i < level; i++ {
print(" ")
if i == level-1 {
print("+-")
} else {
print("| ")
}
} }
println(entry.FullPath.Name()) println(entry.FullPath.Name())
return nil return nil

12
weed/filer2/entry.go

@ -43,3 +43,15 @@ func (entry *Entry) Timestamp() time.Time {
return entry.Mtime return entry.Mtime
} }
} }
func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
if entry == nil {
return nil
}
return &filer_pb.Entry{
Name: string(entry.FullPath),
IsDirectory: entry.IsDirectory(),
Attributes: EntryAttributeToPb(entry),
Chunks: entry.Chunks,
}
}

16
weed/filer2/filer_notify.go

@ -23,23 +23,11 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
notification.Queue.SendMessage( notification.Queue.SendMessage(
key, key,
&filer_pb.EventNotification{ &filer_pb.EventNotification{
OldEntry: toProtoEntry(oldEntry),
NewEntry: toProtoEntry(newEntry),
OldEntry: oldEntry.ToProtoEntry(),
NewEntry: newEntry.ToProtoEntry(),
DeleteChunks: deleteChunks, DeleteChunks: deleteChunks,
}, },
) )
} }
} }
func toProtoEntry(entry *Entry) *filer_pb.Entry {
if entry == nil {
return nil
}
return &filer_pb.Entry{
Name: string(entry.FullPath),
IsDirectory: entry.IsDirectory(),
Attributes: EntryAttributeToPb(entry),
Chunks: entry.Chunks,
}
}

4
weed/filer2/filer_notify_test.go

@ -32,8 +32,8 @@ func TestProtoMarshalText(t *testing.T) {
} }
notification := &filer_pb.EventNotification{ notification := &filer_pb.EventNotification{
OldEntry: toProtoEntry(oldEntry),
NewEntry: toProtoEntry(nil),
OldEntry: oldEntry.ToProtoEntry(),
NewEntry: nil,
DeleteChunks: true, DeleteChunks: true,
} }

Loading…
Cancel
Save