From c839ce1b193b6ab079afff88c3b52666bc879340 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 26 Aug 2022 23:47:12 -0700 Subject: [PATCH] s3 sink use s3 upload manager fix https://github.com/seaweedfs/seaweedfs/issues/3531 --- weed/replication/sink/s3sink/s3_sink.go | 76 +++++++++++-------------- 1 file changed, 32 insertions(+), 44 deletions(-) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 9c4a7498b..6b3a9c1ef 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -1,17 +1,14 @@ package S3Sink import ( - "bytes" - "context" "fmt" - "strings" - "sync" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "strings" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -109,57 +106,48 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b } -func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { +func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) (err error) { key = cleanKey(key) if entry.IsDirectory { return nil } - uploadId, err := s3sink.createMultipartUpload(key, entry) - if err != nil { - return fmt.Errorf("createMultipartUpload: %v", err) + reader := filer.NewFileReader(s3sink.filerSource, entry) + + fileSize := int64(filer.FileSize(entry)) + + partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB + for partSize*1000 < fileSize { + partSize *= 4 } - totalSize := filer.FileSize(entry) - - chunkViews := filer.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) - - parts := make([]*s3.CompletedPart, len(chunkViews)) - - if len(parts) > 0 { - var wg sync.WaitGroup - for chunkIndex, chunk := range chunkViews { - partId := chunkIndex + 1 - wg.Add(1) - go func(chunk *filer.ChunkView, index int) { - defer wg.Done() - if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { - err = uploadErr - glog.Errorf("uploadPart: %v", uploadErr) - } else { - parts[index] = part - } - }(chunk, chunkIndex) - } - wg.Wait() - } else if len(entry.Content) > 0 { - // for small files - if part, uploadErr := s3sink.doUploadPart(key, uploadId, 1, bytes.NewReader(entry.Content)); uploadErr != nil { - err = uploadErr - glog.Errorf("uploadPart: %v", uploadErr) - } else { - parts = make([]*s3.CompletedPart, 1) - parts[0] = part + // Create an uploader with the session and custom options + uploader := s3manager.NewUploaderWithClient(s3sink.conn, func(u *s3manager.Uploader) { + u.PartSize = partSize + u.Concurrency = 8 + }) + + // process tagging + tags := "" + if true { + for k, v := range entry.Extended { + if len(tags) > 0 { + tags = tags + "&" + } + tags = tags + k + "=" + string(v) } } - if err != nil { - s3sink.abortMultipartUpload(key, uploadId) - return fmt.Errorf("uploadPart: %v", err) - } + // Upload the file to S3. + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(s3sink.bucket), + Key: aws.String(key), + Body: reader, + Tagging: aws.String(tags), + }) - return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts) + return }