Konstantin Lebedev 4 years ago
parent
commit
94eac4f00e
  1. 5
      weed/filer/filer_notify.go
  2. 35
      weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
  3. 29
      weed/replication/sub/notification_gocdk_pub_sub.go

5
weed/filer/filer_notify.go

@ -55,7 +55,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
if notification.Queue != nil { if notification.Queue != nil {
glog.V(3).Infof("notifying entry update %v", fullpath) glog.V(3).Infof("notifying entry update %v", fullpath)
notification.Queue.SendMessage(fullpath, eventNotification)
if err := notification.Queue.SendMessage(fullpath, eventNotification); err != nil {
// throw message
glog.Error(err)
}
} }
f.logMetaEvent(ctx, fullpath, eventNotification) f.logMetaEvent(ctx, fullpath, eventNotification)

35
weed/notification/gocdk_pub_sub/gocdk_pub_sub.go

@ -17,10 +17,14 @@ package gocdk_pub_sub
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/streadway/amqp"
"gocloud.dev/pubsub" "gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs" _ "gocloud.dev/pubsub/awssnssqs"
"gocloud.dev/pubsub/rabbitpubsub"
"net/url"
"path"
"time"
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/notification"
@ -29,12 +33,18 @@ import (
_ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub" _ "gocloud.dev/pubsub/natspubsub"
_ "gocloud.dev/pubsub/rabbitpubsub" _ "gocloud.dev/pubsub/rabbitpubsub"
"os"
) )
func init() { func init() {
notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{}) notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
} }
func getPath(rawUrl string) string {
parsedUrl, _ := url.Parse(rawUrl)
return path.Join(parsedUrl.Host, parsedUrl.Path)
}
type GoCDKPubSub struct { type GoCDKPubSub struct {
topicURL string topicURL string
topic *pubsub.Topic topic *pubsub.Topic
@ -44,6 +54,28 @@ func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub" return "gocdk_pub_sub"
} }
func (k *GoCDKPubSub) doReconnect() {
var conn *amqp.Connection
if k.topic.As(&conn) {
go func() {
<-conn.NotifyClose(make(chan *amqp.Error))
conn.Close()
k.topic.Shutdown(context.Background())
for {
glog.Info("Try reconnect")
conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err == nil {
k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
k.doReconnect()
break
}
glog.Error(err)
time.Sleep(time.Second)
}
}()
}
}
func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error { func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error {
k.topicURL = configuration.GetString(prefix + "topic_url") k.topicURL = configuration.GetString(prefix + "topic_url")
glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL) glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
@ -52,6 +84,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string
glog.Fatalf("Failed to open topic: %v", err) glog.Fatalf("Failed to open topic: %v", err)
} }
k.topic = topic k.topic = topic
k.doReconnect()
return nil return nil
} }

29
weed/replication/sub/notification_gocdk_pub_sub.go

@ -9,9 +9,12 @@ import (
"github.com/streadway/amqp" "github.com/streadway/amqp"
"gocloud.dev/pubsub" "gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs" _ "gocloud.dev/pubsub/awssnssqs"
"gocloud.dev/pubsub/rabbitpubsub"
"net/url" "net/url"
"os"
"path" "path"
"strings" "strings"
"time"
// _ "gocloud.dev/pubsub/azuresb" // _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/gcppubsub"
@ -74,6 +77,7 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str
type GoCDKPubSubInput struct { type GoCDKPubSubInput struct {
sub *pubsub.Subscription sub *pubsub.Subscription
subURL string
} }
func (k *GoCDKPubSubInput) GetName() string { func (k *GoCDKPubSubInput) GetName() string {
@ -82,9 +86,9 @@ func (k *GoCDKPubSubInput) GetName() string {
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
topicUrl := configuration.GetString(prefix + "topic_url") topicUrl := configuration.GetString(prefix + "topic_url")
subURL := configuration.GetString(prefix + "sub_url")
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
k.subURL = configuration.GetString(prefix + "sub_url")
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
sub, err := pubsub.OpenSubscription(context.Background(), k.subURL)
if err != nil { if err != nil {
return err return err
} }
@ -95,10 +99,10 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
return err return err
} }
defer ch.Close() defer ch.Close()
_, err = ch.QueueInspect(getPath(subURL))
_, err = ch.QueueInspect(getPath(k.subURL))
if err != nil { if err != nil {
if strings.HasPrefix(err.Error(), "Exception (404) Reason") { if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil {
if err := QueueDeclareAndBind(conn, topicUrl, k.subURL); err != nil {
return err return err
} }
} else { } else {
@ -111,14 +115,25 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
} }
func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg, err := k.sub.Receive(context.Background())
ctx := context.Background()
msg, err := k.sub.Receive(ctx)
if err != nil { if err != nil {
var conn *amqp.Connection var conn *amqp.Connection
if k.sub.As(&conn) && conn.IsClosed() { if k.sub.As(&conn) && conn.IsClosed() {
glog.Fatalln(err)
conn.Close()
k.sub.Shutdown(ctx)
conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err != nil {
glog.Error(err)
time.Sleep(time.Second)
return
} }
k.sub = rabbitpubsub.OpenSubscription(conn, getPath(k.subURL), nil)
return return
} }
// This is permanent cached sub err
glog.Fatal(err)
}
onFailureFn = func() { onFailureFn = func() {
if msg.Nackable() { if msg.Nackable() {
isRedelivered := false isRedelivered := false

Loading…
Cancel
Save