Browse Source

notification: add Go CDK pubsub support

Add the gocdk_pub_sub package, which supports the Go Cloud Development
Kit pubsub API.

Link in all current providers.

Update the notification scaffold.
pull/895/head
Jonathan Amsterdam 6 years ago
parent
commit
8db82e2b75
  1. 10
      weed/command/scaffold.go
  2. 71
      weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
  3. 4
      weed/server/filer_server.go

10
weed/command/scaffold.go

@ -180,6 +180,16 @@ google_application_credentials = "/path/to/x.json" # path to json credential fil
project_id = "" # an existing project id project_id = "" # an existing project id
topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists
[notification.gocdk_pub_sub]
# The Go Cloud Development Kit (https://gocloud.dev).
# PubSub API (https://godoc.org/gocloud.dev/pubsub).
# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
enabled = false
# This URL will Dial the RabbitMQ server at the URL in the environment
# variable RABBIT_SERVER_URL and open the exchange "myexchange".
# The exchange must have already been created by some other means, like
# the RabbitMQ management plugin.
topic_url = "rabbit://myexchange"
` `
REPLICATION_TOML_EXAMPLE = ` REPLICATION_TOML_EXAMPLE = `

71
weed/notification/gocdk_pub_sub/gocdk_pub_sub.go

@ -0,0 +1,71 @@
// Package gocdk_pub_sub supports the Go CDK (Cloud Development Kit) PubSub API,
// which in turn supports many providers, including Amazon SNS/SQS, Azure Service Bus,
// Google Cloud PubSub, and RabbitMQ.
//
// In the config, select a provider and topic using a URL. See
// https://godoc.org/gocloud.dev/pubsub and its sub-packages for details.
//
// The Go CDK PubSub API does not support administrative operations like topic
// creation. Create the topic using a UI, CLI or provider-specific API before running
// weed.
//
// The Go CDK obtains credentials via environment variables and other
// provider-specific default mechanisms. See the provider's documentation for
// details.
package gocdk_pub_sub
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
_ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub"
_ "gocloud.dev/pubsub/rabbitpubsub"
)
func init() {
notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
}
type GoCDKPubSub struct {
topicURL string
topic *pubsub.Topic
}
func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub"
}
func (k *GoCDKPubSub) Initialize(config util.Configuration) error {
k.topicURL = config.GetString("topic_url")
glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
topic, err := pubsub.OpenTopic(context.Background(), k.topicURL)
if err != nil {
glog.Fatalf("Failed to open topic: %v", err)
}
k.topic = topic
return nil
}
func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
bytes, err := proto.Marshal(message)
if err != nil {
return err
}
ctx := context.Background()
err = k.topic.Send(ctx, &pubsub.Message{
Body: bytes,
Metadata: map[string]string{"key": key},
})
if err != nil {
return fmt.Errorf("send message via Go CDK pubsub %s: %v", k.topicURL, err)
}
return nil
}

4
weed/server/filer_server.go

@ -1,10 +1,11 @@
package weed_server package weed_server
import ( import (
"google.golang.org/grpc"
"net/http" "net/http"
"os" "os"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
@ -15,6 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
_ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log" _ "github.com/chrislusf/seaweedfs/weed/notification/log"

Loading…
Cancel
Save