Browse Source

s3api: order ListObjectVersions newest-first

codex/list-object-versions-newest-first
Chris Lu 2 weeks ago
parent
commit
0a1b54b146
  1. 252
      weed/s3api/s3api_object_versioning.go
  2. 64
      weed/s3api/s3api_object_versioning_newest_first_test.go

252
weed/s3api/s3api_object_versioning.go

@ -5,6 +5,7 @@ package s3api
import (
"bytes"
"container/heap"
"encoding/hex"
"encoding/xml"
"errors"
@ -201,10 +202,11 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error
// versionListItem represents an item in the unified version/prefix list
type versionListItem struct {
key string
versionId string
isPrefix bool
versionData interface{} // *VersionEntry or *DeleteMarkerEntry
key string
versionId string
isPrefix bool
lastModified time.Time
versionData interface{} // *VersionEntry or *DeleteMarkerEntry
}
// listObjectVersions lists all versions of an object
@ -213,6 +215,13 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
if maxKeys > 1000 {
maxKeys = 1000
}
// For ListObjectVersions without delimiter, provide newest-first ordering across keys.
// This aligns with object-lock retention refresh workflows that expect monotonically
// decreasing LastModified values across pages.
if delimiter == "" {
return s3a.listObjectVersionsNewestFirst(bucket, prefix, keyMarker, versionIdMarker, maxKeys)
}
// Pre-allocate with capacity for maxKeys+1 to reduce reallocations
// The extra 1 is for truncation detection
allVersions := make([]interface{}, 0, maxKeys+1)
@ -241,7 +250,7 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
// - The alternative (collecting all) causes memory issues for buckets with many versions
// - Pagination continues correctly; users can page through to see all versions
maxCollect := maxKeys + 1 // +1 to detect truncation
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, keyMarker, versionIdMarker, delimiter, commonPrefixes, maxCollect)
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, keyMarker, versionIdMarker, delimiter, commonPrefixes, maxCollect, nil)
if err != nil {
glog.Errorf("listObjectVersions: findVersionsRecursively failed: %v", err)
return nil, err
@ -265,6 +274,56 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
return result, nil
}
// listObjectVersionsNewestFirst returns object versions ordered by LastModified (newest-first)
// across all keys. This mode scans the full keyspace to ensure globally consistent ordering.
func (s3a *S3ApiServer) listObjectVersionsNewestFirst(bucket, prefix, keyMarker, versionIdMarker string, maxKeys int) (*S3ListObjectVersionsResult, error) {
// Marker handling: when version-id-marker is empty, skip the entire key.
skipKeyMarkerOnly := keyMarker != "" && versionIdMarker == ""
var markerItem *versionListItem
if keyMarker != "" && versionIdMarker != "" {
lastModified, err := s3a.getVersionLastModified(bucket, keyMarker, versionIdMarker)
if err != nil {
glog.V(2).Infof("listObjectVersionsNewestFirst: marker lookup failed for %s/%s@%s: %v", bucket, keyMarker, versionIdMarker, err)
lastModified = versionIdToTime(versionIdMarker)
}
if !lastModified.IsZero() {
markerItem = &versionListItem{
key: keyMarker,
versionId: versionIdMarker,
lastModified: lastModified,
}
}
}
pager := newVersionListPager(maxKeys, markerItem, skipKeyMarkerOnly, keyMarker)
// Collect all versions without key-marker filtering to ensure global ordering.
allVersions := make([]interface{}, 0)
processedObjects := make(map[string]bool)
seenVersionIds := make(map[string]bool)
bucketPath := s3a.bucketDir(bucket)
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, "", "", "", nil, 0, pager.consider)
if err != nil {
glog.Errorf("listObjectVersionsNewestFirst: findVersionsRecursively failed: %v", err)
return nil, err
}
clear(processedObjects)
clear(seenVersionIds)
combinedList := pager.itemsSorted()
// Apply MaxKeys truncation and determine pagination markers
truncatedList, nextKeyMarker, nextVersionIdMarker, isTruncated := s3a.truncateAndSetMarkers(combinedList, maxKeys)
// Build the final response by splitting items back into their respective fields
result := s3a.splitIntoResult(truncatedList, bucket, prefix, keyMarker, versionIdMarker, "", maxKeys, isTruncated, nextKeyMarker, nextVersionIdMarker)
return result, nil
}
// buildSortedCombinedList merges versions and common prefixes into a single list
// sorted lexicographically by key, with versions preceding prefixes for the same key.
func (s3a *S3ApiServer) buildSortedCombinedList(allVersions []interface{}, commonPrefixes map[string]bool) []versionListItem {
@ -273,19 +332,23 @@ func (s3a *S3ApiServer) buildSortedCombinedList(allVersions []interface{}, commo
// Add versions
for _, version := range allVersions {
var key, versionId string
var lastModified time.Time
switch v := version.(type) {
case *VersionEntry:
key = v.Key
versionId = v.VersionId
lastModified = v.LastModified
case *DeleteMarkerEntry:
key = v.Key
versionId = v.VersionId
lastModified = v.LastModified
}
combinedList = append(combinedList, versionListItem{
key: key,
versionId: versionId,
isPrefix: false,
versionData: version,
key: key,
versionId: versionId,
isPrefix: false,
lastModified: lastModified,
versionData: version,
})
}
@ -313,6 +376,133 @@ func (s3a *S3ApiServer) buildSortedCombinedList(allVersions []interface{}, commo
return combinedList
}
// compareVersionListItemsByLastModified compares two list items using newest-first ordering.
// Returns negative if a should come before b, positive if b should come before a, 0 if equal.
func compareVersionListItemsByLastModified(a, b versionListItem) int {
if a.isPrefix != b.isPrefix {
if a.isPrefix {
return 1
}
return -1
}
if !a.isPrefix {
if !a.lastModified.Equal(b.lastModified) {
if a.lastModified.After(b.lastModified) {
return -1
}
return 1
}
if a.key != b.key {
if a.key < b.key {
return -1
}
return 1
}
if a.versionId != b.versionId {
return compareVersionIds(a.versionId, b.versionId)
}
return 0
}
if a.key != b.key {
if a.key < b.key {
return -1
}
return 1
}
return 0
}
type versionListItemHeap []versionListItem
func (h versionListItemHeap) Len() int { return len(h) }
func (h versionListItemHeap) Less(i, j int) bool {
// Oldest first so we can evict the oldest when we exceed maxKeys+1.
return compareVersionListItemsByLastModified(h[i], h[j]) > 0
}
func (h versionListItemHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *versionListItemHeap) Push(x interface{}) {
*h = append(*h, x.(versionListItem))
}
func (h *versionListItemHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[:n-1]
return item
}
type versionListPager struct {
maxKeys int
markerItem *versionListItem
skipKeyMarkerOnly bool
keyMarker string
items versionListItemHeap
}
func newVersionListPager(maxKeys int, markerItem *versionListItem, skipKeyMarkerOnly bool, keyMarker string) *versionListPager {
return &versionListPager{
maxKeys: maxKeys,
markerItem: markerItem,
skipKeyMarkerOnly: skipKeyMarkerOnly,
keyMarker: keyMarker,
items: make(versionListItemHeap, 0, maxKeys+1),
}
}
func (p *versionListPager) consider(item versionListItem) {
if p.skipKeyMarkerOnly && item.key == p.keyMarker {
return
}
if p.markerItem != nil {
if compareVersionListItemsByLastModified(item, *p.markerItem) <= 0 {
return
}
}
// Keep only the newest maxKeys+1 items.
if p.maxKeys <= 0 {
return
}
if len(p.items) < p.maxKeys+1 {
heap.Push(&p.items, item)
return
}
if compareVersionListItemsByLastModified(item, p.items[0]) < 0 {
heap.Pop(&p.items)
heap.Push(&p.items, item)
}
}
func (p *versionListPager) itemsSorted() []versionListItem {
items := make([]versionListItem, len(p.items))
copy(items, p.items)
sort.Slice(items, func(i, j int) bool {
return compareVersionListItemsByLastModified(items[i], items[j]) < 0
})
return items
}
func versionIdToTime(versionId string) time.Time {
ts := getVersionTimestamp(versionId)
if ts == 0 {
return time.Time{}
}
return time.Unix(0, ts)
}
func (s3a *S3ApiServer) getVersionLastModified(bucket, object, versionId string) (time.Time, error) {
entry, err := s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
return time.Time{}, err
}
if entry == nil || entry.Attributes == nil {
return time.Time{}, fmt.Errorf("missing entry attributes for %s/%s@%s", bucket, object, versionId)
}
return time.Unix(entry.Attributes.Mtime, 0), nil
}
// truncateAndSetMarkers applies MaxKeys limit and determines pagination markers
func (s3a *S3ApiServer) truncateAndSetMarkers(combinedList []versionListItem, maxKeys int) (truncated []versionListItem, nextKeyMarker, nextVersionIdMarker string, isTruncated bool) {
isTruncated = len(combinedList) > maxKeys
@ -374,6 +564,7 @@ type versionCollector struct {
seenVersionIds map[string]bool
delimiter string
commonPrefixes map[string]bool
emitItem func(item versionListItem)
}
// isFull returns true if we've collected enough versions
@ -447,7 +638,13 @@ func (vc *versionCollector) addVersion(version *ObjectVersion, objectKey string)
LastModified: version.LastModified,
Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey),
}
*vc.allVersions = append(*vc.allVersions, deleteMarker)
vc.emitVersion(versionListItem{
key: objectKey,
versionId: version.VersionId,
isPrefix: false,
lastModified: version.LastModified,
versionData: deleteMarker,
})
} else {
versionEntry := &VersionEntry{
Key: objectKey,
@ -459,8 +656,22 @@ func (vc *versionCollector) addVersion(version *ObjectVersion, objectKey string)
Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey),
StorageClass: StorageClass(vc.s3a.getStorageClassFromExtended(entryExtended(version))),
}
*vc.allVersions = append(*vc.allVersions, versionEntry)
vc.emitVersion(versionListItem{
key: objectKey,
versionId: version.VersionId,
isPrefix: false,
lastModified: version.LastModified,
versionData: versionEntry,
})
}
}
func (vc *versionCollector) emitVersion(item versionListItem) {
if vc.emitItem != nil {
vc.emitItem(item)
return
}
*vc.allVersions = append(*vc.allVersions, item.versionData)
}
// processVersionsDirectory handles a .versions directory entry
@ -530,7 +741,13 @@ func (vc *versionCollector) processExplicitDirectory(entryPath string, entry *fi
Owner: vc.s3a.getObjectOwnerFromEntry(entry),
StorageClass: StorageClass(vc.s3a.getStorageClassFromExtended(entry.Extended)),
}
*vc.allVersions = append(*vc.allVersions, versionEntry)
vc.emitVersion(versionListItem{
key: directoryKey,
versionId: "null",
isPrefix: false,
lastModified: versionEntry.LastModified,
versionData: versionEntry,
})
}
// processRegularFile handles a regular file entry (pre-versioning or suspended-versioning object)
@ -591,14 +808,20 @@ func (vc *versionCollector) processRegularFile(currentPath, entryPath string, en
Owner: vc.s3a.getObjectOwnerFromEntry(entry),
StorageClass: StorageClass(vc.s3a.getStorageClassFromExtended(entry.Extended)),
}
*vc.allVersions = append(*vc.allVersions, versionEntry)
vc.emitVersion(versionListItem{
key: normalizedObjectKey,
versionId: "null",
isPrefix: false,
lastModified: versionEntry.LastModified,
versionData: versionEntry,
})
}
// findVersionsRecursively searches for .versions directories and regular files recursively
// with efficient pagination support. It skips objects before keyMarker and applies versionIdMarker filtering.
// maxCollect limits the number of versions to collect for memory efficiency (must be > 0)
// delimiter and commonPrefixes are used to group keys that share a common prefix
func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix, keyMarker, versionIdMarker, delimiter string, commonPrefixes map[string]bool, maxCollect int) error {
func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix, keyMarker, versionIdMarker, delimiter string, commonPrefixes map[string]bool, maxCollect int, emitItem func(item versionListItem)) error {
vc := &versionCollector{
s3a: s3a,
bucket: bucket,
@ -611,6 +834,7 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
seenVersionIds: seenVersionIds,
delimiter: delimiter,
commonPrefixes: commonPrefixes,
emitItem: emitItem,
}
return vc.collectVersions(currentPath, relativePath)

64
weed/s3api/s3api_object_versioning_newest_first_test.go

@ -0,0 +1,64 @@
package s3api
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestVersionListPagerNewestFirstOrderingAndPagination(t *testing.T) {
base := time.Date(2026, 2, 6, 11, 3, 16, 0, time.UTC)
items := []versionListItem{
{
key: "k10/repo/log_20260206110316",
versionId: "v1",
lastModified: base,
},
{
key: "k10/repo/log_20260206110317",
versionId: "v2",
lastModified: base.Add(1 * time.Second),
},
{
key: "k10/repo/log_20260206110315",
versionId: "v0",
lastModified: base.Add(-1 * time.Second),
},
}
// Page 1: newest-first
pager := newVersionListPager(1, nil, false, "")
for _, item := range []versionListItem{items[0], items[2], items[1]} {
pager.consider(item)
}
sorted := pager.itemsSorted()
s3a := &S3ApiServer{}
truncated, nextKeyMarker, nextVersionIdMarker, isTruncated := s3a.truncateAndSetMarkers(sorted, 1)
require.Len(t, truncated, 1)
require.Equal(t, items[1].key, truncated[0].key)
require.Equal(t, items[1].versionId, truncated[0].versionId)
require.True(t, isTruncated)
require.Equal(t, items[1].key, nextKeyMarker)
require.Equal(t, items[1].versionId, nextVersionIdMarker)
// Page 2: start after marker (should return the next newest item)
marker := &versionListItem{
key: items[1].key,
versionId: items[1].versionId,
lastModified: items[1].lastModified,
}
pager2 := newVersionListPager(1, marker, false, "")
for _, item := range items {
pager2.consider(item)
}
sorted2 := pager2.itemsSorted()
truncated2, _, _, _ := s3a.truncateAndSetMarkers(sorted2, 1)
require.Len(t, truncated2, 1)
require.Equal(t, items[0].key, truncated2[0].key)
require.Equal(t, items[0].versionId, truncated2[0].versionId)
}
Loading…
Cancel
Save