diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 4f698e375..0b6eaf94e 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -97,13 +97,19 @@ func runFilerReplicate(cmd *Command, args []string) bool { replicator := replication.NewReplicator(config, "source.filer.", dataSink) for { - key, m, err := notificationInput.ReceiveMessage() + key, m, onSuccessFn, onFailureFn, err := notificationInput.ReceiveMessage() if err != nil { glog.Errorf("receive %s: %+v", key, err) + if onFailureFn != nil { + onFailureFn() + } continue } if key == "" { // long poll received no messages + if onSuccessFn != nil { + onSuccessFn() + } continue } if m.OldEntry != nil && m.NewEntry == nil { @@ -115,8 +121,14 @@ func runFilerReplicate(cmd *Command, args []string) bool { } if err = replicator.Replicate(context.Background(), key, m); err != nil { glog.Errorf("replicate %s: %+v", key, err) + if onFailureFn != nil { + onFailureFn() + } } else { glog.V(1).Infof("replicated %s", key) + if onSuccessFn != nil { + onSuccessFn() + } } } diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index 1dd386ba7..642834c72 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -68,7 +68,7 @@ func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, que return nil } -func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { // receive message result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 9726096e5..09a96d5d5 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -38,13 +38,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s return nil } -func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { msg, err := k.sub.Receive(context.Background()) + if err != nil { + return + } + onSuccessFn = func() { + msg.Ack() + } + onFailureFn = func() { + if msg.Nackable() { + msg.Nack() + } + } key = msg.Metadata["key"] message = &filer_pb.EventNotification{} err = proto.Unmarshal(msg.Body, message) if err != nil { - return "", nil, err + return "", nil, onSuccessFn, onFailureFn, err } - return key, message, nil + return key, message, onSuccessFn, onFailureFn, nil } diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index a950bb42b..f7c767d4a 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -85,16 +85,22 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { k.messageChan <- m - m.Ack() }) return err } -func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { m := <-k.messageChan + onSuccessFn = func() { + m.Ack() + } + onFailureFn = func() { + m.Nack() + } + // process the message key = m.Attributes["key"] message = &filer_pb.EventNotification{} diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index fa9cfad9b..622a759ea 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -97,7 +97,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, return nil } -func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { msg := <-k.messageChan diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go index 8a2668f98..d5a910db9 100644 --- a/weed/replication/sub/notifications.go +++ b/weed/replication/sub/notifications.go @@ -10,7 +10,7 @@ type NotificationInput interface { GetName() string // Initialize initializes the file store Initialize(configuration util.Configuration, prefix string) error - ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) + ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) } var (