Chris Lu
6 years ago
13 changed files with 594 additions and 96 deletions
-
1weed/command/command.go
-
61weed/command/filer_replication.go
-
32weed/command/scaffold.go
-
6weed/filer2/filer.go
-
8weed/filer2/filer_notify.go
-
6weed/pb/filer.proto
-
188weed/pb/filer_pb/filer.pb.go
-
77weed/replication/notification_kafka.go
-
18weed/replication/notifications.go
-
31weed/replication/replicator.go
-
163weed/replication/sink/filer_sink.go
-
97weed/replication/source/filer_source.go
-
2weed/server/filer_grpc_server.go
@ -0,0 +1,61 @@ |
|||
package command |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/server" |
|||
"github.com/spf13/viper" |
|||
"github.com/chrislusf/seaweedfs/weed/replication" |
|||
) |
|||
|
|||
func init() { |
|||
cmdFilerReplicate.Run = runFilerReplicate // break init cycle
|
|||
} |
|||
|
|||
var cmdFilerReplicate = &Command{ |
|||
UsageLine: "filer.replicate", |
|||
Short: "replicate file changes to another destination", |
|||
Long: `replicate file changes to another destination |
|||
|
|||
filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content, |
|||
and write to the other destination. |
|||
|
|||
Run "weed scaffold -config replication" to generate a replication.toml file and customize the parameters. |
|||
|
|||
`, |
|||
} |
|||
|
|||
func runFilerReplicate(cmd *Command, args []string) bool { |
|||
|
|||
weed_server.LoadConfiguration("replication", true) |
|||
config := viper.GetViper() |
|||
|
|||
var notificationInput replication.NotificationInput |
|||
|
|||
for _, input := range replication.NotificationInputs { |
|||
if config.GetBool("notification." + input.GetName() + ".enabled") { |
|||
viperSub := config.Sub("notification." + input.GetName()) |
|||
if err := input.Initialize(viperSub); err != nil { |
|||
glog.Fatalf("Failed to initialize notification input for %s: %+v", |
|||
input.GetName(), err) |
|||
} |
|||
glog.V(0).Infof("Configure notification input to %s", input.GetName()) |
|||
notificationInput = input |
|||
break |
|||
} |
|||
} |
|||
|
|||
replicator := replication.NewReplicator(config.Sub("sink.filer")) |
|||
|
|||
for { |
|||
key, m, err := notificationInput.ReceiveMessage() |
|||
if err != nil { |
|||
glog.Errorf("receive %s: +v", key, err) |
|||
continue |
|||
} |
|||
if err = replicator.Replicate(key, m); err != nil { |
|||
glog.Errorf("replicate %s: +v", key, err) |
|||
} |
|||
} |
|||
|
|||
return true |
|||
} |
@ -0,0 +1,77 @@ |
|||
package replication |
|||
|
|||
import ( |
|||
"github.com/Shopify/sarama" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
) |
|||
|
|||
func init() { |
|||
NotificationInputs = append(NotificationInputs, &KafkaInput{ |
|||
}) |
|||
} |
|||
|
|||
type KafkaInput struct { |
|||
topic string |
|||
consumer sarama.Consumer |
|||
messageChan chan *sarama.ConsumerMessage |
|||
} |
|||
|
|||
func (k *KafkaInput) GetName() string { |
|||
return "kafka" |
|||
} |
|||
|
|||
func (k *KafkaInput) Initialize(configuration util.Configuration) error { |
|||
glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) |
|||
glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic")) |
|||
return k.initialize( |
|||
configuration.GetStringSlice("hosts"), |
|||
configuration.GetString("topic"), |
|||
) |
|||
} |
|||
|
|||
func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { |
|||
config := sarama.NewConfig() |
|||
config.Consumer.Return.Errors = true |
|||
k.consumer, err = sarama.NewConsumer(hosts, config) |
|||
k.topic = topic |
|||
k.messageChan = make(chan *sarama.ConsumerMessage, 1) |
|||
|
|||
partitions, err := k.consumer.Partitions(topic) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
|
|||
for _, partition := range partitions { |
|||
partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
go func() { |
|||
for { |
|||
select { |
|||
case err := <-partitionConsumer.Errors(): |
|||
fmt.Println(err) |
|||
case msg := <-partitionConsumer.Messages(): |
|||
k.messageChan <- msg |
|||
} |
|||
} |
|||
}() |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { |
|||
|
|||
msg := <-k.messageChan |
|||
|
|||
key = string(msg.Key) |
|||
message = &filer_pb.EventNotification{} |
|||
err = proto.Unmarshal(msg.Value, message) |
|||
|
|||
return |
|||
} |
@ -0,0 +1,18 @@ |
|||
package replication |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
) |
|||
|
|||
type NotificationInput interface { |
|||
// GetName gets the name to locate the configuration in sync.toml file
|
|||
GetName() string |
|||
// Initialize initializes the file store
|
|||
Initialize(configuration util.Configuration) error |
|||
ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) |
|||
} |
|||
|
|||
var ( |
|||
NotificationInputs []NotificationInput |
|||
) |
@ -0,0 +1,31 @@ |
|||
package replication |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/replication/sink" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
) |
|||
|
|||
type Replicator struct { |
|||
sink *sink.FilerSink |
|||
} |
|||
|
|||
func NewReplicator(config util.Configuration) *Replicator { |
|||
|
|||
sink := &sink.FilerSink{} |
|||
sink.Initialize(config) |
|||
|
|||
return &Replicator{ |
|||
sink: sink, |
|||
} |
|||
} |
|||
|
|||
func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error { |
|||
if message.OldEntry != nil && message.NewEntry == nil { |
|||
return r.sink.DeleteEntry(message.OldEntry, message.DeleteChunks) |
|||
} |
|||
if message.OldEntry == nil && message.NewEntry != nil { |
|||
return r.sink.CreateEntry(message.NewEntry) |
|||
} |
|||
return r.sink.UpdateEntry(message.OldEntry, message.NewEntry, message.DeleteChunks) |
|||
} |
@ -0,0 +1,163 @@ |
|||
package sink |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"fmt" |
|||
"strings" |
|||
"github.com/chrislusf/seaweedfs/weed/filer2" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"context" |
|||
"sync" |
|||
) |
|||
|
|||
type ReplicationSink interface { |
|||
DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error |
|||
CreateEntry(entry *filer_pb.Entry) error |
|||
UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error |
|||
} |
|||
|
|||
type FilerSink struct { |
|||
grpcAddress string |
|||
id string |
|||
dir string |
|||
} |
|||
|
|||
func (fs *FilerSink) Initialize(configuration util.Configuration) error { |
|||
return fs.initialize( |
|||
configuration.GetString("grpcAddress"), |
|||
configuration.GetString("id"), |
|||
configuration.GetString("directory"), |
|||
) |
|||
} |
|||
|
|||
func (fs *FilerSink) initialize(grpcAddress string, id string, dir string) (err error) { |
|||
fs.grpcAddress = grpcAddress |
|||
fs.id = id |
|||
fs.dir = dir |
|||
return nil |
|||
} |
|||
|
|||
func (fs *FilerSink) DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error { |
|||
return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
dir, name := filer2.FullPath(entry.Name).DirAndName() |
|||
|
|||
request := &filer_pb.DeleteEntryRequest{ |
|||
Directory: dir, |
|||
Name: name, |
|||
IsDirectory: entry.IsDirectory, |
|||
IsDeleteData: deleteIncludeChunks, |
|||
} |
|||
|
|||
glog.V(1).Infof("delete entry: %v", request) |
|||
_, err := client.DeleteEntry(context.Background(), request) |
|||
if err != nil { |
|||
glog.V(0).Infof("delete entry %s: %v", entry.Name, err) |
|||
return fmt.Errorf("delete entry %s: %v", entry.Name, err) |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
} |
|||
|
|||
func (fs *FilerSink) CreateEntry(entry *filer_pb.Entry) error { |
|||
|
|||
replicatedChunks, err := replicateChunks(entry.Chunks) |
|||
|
|||
if err != nil { |
|||
glog.V(0).Infof("replicate entry chunks %s: %v", entry.Name, err) |
|||
return fmt.Errorf("replicate entry chunks %s: %v", entry.Name, err) |
|||
} |
|||
|
|||
return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
dir, name := filer2.FullPath(entry.Name).DirAndName() |
|||
|
|||
request := &filer_pb.CreateEntryRequest{ |
|||
Directory: dir, |
|||
Entry: &filer_pb.Entry{ |
|||
Name: name, |
|||
IsDirectory: entry.IsDirectory, |
|||
Attributes: entry.Attributes, |
|||
Chunks: replicatedChunks, |
|||
}, |
|||
} |
|||
|
|||
glog.V(1).Infof("create: %v", request) |
|||
if _, err := client.CreateEntry(context.Background(), request); err != nil { |
|||
glog.V(0).Infof("create entry %s: %v", entry.Name, err) |
|||
return fmt.Errorf("create entry %s: %v", entry.Name, err) |
|||
} |
|||
|
|||
return nil |
|||
}) |
|||
} |
|||
|
|||
func (fs *FilerSink) UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { |
|||
return nil |
|||
} |
|||
|
|||
func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { |
|||
|
|||
grpcConnection, err := util.GrpcDial(fs.grpcAddress) |
|||
if err != nil { |
|||
return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) |
|||
} |
|||
defer grpcConnection.Close() |
|||
|
|||
client := filer_pb.NewSeaweedFilerClient(grpcConnection) |
|||
|
|||
return fn(client) |
|||
} |
|||
|
|||
func volumeId(fileId string) string { |
|||
lastCommaIndex := strings.LastIndex(fileId, ",") |
|||
if lastCommaIndex > 0 { |
|||
return fileId[:lastCommaIndex] |
|||
} |
|||
return fileId |
|||
} |
|||
|
|||
func replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { |
|||
if len(sourceChunks) == 0 { |
|||
return |
|||
} |
|||
var wg sync.WaitGroup |
|||
for _, s := range sourceChunks { |
|||
wg.Add(1) |
|||
go func(chunk *filer_pb.FileChunk) { |
|||
defer wg.Done() |
|||
replicatedChunk, e := replicateOneChunk(chunk) |
|||
if e != nil { |
|||
err = e |
|||
} |
|||
replicatedChunks = append(replicatedChunks, replicatedChunk) |
|||
}(s) |
|||
} |
|||
wg.Wait() |
|||
|
|||
return |
|||
} |
|||
|
|||
func replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { |
|||
|
|||
fileId, err := fetchAndWrite(sourceChunk) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) |
|||
} |
|||
|
|||
return &filer_pb.FileChunk{ |
|||
FileId: fileId, |
|||
Offset: sourceChunk.Offset, |
|||
Size: sourceChunk.Size, |
|||
Mtime: sourceChunk.Mtime, |
|||
ETag: sourceChunk.ETag, |
|||
SourceFileId: sourceChunk.FileId, |
|||
}, nil |
|||
} |
|||
|
|||
func fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) { |
|||
|
|||
return |
|||
} |
@ -0,0 +1,97 @@ |
|||
package source |
|||
|
|||
import ( |
|||
"io" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"fmt" |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"strings" |
|||
"context" |
|||
) |
|||
|
|||
type ReplicationSource interface { |
|||
ReadPart(part string) io.ReadCloser |
|||
} |
|||
|
|||
type FilerSource struct { |
|||
grpcAddress string |
|||
id string |
|||
dir string |
|||
} |
|||
|
|||
func (fs *FilerSource) Initialize(configuration util.Configuration) error { |
|||
return fs.initialize( |
|||
configuration.GetString("grpcAddress"), |
|||
configuration.GetString("id"), |
|||
configuration.GetString("directory"), |
|||
) |
|||
} |
|||
|
|||
func (fs *FilerSource) initialize(grpcAddress string, id string, dir string) (err error) { |
|||
fs.grpcAddress = grpcAddress |
|||
fs.id = id |
|||
fs.dir = dir |
|||
return nil |
|||
} |
|||
|
|||
func (fs *FilerSource) ReadPart(part string) (readCloser io.ReadCloser, err error) { |
|||
|
|||
vid2Locations := make(map[string]*filer_pb.Locations) |
|||
|
|||
vid := volumeId(part) |
|||
|
|||
err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { |
|||
|
|||
glog.V(4).Infof("read lookup volume id locations: %v", vid) |
|||
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ |
|||
VolumeIds: []string{vid}, |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
vid2Locations = resp.LocationsMap |
|||
|
|||
return nil |
|||
}) |
|||
|
|||
if err != nil { |
|||
glog.V(1).Infof("replication lookup volume id: %v", vid, err) |
|||
return nil, fmt.Errorf("replicationlookup volume id %v: %v", vid, err) |
|||
} |
|||
|
|||
locations := vid2Locations[vid] |
|||
|
|||
if locations == nil || len(locations.Locations) == 0 { |
|||
glog.V(1).Infof("replication locate volume id: %v", vid, err) |
|||
return nil, fmt.Errorf("replication locate volume id %v: %v", vid, err) |
|||
} |
|||
|
|||
fileUrl := fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) |
|||
|
|||
_, readCloser, err = util.DownloadUrl(fileUrl) |
|||
|
|||
return readCloser, err |
|||
} |
|||
|
|||
func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { |
|||
|
|||
grpcConnection, err := util.GrpcDial(fs.grpcAddress) |
|||
if err != nil { |
|||
return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) |
|||
} |
|||
defer grpcConnection.Close() |
|||
|
|||
client := filer_pb.NewSeaweedFilerClient(grpcConnection) |
|||
|
|||
return fn(client) |
|||
} |
|||
|
|||
func volumeId(fileId string) string { |
|||
lastCommaIndex := strings.LastIndex(fileId, ",") |
|||
if lastCommaIndex > 0 { |
|||
return fileId[:lastCommaIndex] |
|||
} |
|||
return fileId |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue