Browse Source

Fix: ListObjectVersions delimiter support (#7987)

* Fix: Add delimiter support to ListObjectVersions with proper truncation

- Implemented delimiter support to group keys into CommonPrefixes
- Fixed critical truncation bug: now merges versions and common prefixes into single sorted list before truncation
- Ensures total items never exceed MaxKeys (prevents infinite pagination loops)
- Properly sets NextKeyMarker and NextVersionIdMarker for pagination
- Added integration tests in test/s3/versioning/s3_versioning_delimiter_test.go
- Verified behavior matches S3 API specification

* Fix: Add delimiter support to ListObjectVersions with proper truncation

- Implemented delimiter support to group keys into CommonPrefixes
- Fixed critical truncation bug: now merges versions and common prefixes before truncation
- Added safety guard for maxKeys=0 to prevent panics
- Condensed verbose comments for better readability
- Added robust Go integration tests with nil checks for AWS SDK pointers
- Verified behavior matches S3 API specification
- Resolved compilation error in integration tests
- Refined pagination comments and ensured exclusive KeyMarker behavior
- Refactored listObjectVersions into helper methods for better maintainability
pull/7988/head
Chris Lu 2 days ago
committed by GitHub
parent
commit
217d8b9e0e
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 274
      test/s3/versioning/s3_versioning_delimiter_test.go
  2. 230
      weed/s3api/s3api_object_versioning.go

274
test/s3/versioning/s3_versioning_delimiter_test.go

@ -0,0 +1,274 @@
package s3api
import (
"context"
"fmt"
"testing"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestListObjectVersionsDelimiter tests delimiter functionality in ListObjectVersions
func TestListObjectVersionsDelimiter(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create bucket and enable versioning
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
enableVersioning(t, client, bucketName)
// Create test structure:
// - folder1/file1.txt
// - folder1/file2.txt
// - folder2/file3.txt
// - root-file.txt
testObjects := []string{
"folder1/file1.txt",
"folder1/file2.txt",
"folder2/file3.txt",
"root-file.txt",
}
for _, key := range testObjects {
putObject(t, client, bucketName, key, fmt.Sprintf("Content of %s", key))
}
t.Run("Delimiter groups folders correctly", func(t *testing.T) {
// List with delimiter='/' and no prefix
// Should return: root-file.txt and CommonPrefixes: folder1/, folder2/
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Delimiter: aws.String("/"),
})
require.NoError(t, err)
// Extract keys and prefixes
versionKeys := make([]string, 0)
for _, v := range resp.Versions {
versionKeys = append(versionKeys, *v.Key)
}
prefixValues := make([]string, 0)
for _, p := range resp.CommonPrefixes {
prefixValues = append(prefixValues, *p.Prefix)
}
// Verify results
assert.Contains(t, versionKeys, "root-file.txt", "Should include root-file.txt")
assert.Contains(t, prefixValues, "folder1/", "Should include folder1/ prefix")
assert.Contains(t, prefixValues, "folder2/", "Should include folder2/ prefix")
assert.NotContains(t, versionKeys, "folder1/file1.txt", "folder1/file1.txt should be grouped under folder1/")
assert.NotContains(t, versionKeys, "folder2/file3.txt", "folder2/file3.txt should be grouped under folder2/")
t.Logf("✓ Versions: %v", versionKeys)
t.Logf("✓ CommonPrefixes: %v", prefixValues)
})
t.Run("Prefix filtering with delimiter", func(t *testing.T) {
// List with delimiter='/' and prefix='folder1/'
// Should return: folder1/file1.txt, folder1/file2.txt
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Prefix: aws.String("folder1/"),
Delimiter: aws.String("/"),
})
require.NoError(t, err)
versionKeys := make([]string, 0)
for _, v := range resp.Versions {
versionKeys = append(versionKeys, *v.Key)
}
assert.Len(t, versionKeys, 2, "Should have 2 versions")
assert.Contains(t, versionKeys, "folder1/file1.txt")
assert.Contains(t, versionKeys, "folder1/file2.txt")
assert.Empty(t, resp.CommonPrefixes, "Should have no common prefixes")
t.Logf("✓ Prefix filtering works: %v", versionKeys)
})
t.Run("Without delimiter returns all versions", func(t *testing.T) {
// List without delimiter - should return all files
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err)
assert.Len(t, resp.Versions, 4, "Should have all 4 versions")
assert.Empty(t, resp.CommonPrefixes, "Should have no common prefixes without delimiter")
t.Logf("✓ Without delimiter returns all %d versions", len(resp.Versions))
})
}
// TestListObjectVersionsDelimiterTruncation tests MaxKeys with delimiter
func TestListObjectVersionsDelimiterTruncation(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create bucket and enable versioning
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
enableVersioning(t, client, bucketName)
// Create multiple folders to test truncation
for i := 0; i < 5; i++ {
key := fmt.Sprintf("folder%d/file.txt", i)
putObject(t, client, bucketName, key, fmt.Sprintf("Content %d", i))
}
// Add a root file
putObject(t, client, bucketName, "root.txt", "Root content")
t.Run("MaxKeys limits total items", func(t *testing.T) {
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Delimiter: aws.String("/"),
MaxKeys: aws.Int32(3),
})
require.NoError(t, err)
assert.NotNil(t, resp.IsTruncated, "IsTruncated should not be nil")
assert.True(t, *resp.IsTruncated, "Should be truncated")
assert.NotNil(t, resp.NextKeyMarker, "Should have NextKeyMarker for pagination")
count := len(resp.Versions) + len(resp.CommonPrefixes)
t.Logf("✓ MaxKeys truncation: %d items (versions: %d, prefixes: %d)",
count, len(resp.Versions), len(resp.CommonPrefixes))
})
t.Run("Pagination with delimiter", func(t *testing.T) {
// Collect all items through pagination
allKeys := make([]string, 0)
allPrefixes := make([]string, 0)
var keyMarker *string
var versionMarker *string
for {
input := &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Delimiter: aws.String("/"),
MaxKeys: aws.Int32(2),
}
if keyMarker != nil {
input.KeyMarker = keyMarker
}
if versionMarker != nil {
input.VersionIdMarker = versionMarker
}
resp, err := client.ListObjectVersions(context.TODO(), input)
require.NoError(t, err)
// Collect versions
for _, v := range resp.Versions {
allKeys = append(allKeys, *v.Key)
}
// Collect prefixes
for _, p := range resp.CommonPrefixes {
allPrefixes = append(allPrefixes, *p.Prefix)
}
require.NotNil(t, resp.IsTruncated, "IsTruncated should not be nil")
if !*resp.IsTruncated {
break
}
keyMarker = resp.NextKeyMarker
versionMarker = resp.NextVersionIdMarker
}
// Should have collected all items
itemsCount := len(allKeys) + len(allPrefixes)
assert.GreaterOrEqual(t, itemsCount, 6, "Should collect all items through pagination")
t.Logf("✓ Pagination collected %d total items (keys: %d, prefixes: %d)",
itemsCount, len(allKeys), len(allPrefixes))
})
t.Run("CommonPrefixes are filtered by keyMarker (exclusive)", func(t *testing.T) {
// List with keyMarker that should skip some prefixes
// We have folder0/, folder1/, folder2/, folder3/, folder4/
// Setting keyMarker to "folder2/" should return folder3/, folder4/ and root.txt (if it's > folder2/)
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Delimiter: aws.String("/"),
KeyMarker: aws.String("folder2/"),
})
require.NoError(t, err)
prefixValues := make([]string, 0)
for _, p := range resp.CommonPrefixes {
prefixValues = append(prefixValues, *p.Prefix)
}
assert.NotContains(t, prefixValues, "folder0/", "Should skip folder0/")
assert.NotContains(t, prefixValues, "folder1/", "Should skip folder1/")
assert.NotContains(t, prefixValues, "folder2/", "Should skip folder2/ (exclusive marker)")
assert.Contains(t, prefixValues, "folder3/", "Should include folder3/")
assert.Contains(t, prefixValues, "folder4/", "Should include folder4/")
t.Logf("✓ CommonPrefixes filtered by keyMarker: %v", prefixValues)
})
}
// TestListObjectVersionsDelimiterWithMultipleVersions tests delimiter with multiple versions of same object
func TestListObjectVersionsDelimiterWithMultipleVersions(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create bucket and enable versioning
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
enableVersioning(t, client, bucketName)
// Create multiple versions of objects in different folders
for i := 1; i <= 3; i++ {
putObject(t, client, bucketName, "folder1/file.txt", fmt.Sprintf("Version %d", i))
putObject(t, client, bucketName, "folder2/file.txt", fmt.Sprintf("Version %d", i))
}
t.Run("Delimiter groups all versions under prefix", func(t *testing.T) {
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Delimiter: aws.String("/"),
})
require.NoError(t, err)
// Should have no versions at root level, only common prefixes
assert.Empty(t, resp.Versions, "Should have no versions at root")
assert.Len(t, resp.CommonPrefixes, 2, "Should have 2 common prefixes")
prefixValues := make([]string, 0)
for _, p := range resp.CommonPrefixes {
prefixValues = append(prefixValues, *p.Prefix)
}
assert.Contains(t, prefixValues, "folder1/")
assert.Contains(t, prefixValues, "folder2/")
t.Logf("✓ All versions grouped under prefixes: %v", prefixValues)
})
t.Run("Listing within prefix shows all versions", func(t *testing.T) {
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
Prefix: aws.String("folder1/"),
Delimiter: aws.String("/"),
})
require.NoError(t, err)
assert.Len(t, resp.Versions, 3, "Should have 3 versions of folder1/file.txt")
assert.Empty(t, resp.CommonPrefixes, "Should have no common prefixes")
// Verify all versions are for the same key
for _, v := range resp.Versions {
assert.Equal(t, "folder1/file.txt", *v.Key)
}
t.Logf("✓ Found %d versions within prefix", len(resp.Versions))
})
}

