diff --git a/test/s3/multipart/aws_upload.go b/test/s3/multipart/aws_upload.go
new file mode 100644
index 000000000..8c15cf6ed
--- /dev/null
+++ b/test/s3/multipart/aws_upload.go
@@ -0,0 +1,175 @@
+package main
+
+// copied from https://github.com/apoorvam/aws-s3-multipart-upload
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "net/http"
+ "os"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3"
+)
+
+const (
+ maxPartSize = int64(5 * 1024 * 1024)
+ maxRetries = 3
+ awsAccessKeyID = "Your access key"
+ awsSecretAccessKey = "Your secret key"
+ awsBucketRegion = "S3 bucket region"
+ awsBucketName = "newBucket"
+)
+
+var (
+ filename = flag.String("f", "", "the file name")
+)
+
+func main() {
+ flag.Parse()
+
+ creds := credentials.NewStaticCredentials(awsAccessKeyID, awsSecretAccessKey, "")
+ _, err := creds.Get()
+ if err != nil {
+ fmt.Printf("bad credentials: %s", err)
+ }
+ cfg := aws.NewConfig().WithRegion(awsBucketRegion).WithCredentials(creds).WithDisableSSL(true).WithEndpoint("localhost:8333")
+ svc := s3.New(session.New(), cfg)
+
+ file, err := os.Open(*filename)
+ if err != nil {
+ fmt.Printf("err opening file: %s", err)
+ return
+ }
+ defer file.Close()
+ fileInfo, _ := file.Stat()
+ size := fileInfo.Size()
+ buffer := make([]byte, size)
+ fileType := http.DetectContentType(buffer)
+ file.Read(buffer)
+
+ path := "/media/" + file.Name()
+ input := &s3.CreateMultipartUploadInput{
+ Bucket: aws.String(awsBucketName),
+ Key: aws.String(path),
+ ContentType: aws.String(fileType),
+ }
+
+ resp, err := svc.CreateMultipartUpload(input)
+ if err != nil {
+ fmt.Println(err.Error())
+ return
+ }
+ fmt.Println("Created multipart upload request")
+
+ var curr, partLength int64
+ var remaining = size
+ var completedParts []*s3.CompletedPart
+ partNumber := 1
+ for curr = 0; remaining != 0; curr += partLength {
+ if remaining < maxPartSize {
+ partLength = remaining
+ } else {
+ partLength = maxPartSize
+ }
+ completedPart, err := uploadPart(svc, resp, buffer[curr:curr+partLength], partNumber)
+ if err != nil {
+ fmt.Println(err.Error())
+ err := abortMultipartUpload(svc, resp)
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ return
+ }
+ remaining -= partLength
+ partNumber++
+ completedParts = append(completedParts, completedPart)
+ }
+
+ // list parts
+ parts, err := svc.ListParts(&s3.ListPartsInput{
+ Bucket: input.Bucket,
+ Key: input.Key,
+ MaxParts: nil,
+ PartNumberMarker: nil,
+ RequestPayer: nil,
+ UploadId: resp.UploadId,
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ return
+ }
+ fmt.Printf("list parts: %d\n", len(parts.Parts))
+ for i, part := range parts.Parts {
+ fmt.Printf("part %d: %v\n", i, part)
+ }
+
+
+ completeResponse, err := completeMultipartUpload(svc, resp, completedParts)
+ if err != nil {
+ fmt.Println(err.Error())
+ return
+ }
+
+ fmt.Printf("Successfully uploaded file: %s\n", completeResponse.String())
+}
+
+func completeMultipartUpload(svc *s3.S3, resp *s3.CreateMultipartUploadOutput, completedParts []*s3.CompletedPart) (*s3.CompleteMultipartUploadOutput, error) {
+ completeInput := &s3.CompleteMultipartUploadInput{
+ Bucket: resp.Bucket,
+ Key: resp.Key,
+ UploadId: resp.UploadId,
+ MultipartUpload: &s3.CompletedMultipartUpload{
+ Parts: completedParts,
+ },
+ }
+ return svc.CompleteMultipartUpload(completeInput)
+}
+
+func uploadPart(svc *s3.S3, resp *s3.CreateMultipartUploadOutput, fileBytes []byte, partNumber int) (*s3.CompletedPart, error) {
+ tryNum := 1
+ partInput := &s3.UploadPartInput{
+ Body: bytes.NewReader(fileBytes),
+ Bucket: resp.Bucket,
+ Key: resp.Key,
+ PartNumber: aws.Int64(int64(partNumber)),
+ UploadId: resp.UploadId,
+ ContentLength: aws.Int64(int64(len(fileBytes))),
+ }
+
+ for tryNum <= maxRetries {
+ uploadResult, err := svc.UploadPart(partInput)
+ if err != nil {
+ if tryNum == maxRetries {
+ if aerr, ok := err.(awserr.Error); ok {
+ return nil, aerr
+ }
+ return nil, err
+ }
+ fmt.Printf("Retrying to upload part #%v\n", partNumber)
+ tryNum++
+ } else {
+ fmt.Printf("Uploaded part #%v\n", partNumber)
+ return &s3.CompletedPart{
+ ETag: uploadResult.ETag,
+ PartNumber: aws.Int64(int64(partNumber)),
+ }, nil
+ }
+ }
+ return nil, nil
+}
+
+func abortMultipartUpload(svc *s3.S3, resp *s3.CreateMultipartUploadOutput) error {
+ fmt.Println("Aborting multipart upload for UploadId#" + *resp.UploadId)
+ abortInput := &s3.AbortMultipartUploadInput{
+ Bucket: resp.Bucket,
+ Key: resp.Key,
+ UploadId: resp.UploadId,
+ }
+ _, err := svc.AbortMultipartUpload(abortInput)
+ return err
+}
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index 4eb9bf32c..c7385cb0b 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -56,7 +56,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
- entries, err := s3a.list(uploadDirectory, "", "", false, 0)
+ entries, _, err := s3a.list(uploadDirectory, "", "", false, 0)
if err != nil || len(entries) == 0 {
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
return nil, ErrNoSuchUpload
@@ -156,7 +156,7 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
},
}
- entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, uint32(*input.MaxUploads))
+ entries, _, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, uint32(*input.MaxUploads))
if err != nil {
glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err)
return
@@ -177,26 +177,37 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
type ListPartsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult"`
- s3.ListPartsOutput
+
+ // copied from s3.ListPartsOutput, the Parts is not converting to
+ Bucket *string `type:"string"`
+ IsTruncated *bool `type:"boolean"`
+ Key *string `min:"1" type:"string"`
+ MaxParts *int64 `type:"integer"`
+ NextPartNumberMarker *int64 `type:"integer"`
+ PartNumberMarker *int64 `type:"integer"`
+ Part []*s3.Part `locationName:"Part" type:"list" flattened:"true"`
+ StorageClass *string `type:"string" enum:"StorageClass"`
+ UploadId *string `type:"string"`
}
func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListPartsResult, code ErrorCode) {
output = &ListPartsResult{
- ListPartsOutput: s3.ListPartsOutput{
- Bucket: input.Bucket,
- Key: objectKey(input.Key),
- UploadId: input.UploadId,
- MaxParts: input.MaxParts, // the maximum number of parts to return.
- PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
- },
+ Bucket: input.Bucket,
+ Key: objectKey(input.Key),
+ UploadId: input.UploadId,
+ MaxParts: input.MaxParts, // the maximum number of parts to return.
+ PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
+ StorageClass: aws.String("STANDARD"),
}
- entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts))
+ entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts))
if err != nil {
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
return nil, ErrNoSuchUpload
}
+ output.IsTruncated = aws.Bool(!isLast)
+
for _, entry := range entries {
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
partNumberString := entry.Name[:len(entry.Name)-len(".part")]
@@ -205,12 +216,15 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err)
continue
}
- output.Parts = append(output.Parts, &s3.Part{
+ output.Part = append(output.Part, &s3.Part{
PartNumber: aws.Int64(int64(partNumber)),
LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0).UTC()),
Size: aws.Int64(int64(filer.FileSize(entry))),
ETag: aws.String("\"" + filer.ETag(entry) + "\""),
})
+ if !isLast {
+ output.NextPartNumberMarker = aws.Int64(int64(partNumber))
+ }
}
}
diff --git a/weed/s3api/filer_multipart_test.go b/weed/s3api/filer_multipart_test.go
index 835665dd6..f2568b6bc 100644
--- a/weed/s3api/filer_multipart_test.go
+++ b/weed/s3api/filer_multipart_test.go
@@ -4,6 +4,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"testing"
+ "time"
)
func TestInitiateMultipartUploadResult(t *testing.T) {
@@ -24,3 +25,25 @@ func TestInitiateMultipartUploadResult(t *testing.T) {
}
}
+
+func TestListPartsResult(t *testing.T) {
+
+ expected := `
+"12345678"1970-01-01T00:00:00Z1123`
+ response := &ListPartsResult{
+ Part: []*s3.Part{
+ {
+ PartNumber: aws.Int64(int64(1)),
+ LastModified: aws.Time(time.Unix(0, 0).UTC()),
+ Size: aws.Int64(int64(123)),
+ ETag: aws.String("\"12345678\""),
+ },
+ },
+ }
+
+ encoded := string(encodeResponse(response))
+ if encoded != expected {
+ t.Errorf("unexpected output: %s\nexpecting:%s", encoded, expected)
+ }
+
+}
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 7f49c320e..ebdbe8245 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -21,10 +21,13 @@ func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chun
}
-func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit uint32) (entries []*filer_pb.Entry, err error) {
+func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit uint32) (entries []*filer_pb.Entry, isLast bool, err error) {
- err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) error {
+ err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLastEntry bool) error {
entries = append(entries, entry)
+ if isLastEntry {
+ isLast = true
+ }
return nil
}, startFrom, inclusive, limit)
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index 816db04f9..a014242c0 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -25,7 +25,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
var response ListAllMyBucketsResult
- entries, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32)
+ entries, _, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32)
if err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)