From 1d0361d936d9409826eebb2987a302b94eed22eb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 22 Dec 2025 15:50:13 -0800 Subject: [PATCH] Fix: Eliminate duplicate versioned objects in S3 list operations (#7850) * Fix: Eliminate duplicate versioned objects in S3 list operations - Move versioned directory processing outside of pagination loop to process only once - Add deduplication during .versions directory collection phase - Fix directory handling to not add directories to results in recursive mode - Directly add versioned entries to contents array instead of using callback Fixes issue where AWS S3 list operations returned duplicated versioned objects (e.g., 1000 duplicate entries from 4 unique objects). Now correctly returns only the unique logical entries without duplication. Verified with: aws s3api list-objects --endpoint-url http://localhost:8333 --bucket pm-itatiaiucu-01 Returns exactly 4 entries (ClientInfo.xml and Repository from 2 Veeam backup folders) * Refactor: Process .versions directories immediately when encountered Instead of collecting .versions directories and processing them after the pagination loop, process them immediately when encountered during traversal. Benefits: - Simpler code: removed versionedDirEntry struct and collection array - More efficient: no need to store and iterate through collected entries - Same O(V) complexity but with less memory overhead - Clearer logic: processing happens in one pass during traversal Since each .versions directory is only visited once during recursive traversal (we never traverse into them), there's no need for deferred processing or deduplication. * Add comprehensive tests for versioned objects list - TestListObjectsWithVersionedObjects: Tests listing with various delimiters - TestVersionedObjectsNoDuplication: Core test validating no 250x duplication - TestVersionedObjectsWithDeleteMarker: Tests delete marker filtering - TestVersionedObjectsMaxKeys: Tests pagination with versioned objects - TestVersionsDirectoryNotTraversed: Ensures .versions never traversed - Fix existing test signature to match updated doListFilerEntries * style: Fix formatting alignment in versioned objects tests * perf: Optimize path extraction using string indexing Replace multiple strings.Split/Join calls with efficient strings.Index slicing to extract bucket-relative path from directory string. Reduces unnecessary allocations and improves performance in versioned objects listing path construction. * refactor: Address code review feedback from Gemini Code Assist 1. Fix misleading comment about versioned directory processing location. Versioned directories are processed immediately in doListFilerEntries, not deferred to ListObjectsV1Handler. 2. Simplify path extraction logic using explicit bucket path construction instead of index-based string slicing for better readability and maintainability. 3. Add clarifying comment to test callback explaining why production logic is duplicated - necessary because listFilerEntries is not easily testable with filer client injection. * fmt * refactor: Address code review feedback from Copilot - Fix misleading comment about versioned directory processing location (note that processing happens within doListFilerEntries, not at top level) - Add maxKeys validation checks in all test callbacks for consistency - Add maxKeys check before calling eachEntryFn for versioned objects - Improve test documentation to clarify testing approach and avoid apologetic tone * refactor: Address code review feedback from Gemini Code Assist - Remove redundant maxKeys check before eachEntryFn call on line 541 (the loop already checks maxKeys <= 0 at line 502, ensuring quota exists) - Fix pagination pattern consistency in all test callbacks - TestVersionedObjectsNoDuplication: Use cursor.maxKeys <= 0 check and decrement - TestVersionedObjectsWithDeleteMarker: Use cursor.maxKeys <= 0 check and decrement - TestVersionsDirectoryNotTraversed: Use cursor.maxKeys <= 0 check and decrement - Ensures consistent pagination logic across all callbacks matching production behavior * refactor: Address code review suggestions for code quality - Adjust log verbosity from V(5) to V(4) for file additions to reduce noise while maintaining useful debug output during troubleshooting - Remove unused isRecursive parameter from doListFilerEntries function signature and all call sites (not used for any logic decisions) - Consolidate redundant comments about versioned directory handling to reduce documentation duplication These changes improve code maintainability and clarity. * fmt * refactor: Add pagination test and optimize stream processing - Add comprehensive test validation to TestVersionedObjectsMaxKeys that verifies truncation is correctly set when maxKeys is exhausted with more entries available, ensuring proper pagination state - Optimize stream processing in doListFilerEntries by using 'break' instead of 'continue' when quota is exhausted (cursor.maxKeys <= 0) This avoids receiving and discarding entries from the stream when we've already reached the requested limit, improving efficiency --- weed/s3api/s3api_object_handlers_list.go | 99 ++-- weed/s3api/s3api_object_handlers_list_test.go | 2 +- ...api_object_handlers_list_versioned_test.go | 433 ++++++++++++++++++ 3 files changed, 466 insertions(+), 68 deletions(-) create mode 100644 weed/s3api/s3api_object_handlers_list_versioned_test.go diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go index 22a671e67..5e060b008 100644 --- a/weed/s3api/s3api_object_handlers_list.go +++ b/weed/s3api/s3api_object_handlers_list.go @@ -120,7 +120,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ bucket, _ := s3_constants.GetBucketAndObject(r) originalPrefix, marker, delimiter, encodingTypeUrl, maxKeys, allowUnordered, errCode := getListObjectsV1Args(r.URL.Query()) - glog.V(2).Infof("ListObjectsV1Handler bucket=%s prefix=%s", bucket, originalPrefix) + glog.V(2).Infof("ListObjectsV1Handler bucket=%s prefix=%s delimiter=%s maxKeys=%d", bucket, originalPrefix, delimiter, maxKeys) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -203,7 +203,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m for { empty := true - nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, cursor, marker, delimiter, false, func(dir string, entry *filer_pb.Entry) { + nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, cursor, marker, delimiter, false, bucket, func(dir string, entry *filer_pb.Entry) { empty = false dirName, entryName, _ := entryUrlEncode(dir, entry.Name, encodingTypeUrl) if entry.IsDirectory { @@ -307,6 +307,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m } } if !delimiterFound { + glog.V(4).Infof("Adding file to contents: %s", entryName) contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, false, false, s3a.iam)) cursor.maxKeys-- lastEntryWasCommonPrefix = false @@ -439,7 +440,7 @@ func toParentAndDescendants(dirAndName string) (dir, name string) { return } -func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) { +func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, bucket string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) { // invariants // prefix and marker should be under dir, marker may contain "/" // maxKeys should be updated for each recursion @@ -453,7 +454,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d if strings.Contains(marker, "/") { subDir, subMarker := toParentAndDescendants(marker) // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker) - subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", cursor, subMarker, delimiter, false, eachEntryFn) + subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", cursor, subMarker, delimiter, false, bucket, eachEntryFn) if subErr != nil { err = subErr return @@ -486,10 +487,6 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d return } - // Track .versions directories found in this directory for later processing - // Store the full entry to avoid additional getEntry calls (N+1 query optimization) - var versionsDirs []*filer_pb.Entry - for { resp, recvErr := stream.Recv() if recvErr != nil { @@ -504,7 +501,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d if cursor.maxKeys <= 0 { cursor.isTruncated = true - continue + break } // Set nextMarker only when we have quota to process this entry @@ -524,24 +521,42 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d continue } - // Skip .versions directories in regular list operations but track them for logical object creation - // Store the full entry to avoid additional getEntry calls later + // Process .versions directories immediately to create logical versioned object entries + // These directories are never traversed (we continue here), so each is only encountered once if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { - glog.V(4).Infof("Found .versions directory: %s", entry.Name) - versionsDirs = append(versionsDirs, entry) + // Extract object name from .versions directory name + baseObjectName := strings.TrimSuffix(entry.Name, s3_constants.VersionsFolder) + // Construct full object path relative to bucket + bucketFullPath := s3a.option.BucketsPath + "/" + bucket + bucketRelativePath := strings.TrimPrefix(dir, bucketFullPath) + bucketRelativePath = strings.TrimPrefix(bucketRelativePath, "/") + var fullObjectPath string + if bucketRelativePath == "" { + fullObjectPath = baseObjectName + } else { + fullObjectPath = bucketRelativePath + "/" + baseObjectName + } + // Use metadata from the already-fetched .versions directory entry + if latestVersionEntry, err := s3a.getLatestVersionEntryFromDirectoryEntry(bucket, fullObjectPath, entry); err == nil { + eachEntryFn(dir, latestVersionEntry) + } else if !errors.Is(err, ErrDeleteMarker) { + // Log unexpected errors (delete markers are expected) + glog.V(2).Infof("Skipping versioned object %s due to error: %v", fullObjectPath, err) + } continue } if delimiter != "/" || cursor.prefixEndsOnDelimiter { + // When delimiter is empty (recursive mode), recurse into directories but don't add them to results + // Only files and versioned objects should appear in results if cursor.prefixEndsOnDelimiter { cursor.prefixEndsOnDelimiter = false if entry.IsDirectoryKeyObject() { eachEntryFn(dir, entry) } - } else { - eachEntryFn(dir, entry) } - subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", cursor, "", delimiter, false, eachEntryFn) + // Recurse into subdirectory - don't add the directory itself to results + subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", cursor, "", delimiter, false, bucket, eachEntryFn) if subErr != nil { err = fmt.Errorf("doListFilerEntries2: %w", subErr) return @@ -564,57 +579,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d } } - // After processing all regular entries, handle versioned objects - // Create logical entries for objects that have .versions directories - // OPTIMIZATION: Use the already-fetched .versions directory entry to avoid N+1 queries - for _, versionsDir := range versionsDirs { - if cursor.maxKeys <= 0 { - cursor.isTruncated = true - break - } - - // Update nextMarker to ensure pagination advances past this .versions directory - // This is critical to prevent infinite loops when results are truncated - nextMarker = versionsDir.Name - - // Extract object name from .versions directory name (remove .versions suffix) - baseObjectName := strings.TrimSuffix(versionsDir.Name, s3_constants.VersionsFolder) - - // Construct full object path relative to bucket - // dir is something like "/buckets/sea-test-1/Veeam/Backup/vbr/Config" - // we need to get the path relative to bucket: "Veeam/Backup/vbr/Config/Owner" - bucketPath := strings.TrimPrefix(dir, s3a.option.BucketsPath+"/") - bucketName := strings.Split(bucketPath, "/")[0] - - // Remove bucket name from path to get directory within bucket - bucketRelativePath := strings.Join(strings.Split(bucketPath, "/")[1:], "/") - - var fullObjectPath string - if bucketRelativePath == "" { - // Object is at bucket root - fullObjectPath = baseObjectName - } else { - // Object is in subdirectory - fullObjectPath = bucketRelativePath + "/" + baseObjectName - } - - glog.V(4).Infof("Processing versioned object: baseObjectName=%s, bucketRelativePath=%s, fullObjectPath=%s", - baseObjectName, bucketRelativePath, fullObjectPath) - - // OPTIMIZATION: Use metadata from the already-fetched .versions directory entry - // This avoids additional getEntry calls which cause high "find" usage - if latestVersionEntry, err := s3a.getLatestVersionEntryFromDirectoryEntry(bucketName, fullObjectPath, versionsDir); err == nil { - glog.V(4).Infof("Creating logical entry for versioned object: %s", fullObjectPath) - eachEntryFn(dir, latestVersionEntry) - } else if errors.Is(err, ErrDeleteMarker) { - // Expected: latest version is a delete marker, object should not appear in list - glog.V(4).Infof("Skipping versioned object %s: delete marker", fullObjectPath) - } else { - // Unexpected failure: missing metadata, fetch error, etc. - glog.V(3).Infof("Skipping versioned object %s due to error: %v", fullObjectPath, err) - } - } - + // Versioned directories are processed above (lines 524-546) return } diff --git a/weed/s3api/s3api_object_handlers_list_test.go b/weed/s3api/s3api_object_handlers_list_test.go index b24771e8a..37bafbbac 100644 --- a/weed/s3api/s3api_object_handlers_list_test.go +++ b/weed/s3api/s3api_object_handlers_list_test.go @@ -218,7 +218,7 @@ func TestDoListFilerEntries_BucketRootPrefixSlashDelimiterSlash_ListsDirectories cursor := &ListingCursor{maxKeys: 1000} seen := make([]string, 0) - _, err := s3a.doListFilerEntries(client, "/buckets/test-bucket", "/", cursor, "", "/", false, func(dir string, entry *filer_pb.Entry) { + _, err := s3a.doListFilerEntries(client, "/buckets/test-bucket", "/", cursor, "", "/", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { if entry.IsDirectory { seen = append(seen, entry.Name) } diff --git a/weed/s3api/s3api_object_handlers_list_versioned_test.go b/weed/s3api/s3api_object_handlers_list_versioned_test.go new file mode 100644 index 000000000..8252dc4a9 --- /dev/null +++ b/weed/s3api/s3api_object_handlers_list_versioned_test.go @@ -0,0 +1,433 @@ +package s3api + +import ( + "context" + "encoding/hex" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/stretchr/testify/assert" + grpc "google.golang.org/grpc" +) + +// TestListObjectsWithVersionedObjects tests that versioned objects are properly listed +// This validates the fix for duplicate versioned objects issue +func TestListObjectsWithVersionedObjects(t *testing.T) { + now := time.Now().Unix() + + // Create test filer client with versioned objects + filerClient := &testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": { + // Regular directory + { + Name: "folder1", + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + }, + // .versions directory with metadata for versioned object + { + Name: "file1.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1-abc123"), + s3_constants.ExtLatestVersionSizeKey: []byte("1234"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"%s\"", hex.EncodeToString([]byte("test-etag-1")))), + }, + }, + // Another .versions directory + { + Name: "file2.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v2-def456"), + s3_constants.ExtLatestVersionSizeKey: []byte("5678"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"%s\"", hex.EncodeToString([]byte("test-etag-2")))), + }, + }, + }, + "/buckets/test-bucket/folder1": { + // Versioned object in subdirectory + { + Name: "nested.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v3-ghi789"), + s3_constants.ExtLatestVersionSizeKey: []byte("9012"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"%s\"", hex.EncodeToString([]byte("test-etag-3")))), + }, + }, + }, + }, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + tests := []struct { + name string + bucket string + prefix string + delimiter string + expectedCount int + expectedKeys []string + expectedPrefixes []string + }{ + { + name: "List all objects including versioned (no delimiter)", + bucket: "test-bucket", + prefix: "", + delimiter: "", + expectedCount: 3, // file1.txt, file2.txt, folder1/nested.txt + expectedKeys: []string{ + "file1.txt", + "file2.txt", + "folder1/nested.txt", + }, + expectedPrefixes: []string{}, + }, + { + name: "List bucket root with delimiter", + bucket: "test-bucket", + prefix: "", + delimiter: "/", + expectedCount: 2, // file1.txt, file2.txt (folder1/ becomes common prefix) + expectedKeys: []string{ + "file1.txt", + "file2.txt", + }, + expectedPrefixes: []string{ + "folder1/", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Directly call doListFilerEntries with a callback to exercise the versioned objects + // listing logic. The callback mirrors the production listFilerEntries behavior for + // path extraction and accumulation so that this test validates the internal listing + // implementation in isolation from the HTTP layer. + cursor := &ListingCursor{maxKeys: uint16(tt.expectedCount + 10)} + contents := []ListEntry{} + commonPrefixes := []PrefixEntry{} + bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, tt.bucket) + + _, err := s3a.doListFilerEntries(filerClient, bucketPrefix[:len(bucketPrefix)-1], tt.prefix, cursor, "", tt.delimiter, false, tt.bucket, func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + + if entry.IsDirectory { + if tt.delimiter == "/" { + // Extract relative path from bucket prefix + relDir := strings.TrimPrefix(dir, bucketPrefix[:len(bucketPrefix)-1]) + if relDir != "" && relDir[0] == '/' { + relDir = relDir[1:] + } + prefix := relDir + if prefix != "" { + prefix += "/" + } + prefix += entry.Name + "/" + + commonPrefixes = append(commonPrefixes, PrefixEntry{ + Prefix: prefix, + }) + cursor.maxKeys-- + } + } else { + // Extract key from dir and entry name + relDir := strings.TrimPrefix(dir, bucketPrefix[:len(bucketPrefix)-1]) + if relDir != "" && relDir[0] == '/' { + relDir = relDir[1:] + } + key := entry.Name + if relDir != "" { + key = relDir + "/" + entry.Name + } + + contents = append(contents, ListEntry{ + Key: key, + }) + cursor.maxKeys-- + } + }) + + assert.NoError(t, err, "doListFilerEntries should not return error") + assert.Equal(t, tt.expectedCount, len(contents), "Should return correct number of objects") + assert.Equal(t, len(tt.expectedPrefixes), len(commonPrefixes), "Should return correct number of common prefixes") + + // Verify keys + actualKeys := make([]string, len(contents)) + for i, entry := range contents { + actualKeys[i] = entry.Key + } + assert.ElementsMatch(t, tt.expectedKeys, actualKeys, "Should return expected keys") + + // Verify common prefixes + actualPrefixes := make([]string, len(commonPrefixes)) + for i, prefix := range commonPrefixes { + actualPrefixes[i] = prefix.Prefix + } + assert.ElementsMatch(t, tt.expectedPrefixes, actualPrefixes, "Should return expected prefixes") + + // Verify each versioned object has correct version metadata + for _, entry := range contents { + assert.NotEmpty(t, entry.Key, "Versioned object should have key") + } + }) + } +} + +// TestVersionedObjectsNoDuplication ensures that .versions directories are only processed once +// This is the core test for the bug fix - previously versioned objects were duplicated 250x +func TestVersionedObjectsNoDuplication(t *testing.T) { + now := time.Now().Unix() + + // Create a single .versions directory + filerClient := &testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": { + { + Name: "test.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1-test"), + s3_constants.ExtLatestVersionSizeKey: []byte("100"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte("\"test-etag\""), + }, + }, + }, + }, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + cursor := &ListingCursor{maxKeys: uint16(1000)} + contents := []ListEntry{} + _, err := s3a.doListFilerEntries(filerClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + contents = append(contents, ListEntry{Key: entry.Name}) + cursor.maxKeys-- + }) + + assert.NoError(t, err) + assert.Equal(t, 1, len(contents), "Should return exactly 1 object (no duplicates)") + assert.Equal(t, "test.txt", contents[0].Key, "Should return correct key") +} + +// TestVersionedObjectsWithDeleteMarker tests that objects with delete markers are not listed +func TestVersionedObjectsWithDeleteMarker(t *testing.T) { + now := time.Now().Unix() + + filerClient := &testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": { + // Active versioned object + { + Name: "active.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1-active"), + s3_constants.ExtLatestVersionSizeKey: []byte("100"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte("\"etag-active\""), + }, + }, + // Deleted object (has delete marker) + { + Name: "deleted.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1-deleted"), + s3_constants.ExtLatestVersionIsDeleteMarker: []byte("true"), + }, + }, + }, + }, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + cursor := &ListingCursor{maxKeys: uint16(1000)} + contents := []ListEntry{} + _, err := s3a.doListFilerEntries(filerClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + contents = append(contents, ListEntry{Key: entry.Name}) + cursor.maxKeys-- + }) + + assert.NoError(t, err) + assert.Equal(t, 1, len(contents), "Should only return active object, not deleted") + assert.Equal(t, "active.txt", contents[0].Key, "Should return the active object") +} + +// TestVersionedObjectsMaxKeys tests pagination with versioned objects +func TestVersionedObjectsMaxKeys(t *testing.T) { + now := time.Now().Unix() + + // Create 5 versioned objects + entries := make([]*filer_pb.Entry, 5) + for i := 0; i < 5; i++ { + entries[i] = &filer_pb.Entry{ + Name: fmt.Sprintf("file%d.txt"+s3_constants.VersionsFolder, i), + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte(fmt.Sprintf("v%d", i)), + s3_constants.ExtLatestVersionSizeKey: []byte("100"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"etag-%d\"", i)), + }, + } + } + + filerClient := &testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": entries, + }, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + cursor := &ListingCursor{maxKeys: uint16(3)} + contents := []ListEntry{} + _, err := s3a.doListFilerEntries(filerClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + contents = append(contents, ListEntry{Key: entry.Name}) + cursor.maxKeys-- + }) + + assert.NoError(t, err) + assert.Equal(t, 3, len(contents), "Should respect maxKeys limit") + assert.True(t, cursor.isTruncated, "Should set IsTruncated when there are more results") + + // Verify truncation is properly set when maxKeys is exceeded + // (The test mock doesn't implement marker-based pagination, but we can verify + // that the cursor state is correct for actual pagination to work) + assert.True(t, cursor.isTruncated, "IsTruncated should be true when maxKeys is exhausted with more entries available") +} + +// TestVersionsDirectoryNotTraversed ensures .versions directories are never traversed +func TestVersionsDirectoryNotTraversed(t *testing.T) { + now := time.Now().Unix() + traversedDirs := make(map[string]bool) + + // Custom filer client that tracks which directories are accessed + customClient := &customTestFilerClient{ + testFilerClient: testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": { + { + Name: "object.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1"), + s3_constants.ExtLatestVersionSizeKey: []byte("100"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte("\"etag\""), + }, + }, + }, + // This directory should NEVER be accessed + "/buckets/test-bucket/object.txt.versions": { + { + Name: "should-not-see-this", + IsDirectory: false, + }, + }, + }, + }, + traversedDirs: &traversedDirs, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + cursor := &ListingCursor{maxKeys: uint16(1000)} + contents := []ListEntry{} + _, err := s3a.doListFilerEntries(customClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + contents = append(contents, ListEntry{Key: entry.Name}) + cursor.maxKeys-- + }) + + assert.NoError(t, err) + assert.Equal(t, 1, len(contents)) + + // Verify .versions directory was NEVER traversed + _, wasTraversed := traversedDirs["/buckets/test-bucket/object.txt.versions"] + assert.False(t, wasTraversed, ".versions directory should never be traversed") +} + +// customTestFilerClient tracks which directories are accessed +type customTestFilerClient struct { + testFilerClient + traversedDirs *map[string]bool +} + +func (c *customTestFilerClient) ListEntries(ctx context.Context, in *filer_pb.ListEntriesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[filer_pb.ListEntriesResponse], error) { + (*c.traversedDirs)[in.Directory] = true + return c.testFilerClient.ListEntries(ctx, in, opts...) +}