From c0c9a8bad51d448042a1251c5daa73aecf79109a Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Thu, 4 Apr 2019 17:22:45 -0400 Subject: [PATCH 1/3] replication: add GoCDK PubSub support --- .../sub/notification_gocdk_pub_sub.go | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 weed/replication/sub/notification_gocdk_pub_sub.go diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go new file mode 100644 index 000000000..c8b16e308 --- /dev/null +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -0,0 +1,50 @@ +package sub + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "gocloud.dev/pubsub" + _ "gocloud.dev/pubsub/awssnssqs" + _ "gocloud.dev/pubsub/azuresb" + _ "gocloud.dev/pubsub/gcppubsub" + _ "gocloud.dev/pubsub/natspubsub" + _ "gocloud.dev/pubsub/rabbitpubsub" +) + +func init() { + NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{}) +} + +type GoCDKPubSubInput struct { + sub *pubsub.Subscription +} + +func (k *GoCDKPubSubInput) GetName() string { + return "gocdk_pub_sub" +} + +func (k *GoCDKPubSubInput) Initialize(config util.Configuration) error { + subURL := config.GetString("sub_url") + glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", subURL) + sub, err := pubsub.OpenSubscription(context.Background(), subURL) + if err != nil { + return err + } + k.sub = sub + return nil +} + +func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + msg, err := k.sub.Receive(context.Background()) + key = msg.Metadata["key"] + message = &filer_pb.EventNotification{} + err = proto.Unmarshal(msg.Body, message) + if err != nil { + return "", nil, err + } + return key, message, nil +} From a6a5d804014f5bbcf339fd45a94141f0f8c06232 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Fri, 5 Apr 2019 08:13:32 -0400 Subject: [PATCH 2/3] fix typo --- weed/replication/sub/notification_gocdk_pub_sub.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index c8b16e308..9c76e6918 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -29,7 +29,7 @@ func (k *GoCDKPubSubInput) GetName() string { func (k *GoCDKPubSubInput) Initialize(config util.Configuration) error { subURL := config.GetString("sub_url") - glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", subURL) + glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) sub, err := pubsub.OpenSubscription(context.Background(), subURL) if err != nil { return err From 72920efc20fd758048f21a82d621a6d7bbc08066 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Fri, 5 Apr 2019 13:43:38 -0400 Subject: [PATCH 3/3] added entry to scaffold --- weed/command/scaffold.go | 1 + 1 file changed, 1 insertion(+) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index b21641f6b..106c2dace 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -190,6 +190,7 @@ enabled = false # The exchange must have already been created by some other means, like # the RabbitMQ management plugin. topic_url = "rabbit://myexchange" +sub_url = "rabbit://myqueue" ` REPLICATION_TOML_EXAMPLE = `