|
@ -25,7 +25,10 @@ import ( |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
const multipartExt = ".part" |
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
|
multipartExt = ".part" |
|
|
|
|
|
multiPartMinSize = 5 * 1024 * 1024 |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
type InitiateMultipartUploadResult struct { |
|
|
type InitiateMultipartUploadResult struct { |
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"` |
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"` |
|
@ -75,6 +78,10 @@ type CompleteMultipartUploadResult struct { |
|
|
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { |
|
|
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { |
|
|
|
|
|
|
|
|
glog.V(2).Infof("completeMultipartUpload input %v", input) |
|
|
glog.V(2).Infof("completeMultipartUpload input %v", input) |
|
|
|
|
|
if len(parts.Parts) == 0 { |
|
|
|
|
|
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() |
|
|
|
|
|
return nil, s3err.ErrNoSuchUpload |
|
|
|
|
|
} |
|
|
completedPartNumbers := []int{} |
|
|
completedPartNumbers := []int{} |
|
|
completedPartMap := make(map[int][]string) |
|
|
completedPartMap := make(map[int][]string) |
|
|
for _, part := range parts.Parts { |
|
|
for _, part := range parts.Parts { |
|
@ -83,8 +90,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa |
|
|
} |
|
|
} |
|
|
completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag) |
|
|
completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag) |
|
|
} |
|
|
} |
|
|
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId |
|
|
|
|
|
|
|
|
sort.Ints(completedPartNumbers) |
|
|
|
|
|
|
|
|
|
|
|
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId |
|
|
entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList) |
|
|
entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) |
|
|
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) |
|
@ -118,6 +126,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa |
|
|
} |
|
|
} |
|
|
deleteEntries := []*filer_pb.Entry{} |
|
|
deleteEntries := []*filer_pb.Entry{} |
|
|
partEntries := make(map[int][]*filer_pb.Entry, len(entries)) |
|
|
partEntries := make(map[int][]*filer_pb.Entry, len(entries)) |
|
|
|
|
|
entityTooSmall := false |
|
|
for _, entry := range entries { |
|
|
for _, entry := range entries { |
|
|
foundEntry := false |
|
|
foundEntry := false |
|
|
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name) |
|
|
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name) |
|
@ -156,16 +165,23 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa |
|
|
partEntries[partNumber] = append(partEntries[partNumber], entry) |
|
|
partEntries[partNumber] = append(partEntries[partNumber], entry) |
|
|
foundEntry = true |
|
|
foundEntry = true |
|
|
} |
|
|
} |
|
|
if !foundEntry { |
|
|
|
|
|
|
|
|
if foundEntry { |
|
|
|
|
|
if len(completedPartNumbers) > 1 && partNumber != completedPartNumbers[len(completedPartNumbers)-1] && |
|
|
|
|
|
entry.Attributes.FileSize < multiPartMinSize { |
|
|
|
|
|
glog.Warningf("completeMultipartUpload %s part file size less 5mb", entry.Name) |
|
|
|
|
|
entityTooSmall = true |
|
|
|
|
|
} |
|
|
|
|
|
} else { |
|
|
deleteEntries = append(deleteEntries, entry) |
|
|
deleteEntries = append(deleteEntries, entry) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if entityTooSmall { |
|
|
|
|
|
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompleteEntityTooSmall).Inc() |
|
|
|
|
|
return nil, s3err.ErrEntityTooSmall |
|
|
|
|
|
} |
|
|
mime := pentry.Attributes.Mime |
|
|
mime := pentry.Attributes.Mime |
|
|
|
|
|
|
|
|
var finalParts []*filer_pb.FileChunk |
|
|
var finalParts []*filer_pb.FileChunk |
|
|
var offset int64 |
|
|
var offset int64 |
|
|
sort.Ints(completedPartNumbers) |
|
|
|
|
|
for _, partNumber := range completedPartNumbers { |
|
|
for _, partNumber := range completedPartNumbers { |
|
|
partEntriesByNumber, ok := partEntries[partNumber] |
|
|
partEntriesByNumber, ok := partEntries[partNumber] |
|
|
if !ok { |
|
|
if !ok { |
|
|