Browse Source

[s3] fix s3 test_multipart_resend_first_finishes_last (#5471)

* try fix s3 test
https://github.com/seaweedfs/seaweedfs/pull/5466

* add error handler metrics

* refactor

* refactor multipartExt

* delete bad entry parts
pull/5474/head
Konstantin Lebedev 9 months ago
committed by GitHub
parent
commit
d42a04cceb
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 149
      weed/s3api/filer_multipart.go
  2. 81
      weed/s3api/filer_multipart_test.go
  3. 9
      weed/stats/metrics.go
  4. 9
      weed/stats/metrics_names.go

149
weed/s3api/filer_multipart.go

@ -1,9 +1,12 @@
package s3api package s3api
import ( import (
"cmp"
"encoding/hex" "encoding/hex"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
"golang.org/x/exp/slices"
"math" "math"
"path/filepath" "path/filepath"
"sort" "sort"
@ -11,18 +14,18 @@ import (
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"golang.org/x/exp/slices"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
) )
const multipartExt = ".part"
type InitiateMultipartUploadResult struct { type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"` XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"`
s3.CreateMultipartUploadOutput s3.CreateMultipartUploadOutput
@ -71,74 +74,97 @@ type CompleteMultipartUploadResult struct {
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("completeMultipartUpload input %v", input) glog.V(2).Infof("completeMultipartUpload input %v", input)
completedParts := parts.Parts
slices.SortFunc(completedParts, func(a, b CompletedPart) int {
return a.PartNumber - b.PartNumber
})
completedPartNumbers := []int{}
completedPartMap := make(map[int][]string)
for _, part := range parts.Parts {
if _, ok := completedPartMap[part.PartNumber]; !ok {
completedPartNumbers = append(completedPartNumbers, part.PartNumber)
}
completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag)
}
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList) entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList)
if err != nil || len(entries) == 0 { if err != nil || len(entries) == 0 {
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload return nil, s3err.ErrNoSuchUpload
} }
pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId) pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId)
if err != nil { if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err) glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload return nil, s3err.ErrNoSuchUpload
} }
deleteEntries := []*filer_pb.Entry{}
partEntries := make(map[int][]*filer_pb.Entry, len(entries)) partEntries := make(map[int][]*filer_pb.Entry, len(entries))
for _, entry := range entries { for _, entry := range entries {
foundEntry := false
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name) glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name)
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
var partNumberString string
index := strings.Index(entry.Name, "_")
if index != -1 {
partNumberString = entry.Name[:index]
if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) {
continue
}
partNumber, err := parsePartNumber(entry.Name)
if err != nil {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc()
glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", entry.Name, err)
continue
}
completedPartsByNumber, ok := completedPartMap[partNumber]
if !ok {
continue
}
for _, partETag := range completedPartsByNumber {
partETag = strings.Trim(partETag, `"`)
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
if partETag != "" && len(partETag) == 32 && entryETag != "" {
if entryETag != partETag {
glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc()
continue
}
} else { } else {
partNumberString = entry.Name[:len(entry.Name)-len(".part")]
glog.Warningf("invalid complete etag %s, partEtag %s", partETag, entryETag)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc()
} }
partNumber, err := strconv.Atoi(partNumberString)
if err != nil {
glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", partNumberString, err)
if len(entry.Chunks) == 0 {
glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc()
continue continue
} }
//there maybe multi same part, because of client retry //there maybe multi same part, because of client retry
partEntries[partNumber] = append(partEntries[partNumber], entry) partEntries[partNumber] = append(partEntries[partNumber], entry)
foundEntry = true
}
if !foundEntry {
deleteEntries = append(deleteEntries, entry)
} }
} }
mime := pentry.Attributes.Mime mime := pentry.Attributes.Mime
var finalParts []*filer_pb.FileChunk var finalParts []*filer_pb.FileChunk
var offset int64 var offset int64
var deleteEntries []*filer_pb.Entry
for _, part := range completedParts {
entries := partEntries[part.PartNumber]
// check whether completedParts is more than received parts
if len(entries) == 0 {
glog.Errorf("part %d has no entry", part.PartNumber)
sort.Ints(completedPartNumbers)
for _, partNumber := range completedPartNumbers {
partEntriesByNumber, ok := partEntries[partNumber]
if !ok {
glog.Errorf("part %d has no entry", partNumber)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc()
return nil, s3err.ErrInvalidPart return nil, s3err.ErrInvalidPart
} }
found := false found := false
for _, entry := range entries {
if len(partEntriesByNumber) > 1 {
slices.SortFunc(partEntriesByNumber, func(a, b *filer_pb.Entry) int {
return cmp.Compare(b.Chunks[0].ModifiedTsNs, a.Chunks[0].ModifiedTsNs)
})
}
for _, entry := range partEntriesByNumber {
if found { if found {
deleteEntries = append(deleteEntries, entry) deleteEntries = append(deleteEntries, entry)
continue
}
partETag := strings.Trim(part.ETag, `"`)
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
glog.Warningf("complete etag %s, partEtag %s", partETag, entryETag)
if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag {
err = fmt.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
deleteEntries = append(deleteEntries, entry)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEntryMismatch).Inc()
continue continue
} }
for _, chunk := range entry.GetChunks() { for _, chunk := range entry.GetChunks() {
@ -154,13 +180,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
offset += int64(chunk.Size) offset += int64(chunk.Size)
} }
found = true found = true
err = nil
} }
if err != nil {
glog.Errorf("%s", err)
return nil, s3err.ErrInvalidPart
}
} }
entryName := filepath.Base(*input.Key) entryName := filepath.Base(*input.Key)
@ -223,29 +243,15 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return return
} }
func findByPartNumber(fileName string, parts []CompletedPart) (etag string, found bool) {
partNumber, formatErr := strconv.Atoi(fileName[:4])
if formatErr != nil {
return
}
x := sort.Search(len(parts), func(i int) bool {
return parts[i].PartNumber >= partNumber
})
if x >= len(parts) {
return
}
if parts[x].PartNumber != partNumber {
return
func parsePartNumber(fileName string) (int, error) {
var partNumberString string
index := strings.Index(fileName, "_")
if index != -1 {
partNumberString = fileName[:index]
} else {
partNumberString = fileName[:len(fileName)-len(multipartExt)]
} }
y := 0
for i, part := range parts[x:] {
if part.PartNumber == partNumber {
y = i
} else {
break
}
}
return parts[x+y].ETag, true
return strconv.Atoi(partNumberString)
} }
func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) { func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) {
@ -361,7 +367,7 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
StorageClass: aws.String("STANDARD"), StorageClass: aws.String("STANDARD"),
} }
entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts))
entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d%s", *input.PartNumberMarker, multipartExt), false, uint32(*input.MaxParts))
if err != nil { if err != nil {
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err) glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrNoSuchUpload return nil, s3err.ErrNoSuchUpload
@ -373,15 +379,8 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
output.IsTruncated = aws.Bool(!isLast) output.IsTruncated = aws.Bool(!isLast)
for _, entry := range entries { for _, entry := range entries {
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
var partNumberString string
index := strings.Index(entry.Name, "_")
if index != -1 {
partNumberString = entry.Name[:index]
} else {
partNumberString = entry.Name[:len(entry.Name)-len(".part")]
}
partNumber, err := strconv.Atoi(partNumberString)
if strings.HasSuffix(entry.Name, multipartExt) && !entry.IsDirectory {
partNumber, err := parsePartNumber(entry.Name)
if err != nil { if err != nil {
glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err) glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err)
continue continue

81
weed/s3api/filer_multipart_test.go

@ -50,88 +50,27 @@ func TestListPartsResult(t *testing.T) {
} }
func Test_findByPartNumber(t *testing.T) {
type args struct {
fileName string
parts []CompletedPart
}
parts := []CompletedPart{
{
ETag: "xxx",
PartNumber: 1,
},
{
ETag: "lll",
PartNumber: 1,
},
{
ETag: "yyy",
PartNumber: 3,
},
{
ETag: "zzz",
PartNumber: 5,
},
}
func Test_parsePartNumber(t *testing.T) {
tests := []struct { tests := []struct {
name string
args args
wantEtag string
wantFound bool
name string
fileName string
partNum int
}{ }{
{ {
"first", "first",
args{
"0001.part",
parts,
},
"lll",
true,
"0001_uuid.part",
1,
}, },
{ {
"second", "second",
args{
"0002.part",
parts,
},
"",
false,
},
{
"third",
args{
"0003.part",
parts,
},
"yyy",
true,
},
{
"fourth",
args{
"0004.part",
parts,
},
"",
false,
},
{
"fifth",
args{
"0005.part",
parts,
},
"zzz",
true,
"0002.part",
2,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
gotEtag, gotFound := findByPartNumber(tt.args.fileName, tt.args.parts)
assert.Equalf(t, tt.wantEtag, gotEtag, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts)
assert.Equalf(t, tt.wantFound, gotFound, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts)
partNumber, _ := parsePartNumber(tt.fileName)
assert.Equalf(t, tt.partNum, partNumber, "parsePartNumber(%v)", tt.fileName)
}) })
} }
} }

9
weed/stats/metrics.go

@ -241,7 +241,13 @@ var (
Name: "request_total", Name: "request_total",
Help: "Counter of s3 requests.", Help: "Counter of s3 requests.",
}, []string{"type", "code", "bucket"}) }, []string{"type", "code", "bucket"})
S3HandlerCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: "s3",
Name: "handler_total",
Help: "Counter of s3 server handlers.",
}, []string{"type"})
S3RequestHistogram = prometheus.NewHistogramVec( S3RequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
Namespace: Namespace, Namespace: Namespace,
@ -292,6 +298,7 @@ func init() {
Gather.MustRegister(VolumeServerResourceGauge) Gather.MustRegister(VolumeServerResourceGauge)
Gather.MustRegister(S3RequestCounter) Gather.MustRegister(S3RequestCounter)
Gather.MustRegister(S3HandlerCounter)
Gather.MustRegister(S3RequestHistogram) Gather.MustRegister(S3RequestHistogram)
Gather.MustRegister(S3TimeToFirstByteHistogram) Gather.MustRegister(S3TimeToFirstByteHistogram)
} }

9
weed/stats/metrics_names.go

@ -43,4 +43,13 @@ const (
ErrorChunkAssign = "chunkAssign.failed" ErrorChunkAssign = "chunkAssign.failed"
ErrorReadCache = "read.cache.failed" ErrorReadCache = "read.cache.failed"
ErrorReadStream = "read.stream.failed" ErrorReadStream = "read.stream.failed"
// s3 handler
ErrorCompletedNoSuchUpload = "errorCompletedNoSuchUpload"
ErrorCompletedPartEmpty = "ErrorCompletedPartEmpty"
ErrorCompletedPartNumber = "ErrorCompletedPartNumber"
ErrorCompletedPartNotFound = "errorCompletedPartNotFound"
ErrorCompletedEtagInvalid = "errorCompletedEtagInvalid"
ErrorCompletedEtagMismatch = "errorCompletedEtagMismatch"
ErrorCompletedPartEntryMismatch = "errorCompletedPartEntryMismatch"
) )
Loading…
Cancel
Save