From e8ef501f02217d29e35f4a52c86be3d93939ae6f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 3 Oct 2018 23:36:52 -0700 Subject: [PATCH] add s3 replication sink --- weed/command/filer_replication.go | 17 +- weed/command/scaffold.go | 14 +- weed/replication/replicator.go | 29 ++- weed/replication/sink/filersink/filer_sink.go | 33 ++-- weed/replication/sink/replication_sink.go | 12 +- weed/replication/sink/s3sink/s3_sink.go | 123 +++++++++++++ weed/replication/sink/s3sink/s3_write.go | 165 ++++++++++++++++++ weed/replication/source/filer_source.go | 22 ++- 8 files changed, 369 insertions(+), 46 deletions(-) create mode 100644 weed/replication/sink/s3sink/s3_sink.go create mode 100644 weed/replication/sink/s3sink/s3_write.go diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index b19597245..5e41cbb55 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/server" "github.com/spf13/viper" "strings" + "github.com/chrislusf/seaweedfs/weed/replication/sink" ) func init() { @@ -57,7 +58,21 @@ func runFilerReplicate(cmd *Command, args []string) bool { } } - replicator := replication.NewReplicator(config.Sub("source.filer"), config.Sub("sink.filer")) + var dataSink sink.ReplicationSink + for _, sk := range sink.Sinks { + if config.GetBool("sink." + sk.GetName() + ".enabled") { + viperSub := config.Sub("sink." + sk.GetName()) + if err := sk.Initialize(viperSub); err != nil { + glog.Fatalf("Failed to initialize sink for %s: %+v", + sk.GetName(), err) + } + glog.V(0).Infof("Configure sink to %s", sk.GetName()) + dataSink = sk + break + } + } + + replicator := replication.NewReplicator(config.Sub("source.filer"), dataSink) for { key, m, err := notificationInput.ReceiveMessage() diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 321b50424..e0bd48c8c 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -161,7 +161,7 @@ grpcAddress = "localhost:18888" directory = "/buckets" # all files under this directory tree are replicated [notification.kafka] -enabled = true +enabled = false hosts = [ "localhost:9092" ] @@ -170,12 +170,22 @@ offsetFile = "./last.offset" offsetSaveIntervalSeconds = 10 [sink.filer] -enabled = true +enabled = false grpcAddress = "localhost:18888" directory = "/backup" # all replicated files are under this directory tree replication = "" collection = "" ttlSec = 0 +[sink.s3] +# See https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html +# default loads credentials from the shared credentials file (~/.aws/credentials). +enabled = false +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +bucket = "your_bucket_name" # an existing bucket +directory = "" # destination directory (do not prefix or suffix with "/") + ` ) diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 834da6217..d5a57ecac 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -6,8 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" - "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" - "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/glog" ) @@ -17,18 +16,15 @@ type Replicator struct { source *source.FilerSource } -func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator { - - sink := &filersink.FilerSink{} - sink.Initialize(sinkConfig) +func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSink) *Replicator { source := &source.FilerSource{} source.Initialize(sourceConfig) - sink.SetSourceFiler(source) + dataSink.SetSourceFiler(source) return &Replicator{ - sink: sink, + sink: dataSink, source: source, } } @@ -39,23 +35,20 @@ func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) } key = filepath.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):]) if message.OldEntry != nil && message.NewEntry == nil { - return r.sink.DeleteEntry(key, message.OldEntry, message.DeleteChunks) + return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks) } if message.OldEntry == nil && message.NewEntry != nil { return r.sink.CreateEntry(key, message.NewEntry) } - if existingEntry, err := r.sink.LookupEntry(key); err == nil { - if message.OldEntry == nil && message.NewEntry == nil { - glog.V(0).Infof("message %+v existingEntry: %+v", message, existingEntry) - return r.sink.DeleteEntry(key, existingEntry, true) - } - return r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, existingEntry, message.DeleteChunks) - } - - glog.V(0).Infof("key:%s, message %+v", key, message) if message.OldEntry == nil && message.NewEntry == nil { + glog.V(0).Infof("weird message %+v", message) return nil } + foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, message.DeleteChunks) + if foundExisting { + return err + } + return r.sink.CreateEntry(key, message.NewEntry) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index c98c99f34..d526da1c9 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/replication/sink" ) type FilerSink struct { @@ -21,6 +22,14 @@ type FilerSink struct { dataCenter string } +func init(){ + sink.Sinks = append(sink.Sinks, &FilerSink{}) +} + +func (fs *FilerSink) GetName() string { + return "filer" +} + func (fs *FilerSink) GetSinkToDirectory() string { return fs.dir } @@ -49,7 +58,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string, return nil } -func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error { +func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() @@ -57,7 +66,7 @@ func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteInclud request := &filer_pb.DeleteEntryRequest{ Directory: dir, Name: name, - IsDirectory: entry.IsDirectory, + IsDirectory: isDirectory, IsDeleteData: deleteIncludeChunks, } @@ -121,13 +130,14 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { }) } -func (fs *FilerSink) LookupEntry(key string) (entry *filer_pb.Entry, err error) { +func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { ctx := context.Background() dir, name := filer2.FullPath(key).DirAndName() // read existing entry + var existingEntry *filer_pb.Entry err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ @@ -142,24 +152,15 @@ func (fs *FilerSink) LookupEntry(key string) (entry *filer_pb.Entry, err error) return err } - entry = resp.Entry + existingEntry = resp.Entry return nil }) if err != nil { - return nil, fmt.Errorf("lookup %s: %v", key, err) + return false, fmt.Errorf("lookup %s: %v", key, err) } - return entry, nil -} - -func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry, existingEntry *filer_pb.Entry, deleteIncludeChunks bool) (err error) { - - ctx := context.Background() - - dir, _ := filer2.FullPath(key).DirAndName() - glog.V(0).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) if filer2.ETag(newEntry.Chunks) == filer2.ETag(existingEntry.Chunks) { @@ -179,13 +180,13 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry, existingEntry * // replicate the chunks that are new in the source replicatedChunks, err := fs.replicateChunks(newChunks) if err != nil { - return fmt.Errorf("replicte %s chunks error: %v", key, err) + return true, fmt.Errorf("replicte %s chunks error: %v", key, err) } existingEntry.Chunks = append(existingEntry.Chunks, replicatedChunks...) } // save updated meta data - return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return true, fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: dir, diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index c33f3251b..0a86139d3 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -3,13 +3,19 @@ package sink import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" ) type ReplicationSink interface { - DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error + GetName() string + Initialize(configuration util.Configuration) error + DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error CreateEntry(key string, entry *filer_pb.Entry) error - UpdateEntry(key string, oldEntry, newEntry, existingEntry *filer_pb.Entry, deleteIncludeChunks bool) error - LookupEntry(key string) (entry *filer_pb.Entry, err error) + UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) GetSinkToDirectory() string SetSourceFiler(s *source.FilerSource) } + +var ( + Sinks []ReplicationSink +) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go new file mode 100644 index 000000000..c8300a108 --- /dev/null +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -0,0 +1,123 @@ +package S3Sink + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/aws/session" + "sync" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" +) + +type S3Sink struct { + err error + conn s3iface.S3API + region string + bucket string + dir string + filerSource *source.FilerSource +} + +func init() { + sink.Sinks = append(sink.Sinks, &S3Sink{}) +} + +func (s3sink *S3Sink) GetName() string { + return "filer" +} + +func (s3sink *S3Sink) GetSinkToDirectory() string { + return s3sink.dir +} + +func (s3sink *S3Sink) Initialize(configuration util.Configuration) error { + return s3sink.initialize( + configuration.GetString("aws_access_key_id"), + configuration.GetString("aws_secret_access_key"), + configuration.GetString("region"), + configuration.GetString("bucket"), + configuration.GetString("directory"), + ) +} + +func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) { + s3sink.filerSource = s +} + +func (s3sink *S3Sink) initialize(awsAccessKeyId, aswSecretAccessKey, region, bucket, dir string) (error) { + s3sink.region = region + s3sink.bucket = bucket + s3sink.dir = dir + + config := &aws.Config{ + Region: aws.String(s3sink.region), + } + if awsAccessKeyId != "" && aswSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + } + + sess, err := session.NewSession(config) + if err != nil { + return fmt.Errorf("create aws session: %v", err) + } + s3sink.conn = s3.New(sess) + + return nil +} + +func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool) error { + + if isDirectory { + key = key + "/" + } + + return s3sink.deleteObject(key) + +} + +func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { + + uploadId, err := s3sink.createMultipartUpload(key, entry) + if err != nil { + return err + } + + totalSize := filer2.TotalSize(entry.Chunks) + chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) + + var parts []*s3.CompletedPart + var wg sync.WaitGroup + for chunkIndex, chunk := range chunkViews { + partId := chunkIndex + 1 + wg.Add(1) + go func(chunk *filer2.ChunkView) { + defer wg.Done() + if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { + err = uploadErr + } else { + parts = append(parts, part) + } + }(chunk) + } + wg.Wait() + + if err != nil { + s3sink.abortMultipartUpload(key, uploadId) + return err + } + + return s3sink.completeMultipartUpload(key, uploadId, parts) + +} + +func (s3sink *S3Sink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { + // TODO improve efficiency + return false, nil +} diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go new file mode 100644 index 000000000..df73e34a7 --- /dev/null +++ b/weed/replication/sink/s3sink/s3_write.go @@ -0,0 +1,165 @@ +package S3Sink + +import ( + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/aws" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/aws/aws-sdk-go/aws/awserr" + "fmt" + "io" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/util" + "bytes" +) + +func (s3sink *S3Sink) deleteObject(key string) error { + input := &s3.DeleteObjectInput{ + Bucket: aws.String(s3sink.bucket), + Key: aws.String(key), + } + + result, err := s3sink.conn.DeleteObject(input) + + if err == nil { + glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result) + } else { + glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err) + } + + return err + +} + +func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) (uploadId string, err error) { + input := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(s3sink.bucket), + Key: aws.String(key), + ContentType: aws.String(entry.Attributes.Mime), + } + + result, err := s3sink.conn.CreateMultipartUpload(input) + + if err == nil { + glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result) + } else { + glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err) + return "", err + } + + return *result.UploadId, err +} + +func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error { + input := &s3.AbortMultipartUploadInput{ + Bucket: aws.String(s3sink.bucket), + Key: aws.String(key), + UploadId: aws.String(uploadId), + } + + result, err := s3sink.conn.AbortMultipartUpload(input) + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case s3.ErrCodeNoSuchUpload: + glog.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error()) + default: + glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) + } + } else { + // Print the error, cast err to awserr.Error to get the Code and + // Message from an error. + glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) + } + return err + } + + glog.V(0).Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result) + + return nil +} + +// To complete multipart upload +func (s3sink *S3Sink) completeMultipartUpload(key, uploadId string, parts []*s3.CompletedPart) error { + input := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(s3sink.bucket), + Key: aws.String(key), + UploadId: aws.String(uploadId), + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: parts, + }, + } + + result, err := s3sink.conn.CompleteMultipartUpload(input) + if err == nil { + glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) + } else { + glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) + } + + return err +} + +// To upload a part +func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer2.ChunkView) (*s3.CompletedPart, error) { + var readSeeker io.ReadSeeker + + readSeeker, err := s3sink.buildReadSeeker(chunk) + if err != nil { + glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) + return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) + } + + input := &s3.UploadPartInput{ + Body: readSeeker, + Bucket: aws.String(s3sink.bucket), + Key: aws.String(key), + PartNumber: aws.Int64(int64(partId)), + UploadId: aws.String(uploadId), + } + + result, err := s3sink.conn.UploadPart(input) + if err == nil { + glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result) + } else { + glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err) + } + + part := &s3.CompletedPart{ + ETag: result.ETag, + PartNumber: aws.Int64(int64(partId)), + } + + return part, err +} + +// To upload a part by copying byte range from an existing object as data source +func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySource string, sourceStart, sourceStop int) error { + input := &s3.UploadPartCopyInput{ + Bucket: aws.String(s3sink.bucket), + CopySource: aws.String(fmt.Sprintf("/%s/%s", s3sink.bucket, copySource)), + CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", sourceStart, sourceStop)), + Key: aws.String(key), + PartNumber: aws.Int64(partId), + UploadId: aws.String(uploadId), + } + + result, err := s3sink.conn.UploadPartCopy(input) + if err == nil { + glog.V(0).Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result) + } else { + glog.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err) + } + + return err +} + +func (s3sink *S3Sink) buildReadSeeker(chunk *filer2.ChunkView) (io.ReadSeeker, error) { + fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId) + if err != nil { + return nil, err + } + buf := make([]byte, chunk.Size) + util.ReadUrl(fileUrl, chunk.Offset, int(chunk.Size), buf) + return bytes.NewReader(buf), nil +} diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 69e74a63c..efe71e706 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -34,7 +34,7 @@ func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { return nil } -func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { +func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { vid2Locations := make(map[string]*filer_pb.Locations) @@ -56,18 +56,28 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade }) if err != nil { - glog.V(1).Infof("replication lookup volume id %s: %v", vid, err) - return "", nil, nil, fmt.Errorf("replication lookup volume id %s: %v", vid, err) + glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err) + return "", fmt.Errorf("LookupFileId volume id %s: %v", vid, err) } locations := vid2Locations[vid] if locations == nil || len(locations.Locations) == 0 { - glog.V(1).Infof("replication locate volume id %s: %v", vid, err) - return "", nil, nil, fmt.Errorf("replication locate volume id %s: %v", vid, err) + glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err) + return "", fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) } - fileUrl := fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) + fileUrl = fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) + + return +} + +func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, readCloser io.ReadCloser, err error) { + + fileUrl, err := fs.LookupFileId(part) + if err != nil { + return "", nil, nil, err + } filename, header, readCloser, err = util.DownloadFile(fileUrl)