Browse Source

avoid data race on GoCDKPubSub.topic (#3596)

pull/3599/head
Konstantin Lebedev 2 years ago
committed by GitHub
parent
commit
9b2b7d4f5a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      weed/notification/gocdk_pub_sub/gocdk_pub_sub.go

29
weed/notification/gocdk_pub_sub/gocdk_pub_sub.go

@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"net/url" "net/url"
"path" "path"
"sync"
"time" "time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
@ -51,31 +52,42 @@ func getPath(rawUrl string) string {
type GoCDKPubSub struct { type GoCDKPubSub struct {
topicURL string topicURL string
topic *pubsub.Topic topic *pubsub.Topic
topicLock sync.RWMutex
} }
func (k *GoCDKPubSub) GetName() string { func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub" return "gocdk_pub_sub"
} }
func (k *GoCDKPubSub) setTopic(topic *pubsub.Topic) {
k.topicLock.Lock()
k.topic = topic
k.topicLock.Unlock()
k.doReconnect()
}
func (k *GoCDKPubSub) doReconnect() { func (k *GoCDKPubSub) doReconnect() {
var conn *amqp.Connection var conn *amqp.Connection
k.topicLock.RLock()
defer k.topicLock.RUnlock()
if k.topic.As(&conn) { if k.topic.As(&conn) {
go func() {
<-conn.NotifyClose(make(chan *amqp.Error))
conn.Close()
go func(c *amqp.Connection) {
<-c.NotifyClose(make(chan *amqp.Error))
c.Close()
k.topicLock.RLock()
k.topic.Shutdown(context.Background()) k.topic.Shutdown(context.Background())
k.topicLock.RUnlock()
for { for {
glog.Info("Try reconnect") glog.Info("Try reconnect")
conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL")) conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err == nil { if err == nil {
k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)
k.doReconnect()
k.setTopic(rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil))
break break
} }
glog.Error(err) glog.Error(err)
time.Sleep(time.Second) time.Sleep(time.Second)
} }
}()
}(conn)
} }
} }
@ -86,8 +98,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string
if err != nil { if err != nil {
glog.Fatalf("Failed to open topic: %v", err) glog.Fatalf("Failed to open topic: %v", err)
} }
k.topic = topic
k.doReconnect()
k.setTopic(topic)
return nil return nil
} }
@ -96,6 +107,8 @@ func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
if err != nil { if err != nil {
return err return err
} }
k.topicLock.RLock()
defer k.topicLock.RUnlock()
err = k.topic.Send(context.Background(), &pubsub.Message{ err = k.topic.Send(context.Background(), &pubsub.Message{
Body: bytes, Body: bytes,
Metadata: map[string]string{"key": key}, Metadata: map[string]string{"key": key},

Loading…
Cancel
Save