From 0a1b54b146867eb46840a1d578b6ccb7c34660cd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 24 Feb 2026 14:30:45 -0800 Subject: [PATCH] s3api: order ListObjectVersions newest-first --- weed/s3api/s3api_object_versioning.go | 252 +++++++++++++++++- ...api_object_versioning_newest_first_test.go | 64 +++++ 2 files changed, 302 insertions(+), 14 deletions(-) create mode 100644 weed/s3api/s3api_object_versioning_newest_first_test.go diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index 10b909442..a7d93069d 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -5,6 +5,7 @@ package s3api import ( "bytes" + "container/heap" "encoding/hex" "encoding/xml" "errors" @@ -201,10 +202,11 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error // versionListItem represents an item in the unified version/prefix list type versionListItem struct { - key string - versionId string - isPrefix bool - versionData interface{} // *VersionEntry or *DeleteMarkerEntry + key string + versionId string + isPrefix bool + lastModified time.Time + versionData interface{} // *VersionEntry or *DeleteMarkerEntry } // listObjectVersions lists all versions of an object @@ -213,6 +215,13 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM if maxKeys > 1000 { maxKeys = 1000 } + + // For ListObjectVersions without delimiter, provide newest-first ordering across keys. + // This aligns with object-lock retention refresh workflows that expect monotonically + // decreasing LastModified values across pages. + if delimiter == "" { + return s3a.listObjectVersionsNewestFirst(bucket, prefix, keyMarker, versionIdMarker, maxKeys) + } // Pre-allocate with capacity for maxKeys+1 to reduce reallocations // The extra 1 is for truncation detection allVersions := make([]interface{}, 0, maxKeys+1) @@ -241,7 +250,7 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM // - The alternative (collecting all) causes memory issues for buckets with many versions // - Pagination continues correctly; users can page through to see all versions maxCollect := maxKeys + 1 // +1 to detect truncation - err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, keyMarker, versionIdMarker, delimiter, commonPrefixes, maxCollect) + err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, keyMarker, versionIdMarker, delimiter, commonPrefixes, maxCollect, nil) if err != nil { glog.Errorf("listObjectVersions: findVersionsRecursively failed: %v", err) return nil, err @@ -265,6 +274,56 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM return result, nil } +// listObjectVersionsNewestFirst returns object versions ordered by LastModified (newest-first) +// across all keys. This mode scans the full keyspace to ensure globally consistent ordering. +func (s3a *S3ApiServer) listObjectVersionsNewestFirst(bucket, prefix, keyMarker, versionIdMarker string, maxKeys int) (*S3ListObjectVersionsResult, error) { + // Marker handling: when version-id-marker is empty, skip the entire key. + skipKeyMarkerOnly := keyMarker != "" && versionIdMarker == "" + + var markerItem *versionListItem + if keyMarker != "" && versionIdMarker != "" { + lastModified, err := s3a.getVersionLastModified(bucket, keyMarker, versionIdMarker) + if err != nil { + glog.V(2).Infof("listObjectVersionsNewestFirst: marker lookup failed for %s/%s@%s: %v", bucket, keyMarker, versionIdMarker, err) + lastModified = versionIdToTime(versionIdMarker) + } + if !lastModified.IsZero() { + markerItem = &versionListItem{ + key: keyMarker, + versionId: versionIdMarker, + lastModified: lastModified, + } + } + } + + pager := newVersionListPager(maxKeys, markerItem, skipKeyMarkerOnly, keyMarker) + + // Collect all versions without key-marker filtering to ensure global ordering. + allVersions := make([]interface{}, 0) + processedObjects := make(map[string]bool) + seenVersionIds := make(map[string]bool) + + bucketPath := s3a.bucketDir(bucket) + err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, "", "", "", nil, 0, pager.consider) + if err != nil { + glog.Errorf("listObjectVersionsNewestFirst: findVersionsRecursively failed: %v", err) + return nil, err + } + + clear(processedObjects) + clear(seenVersionIds) + + combinedList := pager.itemsSorted() + + // Apply MaxKeys truncation and determine pagination markers + truncatedList, nextKeyMarker, nextVersionIdMarker, isTruncated := s3a.truncateAndSetMarkers(combinedList, maxKeys) + + // Build the final response by splitting items back into their respective fields + result := s3a.splitIntoResult(truncatedList, bucket, prefix, keyMarker, versionIdMarker, "", maxKeys, isTruncated, nextKeyMarker, nextVersionIdMarker) + + return result, nil +} + // buildSortedCombinedList merges versions and common prefixes into a single list // sorted lexicographically by key, with versions preceding prefixes for the same key. func (s3a *S3ApiServer) buildSortedCombinedList(allVersions []interface{}, commonPrefixes map[string]bool) []versionListItem { @@ -273,19 +332,23 @@ func (s3a *S3ApiServer) buildSortedCombinedList(allVersions []interface{}, commo // Add versions for _, version := range allVersions { var key, versionId string + var lastModified time.Time switch v := version.(type) { case *VersionEntry: key = v.Key versionId = v.VersionId + lastModified = v.LastModified case *DeleteMarkerEntry: key = v.Key versionId = v.VersionId + lastModified = v.LastModified } combinedList = append(combinedList, versionListItem{ - key: key, - versionId: versionId, - isPrefix: false, - versionData: version, + key: key, + versionId: versionId, + isPrefix: false, + lastModified: lastModified, + versionData: version, }) } @@ -313,6 +376,133 @@ func (s3a *S3ApiServer) buildSortedCombinedList(allVersions []interface{}, commo return combinedList } +// compareVersionListItemsByLastModified compares two list items using newest-first ordering. +// Returns negative if a should come before b, positive if b should come before a, 0 if equal. +func compareVersionListItemsByLastModified(a, b versionListItem) int { + if a.isPrefix != b.isPrefix { + if a.isPrefix { + return 1 + } + return -1 + } + + if !a.isPrefix { + if !a.lastModified.Equal(b.lastModified) { + if a.lastModified.After(b.lastModified) { + return -1 + } + return 1 + } + if a.key != b.key { + if a.key < b.key { + return -1 + } + return 1 + } + if a.versionId != b.versionId { + return compareVersionIds(a.versionId, b.versionId) + } + return 0 + } + + if a.key != b.key { + if a.key < b.key { + return -1 + } + return 1 + } + return 0 +} + +type versionListItemHeap []versionListItem + +func (h versionListItemHeap) Len() int { return len(h) } +func (h versionListItemHeap) Less(i, j int) bool { + // Oldest first so we can evict the oldest when we exceed maxKeys+1. + return compareVersionListItemsByLastModified(h[i], h[j]) > 0 +} +func (h versionListItemHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *versionListItemHeap) Push(x interface{}) { + *h = append(*h, x.(versionListItem)) +} +func (h *versionListItemHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + *h = old[:n-1] + return item +} + +type versionListPager struct { + maxKeys int + markerItem *versionListItem + skipKeyMarkerOnly bool + keyMarker string + items versionListItemHeap +} + +func newVersionListPager(maxKeys int, markerItem *versionListItem, skipKeyMarkerOnly bool, keyMarker string) *versionListPager { + return &versionListPager{ + maxKeys: maxKeys, + markerItem: markerItem, + skipKeyMarkerOnly: skipKeyMarkerOnly, + keyMarker: keyMarker, + items: make(versionListItemHeap, 0, maxKeys+1), + } +} + +func (p *versionListPager) consider(item versionListItem) { + if p.skipKeyMarkerOnly && item.key == p.keyMarker { + return + } + if p.markerItem != nil { + if compareVersionListItemsByLastModified(item, *p.markerItem) <= 0 { + return + } + } + + // Keep only the newest maxKeys+1 items. + if p.maxKeys <= 0 { + return + } + if len(p.items) < p.maxKeys+1 { + heap.Push(&p.items, item) + return + } + if compareVersionListItemsByLastModified(item, p.items[0]) < 0 { + heap.Pop(&p.items) + heap.Push(&p.items, item) + } +} + +func (p *versionListPager) itemsSorted() []versionListItem { + items := make([]versionListItem, len(p.items)) + copy(items, p.items) + sort.Slice(items, func(i, j int) bool { + return compareVersionListItemsByLastModified(items[i], items[j]) < 0 + }) + return items +} + +func versionIdToTime(versionId string) time.Time { + ts := getVersionTimestamp(versionId) + if ts == 0 { + return time.Time{} + } + return time.Unix(0, ts) +} + +func (s3a *S3ApiServer) getVersionLastModified(bucket, object, versionId string) (time.Time, error) { + entry, err := s3a.getSpecificObjectVersion(bucket, object, versionId) + if err != nil { + return time.Time{}, err + } + if entry == nil || entry.Attributes == nil { + return time.Time{}, fmt.Errorf("missing entry attributes for %s/%s@%s", bucket, object, versionId) + } + return time.Unix(entry.Attributes.Mtime, 0), nil +} + // truncateAndSetMarkers applies MaxKeys limit and determines pagination markers func (s3a *S3ApiServer) truncateAndSetMarkers(combinedList []versionListItem, maxKeys int) (truncated []versionListItem, nextKeyMarker, nextVersionIdMarker string, isTruncated bool) { isTruncated = len(combinedList) > maxKeys @@ -374,6 +564,7 @@ type versionCollector struct { seenVersionIds map[string]bool delimiter string commonPrefixes map[string]bool + emitItem func(item versionListItem) } // isFull returns true if we've collected enough versions @@ -447,7 +638,13 @@ func (vc *versionCollector) addVersion(version *ObjectVersion, objectKey string) LastModified: version.LastModified, Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey), } - *vc.allVersions = append(*vc.allVersions, deleteMarker) + vc.emitVersion(versionListItem{ + key: objectKey, + versionId: version.VersionId, + isPrefix: false, + lastModified: version.LastModified, + versionData: deleteMarker, + }) } else { versionEntry := &VersionEntry{ Key: objectKey, @@ -459,8 +656,22 @@ func (vc *versionCollector) addVersion(version *ObjectVersion, objectKey string) Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey), StorageClass: StorageClass(vc.s3a.getStorageClassFromExtended(entryExtended(version))), } - *vc.allVersions = append(*vc.allVersions, versionEntry) + vc.emitVersion(versionListItem{ + key: objectKey, + versionId: version.VersionId, + isPrefix: false, + lastModified: version.LastModified, + versionData: versionEntry, + }) + } +} + +func (vc *versionCollector) emitVersion(item versionListItem) { + if vc.emitItem != nil { + vc.emitItem(item) + return } + *vc.allVersions = append(*vc.allVersions, item.versionData) } // processVersionsDirectory handles a .versions directory entry @@ -530,7 +741,13 @@ func (vc *versionCollector) processExplicitDirectory(entryPath string, entry *fi Owner: vc.s3a.getObjectOwnerFromEntry(entry), StorageClass: StorageClass(vc.s3a.getStorageClassFromExtended(entry.Extended)), } - *vc.allVersions = append(*vc.allVersions, versionEntry) + vc.emitVersion(versionListItem{ + key: directoryKey, + versionId: "null", + isPrefix: false, + lastModified: versionEntry.LastModified, + versionData: versionEntry, + }) } // processRegularFile handles a regular file entry (pre-versioning or suspended-versioning object) @@ -591,14 +808,20 @@ func (vc *versionCollector) processRegularFile(currentPath, entryPath string, en Owner: vc.s3a.getObjectOwnerFromEntry(entry), StorageClass: StorageClass(vc.s3a.getStorageClassFromExtended(entry.Extended)), } - *vc.allVersions = append(*vc.allVersions, versionEntry) + vc.emitVersion(versionListItem{ + key: normalizedObjectKey, + versionId: "null", + isPrefix: false, + lastModified: versionEntry.LastModified, + versionData: versionEntry, + }) } // findVersionsRecursively searches for .versions directories and regular files recursively // with efficient pagination support. It skips objects before keyMarker and applies versionIdMarker filtering. // maxCollect limits the number of versions to collect for memory efficiency (must be > 0) // delimiter and commonPrefixes are used to group keys that share a common prefix -func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix, keyMarker, versionIdMarker, delimiter string, commonPrefixes map[string]bool, maxCollect int) error { +func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix, keyMarker, versionIdMarker, delimiter string, commonPrefixes map[string]bool, maxCollect int, emitItem func(item versionListItem)) error { vc := &versionCollector{ s3a: s3a, bucket: bucket, @@ -611,6 +834,7 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string seenVersionIds: seenVersionIds, delimiter: delimiter, commonPrefixes: commonPrefixes, + emitItem: emitItem, } return vc.collectVersions(currentPath, relativePath) diff --git a/weed/s3api/s3api_object_versioning_newest_first_test.go b/weed/s3api/s3api_object_versioning_newest_first_test.go new file mode 100644 index 000000000..a4e9c3d2d --- /dev/null +++ b/weed/s3api/s3api_object_versioning_newest_first_test.go @@ -0,0 +1,64 @@ +package s3api + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestVersionListPagerNewestFirstOrderingAndPagination(t *testing.T) { + base := time.Date(2026, 2, 6, 11, 3, 16, 0, time.UTC) + items := []versionListItem{ + { + key: "k10/repo/log_20260206110316", + versionId: "v1", + lastModified: base, + }, + { + key: "k10/repo/log_20260206110317", + versionId: "v2", + lastModified: base.Add(1 * time.Second), + }, + { + key: "k10/repo/log_20260206110315", + versionId: "v0", + lastModified: base.Add(-1 * time.Second), + }, + } + + // Page 1: newest-first + pager := newVersionListPager(1, nil, false, "") + for _, item := range []versionListItem{items[0], items[2], items[1]} { + pager.consider(item) + } + + sorted := pager.itemsSorted() + s3a := &S3ApiServer{} + truncated, nextKeyMarker, nextVersionIdMarker, isTruncated := s3a.truncateAndSetMarkers(sorted, 1) + + require.Len(t, truncated, 1) + require.Equal(t, items[1].key, truncated[0].key) + require.Equal(t, items[1].versionId, truncated[0].versionId) + require.True(t, isTruncated) + require.Equal(t, items[1].key, nextKeyMarker) + require.Equal(t, items[1].versionId, nextVersionIdMarker) + + // Page 2: start after marker (should return the next newest item) + marker := &versionListItem{ + key: items[1].key, + versionId: items[1].versionId, + lastModified: items[1].lastModified, + } + pager2 := newVersionListPager(1, marker, false, "") + for _, item := range items { + pager2.consider(item) + } + + sorted2 := pager2.itemsSorted() + truncated2, _, _, _ := s3a.truncateAndSetMarkers(sorted2, 1) + + require.Len(t, truncated2, 1) + require.Equal(t, items[0].key, truncated2[0].key) + require.Equal(t, items[0].versionId, truncated2[0].versionId) +}