From 3f622409761bdcbe2bf60c8167202f3c5133da20 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 15 Dec 2025 22:51:39 -0800 Subject: [PATCH] s3: add pagination to getObjectVersionList and reduce memory (#7787) * s3: add pagination to getObjectVersionList and reduce memory This commit makes two improvements to S3 version listing: 1. Add pagination to getObjectVersionList: - Previously hardcoded limit of 1000 versions per object - Now paginates through all versions using startFrom marker - Fixes correctness issue where objects with >1000 versions would have some versions missing from list results 2. Reduce memory by not retaining full Entry: - Replace Entry field with OwnerID string in ObjectVersion struct - Extract owner ID when creating ObjectVersion - Avoids retaining Chunks array which can be large for big files - Clear seenVersionIds map after use to help GC 3. Update getObjectOwnerFromVersion: - Use new OwnerID field instead of accessing Entry.Extended - Maintains backward compatibility with fallback lookups * s3: propagate errors from list operation instead of returning partial results Address review feedback: when s3a.list fails during version listing, the function was logging at V(2) level and returning partial results with nil error. This hides the error and could lead to silent data inconsistencies. Fix by: 1. Log at Warningf level for better visibility 2. Return nil versions slice with the error to prevent partial results from being processed as complete --- weed/s3api/s3api_object_versioning.go | 136 +++++++++++++++----------- 1 file changed, 80 insertions(+), 56 deletions(-) diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index 80bd805fe..f33d90c8b 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -61,6 +61,8 @@ type ListObjectVersionsResult struct { } // ObjectVersion represents a version of an S3 object +// Note: We intentionally do not store the full filer_pb.Entry here to avoid +// retaining large Chunks arrays in memory during list operations. type ObjectVersion struct { VersionId string IsLatest bool @@ -68,7 +70,7 @@ type ObjectVersion struct { LastModified time.Time ETag string Size int64 - Entry *filer_pb.Entry + OwnerID string // Owner ID extracted from entry metadata } // generateVersionId creates a unique version ID that preserves chronological order @@ -531,6 +533,7 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string } // getObjectVersionList returns all versions of a specific object +// Uses pagination to handle objects with more than 1000 versions func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) { var versions []*ObjectVersion @@ -558,72 +561,96 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe } } - // List all version files in the .versions directory - entries, _, err := s3a.list(bucketDir+"/"+versionsObjectPath, "", "", false, 1000) - if err != nil { - glog.V(2).Infof("getObjectVersionList: failed to list version files: %v", err) - return versions, nil - } - - glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries)) - // Use a map to detect and prevent duplicate version IDs seenVersionIds := make(map[string]bool) + versionsDir := bucketDir + "/" + versionsObjectPath - for i, entry := range entries { - if entry.Extended == nil { - glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i) - continue - } + // Paginate through all version files in the .versions directory + startFrom := "" + const pageSize = 1000 + totalEntries := 0 - versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey] - if !hasVersionId { - glog.V(2).Infof("getObjectVersionList: entry %d has no version ID, skipping", i) - continue + for { + entries, isLast, err := s3a.list(versionsDir, "", startFrom, false, pageSize) + if err != nil { + glog.Warningf("getObjectVersionList: failed to list version files in %s: %v", versionsDir, err) + return nil, err } - versionId := string(versionIdBytes) + totalEntries += len(entries) - // Check for duplicate version IDs and skip if already seen - if seenVersionIds[versionId] { - glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object) - continue - } - seenVersionIds[versionId] = true + for i, entry := range entries { + // Track last entry for pagination + startFrom = entry.Name - // Check if this version is the latest by comparing with directory metadata - isLatest := (versionId == latestVersionId) + if entry.Extended == nil { + glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i) + continue + } - isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey] - isDeleteMarker := string(isDeleteMarkerBytes) == "true" + versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey] + if !hasVersionId { + glog.V(2).Infof("getObjectVersionList: entry %d has no version ID, skipping", i) + continue + } - glog.V(2).Infof("getObjectVersionList: found version %s, isLatest=%v, isDeleteMarker=%v", versionId, isLatest, isDeleteMarker) + versionId := string(versionIdBytes) - version := &ObjectVersion{ - VersionId: versionId, - IsLatest: isLatest, - IsDeleteMarker: isDeleteMarker, - LastModified: time.Unix(entry.Attributes.Mtime, 0), - Entry: entry, - } + // Check for duplicate version IDs and skip if already seen + if seenVersionIds[versionId] { + glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object) + continue + } + seenVersionIds[versionId] = true - if !isDeleteMarker { - // Try to get ETag from Extended attributes first - if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag { - version.ETag = string(etagBytes) - } else { - // Fallback: calculate ETag from chunks - version.ETag = s3a.calculateETagFromChunks(entry.Chunks) + // Check if this version is the latest by comparing with directory metadata + isLatest := (versionId == latestVersionId) + + isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey] + isDeleteMarker := string(isDeleteMarkerBytes) == "true" + + glog.V(2).Infof("getObjectVersionList: found version %s, isLatest=%v, isDeleteMarker=%v", versionId, isLatest, isDeleteMarker) + + // Extract owner ID from entry metadata to avoid retaining full Entry with Chunks + var ownerID string + if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists { + ownerID = string(ownerBytes) + } + + version := &ObjectVersion{ + VersionId: versionId, + IsLatest: isLatest, + IsDeleteMarker: isDeleteMarker, + LastModified: time.Unix(entry.Attributes.Mtime, 0), + OwnerID: ownerID, + } + + if !isDeleteMarker { + // Try to get ETag from Extended attributes first + if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag { + version.ETag = string(etagBytes) + } else { + // Fallback: calculate ETag from chunks + version.ETag = s3a.calculateETagFromChunks(entry.Chunks) + } + version.Size = int64(entry.Attributes.FileSize) } - version.Size = int64(entry.Attributes.FileSize) + + versions = append(versions, version) } - versions = append(versions, version) + // Stop if we've reached the last page + if isLast || len(entries) < pageSize { + break + } } + // Clear map to help GC + clear(seenVersionIds) + // Don't sort here - let the main listObjectVersions function handle sorting consistently - glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, len(entries)) + glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, totalEntries) for i, version := range versions { glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker) } @@ -988,15 +1015,12 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb return latestVersionEntry, nil } -// getObjectOwnerFromVersion extracts object owner information from version entry metadata +// getObjectOwnerFromVersion extracts object owner information from version metadata func (s3a *S3ApiServer) getObjectOwnerFromVersion(version *ObjectVersion, bucket, objectKey string) CanonicalUser { - // First try to get owner from the version entry itself - if version.Entry != nil && version.Entry.Extended != nil { - if ownerBytes, exists := version.Entry.Extended[s3_constants.ExtAmzOwnerKey]; exists { - ownerId := string(ownerBytes) - ownerDisplayName := s3a.iam.GetAccountNameById(ownerId) - return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName} - } + // First try to get owner from the version's OwnerID field (extracted during listing) + if version.OwnerID != "" { + ownerDisplayName := s3a.iam.GetAccountNameById(version.OwnerID) + return CanonicalUser{ID: version.OwnerID, DisplayName: ownerDisplayName} } // Fallback: try to get owner from the current version of the object