230
weed/s3api/s3api_object_versioning.go

@ -199,6 +199,14 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error
return versionId, nil
}
// versionListItem represents an item in the unified version/prefix list
type versionListItem struct {
key string
versionId string
isPrefix bool
versionData interface{} // *VersionEntry or *DeleteMarkerEntry
}
// listObjectVersions lists all versions of an object
func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*S3ListObjectVersionsResult, error) {
// S3 API limits max-keys to 1000
@ -217,6 +225,9 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
// Track version IDs globally to prevent duplicates throughout the listing
seenVersionIds := make(map[string]bool)
// Map to track common prefixes (deduplicated)
commonPrefixes := make(map[string]bool)
// Recursively find all .versions directories in the bucket
// Pass keyMarker and versionIdMarker to enable efficient pagination (skip entries before marker)
bucketPath := path.Join(s3a.option.BucketsPath, bucket)
@ -230,103 +241,124 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
// - The alternative (collecting all) causes memory issues for buckets with many versions
// - Pagination continues correctly; users can page through to see all versions
maxCollect := maxKeys + 1 // +1 to detect truncation
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, keyMarker, versionIdMarker, maxCollect)
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, keyMarker, versionIdMarker, delimiter, commonPrefixes, maxCollect)
if err != nil {
glog.Errorf("listObjectVersions: findVersionsRecursively failed: %v", err)
return nil, err
}
// Clear maps to help GC reclaim memory sooner
clear(processedObjects)
clear(seenVersionIds)
glog.V(1).Infof("listObjectVersions: found %d total versions", len(allVersions))
// Combine versions and prefixes into a single sorted list
combinedList := s3a.buildSortedCombinedList(allVersions, commonPrefixes)
glog.V(1).Infof("listObjectVersions: collected %d combined items (versions+prefixes)", len(combinedList))
// Sort by key, then by version (newest first)
// Uses compareVersionIds to handle both old and new format version IDs
sort.Slice(allVersions, func(i, j int) bool {
var keyI, keyJ string
var versionIdI, versionIdJ string
// Apply MaxKeys truncation and determine pagination markers
truncatedList, nextKeyMarker, nextVersionIdMarker, isTruncated := s3a.truncateAndSetMarkers(combinedList, maxKeys)
glog.V(1).Infof("listObjectVersions: after truncation - %d items (truncated: %v)", len(truncatedList), isTruncated)
switch v := allVersions[i].(type) {
case *VersionEntry:
keyI = v.Key
versionIdI = v.VersionId
case *DeleteMarkerEntry:
keyI = v.Key
versionIdI = v.VersionId
}
// Build the final response by splitting items back into their respective fields
result := s3a.splitIntoResult(truncatedList, bucket, prefix, keyMarker, versionIdMarker, delimiter, maxKeys, isTruncated, nextKeyMarker, nextVersionIdMarker)
glog.V(1).Infof("listObjectVersions: final result - %d versions, %d delete markers, %d common prefixes", len(result.Versions), len(result.DeleteMarkers), len(result.CommonPrefixes))
switch v := allVersions[j].(type) {
return result, nil
}
// buildSortedCombinedList merges versions and common prefixes into a single list
// sorted lexicographically by key, with versions preceding prefixes for the same key.
func (s3a *S3ApiServer) buildSortedCombinedList(allVersions []interface{}, commonPrefixes map[string]bool) []versionListItem {
combinedList := make([]versionListItem, 0, len(allVersions)+len(commonPrefixes))
// Add versions
for _, version := range allVersions {
var key, versionId string
switch v := version.(type) {
case *VersionEntry:
keyJ = v.Key
versionIdJ = v.VersionId
key = v.Key
versionId = v.VersionId
case *DeleteMarkerEntry:
keyJ = v.Key
versionIdJ = v.VersionId
key = v.Key
versionId = v.VersionId
}
// First sort by object key
if keyI != keyJ {
return keyI < keyJ
combinedList = append(combinedList, versionListItem{
key: key,
versionId: versionId,
isPrefix: false,
versionData: version,
})
}
// Add common prefixes
for prefix := range commonPrefixes {
combinedList = append(combinedList, versionListItem{
key: prefix,
isPrefix: true,
})
}
// Single sort for the entire combined list
sort.Slice(combinedList, func(i, j int) bool {
if combinedList[i].key != combinedList[j].key {
return combinedList[i].key < combinedList[j].key
}
// Then by version ID (newest first)
// compareVersionIds handles both old (raw timestamp) and new (inverted timestamp) formats
return compareVersionIds(versionIdI, versionIdJ) < 0
// For same key, versions come before prefixes
if combinedList[i].isPrefix != combinedList[j].isPrefix {
return !combinedList[i].isPrefix
}
// For same key with both being versions, sort by version ID (newest first)
return compareVersionIds(combinedList[i].versionId, combinedList[j].versionId) < 0
})
// Build result using S3ListObjectVersionsResult to avoid conflicts with XSD structs
result := &S3ListObjectVersionsResult{
Name: bucket,
Prefix: prefix,
KeyMarker: keyMarker,
MaxKeys: maxKeys,
Delimiter: delimiter,
IsTruncated: len(allVersions) > maxKeys,
}
glog.V(1).Infof("listObjectVersions: building response with %d versions (truncated: %v)", len(allVersions), result.IsTruncated)
// Limit results and properly release excess memory
if len(allVersions) > maxKeys {
result.IsTruncated = true
return combinedList
}
// Set next markers from the last item we'll return
switch v := allVersions[maxKeys-1].(type) {
case *VersionEntry:
result.NextKeyMarker = v.Key
result.NextVersionIdMarker = v.VersionId
case *DeleteMarkerEntry:
result.NextKeyMarker = v.Key
result.NextVersionIdMarker = v.VersionId
// truncateAndSetMarkers applies MaxKeys limit and determines pagination markers
func (s3a *S3ApiServer) truncateAndSetMarkers(combinedList []versionListItem, maxKeys int) (truncated []versionListItem, nextKeyMarker, nextVersionIdMarker string, isTruncated bool) {
isTruncated = len(combinedList) > maxKeys
if isTruncated && maxKeys > 0 {
// Set markers from the last item we'll return
lastItem := combinedList[maxKeys-1]
nextKeyMarker = lastItem.key
if !lastItem.isPrefix {
nextVersionIdMarker = lastItem.versionId
}
// Create a new slice with exact capacity to allow GC to reclaim excess memory
truncated := make([]interface{}, maxKeys)
copy(truncated, allVersions[:maxKeys])
allVersions = truncated
// Truncate the list
combinedList = combinedList[:maxKeys]
}
return combinedList, nextKeyMarker, nextVersionIdMarker, isTruncated
}
// Always initialize empty slices so boto3 gets the expected fields even when empty
result.Versions = make([]VersionEntry, 0)
result.DeleteMarkers = make([]DeleteMarkerEntry, 0)
// Add versions to result
for i, version := range allVersions {
switch v := version.(type) {
case *VersionEntry:
glog.V(2).Infof("listObjectVersions: adding version %d: key=%s, versionId=%s", i, v.Key, v.VersionId)
result.Versions = append(result.Versions, *v)
case *DeleteMarkerEntry:
glog.V(2).Infof("listObjectVersions: adding delete marker %d: key=%s, versionId=%s", i, v.Key, v.VersionId)
result.DeleteMarkers = append(result.DeleteMarkers, *v)
// splitIntoResult builds the final S3ListObjectVersionsResult from the combined list
func (s3a *S3ApiServer) splitIntoResult(combinedList []versionListItem, bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int, isTruncated bool, nextKeyMarker, nextVersionIdMarker string) *S3ListObjectVersionsResult {
result := &S3ListObjectVersionsResult{
Name: bucket,
Prefix: prefix,
KeyMarker: keyMarker,
VersionIdMarker: versionIdMarker,
MaxKeys: maxKeys,
Delimiter: delimiter,
IsTruncated: isTruncated,
NextKeyMarker: nextKeyMarker,
NextVersionIdMarker: nextVersionIdMarker,
Versions: make([]VersionEntry, 0),
DeleteMarkers: make([]DeleteMarkerEntry, 0),
CommonPrefixes: make([]PrefixEntry, 0),
}
for _, item := range combinedList {
if item.isPrefix {
result.CommonPrefixes = append(result.CommonPrefixes, PrefixEntry{Prefix: item.key})
} else {
switch v := item.versionData.(type) {
case *VersionEntry:
result.Versions = append(result.Versions, *v)
case *DeleteMarkerEntry:
result.DeleteMarkers = append(result.DeleteMarkers, *v)
}
}
}
glog.V(1).Infof("listObjectVersions: final result - %d versions, %d delete markers", len(result.Versions), len(result.DeleteMarkers))
return result, nil
return result
}
// versionCollector holds state for collecting object versions during recursive traversal
@ -340,11 +372,20 @@ type versionCollector struct {
allVersions *[]interface{}
processedObjects map[string]bool
seenVersionIds map[string]bool
delimiter string
commonPrefixes map[string]bool
}
// isFull returns true if we've collected enough versions
func (vc *versionCollector) isFull() bool {
return vc.maxCollect > 0 && len(*vc.allVersions) >= vc.maxCollect
if vc.maxCollect <= 0 {
return false
}
currentCount := len(*vc.allVersions)
if vc.commonPrefixes != nil {
currentCount += len(vc.commonPrefixes)
}
return currentCount >= vc.maxCollect
}
// matchesPrefixFilter checks if an entry path matches the prefix filter
@ -384,7 +425,9 @@ func (vc *versionCollector) shouldSkipVersionForMarker(objectKey, versionId stri
}
// Object matches keyMarker - apply version filtering
if vc.versionIdMarker == "" {
// No versionIdMarker means skip ALL versions of this key (they were all returned in previous pages)
// When a keyMarker is provided without a versionIdMarker, S3 pagination
// starts after the keyMarker object. Returning true here ensures that
// all versions of the keyMarker object are skipped.
return true
}
// Skip versions that are newer than or equal to versionIdMarker
@ -554,7 +597,8 @@ func (vc *versionCollector) processRegularFile(currentPath, entryPath string, en
// findVersionsRecursively searches for .versions directories and regular files recursively
// with efficient pagination support. It skips objects before keyMarker and applies versionIdMarker filtering.
// maxCollect limits the number of versions to collect for memory efficiency (must be > 0)
func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix, keyMarker, versionIdMarker string, maxCollect int) error {
// delimiter and commonPrefixes are used to group keys that share a common prefix
func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix, keyMarker, versionIdMarker, delimiter string, commonPrefixes map[string]bool, maxCollect int) error {
vc := &versionCollector{
s3a: s3a,
bucket: bucket,
@ -565,6 +609,8 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
allVersions: allVersions,
processedObjects: processedObjects,
seenVersionIds: seenVersionIds,
delimiter: delimiter,
commonPrefixes: commonPrefixes,
}
return vc.collectVersions(currentPath, relativePath)
@ -594,6 +640,36 @@ func (vc *versionCollector) collectVersions(currentPath, relativePath string) er
continue
}
// Group into common prefixes if delimiter is found after the prefix
if vc.delimiter != "" {
fullKey := entryPath
if entry.IsDirectory {
fullKey += "/"
}
if strings.HasPrefix(fullKey, vc.prefix) {
remainder := fullKey[len(vc.prefix):]
if idx := strings.Index(remainder, vc.delimiter); idx >= 0 {
commonPrefix := vc.prefix + remainder[:idx+len(vc.delimiter)]
// Add to CommonPrefixes set if it hasn't been returned yet
if !vc.commonPrefixes[commonPrefix] {
// Filter by keyMarker to ensure proper pagination behavior
if vc.keyMarker != "" && commonPrefix <= vc.keyMarker {
continue
}
if vc.isFull() {
return nil
}
vc.commonPrefixes[commonPrefix] = true
}
// Skip further processing (recursion or addition) for this entry
// because it has been rolled up into the CommonPrefix
continue
}
}
}
if entry.IsDirectory {
if err := vc.processDirectory(currentPath, entryPath, entry); err != nil {
return err

Loading…
Cancel
Save