diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 7dd68466e..e8491fce7 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -1,9 +1,12 @@ package s3api import ( + "cmp" "encoding/hex" "encoding/xml" "fmt" + "github.com/seaweedfs/seaweedfs/weed/stats" + "golang.org/x/exp/slices" "math" "path/filepath" "sort" @@ -11,18 +14,18 @@ import ( "strings" "time" - "github.com/google/uuid" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "golang.org/x/exp/slices" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" + "github.com/google/uuid" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) +const multipartExt = ".part" + type InitiateMultipartUploadResult struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"` s3.CreateMultipartUploadOutput @@ -71,74 +74,97 @@ type CompleteMultipartUploadResult struct { func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { glog.V(2).Infof("completeMultipartUpload input %v", input) - - completedParts := parts.Parts - slices.SortFunc(completedParts, func(a, b CompletedPart) int { - return a.PartNumber - b.PartNumber - }) - + completedPartNumbers := []int{} + completedPartMap := make(map[int][]string) + for _, part := range parts.Parts { + if _, ok := completedPartMap[part.PartNumber]; !ok { + completedPartNumbers = append(completedPartNumbers, part.PartNumber) + } + completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag) + } uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList) if err != nil || len(entries) == 0 { glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() return nil, s3err.ErrNoSuchUpload } pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId) if err != nil { glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() return nil, s3err.ErrNoSuchUpload } + deleteEntries := []*filer_pb.Entry{} partEntries := make(map[int][]*filer_pb.Entry, len(entries)) for _, entry := range entries { + foundEntry := false glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name) - if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { - var partNumberString string - index := strings.Index(entry.Name, "_") - if index != -1 { - partNumberString = entry.Name[:index] + if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) { + continue + } + partNumber, err := parsePartNumber(entry.Name) + if err != nil { + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc() + glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", entry.Name, err) + continue + } + completedPartsByNumber, ok := completedPartMap[partNumber] + if !ok { + continue + } + for _, partETag := range completedPartsByNumber { + partETag = strings.Trim(partETag, `"`) + entryETag := hex.EncodeToString(entry.Attributes.GetMd5()) + if partETag != "" && len(partETag) == 32 && entryETag != "" { + if entryETag != partETag { + glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc() + continue + } } else { - partNumberString = entry.Name[:len(entry.Name)-len(".part")] + glog.Warningf("invalid complete etag %s, partEtag %s", partETag, entryETag) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc() } - partNumber, err := strconv.Atoi(partNumberString) - if err != nil { - glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", partNumberString, err) + if len(entry.Chunks) == 0 { + glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc() continue } //there maybe multi same part, because of client retry partEntries[partNumber] = append(partEntries[partNumber], entry) + foundEntry = true + } + if !foundEntry { + deleteEntries = append(deleteEntries, entry) } - } mime := pentry.Attributes.Mime var finalParts []*filer_pb.FileChunk var offset int64 - var deleteEntries []*filer_pb.Entry - for _, part := range completedParts { - entries := partEntries[part.PartNumber] - // check whether completedParts is more than received parts - if len(entries) == 0 { - glog.Errorf("part %d has no entry", part.PartNumber) + sort.Ints(completedPartNumbers) + for _, partNumber := range completedPartNumbers { + partEntriesByNumber, ok := partEntries[partNumber] + if !ok { + glog.Errorf("part %d has no entry", partNumber) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc() return nil, s3err.ErrInvalidPart } - found := false - for _, entry := range entries { + if len(partEntriesByNumber) > 1 { + slices.SortFunc(partEntriesByNumber, func(a, b *filer_pb.Entry) int { + return cmp.Compare(b.Chunks[0].ModifiedTsNs, a.Chunks[0].ModifiedTsNs) + }) + } + for _, entry := range partEntriesByNumber { if found { deleteEntries = append(deleteEntries, entry) - continue - } - - partETag := strings.Trim(part.ETag, `"`) - entryETag := hex.EncodeToString(entry.Attributes.GetMd5()) - glog.Warningf("complete etag %s, partEtag %s", partETag, entryETag) - if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag { - err = fmt.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag) - deleteEntries = append(deleteEntries, entry) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEntryMismatch).Inc() continue } for _, chunk := range entry.GetChunks() { @@ -154,13 +180,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa offset += int64(chunk.Size) } found = true - err = nil } - if err != nil { - glog.Errorf("%s", err) - return nil, s3err.ErrInvalidPart - } - } entryName := filepath.Base(*input.Key) @@ -223,29 +243,15 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return } -func findByPartNumber(fileName string, parts []CompletedPart) (etag string, found bool) { - partNumber, formatErr := strconv.Atoi(fileName[:4]) - if formatErr != nil { - return - } - x := sort.Search(len(parts), func(i int) bool { - return parts[i].PartNumber >= partNumber - }) - if x >= len(parts) { - return - } - if parts[x].PartNumber != partNumber { - return +func parsePartNumber(fileName string) (int, error) { + var partNumberString string + index := strings.Index(fileName, "_") + if index != -1 { + partNumberString = fileName[:index] + } else { + partNumberString = fileName[:len(fileName)-len(multipartExt)] } - y := 0 - for i, part := range parts[x:] { - if part.PartNumber == partNumber { - y = i - } else { - break - } - } - return parts[x+y].ETag, true + return strconv.Atoi(partNumberString) } func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) { @@ -361,7 +367,7 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP StorageClass: aws.String("STANDARD"), } - entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts)) + entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d%s", *input.PartNumberMarker, multipartExt), false, uint32(*input.MaxParts)) if err != nil { glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err) return nil, s3err.ErrNoSuchUpload @@ -373,15 +379,8 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP output.IsTruncated = aws.Bool(!isLast) for _, entry := range entries { - if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { - var partNumberString string - index := strings.Index(entry.Name, "_") - if index != -1 { - partNumberString = entry.Name[:index] - } else { - partNumberString = entry.Name[:len(entry.Name)-len(".part")] - } - partNumber, err := strconv.Atoi(partNumberString) + if strings.HasSuffix(entry.Name, multipartExt) && !entry.IsDirectory { + partNumber, err := parsePartNumber(entry.Name) if err != nil { glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err) continue diff --git a/weed/s3api/filer_multipart_test.go b/weed/s3api/filer_multipart_test.go index e76d903b8..7f75a40de 100644 --- a/weed/s3api/filer_multipart_test.go +++ b/weed/s3api/filer_multipart_test.go @@ -50,88 +50,27 @@ func TestListPartsResult(t *testing.T) { } -func Test_findByPartNumber(t *testing.T) { - type args struct { - fileName string - parts []CompletedPart - } - - parts := []CompletedPart{ - { - ETag: "xxx", - PartNumber: 1, - }, - { - ETag: "lll", - PartNumber: 1, - }, - { - ETag: "yyy", - PartNumber: 3, - }, - { - ETag: "zzz", - PartNumber: 5, - }, - } - +func Test_parsePartNumber(t *testing.T) { tests := []struct { - name string - args args - wantEtag string - wantFound bool + name string + fileName string + partNum int }{ { "first", - args{ - "0001.part", - parts, - }, - "lll", - true, + "0001_uuid.part", + 1, }, { "second", - args{ - "0002.part", - parts, - }, - "", - false, - }, - { - "third", - args{ - "0003.part", - parts, - }, - "yyy", - true, - }, - { - "fourth", - args{ - "0004.part", - parts, - }, - "", - false, - }, - { - "fifth", - args{ - "0005.part", - parts, - }, - "zzz", - true, + "0002.part", + 2, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotEtag, gotFound := findByPartNumber(tt.args.fileName, tt.args.parts) - assert.Equalf(t, tt.wantEtag, gotEtag, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts) - assert.Equalf(t, tt.wantFound, gotFound, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts) + partNumber, _ := parsePartNumber(tt.fileName) + assert.Equalf(t, tt.partNum, partNumber, "parsePartNumber(%v)", tt.fileName) }) } } diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index f61f68e08..83391f047 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -241,7 +241,13 @@ var ( Name: "request_total", Help: "Counter of s3 requests.", }, []string{"type", "code", "bucket"}) - + S3HandlerCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "s3", + Name: "handler_total", + Help: "Counter of s3 server handlers.", + }, []string{"type"}) S3RequestHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: Namespace, @@ -292,6 +298,7 @@ func init() { Gather.MustRegister(VolumeServerResourceGauge) Gather.MustRegister(S3RequestCounter) + Gather.MustRegister(S3HandlerCounter) Gather.MustRegister(S3RequestHistogram) Gather.MustRegister(S3TimeToFirstByteHistogram) } diff --git a/weed/stats/metrics_names.go b/weed/stats/metrics_names.go index ffb0c76f1..cfc0fbeb0 100644 --- a/weed/stats/metrics_names.go +++ b/weed/stats/metrics_names.go @@ -43,4 +43,13 @@ const ( ErrorChunkAssign = "chunkAssign.failed" ErrorReadCache = "read.cache.failed" ErrorReadStream = "read.stream.failed" + + // s3 handler + ErrorCompletedNoSuchUpload = "errorCompletedNoSuchUpload" + ErrorCompletedPartEmpty = "ErrorCompletedPartEmpty" + ErrorCompletedPartNumber = "ErrorCompletedPartNumber" + ErrorCompletedPartNotFound = "errorCompletedPartNotFound" + ErrorCompletedEtagInvalid = "errorCompletedEtagInvalid" + ErrorCompletedEtagMismatch = "errorCompletedEtagMismatch" + ErrorCompletedPartEntryMismatch = "errorCompletedPartEntryMismatch" )