You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							230 lines
						
					
					
						
							5.4 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							230 lines
						
					
					
						
							5.4 KiB
						
					
					
				
								package webhook
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"errors"
							 | 
						|
									"fmt"
							 | 
						|
									"time"
							 | 
						|
								
							 | 
						|
									"github.com/ThreeDotsLabs/watermill"
							 | 
						|
									"github.com/ThreeDotsLabs/watermill/message"
							 | 
						|
									"github.com/ThreeDotsLabs/watermill/message/router/middleware"
							 | 
						|
									"github.com/ThreeDotsLabs/watermill/message/router/plugin"
							 | 
						|
									"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/glog"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/notification"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util"
							 | 
						|
									"google.golang.org/protobuf/proto"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								func init() {
							 | 
						|
									notification.MessageQueues = append(notification.MessageQueues, &Queue{})
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								type Queue struct {
							 | 
						|
									router       *message.Router
							 | 
						|
									queueChannel *gochannel.GoChannel
							 | 
						|
									config       *config
							 | 
						|
									client       client
							 | 
						|
									filter       *filter
							 | 
						|
								
							 | 
						|
									ctx    context.Context
							 | 
						|
									cancel context.CancelFunc
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (w *Queue) GetName() string {
							 | 
						|
									return queueName
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (w *Queue) SendMessage(key string, msg proto.Message) error {
							 | 
						|
									eventNotification, ok := msg.(*filer_pb.EventNotification)
							 | 
						|
									if !ok {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if w.filter != nil && !w.filter.shouldPublish(key, eventNotification) {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									m := newWebhookMessage(key, msg)
							 | 
						|
									if m == nil {
							 | 
						|
										return nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									wMsg, err := m.toWaterMillMessage()
							 | 
						|
									if err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return w.queueChannel.Publish(pubSubTopicName, wMsg)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (w *webhookMessage) toWaterMillMessage() (*message.Message, error) {
							 | 
						|
									payload, err := proto.Marshal(w.Notification)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									msg := message.NewMessage(watermill.NewUUID(), payload)
							 | 
						|
									// Set event type and key as metadata
							 | 
						|
									msg.Metadata.Set("event_type", w.EventType)
							 | 
						|
									msg.Metadata.Set("key", w.Key)
							 | 
						|
								
							 | 
						|
									return msg, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (w *Queue) Initialize(configuration util.Configuration, prefix string) error {
							 | 
						|
									c := newConfigWithDefaults(configuration, prefix)
							 | 
						|
								
							 | 
						|
									if err := c.validate(); err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return w.initialize(c)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (w *Queue) initialize(cfg *config) error {
							 | 
						|
									w.ctx, w.cancel = context.WithCancel(context.Background())
							 | 
						|
									w.config = cfg
							 | 
						|
									w.filter = newFilter(cfg)
							 | 
						|
								
							 | 
						|
									hClient, err := newHTTPClient(cfg)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create webhook http client: %w", err)
							 | 
						|
									}
							 | 
						|
									w.client = hClient
							 | 
						|
								
							 | 
						|
									if err = w.setupWatermillQueue(cfg); err != nil {
							 | 
						|
										return fmt.Errorf("failed to setup watermill queue: %w", err)
							 | 
						|
									}
							 | 
						|
									if err = w.logDeadLetterMessages(); err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (w *Queue) setupWatermillQueue(cfg *config) error {
							 | 
						|
									logger := watermill.NewStdLogger(false, false)
							 | 
						|
									pubSubConfig := gochannel.Config{
							 | 
						|
										OutputChannelBuffer: int64(cfg.bufferSize),
							 | 
						|
										Persistent:          false,
							 | 
						|
									}
							 | 
						|
									w.queueChannel = gochannel.NewGoChannel(pubSubConfig, logger)
							 | 
						|
								
							 | 
						|
									router, err := message.NewRouter(
							 | 
						|
										message.RouterConfig{
							 | 
						|
											CloseTimeout: 60 * time.Second,
							 | 
						|
										},
							 | 
						|
										logger,
							 | 
						|
									)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create router: %w", err)
							 | 
						|
									}
							 | 
						|
									w.router = router
							 | 
						|
								
							 | 
						|
									retryMiddleware := middleware.Retry{
							 | 
						|
										MaxRetries:          cfg.maxRetries,
							 | 
						|
										InitialInterval:     time.Duration(cfg.backoffSeconds) * time.Second,
							 | 
						|
										MaxInterval:         time.Duration(cfg.maxBackoffSeconds) * time.Second,
							 | 
						|
										Multiplier:          2.0,
							 | 
						|
										RandomizationFactor: 0.3,
							 | 
						|
										Logger:              logger,
							 | 
						|
									}.Middleware
							 | 
						|
								
							 | 
						|
									poisonQueue, err := middleware.PoisonQueue(w.queueChannel, deadLetterTopic)
							 | 
						|
									if err != nil {
							 | 
						|
										return fmt.Errorf("failed to create poison queue: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									router.AddPlugin(plugin.SignalsHandler)
							 | 
						|
									router.AddMiddleware(retryMiddleware, poisonQueue)
							 | 
						|
								
							 | 
						|
									for i := 0; i < cfg.nWorkers; i++ {
							 | 
						|
										router.AddNoPublisherHandler(
							 | 
						|
											pubSubHandlerNameTemplate(i),
							 | 
						|
											pubSubTopicName,
							 | 
						|
											w.queueChannel,
							 | 
						|
											w.handleWebhook,
							 | 
						|
										)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									go func() {
							 | 
						|
										// cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already
							 | 
						|
										defer w.cancel()
							 | 
						|
								
							 | 
						|
										if err := router.Run(w.ctx); err != nil && !errors.Is(err, context.Canceled) {
							 | 
						|
											glog.Errorf("webhook pubsub worker stopped with error: %v", err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										glog.Info("webhook pubsub worker stopped")
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (w *Queue) handleWebhook(msg *message.Message) error {
							 | 
						|
									var n filer_pb.EventNotification
							 | 
						|
									if err := proto.Unmarshal(msg.Payload, &n); err != nil {
							 | 
						|
										glog.Errorf("failed to unmarshal protobuf message: %v", err)
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Reconstruct webhook message from metadata and payload
							 | 
						|
									webhookMsg := &webhookMessage{
							 | 
						|
										Key:          msg.Metadata.Get("key"),
							 | 
						|
										EventType:    msg.Metadata.Get("event_type"),
							 | 
						|
										Notification: &n,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if err := w.client.sendMessage(webhookMsg); err != nil {
							 | 
						|
										glog.Errorf("failed to send message to webhook %s: %v", webhookMsg.Key, err)
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (w *Queue) logDeadLetterMessages() error {
							 | 
						|
									ch, err := w.queueChannel.Subscribe(w.ctx, deadLetterTopic)
							 | 
						|
									if err != nil {
							 | 
						|
										return err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									go func() {
							 | 
						|
										for {
							 | 
						|
											select {
							 | 
						|
											case msg, ok := <-ch:
							 | 
						|
												if !ok {
							 | 
						|
													glog.Info("dead letter channel closed")
							 | 
						|
													return
							 | 
						|
												}
							 | 
						|
												if msg == nil {
							 | 
						|
													glog.Errorf("received nil message from dead letter channel")
							 | 
						|
													continue
							 | 
						|
												}
							 | 
						|
												key := "unknown"
							 | 
						|
												if msg.Metadata != nil {
							 | 
						|
													if keyValue, exists := msg.Metadata["key"]; exists {
							 | 
						|
														key = keyValue
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
												payload := ""
							 | 
						|
												if msg.Payload != nil {
							 | 
						|
													var n filer_pb.EventNotification
							 | 
						|
													if err := proto.Unmarshal(msg.Payload, &n); err != nil {
							 | 
						|
														payload = fmt.Sprintf("failed to unmarshal payload: %v", err)
							 | 
						|
													} else {
							 | 
						|
														payload = n.String()
							 | 
						|
													}
							 | 
						|
												}
							 | 
						|
												glog.Errorf("received dead letter message: %s, key: %s", payload, key)
							 | 
						|
											case <-w.ctx.Done():
							 | 
						|
												return
							 | 
						|
											}
							 | 
						|
										}
							 | 
						|
									}()
							 | 
						|
								
							 | 
						|
									return nil
							 | 
						|
								}
							 |