@ -264,225 +264,236 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
// findVersionsRecursively searches for all .versions directories and regular files recursively
// findVersionsRecursively searches for all .versions directories and regular files recursively
func ( s3a * S3ApiServer ) findVersionsRecursively ( currentPath , relativePath string , allVersions * [ ] interface { } , processedObjects map [ string ] bool , seenVersionIds map [ string ] bool , bucket , prefix string ) error {
func ( s3a * S3ApiServer ) findVersionsRecursively ( currentPath , relativePath string , allVersions * [ ] interface { } , processedObjects map [ string ] bool , seenVersionIds map [ string ] bool , bucket , prefix string ) error {
// List entries in current directory
entries , _ , err := s3a . list ( currentPath , "" , "" , false , 1000 )
if err != nil {
return err
}
// List entries in current directory with pagination
startFrom := ""
for {
entries , isLast , err := s3a . list ( currentPath , "" , startFrom , false , filer . PaginationSize )
if err != nil {
return err
}
for _ , entry := range entries {
entryPath := path . Join ( relativePath , entry . Name )
// Skip if this doesn't match the prefix filter
if normalizedPrefix := strings . TrimPrefix ( prefix , "/" ) ; normalizedPrefix != "" {
// An entry is a candidate if:
// 1. Its path is a match for the prefix.
// 2. It is a directory that is an ancestor of the prefix path, so we must descend into it.
// Condition 1: The entry's path starts with the prefix.
isMatch := strings . HasPrefix ( entryPath , normalizedPrefix )
if ! isMatch && entry . IsDirectory {
// Also check if a directory entry matches a directory-style prefix (e.g., prefix "a/", entry "a").
isMatch = strings . HasPrefix ( entryPath + "/" , normalizedPrefix )
}
for _ , entry := range entries {
// Track last entry name for pagination
startFrom = entry . Name
// Condition 2: The prefix path starts with the entry's path (and it's a directory).
canDescend := entry . IsDirectory && strings . HasPrefix ( normalizedPrefix , entryPath )
entryPath := path . Join ( relativePath , entry . Name )
if ! isMatch && ! canDescend {
continue
}
}
// Skip if this doesn't match the prefix filter
if normalizedPrefix := strings . TrimPrefix ( prefix , "/" ) ; normalizedPrefix != "" {
// An entry is a candidate if:
// 1. Its path is a match for the prefix.
// 2. It is a directory that is an ancestor of the prefix path, so we must descend into it.
if entry . IsDirectory {
// Skip .uploads directory (multipart upload temporary files)
if strings . HasPrefix ( entry . Name , ".uploads" ) {
continue
}
// Condition 1: The entry's path starts with the prefix.
isMatch := strings . HasPrefix ( entryPath , normalizedPrefix )
if ! isMatch && entry . IsDirectory {
// Also check if a directory entry matches a directory-style prefix (e.g., prefix "a/", entry "a").
isMatch = strings . HasPrefix ( entryPath + "/" , normalizedPrefix )
}
// Check if this is a .versions directory
if strings . HasSuffix ( entry . Name , s3_constants . VersionsFolder ) {
// Extract object name from .versions directory name
objectKey := strings . TrimSuffix ( entryPath , s3_constants . VersionsFolder )
normalizedObjectKey := removeDuplicateSlashes ( objectKey )
// Mark both keys as processed for backward compatibility
processedObjects [ objectKey ] = true
processedObjects [ normalizedObjectKey ] = true
// Condition 2: The prefix path starts with the entry's path (and it's a directory).
canDescend := entry . IsDirectory && strings . HasPrefix ( normalizedPrefix , entryPath )
glog . V ( 2 ) . Infof ( "Found .versions directory for object %s (normalized: %s)" , objectKey , normalizedObjectKey )
if ! isMatch && ! canDescend {
continue
}
}
versions , err := s3a . getObjectVersionList ( bucket , normalizedObjectKey )
if err != nil {
glog . Warningf ( "Failed to get versions for object %s (normalized: %s): %v" , objectKey , normalizedObjectKey , err )
if entry . IsDirectory {
// Skip .uploads directory (multipart upload temporary files)
if strings . HasPrefix ( entry . Name , ".uploads" ) {
continue
continue
}
}
for _ , version := range versions {
// Check for duplicate version IDs and skip if already seen
// Use normalized key for deduplication
versionKey := normalizedObjectKey + ":" + version . VersionId
if seenVersionIds [ versionKey ] {
glog . Warningf ( "findVersionsRecursively: duplicate version %s for object %s detected, skipping" , version . VersionId , normalizedObjectKey )
// Check if this is a .versions directory
if strings . HasSuffix ( entry . Name , s3_constants . VersionsFolder ) {
// Extract object name from .versions directory name
objectKey := strings . TrimSuffix ( entryPath , s3_constants . VersionsFolder )
normalizedObjectKey := removeDuplicateSlashes ( objectKey )
// Mark both keys as processed for backward compatibility
processedObjects [ objectKey ] = true
processedObjects [ normalizedObjectKey ] = true
glog . V ( 2 ) . Infof ( "Found .versions directory for object %s (normalized: %s)" , objectKey , normalizedObjectKey )
versions , err := s3a . getObjectVersionList ( bucket , normalizedObjectKey )
if err != nil {
glog . Warningf ( "Failed to get versions for object %s (normalized: %s): %v" , objectKey , normalizedObjectKey , err )
continue
continue
}
}
seenVersionIds [ versionKey ] = true
if version . IsDeleteMarker {
glog . V ( 4 ) . Infof ( "Adding delete marker from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s" ,
normalizedObjectKey , version . VersionId , version . IsLatest , versionKey )
deleteMarker := & DeleteMarkerEntry {
Key : normalizedObjectKey , // Use normalized key for consistency
VersionId : version . VersionId ,
IsLatest : version . IsLatest ,
LastModified : version . LastModified ,
Owner : s3a . getObjectOwnerFromVersion ( version , bucket , normalizedObjectKey ) ,
for _ , version := range versions {
// Check for duplicate version IDs and skip if already seen
// Use normalized key for deduplication
versionKey := normalizedObjectKey + ":" + version . VersionId
if seenVersionIds [ versionKey ] {
glog . Warningf ( "findVersionsRecursively: duplicate version %s for object %s detected, skipping" , version . VersionId , normalizedObjectKey )
continue
}
}
* allVersions = append ( * allVersions , deleteMarker )
} else {
glog . V ( 4 ) . Infof ( "Adding version from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s" ,
normalizedObjectKey , version . VersionId , version . IsLatest , versionKey )
seenVersionIds [ versionKey ] = true
if version . IsDeleteMarker {
glog . V ( 4 ) . Infof ( "Adding delete marker from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s" ,
normalizedObjectKey , version . VersionId , version . IsLatest , versionKey )
deleteMarker := & DeleteMarkerEntry {
Key : normalizedObjectKey , // Use normalized key for consistency
VersionId : version . VersionId ,
IsLatest : version . IsLatest ,
LastModified : version . LastModified ,
Owner : s3a . getObjectOwnerFromVersion ( version , bucket , normalizedObjectKey ) ,
}
* allVersions = append ( * allVersions , deleteMarker )
} else {
glog . V ( 4 ) . Infof ( "Adding version from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s" ,
normalizedObjectKey , version . VersionId , version . IsLatest , versionKey )
versionEntry := & VersionEntry {
Key : normalizedObjectKey , // Use normalized key for consistency
VersionId : version . VersionId ,
IsLatest : version . IsLatest ,
LastModified : version . LastModified ,
ETag : version . ETag ,
Size : version . Size ,
Owner : s3a . getObjectOwnerFromVersion ( version , bucket , normalizedObjectKey ) ,
StorageClass : "STANDARD" ,
}
* allVersions = append ( * allVersions , versionEntry )
}
}
} else {
// This is a regular directory - check if it's an explicit S3 directory object
// Only include directories that were explicitly created via S3 API (have FolderMimeType)
// This excludes implicit directories created when uploading files like "test1/a"
if entry . Attributes . Mime == s3_constants . FolderMimeType {
directoryKey := entryPath
if ! strings . HasSuffix ( directoryKey , "/" ) {
directoryKey += "/"
}
// Add directory as a version entry with VersionId "null" (following S3/Minio behavior)
glog . V ( 2 ) . Infof ( "findVersionsRecursively: found explicit S3 directory %s" , directoryKey )
// Calculate ETag for empty directory
directoryETag := "\"d41d8cd98f00b204e9800998ecf8427e\""
versionEntry := & VersionEntry {
versionEntry := & VersionEntry {
Key : normalizedObjectKey , // Use normalized key for consistency
VersionId : version . VersionId ,
IsLatest : version . IsLatest ,
LastModified : version . LastModified ,
ETag : version . ETag ,
Size : version . Size ,
Owner : s3a . getObjectOwnerFromVersion ( version , bucket , normalizedObjectKey ) ,
Key : directoryKey ,
VersionId : "null" ,
IsLatest : true ,
LastModified : time . Unix ( entry . Attributes . Mtime , 0 ) ,
ETag : directory ETag,
Size : 0 , // Directories have size 0
Owner : s3a . getObjectOwnerFromEntry ( entr y ) ,
StorageClass : "STANDARD" ,
StorageClass : "STANDARD" ,
}
}
* allVersions = append ( * allVersions , versionEntry )
* allVersions = append ( * allVersions , versionEntry )
}
}
}
} else {
// This is a regular directory - check if it's an explicit S3 directory object
// Only include directories that were explicitly created via S3 API (have FolderMimeType)
// This excludes implicit directories created when uploading files like "test1/a"
if entry . Attributes . Mime == s3_constants . FolderMimeType {
directoryKey := entryPath
if ! strings . HasSuffix ( directoryKey , "/" ) {
directoryKey += "/"
}
// Add directory as a version entry with VersionId "null" (following S3/Minio behavior)
glog . V ( 2 ) . Infof ( "findVersionsRecursively: found explicit S3 directory %s" , directoryKey )
// Calculate ETag for empty directory
directoryETag := "\"d41d8cd98f00b204e9800998ecf8427e\""
versionEntry := & VersionEntry {
Key : directoryKey ,
VersionId : "null" ,
IsLatest : true ,
LastModified : time . Unix ( entry . Attributes . Mtime , 0 ) ,
ETag : directoryETag ,
Size : 0 , // Directories have size 0
Owner : s3a . getObjectOwnerFromEntry ( entry ) ,
StorageClass : "STANDARD" ,
// Recursively search subdirectories (regardless of whether they're explicit or implicit)
fullPath := path . Join ( currentPath , entry . Name )
err := s3a . findVersionsRecursively ( fullPath , entryPath , allVersions , processedObjects , seenVersionIds , bucket , prefix )
if err != nil {
glog . Warningf ( "Error searching subdirectory %s: %v" , entryPath , err )
continue
}
}
* allVersions = append ( * allVersions , versionEntry )
}
}
} else {
// This is a regular file - check if it's a pre-versioning object
objectKey := entryPath
// Normalize object key to ensure consistency with other version operations
normalizedObjectKey := removeDuplicateSlashes ( objectKey )
// Recursively search subdirectories (regardless of whether they're explicit or implicit)
fullPath := path . Join ( currentPath , entry . Name )
err := s3a . findVersionsRecursively ( fullPath , entryPath , allVersions , processedObjects , seenVersionIds , bucket , prefix )
if err != nil {
glog . Warningf ( "Error searching subdirectory %s: %v" , entryPath , err )
// Skip if this object already has a .versions directory (already processed )
// Check both normalized and original keys for backward compatibility
if processedObjects [ objectKey ] || processedObjects [ normalizedObjectKey ] {
glog . V ( 4 ) . Infof ( "Skipping already processed object: objectKey=%s, normalizedObjectKey=%s, processedObjects[objectKey]=%v, processedObjects[normalizedObjectKey]=%v" ,
objectKey , normalizedObjectKey , processedObjects [ objectKey ] , processedObjects [ normalizedObjectKey ] )
continue
continue
}
}
}
} else {
// This is a regular file - check if it's a pre-versioning object
objectKey := entryPath
// Normalize object key to ensure consistency with other version operations
normalizedObjectKey := removeDuplicateSlashes ( objectKey )
// Skip if this object already has a .versions directory (already processed)
// Check both normalized and original keys for backward compatibility
if processedObjects [ objectKey ] || processedObjects [ normalizedObjectKey ] {
glog . V ( 4 ) . Infof ( "Skipping already processed object: objectKey=%s, normalizedObjectKey=%s, processedObjects[objectKey]=%v, processedObjects[normalizedObjectKey]=%v" ,
objectKey , normalizedObjectKey , processedObjects [ objectKey ] , processedObjects [ normalizedObjectKey ] )
continue
}
glog . V ( 4 ) . Infof ( "Processing regular file: objectKey=%s, normalizedObjectKey=%s, NOT in processedObjects" , objectKey , normalizedObjectKey )
glog . V ( 4 ) . Infof ( "Processing regular file: objectKey=%s, normalizedObjectKey=%s, NOT in processedObjects" , objectKey , normalizedObjectKey )
// This is a pre-versioning or suspended-versioning object
// Check if this file has version metadata (ExtVersionIdKey)
hasVersionMeta := false
if entry . Extended != nil {
if versionIdBytes , ok := entry . Extended [ s3_constants . ExtVersionIdKey ] ; ok {
hasVersionMeta = true
glog . V ( 4 ) . Infof ( "Regular file %s has version metadata: %s" , normalizedObjectKey , string ( versionIdBytes ) )
// This is a pre-versioning or suspended-versioning object
// Check if this file has version metadata (ExtVersionIdKey)
hasVersionMeta := false
if entry . Extended != nil {
if versionIdBytes , ok := entry . Extended [ s3_constants . ExtVersionIdKey ] ; ok {
hasVersionMeta = true
glog . V ( 4 ) . Infof ( "Regular file %s has version metadata: %s" , normalizedObjectKey , string ( versionIdBytes ) )
}
}
}
}
// Check if a .versions directory exists for this object
versionsObjectPath := normalizedObjectKey + s3_constants . VersionsFolder
_ , versionsErr := s3a . getEntry ( currentPath , versionsObjectPath )
if versionsErr == nil {
// .versions directory exists
glog . V ( 4 ) . Infof ( "Found .versions directory for regular file %s, hasVersionMeta=%v" , normalizedObjectKey , hasVersionMeta )
// If this file has version metadata, it's a suspended versioning null version
// Include it and it will be the latest
if hasVersionMeta {
glog . V ( 4 ) . Infof ( "Including suspended versioning file %s (has version metadata)" , normalizedObjectKey )
// Continue to add it below
} else {
// No version metadata - this is a pre-versioning file
// Skip it if there's already a null version in .versions
versions , err := s3a . getObjectVersionList ( bucket , normalizedObjectKey )
if err == nil {
hasNullVersion := false
for _ , v := range versions {
if v . VersionId == "null" {
hasNullVersion = true
break
// Check if a .versions directory exists for this object
versionsObjectPath := normalizedObjectKey + s3_constants . VersionsFolder
_ , versionsErr := s3a . getEntry ( currentPath , versionsObjectPath )
if versionsErr == nil {
// .versions directory exists
glog . V ( 4 ) . Infof ( "Found .versions directory for regular file %s, hasVersionMeta=%v" , normalizedObjectKey , hasVersionMeta )
// If this file has version metadata, it's a suspended versioning null version
// Include it and it will be the latest
if hasVersionMeta {
glog . V ( 4 ) . Infof ( "Including suspended versioning file %s (has version metadata)" , normalizedObjectKey )
// Continue to add it below
} else {
// No version metadata - this is a pre-versioning file
// Skip it if there's already a null version in .versions
versions , err := s3a . getObjectVersionList ( bucket , normalizedObjectKey )
if err == nil {
hasNullVersion := false
for _ , v := range versions {
if v . VersionId == "null" {
hasNullVersion = true
break
}
}
if hasNullVersion {
glog . V ( 4 ) . Infof ( "Skipping pre-versioning file %s, null version exists in .versions" , normalizedObjectKey )
processedObjects [ objectKey ] = true
processedObjects [ normalizedObjectKey ] = true
continue
}
}
}
}
if hasNullVersion {
glog . V ( 4 ) . Infof ( "Skipping pre-versioning file %s, null version exists in .versions" , normalizedObjectKey )
processedObjects [ objectKey ] = true
processedObjects [ normalizedObjectKey ] = true
continue
}
glog . V ( 4 ) . Infof ( "Including pre-versioning file %s (no null version in .versions)" , normalizedObjectKey )
}
}
glog . V ( 4 ) . Infof ( "Including pre-versioning file %s (no null version in .versions)" , normalizedObjectKey )
} else {
glog . V ( 4 ) . Infof ( "No .versions directory for regular file %s, hasVersionMeta=%v" , normalizedObjectKey , hasVersionMeta )
}
}
} else {
glog . V ( 4 ) . Infof ( "No .versions directory for regular file %s, hasVersionMeta=%v" , normalizedObjectKey , hasVersionMeta )
}
// Add this file as a null version with IsLatest=true
isLatest := true
// Add this file as a null version with IsLatest=true
isLatest := true
// Check for duplicate version IDs and skip if already seen
// Use normalized key for deduplication to match how other version operations work
versionKey := normalizedObjectKey + ":null"
if seenVersionIds [ versionKey ] {
glog . Warningf ( "findVersionsRecursively: duplicate null version for object %s detected (versionKey=%s), skipping" , normalizedObjectKey , versionKey )
continue
}
seenVersionIds [ versionKey ] = true
etag := s3a . calculateETagFromChunks ( entry . Chunks )
glog . V ( 4 ) . Infof ( "Adding null version from regular file: objectKey=%s, normalizedObjectKey=%s, versionKey=%s, isLatest=%v, hasVersionMeta=%v" ,
objectKey , normalizedObjectKey , versionKey , isLatest , hasVersionMeta )
versionEntry := & VersionEntry {
Key : normalizedObjectKey , // Use normalized key for consistency
VersionId : "null" ,
IsLatest : isLatest ,
LastModified : time . Unix ( entry . Attributes . Mtime , 0 ) ,
ETag : etag ,
Size : int64 ( entry . Attributes . FileSize ) ,
Owner : s3a . getObjectOwnerFromEntry ( entry ) ,
StorageClass : "STANDARD" ,
// Check for duplicate version IDs and skip if already seen
// Use normalized key for deduplication to match how other version operations work
versionKey := normalizedObjectKey + ":null"
if seenVersionIds [ versionKey ] {
glog . Warningf ( "findVersionsRecursively: duplicate null version for object %s detected (versionKey=%s), skipping" , normalizedObjectKey , versionKey )
continue
}
seenVersionIds [ versionKey ] = true
etag := s3a . calculateETagFromChunks ( entry . Chunks )
glog . V ( 4 ) . Infof ( "Adding null version from regular file: objectKey=%s, normalizedObjectKey=%s, versionKey=%s, isLatest=%v, hasVersionMeta=%v" ,
objectKey , normalizedObjectKey , versionKey , isLatest , hasVersionMeta )
versionEntry := & VersionEntry {
Key : normalizedObjectKey , // Use normalized key for consistency
VersionId : "null" ,
IsLatest : isLatest ,
LastModified : time . Unix ( entry . Attributes . Mtime , 0 ) ,
ETag : etag ,
Size : int64 ( entry . Attributes . FileSize ) ,
Owner : s3a . getObjectOwnerFromEntry ( entry ) ,
StorageClass : "STANDARD" ,
}
* allVersions = append ( * allVersions , versionEntry )
}
}
* allVersions = append ( * allVersions , versionEntry )
}
// If we've reached the last page, stop pagination
if isLast {
break
}
}
}
}