|
|
|
@ -247,13 +247,72 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa |
|
|
|
return nil, s3err.ErrInternalError |
|
|
|
} |
|
|
|
|
|
|
|
output = &CompleteMultipartUploadResult{ |
|
|
|
CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ |
|
|
|
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), |
|
|
|
Bucket: input.Bucket, |
|
|
|
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), |
|
|
|
Key: objectKey(input.Key), |
|
|
|
}, |
|
|
|
// Check if versioning is enabled for this bucket
|
|
|
|
versioningEnabled, vErr := s3a.isVersioningEnabled(*input.Bucket) |
|
|
|
if vErr == nil && versioningEnabled { |
|
|
|
// For versioned buckets, create a version and return the version ID
|
|
|
|
versionId := generateVersionId() |
|
|
|
versionFileName := s3a.getVersionFileName(versionId) |
|
|
|
versionDir := dirName + "/" + entryName + ".versions" |
|
|
|
|
|
|
|
// Move the completed object to the versions directory
|
|
|
|
err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) { |
|
|
|
if versionEntry.Extended == nil { |
|
|
|
versionEntry.Extended = make(map[string][]byte) |
|
|
|
} |
|
|
|
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) |
|
|
|
versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) |
|
|
|
for k, v := range pentry.Extended { |
|
|
|
if k != "key" { |
|
|
|
versionEntry.Extended[k] = v |
|
|
|
} |
|
|
|
} |
|
|
|
if pentry.Attributes.Mime != "" { |
|
|
|
versionEntry.Attributes.Mime = pentry.Attributes.Mime |
|
|
|
} else if mime != "" { |
|
|
|
versionEntry.Attributes.Mime = mime |
|
|
|
} |
|
|
|
versionEntry.Attributes.FileSize = uint64(offset) |
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err) |
|
|
|
return nil, s3err.ErrInternalError |
|
|
|
} |
|
|
|
|
|
|
|
// Create a delete marker for the main object (latest version)
|
|
|
|
err = s3a.mkFile(dirName, entryName, nil, func(mainEntry *filer_pb.Entry) { |
|
|
|
if mainEntry.Extended == nil { |
|
|
|
mainEntry.Extended = make(map[string][]byte) |
|
|
|
} |
|
|
|
mainEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) |
|
|
|
mainEntry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("false") // This is the latest version, not a delete marker
|
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
glog.Errorf("completeMultipartUpload: failed to update main entry: %v", err) |
|
|
|
return nil, s3err.ErrInternalError |
|
|
|
} |
|
|
|
|
|
|
|
output = &CompleteMultipartUploadResult{ |
|
|
|
CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ |
|
|
|
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), |
|
|
|
Bucket: input.Bucket, |
|
|
|
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), |
|
|
|
Key: objectKey(input.Key), |
|
|
|
VersionId: aws.String(versionId), |
|
|
|
}, |
|
|
|
} |
|
|
|
} else { |
|
|
|
// For non-versioned buckets, return response without VersionId
|
|
|
|
output = &CompleteMultipartUploadResult{ |
|
|
|
CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ |
|
|
|
Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), |
|
|
|
Bucket: input.Bucket, |
|
|
|
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), |
|
|
|
Key: objectKey(input.Key), |
|
|
|
}, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
for _, deleteEntry := range deleteEntries { |
|
|
|
|