Chris Lu
6 years ago
9 changed files with 282 additions and 4 deletions
-
9weed/command/filer_replication.go
-
15weed/command/scaffold.go
-
51weed/filer2/filer_notify_test.go
-
91weed/notification/aws_sqs/aws_sqs_pub.go
-
4weed/replication/sink/s3sink/s3_sink.go
-
111weed/replication/sub/notification_aws_sqs.go
-
2weed/replication/sub/notification_kafka.go
-
2weed/replication/sub/notifications.go
-
1weed/server/filer_server.go
@ -0,0 +1,51 @@ |
|||
package filer2 |
|||
|
|||
import ( |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/golang/protobuf/proto" |
|||
) |
|||
|
|||
func TestProtoMarshalText(t *testing.T) { |
|||
|
|||
oldEntry := &Entry{ |
|||
FullPath: FullPath("/this/path/to"), |
|||
Attr: Attr{ |
|||
Mtime: time.Now(), |
|||
Mode: 0644, |
|||
Uid: 1, |
|||
Mime: "text/json", |
|||
TtlSec: 25, |
|||
}, |
|||
Chunks: []*filer_pb.FileChunk{ |
|||
&filer_pb.FileChunk{ |
|||
FileId: "234,2423423422", |
|||
Offset: 234234, |
|||
Size: 234, |
|||
Mtime: 12312423, |
|||
ETag: "2342342354", |
|||
SourceFileId: "23234,2342342342", |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
notification := &filer_pb.EventNotification{ |
|||
OldEntry: toProtoEntry(oldEntry), |
|||
NewEntry: toProtoEntry(nil), |
|||
DeleteChunks: true, |
|||
} |
|||
|
|||
text := proto.MarshalTextString(notification) |
|||
|
|||
notification2 := &filer_pb.EventNotification{} |
|||
proto.UnmarshalText(text, notification2) |
|||
|
|||
if notification2.OldEntry.Chunks[0].SourceFileId != notification.OldEntry.Chunks[0].SourceFileId { |
|||
t.Fatalf("marshal/unmarshal error: %s", text) |
|||
} |
|||
|
|||
println(text) |
|||
|
|||
} |
@ -0,0 +1,91 @@ |
|||
package aws_sqs |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/notification" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"github.com/aws/aws-sdk-go/service/sqs" |
|||
"github.com/aws/aws-sdk-go/aws" |
|||
"github.com/aws/aws-sdk-go/aws/credentials" |
|||
"github.com/aws/aws-sdk-go/aws/session" |
|||
"fmt" |
|||
"github.com/aws/aws-sdk-go/aws/awserr" |
|||
) |
|||
|
|||
func init() { |
|||
notification.MessageQueues = append(notification.MessageQueues, &AwsSqsPub{}) |
|||
} |
|||
|
|||
type AwsSqsPub struct { |
|||
svc *sqs.SQS |
|||
queueUrl string |
|||
} |
|||
|
|||
func (k *AwsSqsPub) GetName() string { |
|||
return "aws_sqs" |
|||
} |
|||
|
|||
func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) { |
|||
glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region")) |
|||
glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) |
|||
return k.initialize( |
|||
configuration.GetString("aws_access_key_id"), |
|||
configuration.GetString("aws_secret_access_key"), |
|||
configuration.GetString("region"), |
|||
configuration.GetString("sqs_queue_name"), |
|||
) |
|||
} |
|||
|
|||
func (k *AwsSqsPub) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { |
|||
|
|||
config := &aws.Config{ |
|||
Region: aws.String(region), |
|||
} |
|||
if awsAccessKeyId != "" && aswSecretAccessKey != "" { |
|||
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") |
|||
} |
|||
|
|||
sess, err := session.NewSession(config) |
|||
if err != nil { |
|||
return fmt.Errorf("create aws session: %v", err) |
|||
} |
|||
k.svc = sqs.New(sess) |
|||
|
|||
result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{ |
|||
QueueName: aws.String(queueName), |
|||
}) |
|||
if err != nil { |
|||
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { |
|||
return fmt.Errorf("unable to find queue %s", queueName) |
|||
} |
|||
return fmt.Errorf("get queue %s url: %v", queueName, err) |
|||
} |
|||
|
|||
k.queueUrl = *result.QueueUrl |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (k *AwsSqsPub) SendMessage(key string, message proto.Message) (err error) { |
|||
|
|||
text := proto.MarshalTextString(message) |
|||
|
|||
_, err = k.svc.SendMessage(&sqs.SendMessageInput{ |
|||
DelaySeconds: aws.Int64(10), |
|||
MessageAttributes: map[string]*sqs.MessageAttributeValue{ |
|||
"key": &sqs.MessageAttributeValue{ |
|||
DataType: aws.String("String"), |
|||
StringValue: aws.String(key), |
|||
}, |
|||
}, |
|||
MessageBody: aws.String(text), |
|||
QueueUrl: &k.queueUrl, |
|||
}) |
|||
|
|||
if err != nil { |
|||
return fmt.Errorf("send message to sqs %s: %v", k.queueUrl, err) |
|||
} |
|||
|
|||
return nil |
|||
} |
@ -0,0 +1,111 @@ |
|||
package sub |
|||
|
|||
import ( |
|||
"fmt" |
|||
|
|||
"github.com/chrislusf/seaweedfs/weed/glog" |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/chrislusf/seaweedfs/weed/util" |
|||
"github.com/golang/protobuf/proto" |
|||
"github.com/aws/aws-sdk-go/aws" |
|||
"github.com/aws/aws-sdk-go/aws/credentials" |
|||
"github.com/aws/aws-sdk-go/aws/session" |
|||
"github.com/aws/aws-sdk-go/service/sqs" |
|||
"github.com/aws/aws-sdk-go/aws/awserr" |
|||
) |
|||
|
|||
func init() { |
|||
NotificationInputs = append(NotificationInputs, &AwsSqsInput{}) |
|||
} |
|||
|
|||
type AwsSqsInput struct { |
|||
svc *sqs.SQS |
|||
queueUrl string |
|||
} |
|||
|
|||
func (k *AwsSqsInput) GetName() string { |
|||
return "aws_sqs" |
|||
} |
|||
|
|||
func (k *AwsSqsInput) Initialize(configuration util.Configuration) error { |
|||
glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region")) |
|||
glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) |
|||
return k.initialize( |
|||
configuration.GetString("aws_access_key_id"), |
|||
configuration.GetString("aws_secret_access_key"), |
|||
configuration.GetString("region"), |
|||
configuration.GetString("sqs_queue_name"), |
|||
) |
|||
} |
|||
|
|||
func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { |
|||
|
|||
config := &aws.Config{ |
|||
Region: aws.String(region), |
|||
} |
|||
if awsAccessKeyId != "" && aswSecretAccessKey != "" { |
|||
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") |
|||
} |
|||
|
|||
sess, err := session.NewSession(config) |
|||
if err != nil { |
|||
return fmt.Errorf("create aws session: %v", err) |
|||
} |
|||
k.svc = sqs.New(sess) |
|||
|
|||
result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{ |
|||
QueueName: aws.String(queueName), |
|||
}) |
|||
if err != nil { |
|||
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { |
|||
return fmt.Errorf("unable to find queue %s", queueName) |
|||
} |
|||
return fmt.Errorf("get queue %s url: %v", queueName, err) |
|||
} |
|||
|
|||
k.queueUrl = *result.QueueUrl |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { |
|||
|
|||
// receive message
|
|||
result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ |
|||
AttributeNames: []*string{ |
|||
aws.String(sqs.MessageSystemAttributeNameSentTimestamp), |
|||
}, |
|||
MessageAttributeNames: []*string{ |
|||
aws.String(sqs.QueueAttributeNameAll), |
|||
}, |
|||
QueueUrl: &k.queueUrl, |
|||
MaxNumberOfMessages: aws.Int64(1), |
|||
VisibilityTimeout: aws.Int64(20), // 20 seconds
|
|||
WaitTimeSeconds: aws.Int64(20), |
|||
}) |
|||
if err != nil { |
|||
err = fmt.Errorf("receive message from sqs %s: %v", k.queueUrl, err) |
|||
return |
|||
} |
|||
if len(result.Messages) == 0 { |
|||
return |
|||
} |
|||
|
|||
// process the message
|
|||
key = *result.Messages[0].Attributes["key"] |
|||
text := *result.Messages[0].Body |
|||
message = &filer_pb.EventNotification{} |
|||
err = proto.UnmarshalText(text, message) |
|||
|
|||
// delete the message
|
|||
_, err = k.svc.DeleteMessage(&sqs.DeleteMessageInput{ |
|||
QueueUrl: &k.queueUrl, |
|||
ReceiptHandle: result.Messages[0].ReceiptHandle, |
|||
}) |
|||
|
|||
if err != nil { |
|||
glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err) |
|||
} |
|||
|
|||
return |
|||
} |
@ -1,4 +1,4 @@ |
|||
package replication |
|||
package sub |
|||
|
|||
import ( |
|||
"encoding/json" |
@ -1,4 +1,4 @@ |
|||
package replication |
|||
package sub |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
Write
Preview
Loading…
Cancel
Save
Reference in new issue