|
|
@ -1,17 +1,14 @@ |
|
|
|
package S3Sink |
|
|
|
|
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
|
|
|
|
"github.com/aws/aws-sdk-go/aws" |
|
|
|
"github.com/aws/aws-sdk-go/aws/credentials" |
|
|
|
"github.com/aws/aws-sdk-go/aws/session" |
|
|
|
"github.com/aws/aws-sdk-go/service/s3" |
|
|
|
"github.com/aws/aws-sdk-go/service/s3/s3iface" |
|
|
|
"github.com/aws/aws-sdk-go/service/s3/s3manager" |
|
|
|
"strings" |
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer" |
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
@ -109,57 +106,48 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { |
|
|
|
func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) (err error) { |
|
|
|
key = cleanKey(key) |
|
|
|
|
|
|
|
if entry.IsDirectory { |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
uploadId, err := s3sink.createMultipartUpload(key, entry) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("createMultipartUpload: %v", err) |
|
|
|
} |
|
|
|
|
|
|
|
totalSize := filer.FileSize(entry) |
|
|
|
|
|
|
|
chunkViews := filer.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) |
|
|
|
reader := filer.NewFileReader(s3sink.filerSource, entry) |
|
|
|
|
|
|
|
parts := make([]*s3.CompletedPart, len(chunkViews)) |
|
|
|
fileSize := int64(filer.FileSize(entry)) |
|
|
|
|
|
|
|
if len(parts) > 0 { |
|
|
|
var wg sync.WaitGroup |
|
|
|
for chunkIndex, chunk := range chunkViews { |
|
|
|
partId := chunkIndex + 1 |
|
|
|
wg.Add(1) |
|
|
|
go func(chunk *filer.ChunkView, index int) { |
|
|
|
defer wg.Done() |
|
|
|
if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { |
|
|
|
err = uploadErr |
|
|
|
glog.Errorf("uploadPart: %v", uploadErr) |
|
|
|
} else { |
|
|
|
parts[index] = part |
|
|
|
partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB
|
|
|
|
for partSize*1000 < fileSize { |
|
|
|
partSize *= 4 |
|
|
|
} |
|
|
|
}(chunk, chunkIndex) |
|
|
|
|
|
|
|
// Create an uploader with the session and custom options
|
|
|
|
uploader := s3manager.NewUploaderWithClient(s3sink.conn, func(u *s3manager.Uploader) { |
|
|
|
u.PartSize = partSize |
|
|
|
u.Concurrency = 8 |
|
|
|
}) |
|
|
|
|
|
|
|
// process tagging
|
|
|
|
tags := "" |
|
|
|
if true { |
|
|
|
for k, v := range entry.Extended { |
|
|
|
if len(tags) > 0 { |
|
|
|
tags = tags + "&" |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
} else if len(entry.Content) > 0 { |
|
|
|
// for small files
|
|
|
|
if part, uploadErr := s3sink.doUploadPart(key, uploadId, 1, bytes.NewReader(entry.Content)); uploadErr != nil { |
|
|
|
err = uploadErr |
|
|
|
glog.Errorf("uploadPart: %v", uploadErr) |
|
|
|
} else { |
|
|
|
parts = make([]*s3.CompletedPart, 1) |
|
|
|
parts[0] = part |
|
|
|
tags = tags + k + "=" + string(v) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
s3sink.abortMultipartUpload(key, uploadId) |
|
|
|
return fmt.Errorf("uploadPart: %v", err) |
|
|
|
} |
|
|
|
// Upload the file to S3.
|
|
|
|
_, err = uploader.Upload(&s3manager.UploadInput{ |
|
|
|
Bucket: aws.String(s3sink.bucket), |
|
|
|
Key: aws.String(key), |
|
|
|
Body: reader, |
|
|
|
Tagging: aws.String(tags), |
|
|
|
}) |
|
|
|
|
|
|
|
return s3sink.completeMultipartUpload(context.Background(), key, uploadId, parts) |
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|