Browse Source
Fix: Eliminate duplicate versioned objects in S3 list operations (#7850)
Fix: Eliminate duplicate versioned objects in S3 list operations (#7850)
* Fix: Eliminate duplicate versioned objects in S3 list operations - Move versioned directory processing outside of pagination loop to process only once - Add deduplication during .versions directory collection phase - Fix directory handling to not add directories to results in recursive mode - Directly add versioned entries to contents array instead of using callback Fixes issue where AWS S3 list operations returned duplicated versioned objects (e.g., 1000 duplicate entries from 4 unique objects). Now correctly returns only the unique logical entries without duplication. Verified with: aws s3api list-objects --endpoint-url http://localhost:8333 --bucket pm-itatiaiucu-01 Returns exactly 4 entries (ClientInfo.xml and Repository from 2 Veeam backup folders) * Refactor: Process .versions directories immediately when encountered Instead of collecting .versions directories and processing them after the pagination loop, process them immediately when encountered during traversal. Benefits: - Simpler code: removed versionedDirEntry struct and collection array - More efficient: no need to store and iterate through collected entries - Same O(V) complexity but with less memory overhead - Clearer logic: processing happens in one pass during traversal Since each .versions directory is only visited once during recursive traversal (we never traverse into them), there's no need for deferred processing or deduplication. * Add comprehensive tests for versioned objects list - TestListObjectsWithVersionedObjects: Tests listing with various delimiters - TestVersionedObjectsNoDuplication: Core test validating no 250x duplication - TestVersionedObjectsWithDeleteMarker: Tests delete marker filtering - TestVersionedObjectsMaxKeys: Tests pagination with versioned objects - TestVersionsDirectoryNotTraversed: Ensures .versions never traversed - Fix existing test signature to match updated doListFilerEntries * style: Fix formatting alignment in versioned objects tests * perf: Optimize path extraction using string indexing Replace multiple strings.Split/Join calls with efficient strings.Index slicing to extract bucket-relative path from directory string. Reduces unnecessary allocations and improves performance in versioned objects listing path construction. * refactor: Address code review feedback from Gemini Code Assist 1. Fix misleading comment about versioned directory processing location. Versioned directories are processed immediately in doListFilerEntries, not deferred to ListObjectsV1Handler. 2. Simplify path extraction logic using explicit bucket path construction instead of index-based string slicing for better readability and maintainability. 3. Add clarifying comment to test callback explaining why production logic is duplicated - necessary because listFilerEntries is not easily testable with filer client injection. * fmt * refactor: Address code review feedback from Copilot - Fix misleading comment about versioned directory processing location (note that processing happens within doListFilerEntries, not at top level) - Add maxKeys validation checks in all test callbacks for consistency - Add maxKeys check before calling eachEntryFn for versioned objects - Improve test documentation to clarify testing approach and avoid apologetic tone * refactor: Address code review feedback from Gemini Code Assist - Remove redundant maxKeys check before eachEntryFn call on line 541 (the loop already checks maxKeys <= 0 at line 502, ensuring quota exists) - Fix pagination pattern consistency in all test callbacks - TestVersionedObjectsNoDuplication: Use cursor.maxKeys <= 0 check and decrement - TestVersionedObjectsWithDeleteMarker: Use cursor.maxKeys <= 0 check and decrement - TestVersionsDirectoryNotTraversed: Use cursor.maxKeys <= 0 check and decrement - Ensures consistent pagination logic across all callbacks matching production behavior * refactor: Address code review suggestions for code quality - Adjust log verbosity from V(5) to V(4) for file additions to reduce noise while maintaining useful debug output during troubleshooting - Remove unused isRecursive parameter from doListFilerEntries function signature and all call sites (not used for any logic decisions) - Consolidate redundant comments about versioned directory handling to reduce documentation duplication These changes improve code maintainability and clarity. * fmt * refactor: Add pagination test and optimize stream processing - Add comprehensive test validation to TestVersionedObjectsMaxKeys that verifies truncation is correctly set when maxKeys is exhausted with more entries available, ensuring proper pagination state - Optimize stream processing in doListFilerEntries by using 'break' instead of 'continue' when quota is exhausted (cursor.maxKeys <= 0) This avoids receiving and discarding entries from the stream when we've already reached the requested limit, improving efficiencypull/7852/head
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 466 additions and 68 deletions
-
99weed/s3api/s3api_object_handlers_list.go
-
2weed/s3api/s3api_object_handlers_list_test.go
-
433weed/s3api/s3api_object_handlers_list_versioned_test.go
@ -0,0 +1,433 @@ |
|||
package s3api |
|||
|
|||
import ( |
|||
"context" |
|||
"encoding/hex" |
|||
"fmt" |
|||
"strconv" |
|||
"strings" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" |
|||
"github.com/stretchr/testify/assert" |
|||
grpc "google.golang.org/grpc" |
|||
) |
|||
|
|||
// TestListObjectsWithVersionedObjects tests that versioned objects are properly listed
|
|||
// This validates the fix for duplicate versioned objects issue
|
|||
func TestListObjectsWithVersionedObjects(t *testing.T) { |
|||
now := time.Now().Unix() |
|||
|
|||
// Create test filer client with versioned objects
|
|||
filerClient := &testFilerClient{ |
|||
entriesByDir: map[string][]*filer_pb.Entry{ |
|||
"/buckets/test-bucket": { |
|||
// Regular directory
|
|||
{ |
|||
Name: "folder1", |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: now, |
|||
}, |
|||
}, |
|||
// .versions directory with metadata for versioned object
|
|||
{ |
|||
Name: "file1.txt" + s3_constants.VersionsFolder, |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: now, |
|||
}, |
|||
Extended: map[string][]byte{ |
|||
s3_constants.ExtLatestVersionIdKey: []byte("v1-abc123"), |
|||
s3_constants.ExtLatestVersionSizeKey: []byte("1234"), |
|||
s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), |
|||
s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"%s\"", hex.EncodeToString([]byte("test-etag-1")))), |
|||
}, |
|||
}, |
|||
// Another .versions directory
|
|||
{ |
|||
Name: "file2.txt" + s3_constants.VersionsFolder, |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: now, |
|||
}, |
|||
Extended: map[string][]byte{ |
|||
s3_constants.ExtLatestVersionIdKey: []byte("v2-def456"), |
|||
s3_constants.ExtLatestVersionSizeKey: []byte("5678"), |
|||
s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), |
|||
s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"%s\"", hex.EncodeToString([]byte("test-etag-2")))), |
|||
}, |
|||
}, |
|||
}, |
|||
"/buckets/test-bucket/folder1": { |
|||
// Versioned object in subdirectory
|
|||
{ |
|||
Name: "nested.txt" + s3_constants.VersionsFolder, |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: now, |
|||
}, |
|||
Extended: map[string][]byte{ |
|||
s3_constants.ExtLatestVersionIdKey: []byte("v3-ghi789"), |
|||
s3_constants.ExtLatestVersionSizeKey: []byte("9012"), |
|||
s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), |
|||
s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"%s\"", hex.EncodeToString([]byte("test-etag-3")))), |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
s3a := &S3ApiServer{ |
|||
option: &S3ApiServerOption{ |
|||
BucketsPath: "/buckets", |
|||
}, |
|||
} |
|||
|
|||
tests := []struct { |
|||
name string |
|||
bucket string |
|||
prefix string |
|||
delimiter string |
|||
expectedCount int |
|||
expectedKeys []string |
|||
expectedPrefixes []string |
|||
}{ |
|||
{ |
|||
name: "List all objects including versioned (no delimiter)", |
|||
bucket: "test-bucket", |
|||
prefix: "", |
|||
delimiter: "", |
|||
expectedCount: 3, // file1.txt, file2.txt, folder1/nested.txt
|
|||
expectedKeys: []string{ |
|||
"file1.txt", |
|||
"file2.txt", |
|||
"folder1/nested.txt", |
|||
}, |
|||
expectedPrefixes: []string{}, |
|||
}, |
|||
{ |
|||
name: "List bucket root with delimiter", |
|||
bucket: "test-bucket", |
|||
prefix: "", |
|||
delimiter: "/", |
|||
expectedCount: 2, // file1.txt, file2.txt (folder1/ becomes common prefix)
|
|||
expectedKeys: []string{ |
|||
"file1.txt", |
|||
"file2.txt", |
|||
}, |
|||
expectedPrefixes: []string{ |
|||
"folder1/", |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
for _, tt := range tests { |
|||
t.Run(tt.name, func(t *testing.T) { |
|||
// Directly call doListFilerEntries with a callback to exercise the versioned objects
|
|||
// listing logic. The callback mirrors the production listFilerEntries behavior for
|
|||
// path extraction and accumulation so that this test validates the internal listing
|
|||
// implementation in isolation from the HTTP layer.
|
|||
cursor := &ListingCursor{maxKeys: uint16(tt.expectedCount + 10)} |
|||
contents := []ListEntry{} |
|||
commonPrefixes := []PrefixEntry{} |
|||
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, tt.bucket) |
|||
|
|||
_, err := s3a.doListFilerEntries(filerClient, bucketPrefix[:len(bucketPrefix)-1], tt.prefix, cursor, "", tt.delimiter, false, tt.bucket, func(dir string, entry *filer_pb.Entry) { |
|||
if cursor.maxKeys <= 0 { |
|||
return |
|||
} |
|||
|
|||
if entry.IsDirectory { |
|||
if tt.delimiter == "/" { |
|||
// Extract relative path from bucket prefix
|
|||
relDir := strings.TrimPrefix(dir, bucketPrefix[:len(bucketPrefix)-1]) |
|||
if relDir != "" && relDir[0] == '/' { |
|||
relDir = relDir[1:] |
|||
} |
|||
prefix := relDir |
|||
if prefix != "" { |
|||
prefix += "/" |
|||
} |
|||
prefix += entry.Name + "/" |
|||
|
|||
commonPrefixes = append(commonPrefixes, PrefixEntry{ |
|||
Prefix: prefix, |
|||
}) |
|||
cursor.maxKeys-- |
|||
} |
|||
} else { |
|||
// Extract key from dir and entry name
|
|||
relDir := strings.TrimPrefix(dir, bucketPrefix[:len(bucketPrefix)-1]) |
|||
if relDir != "" && relDir[0] == '/' { |
|||
relDir = relDir[1:] |
|||
} |
|||
key := entry.Name |
|||
if relDir != "" { |
|||
key = relDir + "/" + entry.Name |
|||
} |
|||
|
|||
contents = append(contents, ListEntry{ |
|||
Key: key, |
|||
}) |
|||
cursor.maxKeys-- |
|||
} |
|||
}) |
|||
|
|||
assert.NoError(t, err, "doListFilerEntries should not return error") |
|||
assert.Equal(t, tt.expectedCount, len(contents), "Should return correct number of objects") |
|||
assert.Equal(t, len(tt.expectedPrefixes), len(commonPrefixes), "Should return correct number of common prefixes") |
|||
|
|||
// Verify keys
|
|||
actualKeys := make([]string, len(contents)) |
|||
for i, entry := range contents { |
|||
actualKeys[i] = entry.Key |
|||
} |
|||
assert.ElementsMatch(t, tt.expectedKeys, actualKeys, "Should return expected keys") |
|||
|
|||
// Verify common prefixes
|
|||
actualPrefixes := make([]string, len(commonPrefixes)) |
|||
for i, prefix := range commonPrefixes { |
|||
actualPrefixes[i] = prefix.Prefix |
|||
} |
|||
assert.ElementsMatch(t, tt.expectedPrefixes, actualPrefixes, "Should return expected prefixes") |
|||
|
|||
// Verify each versioned object has correct version metadata
|
|||
for _, entry := range contents { |
|||
assert.NotEmpty(t, entry.Key, "Versioned object should have key") |
|||
} |
|||
}) |
|||
} |
|||
} |
|||
|
|||
// TestVersionedObjectsNoDuplication ensures that .versions directories are only processed once
|
|||
// This is the core test for the bug fix - previously versioned objects were duplicated 250x
|
|||
func TestVersionedObjectsNoDuplication(t *testing.T) { |
|||
now := time.Now().Unix() |
|||
|
|||
// Create a single .versions directory
|
|||
filerClient := &testFilerClient{ |
|||
entriesByDir: map[string][]*filer_pb.Entry{ |
|||
"/buckets/test-bucket": { |
|||
{ |
|||
Name: "test.txt" + s3_constants.VersionsFolder, |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: now, |
|||
}, |
|||
Extended: map[string][]byte{ |
|||
s3_constants.ExtLatestVersionIdKey: []byte("v1-test"), |
|||
s3_constants.ExtLatestVersionSizeKey: []byte("100"), |
|||
s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), |
|||
s3_constants.ExtLatestVersionETagKey: []byte("\"test-etag\""), |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
s3a := &S3ApiServer{ |
|||
option: &S3ApiServerOption{ |
|||
BucketsPath: "/buckets", |
|||
}, |
|||
} |
|||
|
|||
cursor := &ListingCursor{maxKeys: uint16(1000)} |
|||
contents := []ListEntry{} |
|||
_, err := s3a.doListFilerEntries(filerClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { |
|||
if cursor.maxKeys <= 0 { |
|||
return |
|||
} |
|||
contents = append(contents, ListEntry{Key: entry.Name}) |
|||
cursor.maxKeys-- |
|||
}) |
|||
|
|||
assert.NoError(t, err) |
|||
assert.Equal(t, 1, len(contents), "Should return exactly 1 object (no duplicates)") |
|||
assert.Equal(t, "test.txt", contents[0].Key, "Should return correct key") |
|||
} |
|||
|
|||
// TestVersionedObjectsWithDeleteMarker tests that objects with delete markers are not listed
|
|||
func TestVersionedObjectsWithDeleteMarker(t *testing.T) { |
|||
now := time.Now().Unix() |
|||
|
|||
filerClient := &testFilerClient{ |
|||
entriesByDir: map[string][]*filer_pb.Entry{ |
|||
"/buckets/test-bucket": { |
|||
// Active versioned object
|
|||
{ |
|||
Name: "active.txt" + s3_constants.VersionsFolder, |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: now, |
|||
}, |
|||
Extended: map[string][]byte{ |
|||
s3_constants.ExtLatestVersionIdKey: []byte("v1-active"), |
|||
s3_constants.ExtLatestVersionSizeKey: []byte("100"), |
|||
s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), |
|||
s3_constants.ExtLatestVersionETagKey: []byte("\"etag-active\""), |
|||
}, |
|||
}, |
|||
// Deleted object (has delete marker)
|
|||
{ |
|||
Name: "deleted.txt" + s3_constants.VersionsFolder, |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: now, |
|||
}, |
|||
Extended: map[string][]byte{ |
|||
s3_constants.ExtLatestVersionIdKey: []byte("v1-deleted"), |
|||
s3_constants.ExtLatestVersionIsDeleteMarker: []byte("true"), |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
} |
|||
|
|||
s3a := &S3ApiServer{ |
|||
option: &S3ApiServerOption{ |
|||
BucketsPath: "/buckets", |
|||
}, |
|||
} |
|||
|
|||
cursor := &ListingCursor{maxKeys: uint16(1000)} |
|||
contents := []ListEntry{} |
|||
_, err := s3a.doListFilerEntries(filerClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { |
|||
if cursor.maxKeys <= 0 { |
|||
return |
|||
} |
|||
contents = append(contents, ListEntry{Key: entry.Name}) |
|||
cursor.maxKeys-- |
|||
}) |
|||
|
|||
assert.NoError(t, err) |
|||
assert.Equal(t, 1, len(contents), "Should only return active object, not deleted") |
|||
assert.Equal(t, "active.txt", contents[0].Key, "Should return the active object") |
|||
} |
|||
|
|||
// TestVersionedObjectsMaxKeys tests pagination with versioned objects
|
|||
func TestVersionedObjectsMaxKeys(t *testing.T) { |
|||
now := time.Now().Unix() |
|||
|
|||
// Create 5 versioned objects
|
|||
entries := make([]*filer_pb.Entry, 5) |
|||
for i := 0; i < 5; i++ { |
|||
entries[i] = &filer_pb.Entry{ |
|||
Name: fmt.Sprintf("file%d.txt"+s3_constants.VersionsFolder, i), |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: now, |
|||
}, |
|||
Extended: map[string][]byte{ |
|||
s3_constants.ExtLatestVersionIdKey: []byte(fmt.Sprintf("v%d", i)), |
|||
s3_constants.ExtLatestVersionSizeKey: []byte("100"), |
|||
s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), |
|||
s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"etag-%d\"", i)), |
|||
}, |
|||
} |
|||
} |
|||
|
|||
filerClient := &testFilerClient{ |
|||
entriesByDir: map[string][]*filer_pb.Entry{ |
|||
"/buckets/test-bucket": entries, |
|||
}, |
|||
} |
|||
|
|||
s3a := &S3ApiServer{ |
|||
option: &S3ApiServerOption{ |
|||
BucketsPath: "/buckets", |
|||
}, |
|||
} |
|||
|
|||
cursor := &ListingCursor{maxKeys: uint16(3)} |
|||
contents := []ListEntry{} |
|||
_, err := s3a.doListFilerEntries(filerClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { |
|||
if cursor.maxKeys <= 0 { |
|||
return |
|||
} |
|||
contents = append(contents, ListEntry{Key: entry.Name}) |
|||
cursor.maxKeys-- |
|||
}) |
|||
|
|||
assert.NoError(t, err) |
|||
assert.Equal(t, 3, len(contents), "Should respect maxKeys limit") |
|||
assert.True(t, cursor.isTruncated, "Should set IsTruncated when there are more results") |
|||
|
|||
// Verify truncation is properly set when maxKeys is exceeded
|
|||
// (The test mock doesn't implement marker-based pagination, but we can verify
|
|||
// that the cursor state is correct for actual pagination to work)
|
|||
assert.True(t, cursor.isTruncated, "IsTruncated should be true when maxKeys is exhausted with more entries available") |
|||
} |
|||
|
|||
// TestVersionsDirectoryNotTraversed ensures .versions directories are never traversed
|
|||
func TestVersionsDirectoryNotTraversed(t *testing.T) { |
|||
now := time.Now().Unix() |
|||
traversedDirs := make(map[string]bool) |
|||
|
|||
// Custom filer client that tracks which directories are accessed
|
|||
customClient := &customTestFilerClient{ |
|||
testFilerClient: testFilerClient{ |
|||
entriesByDir: map[string][]*filer_pb.Entry{ |
|||
"/buckets/test-bucket": { |
|||
{ |
|||
Name: "object.txt" + s3_constants.VersionsFolder, |
|||
IsDirectory: true, |
|||
Attributes: &filer_pb.FuseAttributes{ |
|||
Mtime: now, |
|||
}, |
|||
Extended: map[string][]byte{ |
|||
s3_constants.ExtLatestVersionIdKey: []byte("v1"), |
|||
s3_constants.ExtLatestVersionSizeKey: []byte("100"), |
|||
s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), |
|||
s3_constants.ExtLatestVersionETagKey: []byte("\"etag\""), |
|||
}, |
|||
}, |
|||
}, |
|||
// This directory should NEVER be accessed
|
|||
"/buckets/test-bucket/object.txt.versions": { |
|||
{ |
|||
Name: "should-not-see-this", |
|||
IsDirectory: false, |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
traversedDirs: &traversedDirs, |
|||
} |
|||
|
|||
s3a := &S3ApiServer{ |
|||
option: &S3ApiServerOption{ |
|||
BucketsPath: "/buckets", |
|||
}, |
|||
} |
|||
|
|||
cursor := &ListingCursor{maxKeys: uint16(1000)} |
|||
contents := []ListEntry{} |
|||
_, err := s3a.doListFilerEntries(customClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { |
|||
if cursor.maxKeys <= 0 { |
|||
return |
|||
} |
|||
contents = append(contents, ListEntry{Key: entry.Name}) |
|||
cursor.maxKeys-- |
|||
}) |
|||
|
|||
assert.NoError(t, err) |
|||
assert.Equal(t, 1, len(contents)) |
|||
|
|||
// Verify .versions directory was NEVER traversed
|
|||
_, wasTraversed := traversedDirs["/buckets/test-bucket/object.txt.versions"] |
|||
assert.False(t, wasTraversed, ".versions directory should never be traversed") |
|||
} |
|||
|
|||
// customTestFilerClient tracks which directories are accessed
|
|||
type customTestFilerClient struct { |
|||
testFilerClient |
|||
traversedDirs *map[string]bool |
|||
} |
|||
|
|||
func (c *customTestFilerClient) ListEntries(ctx context.Context, in *filer_pb.ListEntriesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[filer_pb.ListEntriesResponse], error) { |
|||
(*c.traversedDirs)[in.Directory] = true |
|||
return c.testFilerClient.ListEntries(ctx, in, opts...) |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue