From e88392b50f9a8f7626e832d6957318bca03b5f48 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 15 Aug 2022 00:30:19 -0700 Subject: [PATCH] Fix s3 pagination (#3436) * Revert previous changes * s3: use cursor to track tree traversal fix https://github.com/seaweedfs/seaweedfs/issues/3166 * special cases for empty prefix and empty directory * use constants * address empty folder * undo local changes * fix IsTruncated * adjust counting directories * fix cases when prefix is a directory * s3: handle directory object works for aws --endpoint-url http://127.0.0.1:8333/ s3api list-objects-v2 --bucket test --prefix "fakedir" --- weed/s3api/s3api_objects_list_handlers.go | 244 +++++++++--------- .../s3api/s3api_objects_list_handlers_test.go | 52 ++++ 2 files changed, 181 insertions(+), 115 deletions(-) diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 448b082eb..620969fd6 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -9,7 +9,6 @@ import ( "io" "net/http" "net/url" - "path/filepath" "strconv" "strings" "time" @@ -126,105 +125,87 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ writeSuccessResponseXML(w, r, response) } -func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) { +func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, originalMarker string, delimiter string) (response ListBucketResult, err error) { // convert full path prefix into directory name and prefix for entry name - reqDir, prefix := filepath.Split(originalPrefix) - if strings.HasPrefix(reqDir, "/") { - reqDir = reqDir[1:] - } + requestDir, prefix, marker := normalizePrefixMarker(originalPrefix, originalMarker) bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket) - bucketPrefixLen := len(bucketPrefix) - reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir) - if strings.HasSuffix(reqDir, "/") { - reqDir = strings.TrimSuffix(reqDir, "/") + reqDir := bucketPrefix[:len(bucketPrefix)-1] + if requestDir != "" { + reqDir = fmt.Sprintf("%s%s", bucketPrefix, requestDir) } var contents []ListEntry var commonPrefixes []PrefixEntry - var isTruncated bool var doErr error var nextMarker string + cursor := &ListingCursor{ + maxKeys: maxKeys, + } // check filer err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, false, false, bucketPrefixLen, func(dir string, entry *filer_pb.Entry) { + nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, cursor, marker, delimiter, false, func(dir string, entry *filer_pb.Entry) { if entry.IsDirectory { - if delimiter == "/" { + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html + if delimiter == "/" { // A response can contain CommonPrefixes only if you specify a delimiter. commonPrefixes = append(commonPrefixes, PrefixEntry{ - Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[bucketPrefixLen:], + Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):], }) - } - if !(entry.IsDirectoryKeyObject() && strings.HasSuffix(entry.Name, "/")) { - return - } - } - storageClass := "STANDARD" - if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { - storageClass = string(v) - } - contents = append(contents, ListEntry{ - Key: fmt.Sprintf("%s/%s", dir, entry.Name)[bucketPrefixLen:], - LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), - ETag: "\"" + filer.ETag(entry) + "\"", - Size: int64(filer.FileSize(entry)), - Owner: CanonicalUser{ - ID: fmt.Sprintf("%x", entry.Attributes.Uid), - DisplayName: entry.Attributes.UserName, - }, - StorageClass: StorageClass(storageClass), - }) - }) - glog.V(4).Infof("end doListFilerEntries isTruncated:%v nextMarker:%v reqDir: %v prefix: %v", isTruncated, nextMarker, reqDir, prefix) - if doErr != nil { - return doErr - } - - if !isTruncated { - nextMarker = "" - } - - if len(contents) == 0 && len(commonPrefixes) == 0 && maxKeys > 0 { - if strings.HasSuffix(originalPrefix, "/") && prefix == "" { - reqDir, prefix = filepath.Split(strings.TrimSuffix(reqDir, "/")) - reqDir = strings.TrimSuffix(reqDir, "/") - } - _, _, _, doErr = s3a.doListFilerEntries(client, reqDir, prefix, 1, prefix, delimiter, true, false, bucketPrefixLen, func(dir string, entry *filer_pb.Entry) { - if entry.IsDirectoryKeyObject() && entry.Name == prefix { - storageClass := "STANDARD" - if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { - storageClass = string(v) - } + //All of the keys (up to 1,000) rolled up into a common prefix count as a single return when calculating the number of returns. + cursor.maxKeys-- + } else if entry.IsDirectoryKeyObject() { contents = append(contents, ListEntry{ - Key: fmt.Sprintf("%s/%s/", dir, entry.Name)[bucketPrefixLen:], + Key: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):], LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), - ETag: "\"" + fmt.Sprintf("%x", entry.Attributes.Md5) + "\"", - Size: int64(filer.FileSize(entry)), + ETag: "\"" + filer.ETag(entry) + "\"", Owner: CanonicalUser{ ID: fmt.Sprintf("%x", entry.Attributes.Uid), DisplayName: entry.Attributes.UserName, }, - StorageClass: StorageClass(storageClass), + StorageClass: "STANDARD", }) + cursor.maxKeys-- } - }) - if doErr != nil { - return doErr + } else { + storageClass := "STANDARD" + if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { + storageClass = string(v) + } + contents = append(contents, ListEntry{ + Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):], + LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), + ETag: "\"" + filer.ETag(entry) + "\"", + Size: int64(filer.FileSize(entry)), + Owner: CanonicalUser{ + ID: fmt.Sprintf("%x", entry.Attributes.Uid), + DisplayName: entry.Attributes.UserName, + }, + StorageClass: StorageClass(storageClass), + }) + cursor.maxKeys-- } + }) + if doErr != nil { + return doErr } - if len(nextMarker) > 0 { - nextMarker = nextMarker[bucketPrefixLen:] + if !cursor.isTruncated { + nextMarker = "" + } else { + if requestDir != "" { + nextMarker = requestDir + "/" + nextMarker + } } response = ListBucketResult{ Name: bucket, Prefix: originalPrefix, - Marker: marker, + Marker: originalMarker, NextMarker: nextMarker, MaxKeys: maxKeys, Delimiter: delimiter, - IsTruncated: isTruncated, + IsTruncated: cursor.isTruncated, Contents: contents, CommonPrefixes: commonPrefixes, } @@ -235,7 +216,64 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m return } -func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, inclusiveStartFrom bool, subEntries bool, bucketPrefixLen int, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) { +type ListingCursor struct { + maxKeys int + isTruncated bool +} + +// the prefix and marker may be in different directories +// normalizePrefixMarker ensures the prefix and marker both starts from the same directory +func normalizePrefixMarker(prefix, marker string) (alignedDir, alignedPrefix, alignedMarker string) { + // alignedDir should not end with "/" + // alignedDir, alignedPrefix, alignedMarker should only have "/" in middle + prefix = strings.TrimLeft(prefix, "/") + marker = strings.TrimLeft(marker, "/") + if prefix == "" { + return "", "", marker + } + if marker == "" { + alignedDir, alignedPrefix = toDirAndName(prefix) + return + } + if !strings.HasPrefix(marker, prefix) { + // something wrong + return "", prefix, marker + } + if strings.HasPrefix(marker, prefix+"/") { + alignedDir = prefix + alignedPrefix = "" + alignedMarker = marker[len(alignedDir)+1:] + return + } + + alignedDir, alignedPrefix = toDirAndName(prefix) + if alignedDir != "" { + alignedMarker = marker[len(alignedDir)+1:] + } else { + alignedMarker = marker + } + return +} +func toDirAndName(dirAndName string) (dir, name string) { + sepIndex := strings.LastIndex(dirAndName, "/") + if sepIndex >= 0 { + dir, name = dirAndName[0:sepIndex], dirAndName[sepIndex+1:] + } else { + name = dirAndName + } + return +} +func toParentAndDescendants(dirAndName string) (dir, name string) { + sepIndex := strings.Index(dirAndName, "/") + if sepIndex >= 0 { + dir, name = dirAndName[0:sepIndex], dirAndName[sepIndex+1:] + } else { + name = dirAndName + } + return +} + +func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) { // invariants // prefix and marker should be under dir, marker may contain "/" // maxKeys should be updated for each recursion @@ -243,37 +281,23 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d if prefix == "/" && delimiter == "/" { return } - if maxKeys <= 0 { + if cursor.maxKeys <= 0 { return } if strings.Contains(marker, "/") { - if strings.HasSuffix(marker, "/") { - marker = strings.TrimSuffix(marker, "/") - } - sepIndex := strings.Index(marker, "/") - if sepIndex != -1 { - subPrefix, subMarker := marker[0:sepIndex], marker[sepIndex+1:] - var subDir string - if len(dir) > bucketPrefixLen && dir[bucketPrefixLen:] == subPrefix { - subDir = dir - } else { - subDir = fmt.Sprintf("%s/%s", dir, subPrefix) - } - subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, subDir, "", maxKeys, subMarker, delimiter, false, false, bucketPrefixLen, eachEntryFn) - if subErr != nil { - err = subErr - return - } - counter += subCounter - isTruncated = isTruncated || subIsTruncated - maxKeys -= subCounter - nextMarker = subNextMarker - // finished processing this sub directory - marker = subPrefix + subDir, subMarker := toParentAndDescendants(marker) + // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker) + subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", cursor, subMarker, delimiter, false, eachEntryFn) + if subErr != nil { + err = subErr + return } + nextMarker = subDir + "/" + subNextMarker + // finished processing this sub directory + marker = subDir } - if maxKeys <= 0 { + if cursor.maxKeys <= 0 { return } @@ -281,7 +305,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d request := &filer_pb.ListEntriesRequest{ Directory: dir, Prefix: prefix, - Limit: uint32(maxKeys + 2), // bucket root directory needs to skip additional s3_constants.MultipartUploadsFolder folder + Limit: uint32(cursor.maxKeys + 2), // bucket root directory needs to skip additional s3_constants.MultipartUploadsFolder folder StartFromFileName: marker, InclusiveStartFrom: inclusiveStartFrom, } @@ -304,38 +328,31 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d return } } - if counter >= maxKeys { - isTruncated = true + if cursor.maxKeys <= 0 { + cursor.isTruncated = true return } entry := resp.Entry - nextMarker = dir + "/" + entry.Name + nextMarker = entry.Name if entry.IsDirectory { // println("ListEntries", dir, "dir:", entry.Name) if entry.Name == s3_constants.MultipartUploadsFolder { // FIXME no need to apply to all directories. this extra also affects maxKeys continue } - if delimiter == "" { + if delimiter != "/" { eachEntryFn(dir, entry) - // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter) - subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, false, true, bucketPrefixLen, eachEntryFn) + subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", cursor, "", delimiter, false, eachEntryFn) if subErr != nil { err = fmt.Errorf("doListFilerEntries2: %v", subErr) return } - // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated) - if subCounter == 0 && entry.IsDirectoryKeyObject() { - entry.Name += "/" - eachEntryFn(dir, entry) - counter++ - } - counter += subCounter - nextMarker = subNextMarker - if subIsTruncated { - isTruncated = true + // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "subNextMarker", subNextMarker) + nextMarker = entry.Name + "/" + subNextMarker + if cursor.isTruncated { return } - } else if delimiter == "/" { + // println("doListFilerEntries2 nextMarker", nextMarker) + } else { var isEmpty bool if !s3a.option.AllowEmptyFolder && !entry.IsDirectoryKeyObject() { if isEmpty, err = s3a.ensureDirectoryAllEmpty(client, dir, entry.Name); err != nil { @@ -343,15 +360,12 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d } } if !isEmpty { - nextMarker += "/" eachEntryFn(dir, entry) - counter++ } } - } else if !(delimiter == "/" && subEntries) { - // println("ListEntries", dir, "file:", entry.Name) + } else { eachEntryFn(dir, entry) - counter++ + // println("ListEntries", dir, "file:", entry.Name, "maxKeys", cursor.maxKeys) } } return diff --git a/weed/s3api/s3api_objects_list_handlers_test.go b/weed/s3api/s3api_objects_list_handlers_test.go index 8734ff2ff..585aa40b1 100644 --- a/weed/s3api/s3api_objects_list_handlers_test.go +++ b/weed/s3api/s3api_objects_list_handlers_test.go @@ -2,6 +2,7 @@ package s3api import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/stretchr/testify/assert" "testing" "time" ) @@ -37,3 +38,54 @@ func TestListObjectsHandler(t *testing.T) { t.Errorf("unexpected output: %s\nexpecting:%s", encoded, expected) } } + +func Test_normalizePrefixMarker(t *testing.T) { + type args struct { + prefix string + marker string + } + tests := []struct { + name string + args args + wantAlignedDir string + wantAlignedPrefix string + wantAlignedMarker string + }{ + {"prefix is a directory", + args{"/parentDir/data/", + ""}, + "parentDir/data", + "", + "", + }, + {"normal case", + args{"/parentDir/data/0", + "parentDir/data/0e/0e149049a2137b0cc12e"}, + "parentDir/data", + "0", + "0e/0e149049a2137b0cc12e", + }, + {"empty prefix", + args{"", + "parentDir/data/0e/0e149049a2137b0cc12e"}, + "", + "", + "parentDir/data/0e/0e149049a2137b0cc12e", + }, + {"empty directory", + args{"parent", + "parentDir/data/0e/0e149049a2137b0cc12e"}, + "", + "parent", + "parentDir/data/0e/0e149049a2137b0cc12e", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotAlignedDir, gotAlignedPrefix, gotAlignedMarker := normalizePrefixMarker(tt.args.prefix, tt.args.marker) + assert.Equalf(t, tt.wantAlignedDir, gotAlignedDir, "normalizePrefixMarker(%v, %v)", tt.args.prefix, tt.args.marker) + assert.Equalf(t, tt.wantAlignedPrefix, gotAlignedPrefix, "normalizePrefixMarker(%v, %v)", tt.args.prefix, tt.args.marker) + assert.Equalf(t, tt.wantAlignedMarker, gotAlignedMarker, "normalizePrefixMarker(%v, %v)", tt.args.prefix, tt.args.marker) + }) + } +}