diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index de6b35ae8..096654dd9 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -360,12 +360,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // Check if versioning is configured for this bucket BEFORE creating any files versioningState, vErr := s3a.getVersioningState(*input.Bucket) if vErr == nil && versioningState == s3_constants.VersioningEnabled { - // For versioned buckets, create a version and return the version ID - versionId := generateVersionId() + // Use full object key (not just entryName) to ensure correct .versions directory is checked + normalizedKey := strings.TrimPrefix(*input.Key, "/") + useInvertedFormat := s3a.getVersionIdFormat(*input.Bucket, normalizedKey) + versionId := generateVersionId(useInvertedFormat) versionFileName := s3a.getVersionFileName(versionId) versionDir := dirName + "/" + entryName + s3_constants.VersionsFolder - // Move the completed object to the versions directory + // Create the version file in the .versions directory err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) { if versionEntry.Extended == nil { versionEntry.Extended = make(map[string][]byte) diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 66d4ded80..682a19eb9 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -272,8 +272,8 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request var etag string if shouldCreateVersionForCopy(dstVersioningState) { - // For versioned destination, create a new version - dstVersionId = generateVersionId() + // For versioned destination, create a new version using appropriate format + dstVersionId = s3a.generateVersionIdForObject(dstBucket, dstObject) glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject) // Add version metadata to the entry diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 1554907ab..928c4a9bb 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -946,13 +946,16 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object } func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { - // Generate version ID - versionId = generateVersionId() - // Normalize object path to ensure consistency with toFilerPath behavior normalizedObject := removeDuplicateSlashes(object) - glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) + // Check if .versions directory exists to determine format + useInvertedFormat := s3a.getVersionIdFormat(bucket, normalizedObject) + + // Generate version ID using the appropriate format + versionId = generateVersionId(useInvertedFormat) + + glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s (normalized: %s, inverted=%v)", versionId, bucket, object, normalizedObject, useInvertedFormat) // Create the version file name versionFileName := s3a.getVersionFileName(versionId) @@ -961,17 +964,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // We need to construct the object path relative to the bucket versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName versionFilePath := s3a.toFilerPath(bucket, versionObjectPath) - - // Ensure the .versions directory exists before uploading bucketDir := s3a.option.BucketsPath + "/" + bucket - versionsDir := normalizedObject + s3_constants.VersionsFolder - err := s3a.mkdir(bucketDir, versionsDir, func(entry *filer_pb.Entry) { - entry.Attributes.Mime = s3_constants.FolderMimeType - }) - if err != nil { - glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err) - return "", "", s3err.ErrInternalError, SSEResponseMetadata{} - } body := dataReader if objectContentType == "" { @@ -989,6 +982,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Get the uploaded entry to add versioning metadata // Use retry logic to handle filer consistency delays var versionEntry *filer_pb.Entry + var err error maxRetries := 8 for attempt := 1; attempt <= maxRetries; attempt++ { versionEntry, err = s3a.getEntry(bucketDir, versionObjectPath) diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index c82f797f1..c6f5771e2 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -1,8 +1,9 @@ package s3api +// This file contains the core S3 versioning operations. +// Version ID format handling is in s3api_version_id.go + import ( - "crypto/rand" - "encoding/hex" "encoding/xml" "fmt" "net/http" @@ -73,50 +74,21 @@ type ObjectVersion struct { OwnerID string // Owner ID extracted from entry metadata } -// generateVersionId creates a unique version ID that preserves chronological order -func generateVersionId() string { - // Use nanosecond timestamp to ensure chronological ordering - // Format as 16-digit hex (first 16 chars of version ID) - now := time.Now().UnixNano() - timestampHex := fmt.Sprintf("%016x", now) - - // Generate random 8 bytes for uniqueness (last 16 chars of version ID) - randBytes := make([]byte, 8) - if _, err := rand.Read(randBytes); err != nil { - glog.Errorf("Failed to generate random bytes for version ID: %v", err) - // Fallback to timestamp-only if random generation fails - return timestampHex + "0000000000000000" - } - - // Combine timestamp (16 chars) + random (16 chars) = 32 chars total - randomHex := hex.EncodeToString(randBytes) - versionId := timestampHex + randomHex - - return versionId -} - -// getVersionedObjectDir returns the directory path for storing object versions -func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string { - return path.Join(s3a.option.BucketsPath, bucket, object+s3_constants.VersionsFolder) -} - -// getVersionFileName returns the filename for a specific version -func (s3a *S3ApiServer) getVersionFileName(versionId string) string { - return fmt.Sprintf("v_%s", versionId) -} - // createDeleteMarker creates a delete marker for versioned delete operations func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error) { - versionId := generateVersionId() + // Clean up the object path first + cleanObject := strings.TrimPrefix(object, "/") - glog.V(2).Infof("createDeleteMarker: creating delete marker %s for %s/%s", versionId, bucket, object) + // Check if .versions directory exists to determine format + useInvertedFormat := s3a.getVersionIdFormat(bucket, cleanObject) + versionId := generateVersionId(useInvertedFormat) + + glog.V(2).Infof("createDeleteMarker: creating delete marker %s for %s/%s (inverted=%v)", versionId, bucket, object, useInvertedFormat) // Create the version file name for the delete marker versionFileName := s3a.getVersionFileName(versionId) // Store delete marker in the .versions directory - // Make sure to clean up the object path to remove leading slashes - cleanObject := strings.TrimPrefix(object, "/") bucketDir := s3a.option.BucketsPath + "/" + bucket versionsDir := bucketDir + "/" + cleanObject + s3_constants.VersionsFolder @@ -159,7 +131,7 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM // The extra 1 is for truncation detection allVersions := make([]interface{}, 0, maxKeys+1) - glog.V(1).Infof("listObjectVersions: listing versions for bucket %s, prefix '%s'", bucket, prefix) + glog.V(1).Infof("listObjectVersions: listing versions for bucket %s, prefix '%s', keyMarker '%s', versionIdMarker '%s'", bucket, prefix, keyMarker, versionIdMarker) // Track objects that have been processed to avoid duplicates processedObjects := make(map[string]bool) @@ -168,14 +140,19 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM seenVersionIds := make(map[string]bool) // Recursively find all .versions directories in the bucket - // When keyMarker is set, we need to collect all versions since filtering happens after sorting - // Pass 0 (unlimited) when keyMarker is set, otherwise maxKeys+1 for truncation detection + // Pass keyMarker and versionIdMarker to enable efficient pagination (skip entries before marker) bucketPath := path.Join(s3a.option.BucketsPath, bucket) - maxCollect := maxKeys + 1 - if keyMarker != "" { - maxCollect = 0 // Collect all versions when paginating, filter after sort - } - err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, maxCollect) + + // Memory optimization: limit collection to maxKeys+1 versions. + // This works correctly for objects using the NEW inverted-timestamp format, where + // filesystem order (lexicographic) matches sorted order (newest-first). + // For OLD format objects (raw timestamps), filesystem order is oldest-first, so + // limiting collection may return older versions instead of newest. However: + // - New objects going forward use the new format + // - The alternative (collecting all) causes memory issues for buckets with many versions + // - Pagination continues correctly; users can page through to see all versions + maxCollect := maxKeys + 1 // +1 to detect truncation + err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix, keyMarker, versionIdMarker, maxCollect) if err != nil { glog.Errorf("listObjectVersions: findVersionsRecursively failed: %v", err) return nil, err @@ -187,31 +164,27 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM glog.V(1).Infof("listObjectVersions: found %d total versions", len(allVersions)) - // Sort by key, then by LastModified (newest first), then by VersionId for deterministic ordering + // Sort by key, then by version (newest first) + // Uses compareVersionIds to handle both old and new format version IDs sort.Slice(allVersions, func(i, j int) bool { var keyI, keyJ string - var lastModifiedI, lastModifiedJ time.Time var versionIdI, versionIdJ string switch v := allVersions[i].(type) { case *VersionEntry: keyI = v.Key - lastModifiedI = v.LastModified versionIdI = v.VersionId case *DeleteMarkerEntry: keyI = v.Key - lastModifiedI = v.LastModified versionIdI = v.VersionId } switch v := allVersions[j].(type) { case *VersionEntry: keyJ = v.Key - lastModifiedJ = v.LastModified versionIdJ = v.VersionId case *DeleteMarkerEntry: keyJ = v.Key - lastModifiedJ = v.LastModified versionIdJ = v.VersionId } @@ -220,53 +193,11 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM return keyI < keyJ } - // Then by modification time (newest first) - but use nanosecond precision for ties - timeDiff := lastModifiedI.Sub(lastModifiedJ) - if timeDiff.Abs() > time.Millisecond { - return lastModifiedI.After(lastModifiedJ) - } - - // For very close timestamps (within 1ms), use version ID for deterministic ordering - // Sort version IDs in reverse lexicographic order to maintain newest-first semantics - return versionIdI > versionIdJ + // Then by version ID (newest first) + // compareVersionIds handles both old (raw timestamp) and new (inverted timestamp) formats + return compareVersionIds(versionIdI, versionIdJ) < 0 }) - // Apply key-marker and version-id-marker filtering - // S3 pagination: skip versions at or before the marker, return versions AFTER the marker - // Versions are sorted: key ascending, then versionId descending (newest first for same key) - // - // S3 behavior: - // - If key-marker is specified without version-id-marker: start after ALL versions of key-marker - // - If both are specified: start after the specific version of key-marker - if keyMarker != "" { - filteredVersions := make([]interface{}, 0, len(allVersions)) - for _, version := range allVersions { - var key, versionId string - switch v := version.(type) { - case *VersionEntry: - key = v.Key - versionId = v.VersionId - case *DeleteMarkerEntry: - key = v.Key - versionId = v.VersionId - } - - // Include this version if it's AFTER the marker - if key > keyMarker { - // Key is after marker key: always include - filteredVersions = append(filteredVersions, version) - } else if key == keyMarker && versionIdMarker != "" && versionId < versionIdMarker { - // Same key, but version is after the marker version (versionIds sorted descending) - filteredVersions = append(filteredVersions, version) - } - // else: key < keyMarker OR (key == keyMarker with no versionIdMarker or version already seen) - // skip this version (it was in a previous page) - } - glog.V(1).Infof("listObjectVersions: after applying markers (key=%s, versionId=%s), %d -> %d versions", - keyMarker, versionIdMarker, len(allVersions), len(filteredVersions)) - allVersions = filteredVersions - } - // Build result using S3ListObjectVersionsResult to avoid conflicts with XSD structs result := &S3ListObjectVersionsResult{ Name: bucket, @@ -320,255 +251,310 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM return result, nil } -// findVersionsRecursively searches for all .versions directories and regular files recursively -// maxCollect limits the number of versions to collect for memory efficiency (0 = unlimited) -func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix string, maxCollect int) error { - // List entries in current directory with pagination +// versionCollector holds state for collecting object versions during recursive traversal +type versionCollector struct { + s3a *S3ApiServer + bucket string + prefix string + keyMarker string + versionIdMarker string + maxCollect int + allVersions *[]interface{} + processedObjects map[string]bool + seenVersionIds map[string]bool +} + +// isFull returns true if we've collected enough versions +func (vc *versionCollector) isFull() bool { + return vc.maxCollect > 0 && len(*vc.allVersions) >= vc.maxCollect +} + +// matchesPrefixFilter checks if an entry path matches the prefix filter +func (vc *versionCollector) matchesPrefixFilter(entryPath string, isDirectory bool) bool { + normalizedPrefix := strings.TrimPrefix(vc.prefix, "/") + if normalizedPrefix == "" { + return true + } + + // Entry matches if its path starts with the prefix + isMatch := strings.HasPrefix(entryPath, normalizedPrefix) + if !isMatch && isDirectory { + // Directory might match with trailing slash + isMatch = strings.HasPrefix(entryPath+"/", normalizedPrefix) + } + + // For directories, also check if we need to descend (prefix is deeper) + canDescend := isDirectory && strings.HasPrefix(normalizedPrefix, entryPath) + + return isMatch || canDescend +} + +// shouldSkipObjectForMarker returns true if the object should be skipped based on keyMarker +func (vc *versionCollector) shouldSkipObjectForMarker(objectKey string) bool { + if vc.keyMarker == "" { + return false + } + return objectKey < vc.keyMarker +} + +// shouldSkipVersionForMarker returns true if a version should be skipped based on markers +// For the keyMarker object, skip versions that are newer than or equal to versionIdMarker +// (these were already returned in previous pages). +// Handles both old (raw timestamp) and new (inverted timestamp) version ID formats. +func (vc *versionCollector) shouldSkipVersionForMarker(objectKey, versionId string) bool { + if vc.keyMarker == "" || objectKey != vc.keyMarker { + return false + } + // Object matches keyMarker - apply version filtering + if vc.versionIdMarker == "" { + // No versionIdMarker means skip ALL versions of this key (they were all returned in previous pages) + return true + } + // Skip versions that are newer than or equal to versionIdMarker + // compareVersionIds returns negative if versionId is newer than marker + // We skip if versionId is newer (negative) or equal (zero) to the marker + cmp := compareVersionIds(versionId, vc.versionIdMarker) + return cmp <= 0 +} + +// addVersion adds a version or delete marker to results +func (vc *versionCollector) addVersion(version *ObjectVersion, objectKey string) { + if version.IsDeleteMarker { + deleteMarker := &DeleteMarkerEntry{ + Key: objectKey, + VersionId: version.VersionId, + IsLatest: version.IsLatest, + LastModified: version.LastModified, + Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey), + } + *vc.allVersions = append(*vc.allVersions, deleteMarker) + } else { + versionEntry := &VersionEntry{ + Key: objectKey, + VersionId: version.VersionId, + IsLatest: version.IsLatest, + LastModified: version.LastModified, + ETag: version.ETag, + Size: version.Size, + Owner: vc.s3a.getObjectOwnerFromVersion(version, vc.bucket, objectKey), + StorageClass: "STANDARD", + } + *vc.allVersions = append(*vc.allVersions, versionEntry) + } +} + +// processVersionsDirectory handles a .versions directory entry +func (vc *versionCollector) processVersionsDirectory(entryPath string) error { + objectKey := strings.TrimSuffix(entryPath, s3_constants.VersionsFolder) + normalizedObjectKey := removeDuplicateSlashes(objectKey) + + // Mark as processed + vc.processedObjects[objectKey] = true + vc.processedObjects[normalizedObjectKey] = true + + // Skip objects before keyMarker + if vc.shouldSkipObjectForMarker(normalizedObjectKey) { + glog.V(4).Infof("processVersionsDirectory: skipping object %s (before keyMarker %s)", normalizedObjectKey, vc.keyMarker) + return nil + } + + glog.V(2).Infof("processVersionsDirectory: found object %s", normalizedObjectKey) + + versions, err := vc.s3a.getObjectVersionList(vc.bucket, normalizedObjectKey) + if err != nil { + glog.Warningf("processVersionsDirectory: failed to get versions for %s: %v", normalizedObjectKey, err) + return nil // Continue with other entries + } + + for _, version := range versions { + if vc.isFull() { + return nil + } + + versionKey := normalizedObjectKey + ":" + version.VersionId + if vc.seenVersionIds[versionKey] { + continue + } + + // Skip versions that were already returned in previous pages + if vc.shouldSkipVersionForMarker(normalizedObjectKey, version.VersionId) { + continue + } + + vc.seenVersionIds[versionKey] = true + vc.addVersion(version, normalizedObjectKey) + } + + return nil +} + +// processExplicitDirectory handles an explicit S3 directory object +func (vc *versionCollector) processExplicitDirectory(entryPath string, entry *filer_pb.Entry) { + directoryKey := entryPath + if !strings.HasSuffix(directoryKey, "/") { + directoryKey += "/" + } + + // Skip directories at or before keyMarker + if vc.keyMarker != "" && directoryKey <= vc.keyMarker { + return + } + + versionEntry := &VersionEntry{ + Key: directoryKey, + VersionId: "null", + IsLatest: true, + LastModified: time.Unix(entry.Attributes.Mtime, 0), + ETag: "\"d41d8cd98f00b204e9800998ecf8427e\"", // Empty content ETag + Size: 0, + Owner: vc.s3a.getObjectOwnerFromEntry(entry), + StorageClass: "STANDARD", + } + *vc.allVersions = append(*vc.allVersions, versionEntry) +} + +// processRegularFile handles a regular file entry (pre-versioning or suspended-versioning object) +func (vc *versionCollector) processRegularFile(currentPath, entryPath string, entry *filer_pb.Entry) { + objectKey := entryPath + normalizedObjectKey := removeDuplicateSlashes(objectKey) + + // Skip files before keyMarker + if vc.shouldSkipObjectForMarker(normalizedObjectKey) { + return + } + + // For keyMarker match, skip if this null version was already returned + if vc.shouldSkipVersionForMarker(normalizedObjectKey, "null") { + return + } + + // Skip if already processed via .versions directory + if vc.processedObjects[objectKey] || vc.processedObjects[normalizedObjectKey] { + return + } + + // Check if this file has version metadata + hasVersionMeta := entry.Extended != nil && entry.Extended[s3_constants.ExtVersionIdKey] != nil + + // Check if a .versions directory exists for this object + versionsEntryName := entry.Name + s3_constants.VersionsFolder + _, versionsErr := vc.s3a.getEntry(currentPath, versionsEntryName) + if versionsErr == nil && !hasVersionMeta { + // .versions exists but file has no version metadata - check for null version in .versions + versions, err := vc.s3a.getObjectVersionList(vc.bucket, normalizedObjectKey) + if err == nil { + for _, v := range versions { + if v.VersionId == "null" { + // Null version exists in .versions, skip this file + vc.processedObjects[objectKey] = true + vc.processedObjects[normalizedObjectKey] = true + return + } + } + } + } + + // Check for duplicate + versionKey := normalizedObjectKey + ":null" + if vc.seenVersionIds[versionKey] { + return + } + vc.seenVersionIds[versionKey] = true + + versionEntry := &VersionEntry{ + Key: normalizedObjectKey, + VersionId: "null", + IsLatest: true, + LastModified: time.Unix(entry.Attributes.Mtime, 0), + ETag: vc.s3a.calculateETagFromChunks(entry.Chunks), + Size: int64(entry.Attributes.FileSize), + Owner: vc.s3a.getObjectOwnerFromEntry(entry), + StorageClass: "STANDARD", + } + *vc.allVersions = append(*vc.allVersions, versionEntry) +} + +// findVersionsRecursively searches for .versions directories and regular files recursively +// with efficient pagination support. It skips objects before keyMarker and applies versionIdMarker filtering. +// maxCollect limits the number of versions to collect for memory efficiency (must be > 0) +func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix, keyMarker, versionIdMarker string, maxCollect int) error { + vc := &versionCollector{ + s3a: s3a, + bucket: bucket, + prefix: prefix, + keyMarker: keyMarker, + versionIdMarker: versionIdMarker, + maxCollect: maxCollect, + allVersions: allVersions, + processedObjects: processedObjects, + seenVersionIds: seenVersionIds, + } + + return vc.collectVersions(currentPath, relativePath) +} + +// collectVersions recursively collects versions from the given path +func (vc *versionCollector) collectVersions(currentPath, relativePath string) error { startFrom := "" for { - // Early termination: stop if we've collected enough versions - if maxCollect > 0 && len(*allVersions) >= maxCollect { + if vc.isFull() { return nil } - entries, isLast, err := s3a.list(currentPath, "", startFrom, false, filer.PaginationSize) + entries, isLast, err := vc.s3a.list(currentPath, "", startFrom, false, filer.PaginationSize) if err != nil { return err } for _, entry := range entries { - // Early termination check inside loop - if maxCollect > 0 && len(*allVersions) >= maxCollect { + if vc.isFull() { return nil } - // Track last entry name for pagination startFrom = entry.Name - entryPath := path.Join(relativePath, entry.Name) - // Skip if this doesn't match the prefix filter - if normalizedPrefix := strings.TrimPrefix(prefix, "/"); normalizedPrefix != "" { - // An entry is a candidate if: - // 1. Its path is a match for the prefix. - // 2. It is a directory that is an ancestor of the prefix path, so we must descend into it. - - // Condition 1: The entry's path starts with the prefix. - isMatch := strings.HasPrefix(entryPath, normalizedPrefix) - if !isMatch && entry.IsDirectory { - // Also check if a directory entry matches a directory-style prefix (e.g., prefix "a/", entry "a"). - isMatch = strings.HasPrefix(entryPath+"/", normalizedPrefix) - } - - // Condition 2: The prefix path starts with the entry's path (and it's a directory). - canDescend := entry.IsDirectory && strings.HasPrefix(normalizedPrefix, entryPath) - - if !isMatch && !canDescend { - continue - } + if !vc.matchesPrefixFilter(entryPath, entry.IsDirectory) { + continue } if entry.IsDirectory { - // Skip .uploads directory (multipart upload temporary files) - if strings.HasPrefix(entry.Name, ".uploads") { - continue - } - - // Check if this is a .versions directory - if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { - // Extract object name from .versions directory name - objectKey := strings.TrimSuffix(entryPath, s3_constants.VersionsFolder) - normalizedObjectKey := removeDuplicateSlashes(objectKey) - // Mark both keys as processed for backward compatibility - processedObjects[objectKey] = true - processedObjects[normalizedObjectKey] = true - - glog.V(2).Infof("Found .versions directory for object %s (normalized: %s)", objectKey, normalizedObjectKey) - - versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey) - if err != nil { - glog.Warningf("Failed to get versions for object %s (normalized: %s): %v", objectKey, normalizedObjectKey, err) - continue - } - - for _, version := range versions { - // Check for duplicate version IDs and skip if already seen - // Use normalized key for deduplication - versionKey := normalizedObjectKey + ":" + version.VersionId - if seenVersionIds[versionKey] { - glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, normalizedObjectKey) - continue - } - seenVersionIds[versionKey] = true - - if version.IsDeleteMarker { - glog.V(4).Infof("Adding delete marker from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s", - normalizedObjectKey, version.VersionId, version.IsLatest, versionKey) - deleteMarker := &DeleteMarkerEntry{ - Key: normalizedObjectKey, // Use normalized key for consistency - VersionId: version.VersionId, - IsLatest: version.IsLatest, - LastModified: version.LastModified, - Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey), - } - *allVersions = append(*allVersions, deleteMarker) - } else { - glog.V(4).Infof("Adding version from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s", - normalizedObjectKey, version.VersionId, version.IsLatest, versionKey) - versionEntry := &VersionEntry{ - Key: normalizedObjectKey, // Use normalized key for consistency - VersionId: version.VersionId, - IsLatest: version.IsLatest, - LastModified: version.LastModified, - ETag: version.ETag, - Size: version.Size, - Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey), - StorageClass: "STANDARD", - } - *allVersions = append(*allVersions, versionEntry) - } - } - } else { - // This is a regular directory - check if it's an explicit S3 directory object - // Only include directories that were explicitly created via S3 API (have FolderMimeType) - // This excludes implicit directories created when uploading files like "test1/a" - if entry.Attributes.Mime == s3_constants.FolderMimeType { - directoryKey := entryPath - if !strings.HasSuffix(directoryKey, "/") { - directoryKey += "/" - } - - // Add directory as a version entry with VersionId "null" (following S3/Minio behavior) - glog.V(2).Infof("findVersionsRecursively: found explicit S3 directory %s", directoryKey) - - // Calculate ETag for empty directory - directoryETag := "\"d41d8cd98f00b204e9800998ecf8427e\"" - - versionEntry := &VersionEntry{ - Key: directoryKey, - VersionId: "null", - IsLatest: true, - LastModified: time.Unix(entry.Attributes.Mtime, 0), - ETag: directoryETag, - Size: 0, // Directories have size 0 - Owner: s3a.getObjectOwnerFromEntry(entry), - StorageClass: "STANDARD", - } - *allVersions = append(*allVersions, versionEntry) - } - - // Recursively search subdirectories (regardless of whether they're explicit or implicit) - fullPath := path.Join(currentPath, entry.Name) - err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix, maxCollect) - if err != nil { - glog.Warningf("Error searching subdirectory %s: %v", entryPath, err) - continue - } - // Check if we've collected enough after recursion - if maxCollect > 0 && len(*allVersions) >= maxCollect { - return nil - } + if err := vc.processDirectory(currentPath, entryPath, entry); err != nil { + return err } } else { - // This is a regular file - check if it's a pre-versioning object - objectKey := entryPath - - // Normalize object key to ensure consistency with other version operations - normalizedObjectKey := removeDuplicateSlashes(objectKey) - - // Skip if this object already has a .versions directory (already processed) - // Check both normalized and original keys for backward compatibility - if processedObjects[objectKey] || processedObjects[normalizedObjectKey] { - glog.V(4).Infof("Skipping already processed object: objectKey=%s, normalizedObjectKey=%s, processedObjects[objectKey]=%v, processedObjects[normalizedObjectKey]=%v", - objectKey, normalizedObjectKey, processedObjects[objectKey], processedObjects[normalizedObjectKey]) - continue - } - - glog.V(4).Infof("Processing regular file: objectKey=%s, normalizedObjectKey=%s, NOT in processedObjects", objectKey, normalizedObjectKey) - - // This is a pre-versioning or suspended-versioning object - // Check if this file has version metadata (ExtVersionIdKey) - hasVersionMeta := false - if entry.Extended != nil { - if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { - hasVersionMeta = true - glog.V(4).Infof("Regular file %s has version metadata: %s", normalizedObjectKey, string(versionIdBytes)) - } - } - - // Check if a .versions directory exists for this object - // Use entry.Name (relative to currentPath) to avoid duplicating subdirectory segments - versionsEntryName := entry.Name + s3_constants.VersionsFolder - _, versionsErr := s3a.getEntry(currentPath, versionsEntryName) - if versionsErr == nil { - // .versions directory exists - glog.V(4).Infof("Found .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta) - - // If this file has version metadata, it's a suspended versioning null version - // Include it and it will be the latest - if hasVersionMeta { - glog.V(4).Infof("Including suspended versioning file %s (has version metadata)", normalizedObjectKey) - // Continue to add it below - } else { - // No version metadata - this is a pre-versioning file - // Skip it if there's already a null version in .versions - versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey) - if err == nil { - hasNullVersion := false - for _, v := range versions { - if v.VersionId == "null" { - hasNullVersion = true - break - } - } - if hasNullVersion { - glog.V(4).Infof("Skipping pre-versioning file %s, null version exists in .versions", normalizedObjectKey) - processedObjects[objectKey] = true - processedObjects[normalizedObjectKey] = true - continue - } - } - glog.V(4).Infof("Including pre-versioning file %s (no null version in .versions)", normalizedObjectKey) - } - } else { - glog.V(4).Infof("No .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta) - } - - // Add this file as a null version with IsLatest=true - isLatest := true - - // Check for duplicate version IDs and skip if already seen - // Use normalized key for deduplication to match how other version operations work - versionKey := normalizedObjectKey + ":null" - if seenVersionIds[versionKey] { - glog.Warningf("findVersionsRecursively: duplicate null version for object %s detected (versionKey=%s), skipping", normalizedObjectKey, versionKey) - continue - } - seenVersionIds[versionKey] = true - - etag := s3a.calculateETagFromChunks(entry.Chunks) - - glog.V(4).Infof("Adding null version from regular file: objectKey=%s, normalizedObjectKey=%s, versionKey=%s, isLatest=%v, hasVersionMeta=%v", - objectKey, normalizedObjectKey, versionKey, isLatest, hasVersionMeta) - - versionEntry := &VersionEntry{ - Key: normalizedObjectKey, // Use normalized key for consistency - VersionId: "null", - IsLatest: isLatest, - LastModified: time.Unix(entry.Attributes.Mtime, 0), - ETag: etag, - Size: int64(entry.Attributes.FileSize), - Owner: s3a.getObjectOwnerFromEntry(entry), - StorageClass: "STANDARD", - } - *allVersions = append(*allVersions, versionEntry) + vc.processRegularFile(currentPath, entryPath, entry) } } - // If we've reached the last page, stop pagination if isLast { break } } + return nil +} + +// processDirectory handles directory entries +func (vc *versionCollector) processDirectory(currentPath, entryPath string, entry *filer_pb.Entry) error { + // Skip .uploads directory + if strings.HasPrefix(entry.Name, ".uploads") { + return nil + } + + // Handle .versions directory + if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { + return vc.processVersionsDirectory(entryPath) + } + + // Handle explicit S3 directory object + if entry.Attributes.Mime == s3_constants.FolderMimeType { + vc.processExplicitDirectory(entryPath, entry) + } + + // Recursively search subdirectory + fullPath := path.Join(currentPath, entry.Name) + if err := vc.collectVersions(fullPath, entryPath); err != nil { + glog.Warningf("Error searching subdirectory %s: %v", entryPath, err) + } return nil } @@ -860,13 +846,14 @@ func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) continue } - // Compare version IDs chronologically (our version IDs start with timestamp) - if latestVersionId == "" || versionId > latestVersionId { + // Compare version IDs chronologically using unified comparator (handles both old and new formats) + // compareVersionIds returns negative if first arg is newer + if latestVersionId == "" || compareVersionIds(versionId, latestVersionId) < 0 { glog.V(1).Infof("updateLatestVersionAfterDeletion: found newer version %s (file: %s)", versionId, entry.Name) latestVersionId = versionId latestVersionFileName = entry.Name } else { - glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older version %s", versionId) + glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older or equal version %s", versionId) } } @@ -1091,3 +1078,4 @@ func (s3a *S3ApiServer) getObjectOwnerFromEntry(entry *filer_pb.Entry) Canonical // Fallback: return anonymous if no owner found return CanonicalUser{ID: s3_constants.AccountAnonymousId, DisplayName: "anonymous"} } + diff --git a/weed/s3api/s3api_version_id.go b/weed/s3api/s3api_version_id.go new file mode 100644 index 000000000..0ea3e6f89 --- /dev/null +++ b/weed/s3api/s3api_version_id.go @@ -0,0 +1,187 @@ +package s3api + +import ( + "crypto/rand" + "encoding/hex" + "fmt" + "math" + "strconv" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + s3_constants "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" +) + +// Version ID format constants +// New format uses inverted timestamps so newer versions sort first lexicographically +// Old format used raw timestamps where older versions sorted first +const ( + // Threshold to distinguish old vs new format version IDs + // Around year 2024-2025: + // - Old format (raw ns): ~1.7×10¹⁸ ≈ 0x17... (BELOW threshold) + // - New format (MaxInt64 - ns): ~7.5×10¹⁸ ≈ 0x68... (ABOVE threshold) + // We use 0x4000000000000000 (~4.6×10¹⁸) as threshold + versionIdFormatThreshold = 0x4000000000000000 +) + +// generateVersionId creates a unique version ID +// If useInvertedFormat is true, uses inverted timestamps so newer versions sort first +// If false, uses raw timestamps (old format) for backward compatibility +func generateVersionId(useInvertedFormat bool) string { + now := time.Now().UnixNano() + var timestampHex string + + if useInvertedFormat { + // INVERTED timestamp: newer versions have SMALLER values + // This makes lexicographic sorting return newest versions first + invertedTimestamp := math.MaxInt64 - now + timestampHex = fmt.Sprintf("%016x", invertedTimestamp) + } else { + // Raw timestamp: older versions have SMALLER values (old format) + timestampHex = fmt.Sprintf("%016x", now) + } + + // Generate random 8 bytes for uniqueness (last 16 chars of version ID) + randBytes := make([]byte, 8) + if _, err := rand.Read(randBytes); err != nil { + glog.Errorf("Failed to generate random bytes for version ID: %v", err) + // Fallback to timestamp-only if random generation fails + return timestampHex + "0000000000000000" + } + + // Combine timestamp (16 chars) + random (16 chars) = 32 chars total + randomHex := hex.EncodeToString(randBytes) + return timestampHex + randomHex +} + +// isNewFormatVersionId returns true if the version ID uses the new inverted timestamp format +func isNewFormatVersionId(versionId string) bool { + if len(versionId) < 16 || versionId == "null" { + return false + } + // Parse the first 16 hex chars as the timestamp portion + timestampPart, err := strconv.ParseUint(versionId[:16], 16, 64) + if err != nil { + return false + } + // New format has inverted timestamps (MaxInt64 - ns), which are ABOVE the threshold (~0x68...) + // Old format has raw timestamps, which are BELOW the threshold (~0x17...) + return timestampPart > versionIdFormatThreshold +} + +// getVersionTimestamp extracts the actual timestamp from a version ID, +// handling both old (raw) and new (inverted) formats +func getVersionTimestamp(versionId string) int64 { + if len(versionId) < 16 || versionId == "null" { + return 0 + } + timestampPart, err := strconv.ParseUint(versionId[:16], 16, 64) + if err != nil { + return 0 + } + if timestampPart > versionIdFormatThreshold { + // New format: inverted timestamp (above threshold), convert back + return int64(math.MaxInt64 - timestampPart) + } + // Validate old format timestamp is within int64 range + if timestampPart > math.MaxInt64 { + return 0 + } + // Old format: raw timestamp (below threshold) + return int64(timestampPart) +} + +// compareVersionIds compares two version IDs for sorting (newest first) +// Returns: negative if a is newer, positive if b is newer, 0 if equal +// Handles both old and new format version IDs +func compareVersionIds(a, b string) int { + if a == b { + return 0 + } + if a == "null" { + return 1 // null versions sort last + } + if b == "null" { + return -1 + } + + aIsNew := isNewFormatVersionId(a) + bIsNew := isNewFormatVersionId(b) + + if aIsNew == bIsNew { + // Same format - compare lexicographically + // For new format: smaller value = newer (correct) + // For old format: smaller value = older (need to invert) + if aIsNew { + // New format: lexicographic order is correct (smaller = newer) + if a < b { + return -1 + } + return 1 + } else { + // Old format: lexicographic order is inverted (smaller = older) + if a < b { + return 1 + } + return -1 + } + } + + // Mixed formats - compare by actual timestamp + aTime := getVersionTimestamp(a) + bTime := getVersionTimestamp(b) + if aTime > bTime { + return -1 // a is newer + } + if aTime < bTime { + return 1 // b is newer + } + return 0 +} + +// getVersionedObjectDir returns the directory path for storing object versions +func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string { + return s3a.option.BucketsPath + "/" + bucket + "/" + object + s3_constants.VersionsFolder +} + +// getVersionFileName returns the filename for a specific version +func (s3a *S3ApiServer) getVersionFileName(versionId string) string { + return fmt.Sprintf("v_%s", versionId) +} + +// getVersionIdFormat checks the .versions directory to determine which version ID format to use. +// Returns true if inverted format (new format) should be used. +// For new .versions directories, returns true (use new format). +// For existing directories, infers format from the latest version ID. +func (s3a *S3ApiServer) getVersionIdFormat(bucket, object string) bool { + cleanObject := strings.TrimPrefix(object, "/") + bucketDir := s3a.option.BucketsPath + "/" + bucket + versionsPath := cleanObject + s3_constants.VersionsFolder + + // Try to get the .versions directory entry + versionsEntry, err := s3a.getEntry(bucketDir, versionsPath) + if err != nil { + // .versions directory doesn't exist yet - use new format + return true + } + + // Infer format from the latest version ID stored in metadata + if versionsEntry.Extended != nil { + if latestVersionId, exists := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; exists { + return isNewFormatVersionId(string(latestVersionId)) + } + } + + // No latest version metadata - this is likely a new or empty directory + // Use new format + return true +} + +// generateVersionIdForObject generates a version ID using the appropriate format for the object. +// For new objects, uses inverted format. For existing versioned objects, uses their existing format. +func (s3a *S3ApiServer) generateVersionIdForObject(bucket, object string) string { + useInvertedFormat := s3a.getVersionIdFormat(bucket, object) + return generateVersionId(useInvertedFormat) +} + diff --git a/weed/s3api/s3api_version_id_test.go b/weed/s3api/s3api_version_id_test.go new file mode 100644 index 000000000..5c612ac58 --- /dev/null +++ b/weed/s3api/s3api_version_id_test.go @@ -0,0 +1,375 @@ +package s3api + +import ( + "math" + "testing" + "time" +) + +// TestVersionIdFormatDetection tests that old and new format version IDs are correctly identified +func TestVersionIdFormatDetection(t *testing.T) { + tests := []struct { + name string + versionId string + expectNew bool + }{ + // New format (inverted timestamps) - values > 0x4000000000000000 + { + name: "new format - inverted timestamp", + versionId: "68a1b2c3d4e5f6780000000000000000", // > 0x4000... + expectNew: true, + }, + { + name: "new format - high value", + versionId: "7fffffffffffffff0000000000000000", // near max + expectNew: true, + }, + // Old format (raw timestamps) - values < 0x4000000000000000 + { + name: "old format - raw timestamp", + versionId: "179a1b2c3d4e5f670000000000000000", // ~2024-2025 + expectNew: false, + }, + { + name: "old format - low value", + versionId: "10000000000000000000000000000000", + expectNew: false, + }, + // Edge cases + { + name: "null version", + versionId: "null", + expectNew: false, + }, + { + name: "short version ID", + versionId: "abc123", + expectNew: false, + }, + { + name: "empty version ID", + versionId: "", + expectNew: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isNewFormatVersionId(tt.versionId) + if got != tt.expectNew { + t.Errorf("isNewFormatVersionId(%s) = %v, want %v", tt.versionId, got, tt.expectNew) + } + }) + } +} + +// TestGenerateVersionIdFormats tests that generateVersionId produces correct format based on parameter +func TestGenerateVersionIdFormats(t *testing.T) { + // Generate old format version ID + oldFormatId := generateVersionId(false) + if len(oldFormatId) != 32 { + t.Errorf("old format version ID length = %d, want 32", len(oldFormatId)) + } + if isNewFormatVersionId(oldFormatId) { + t.Errorf("generateVersionId(false) produced new format ID: %s", oldFormatId) + } + + // Generate new format version ID + newFormatId := generateVersionId(true) + if len(newFormatId) != 32 { + t.Errorf("new format version ID length = %d, want 32", len(newFormatId)) + } + if !isNewFormatVersionId(newFormatId) { + t.Errorf("generateVersionId(true) produced old format ID: %s", newFormatId) + } +} + +// TestGetVersionTimestamp tests timestamp extraction from both formats +func TestGetVersionTimestamp(t *testing.T) { + now := time.Now().UnixNano() + + // Generate old and new format IDs + oldId := generateVersionId(false) + newId := generateVersionId(true) + + oldTs := getVersionTimestamp(oldId) + newTs := getVersionTimestamp(newId) + + // Both should be close to current time (within 1 second) + tolerance := int64(time.Second) + + if abs(oldTs-now) > tolerance { + t.Errorf("old format timestamp diff too large: got %d, want ~%d", oldTs, now) + } + if abs(newTs-now) > tolerance { + t.Errorf("new format timestamp diff too large: got %d, want ~%d", newTs, now) + } + + // null should return 0 + if ts := getVersionTimestamp("null"); ts != 0 { + t.Errorf("getVersionTimestamp(null) = %d, want 0", ts) + } +} + +func abs(x int64) int64 { + if x < 0 { + return -x + } + return x +} + +// TestCompareVersionIdsSameFormatOld tests sorting of old format version IDs (newest first) +func TestCompareVersionIdsSameFormatOld(t *testing.T) { + // Old format: larger hex value = newer (raw timestamp) + older := "1700000000000000" + "0000000000000000" // older timestamp + newer := "1800000000000000" + "0000000000000000" // newer timestamp + + // Verify both are old format + if isNewFormatVersionId(older) || isNewFormatVersionId(newer) { + t.Fatal("test setup error: expected old format IDs") + } + + // compareVersionIds should return negative if first arg is newer + result := compareVersionIds(newer, older) + if result >= 0 { + t.Errorf("compareVersionIds(newer, older) = %d, want negative", result) + } + + result = compareVersionIds(older, newer) + if result <= 0 { + t.Errorf("compareVersionIds(older, newer) = %d, want positive", result) + } + + result = compareVersionIds(older, older) + if result != 0 { + t.Errorf("compareVersionIds(same, same) = %d, want 0", result) + } +} + +// TestCompareVersionIdsSameFormatNew tests sorting of new format version IDs (newest first) +func TestCompareVersionIdsSameFormatNew(t *testing.T) { + // New format: smaller hex value = newer (inverted timestamp) + // MaxInt64 - newer_ts < MaxInt64 - older_ts + newer := "6800000000000000" + "0000000000000000" // smaller = newer + older := "6900000000000000" + "0000000000000000" // larger = older + + // Verify both are new format + if !isNewFormatVersionId(older) || !isNewFormatVersionId(newer) { + t.Fatal("test setup error: expected new format IDs") + } + + // compareVersionIds should return negative if first arg is newer + result := compareVersionIds(newer, older) + if result >= 0 { + t.Errorf("compareVersionIds(newer, older) = %d, want negative", result) + } + + result = compareVersionIds(older, newer) + if result <= 0 { + t.Errorf("compareVersionIds(older, newer) = %d, want positive", result) + } +} + +// TestCompareVersionIdsMixedFormats tests sorting when comparing old and new format IDs +func TestCompareVersionIdsMixedFormats(t *testing.T) { + // Create IDs where we know the actual timestamps + // Old format: raw timestamp + oldFormatTs := int64(1700000000000000000) // some timestamp + oldFormatId := createOldFormatVersionId(oldFormatTs) + + // New format: inverted timestamp (created 1 second later) + newFormatTs := oldFormatTs + int64(time.Second) + newFormatId := createNewFormatVersionId(newFormatTs) + + // Verify formats + if isNewFormatVersionId(oldFormatId) { + t.Fatalf("expected old format for %s", oldFormatId) + } + if !isNewFormatVersionId(newFormatId) { + t.Fatalf("expected new format for %s", newFormatId) + } + + // New format ID is newer (created 1 second later) + result := compareVersionIds(newFormatId, oldFormatId) + if result >= 0 { + t.Errorf("compareVersionIds(newer_new_format, older_old_format) = %d, want negative", result) + } + + result = compareVersionIds(oldFormatId, newFormatId) + if result <= 0 { + t.Errorf("compareVersionIds(older_old_format, newer_new_format) = %d, want positive", result) + } +} + +// TestCompareVersionIdsNullHandling tests that null versions sort last +func TestCompareVersionIdsNullHandling(t *testing.T) { + regular := generateVersionId(true) + + // null should sort after regular versions + if result := compareVersionIds("null", regular); result <= 0 { + t.Errorf("compareVersionIds(null, regular) = %d, want positive (null sorts last)", result) + } + + if result := compareVersionIds(regular, "null"); result >= 0 { + t.Errorf("compareVersionIds(regular, null) = %d, want negative (null sorts last)", result) + } +} + +// Helper to create old format version ID from timestamp +func createOldFormatVersionId(ts int64) string { + return sprintf16x(uint64(ts)) + "0000000000000000" +} + +// Helper to create new format version ID from timestamp +func createNewFormatVersionId(ts int64) string { + inverted := uint64(math.MaxInt64 - ts) + return sprintf16x(inverted) + "0000000000000000" +} + +func sprintf16x(v uint64) string { + return sprintf("%016x", v) +} + +func sprintf(format string, v uint64) string { + result := make([]byte, 16) + for i := 15; i >= 0; i-- { + digit := v & 0xf + if digit < 10 { + result[i] = byte('0' + digit) + } else { + result[i] = byte('a' + digit - 10) + } + v >>= 4 + } + return string(result) +} + +// TestOldFormatBackwardCompatibility ensures old format versions work correctly in sorting +func TestOldFormatBackwardCompatibility(t *testing.T) { + // Simulate a bucket that was created before the inverted format was introduced + // All versions should be old format and should sort correctly (newest first) + + // Create 5 old format version IDs with known timestamps + baseTs := int64(1700000000000000000) + versions := make([]string, 5) + for i := 0; i < 5; i++ { + ts := baseTs + int64(i)*int64(time.Minute) // each 1 minute apart + versions[i] = createOldFormatVersionId(ts) + } + + // Verify all are old format + for i, v := range versions { + if isNewFormatVersionId(v) { + t.Fatalf("version %d should be old format: %s", i, v) + } + } + + // Verify sorting: versions[4] is newest, versions[0] is oldest + // compareVersionIds(newer, older) should return negative + for i := 0; i < 4; i++ { + newer := versions[i+1] + older := versions[i] + result := compareVersionIds(newer, older) + if result >= 0 { + t.Errorf("compareVersionIds(versions[%d], versions[%d]) = %d, want negative (newer first)", i+1, i, result) + } + } + + // Verify extracted timestamps are correct + for i, v := range versions { + expectedTs := baseTs + int64(i)*int64(time.Minute) + gotTs := getVersionTimestamp(v) + if gotTs != expectedTs { + t.Errorf("getVersionTimestamp(versions[%d]) = %d, want %d", i, gotTs, expectedTs) + } + } +} + +// TestNewFormatSorting ensures new format versions sort correctly (newest first) +func TestNewFormatSorting(t *testing.T) { + // Create 5 new format version IDs with known timestamps + baseTs := int64(1700000000000000000) + versions := make([]string, 5) + for i := 0; i < 5; i++ { + ts := baseTs + int64(i)*int64(time.Minute) // each 1 minute apart + versions[i] = createNewFormatVersionId(ts) + } + + // Verify all are new format + for i, v := range versions { + if !isNewFormatVersionId(v) { + t.Fatalf("version %d should be new format: %s", i, v) + } + } + + // Verify sorting: versions[4] is newest, versions[0] is oldest + for i := 0; i < 4; i++ { + newer := versions[i+1] + older := versions[i] + result := compareVersionIds(newer, older) + if result >= 0 { + t.Errorf("compareVersionIds(versions[%d], versions[%d]) = %d, want negative (newer first)", i+1, i, result) + } + } + + // Verify extracted timestamps are correct + for i, v := range versions { + expectedTs := baseTs + int64(i)*int64(time.Minute) + gotTs := getVersionTimestamp(v) + if gotTs != expectedTs { + t.Errorf("getVersionTimestamp(versions[%d]) = %d, want %d", i, gotTs, expectedTs) + } + } +} + +// TestMixedFormatTransition simulates a bucket transitioning from old to new format +func TestMixedFormatTransition(t *testing.T) { + baseTs := int64(1700000000000000000) + + // First 3 versions created with old format (before upgrade) + oldVersions := make([]string, 3) + for i := 0; i < 3; i++ { + ts := baseTs + int64(i)*int64(time.Minute) + oldVersions[i] = createOldFormatVersionId(ts) + } + + // Next 3 versions created with new format (after upgrade) + newVersions := make([]string, 3) + for i := 0; i < 3; i++ { + ts := baseTs + int64(3+i)*int64(time.Minute) // continue from where old left off + newVersions[i] = createNewFormatVersionId(ts) + } + + // All versions in chronological order (oldest to newest) + allVersions := append(oldVersions, newVersions...) + + // Verify mixed formats + for i := 0; i < 3; i++ { + if isNewFormatVersionId(allVersions[i]) { + t.Errorf("allVersions[%d] should be old format", i) + } + } + for i := 3; i < 6; i++ { + if !isNewFormatVersionId(allVersions[i]) { + t.Errorf("allVersions[%d] should be new format", i) + } + } + + // Verify sorting works correctly across the format boundary + for i := 0; i < 5; i++ { + newer := allVersions[i+1] + older := allVersions[i] + result := compareVersionIds(newer, older) + if result >= 0 { + t.Errorf("compareVersionIds(allVersions[%d], allVersions[%d]) = %d, want negative (newer first)", i+1, i, result) + } + } + + // Verify the newest (new format) version sorts before oldest (old format) when comparing directly + newest := allVersions[5] // newest, new format + oldest := allVersions[0] // oldest, old format + if result := compareVersionIds(newest, oldest); result >= 0 { + t.Errorf("compareVersionIds(newest_new_format, oldest_old_format) = %d, want negative", result) + } +} +