Browse Source

refactoring

pull/747/head
Chris Lu 6 years ago
parent
commit
9fe24991d5
  1. 13
      weed/command/filer_replication.go
  2. 12
      weed/replication/replicator.go
  3. 2
      weed/replication/sink/filersink/fetch_write.go
  4. 10
      weed/replication/sink/filersink/filer_sink.go
  5. 14
      weed/replication/sink/replication_sink.go

13
weed/command/filer_replication.go

@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication" "github.com/chrislusf/seaweedfs/weed/replication"
"github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/server"
"github.com/spf13/viper" "github.com/spf13/viper"
"strings"
) )
func init() { func init() {
@ -44,6 +45,18 @@ func runFilerReplicate(cmd *Command, args []string) bool {
} }
} }
// avoid recursive replication
if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer")
if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
fromDir := sourceConfig.GetString("directory")
toDir := sinkConfig.GetString("directory")
if strings.HasPrefix(toDir, fromDir) {
glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
}
}
}
replicator := replication.NewReplicator(config.Sub("source.filer"), config.Sub("sink.filer")) replicator := replication.NewReplicator(config.Sub("source.filer"), config.Sub("sink.filer"))
for { for {

12
weed/replication/replicator.go

@ -3,9 +3,9 @@ package replication
import ( import (
"strings" "strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
@ -17,20 +17,12 @@ type Replicator struct {
func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator { func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator {
sink := &sink.FilerSink{}
sink := &filersink.FilerSink{}
sink.Initialize(sinkConfig) sink.Initialize(sinkConfig)
source := &source.FilerSource{} source := &source.FilerSource{}
source.Initialize(sourceConfig) source.Initialize(sourceConfig)
if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
fromDir := sourceConfig.GetString("directory")
toDir := sinkConfig.GetString("directory")
if strings.HasPrefix(toDir, fromDir) {
glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
}
}
sink.SetSourceFiler(source) sink.SetSourceFiler(source)
return &Replicator{ return &Replicator{

2
weed/replication/sink/fetch_write.go → weed/replication/sink/filersink/fetch_write.go

@ -1,4 +1,4 @@
package sink
package filersink
import ( import (
"context" "context"

10
weed/replication/sink/filer_sink.go → weed/replication/sink/filersink/filer_sink.go

@ -1,4 +1,4 @@
package sink
package filersink
import ( import (
"context" "context"
@ -11,14 +11,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
) )
type ReplicationSink interface {
DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error
CreateEntry(key string, entry *filer_pb.Entry) error
UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error
GetSinkToDirectory() string
SetSourceFiler(s *source.FilerSource)
}
type FilerSink struct { type FilerSink struct {
filerSource *source.FilerSource filerSource *source.FilerSource
grpcAddress string grpcAddress string

14
weed/replication/sink/replication_sink.go

@ -0,0 +1,14 @@
package sink
import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
)
type ReplicationSink interface {
DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error
CreateEntry(key string, entry *filer_pb.Entry) error
UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error
GetSinkToDirectory() string
SetSourceFiler(s *source.FilerSource)
}
Loading…
Cancel
Save