From 51ec0d21225ac0557ab675198a42b33b30172153 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 19 Mar 2026 21:18:52 -0700 Subject: [PATCH] fix(remote_gateway): prevent double-versioning when syncing to versioned central bucket (#8710) * fix(remote_gateway): prevent double-versioning when syncing to versioned central bucket When a file is uploaded to a versioned bucket on edge, SeaweedFS stores it internally as {object}.versions/v_{versionId}. The remote_gateway was syncing this internal path directly to the central S3 endpoint. When central's bucket also has versioning enabled, this caused central to apply its own versioning on top, producing corrupt paths like: object.versions/v_{edgeId}.versions/v_{centralId} Fix: rewrite internal .versions/v_{id} paths to the original S3 object key before uploading to the remote. Skip version file delete/update events that are internal bookkeeping. Fixes https://github.com/seaweedfs/seaweedfs/discussions/8481#discussioncomment-16209342 * fix(remote_gateway): propagate delete markers to remote as deletions Delete markers are zero-content version entries (ExtDeleteMarkerKey=true) created by S3 DELETE on a versioned bucket. Previously they were silently dropped by the HasData() filter, so deletions on edge never reached central. Now: detect delete markers before the HasData check, rewrite the .versions path to the original S3 key, and issue client.DeleteFile() on the remote. * fix(remote_gateway): tighten isVersionedPath to avoid false positives Address PR review feedback: - Add isDir parameter to isVersionedPath so it only matches the exact internal shapes: directories whose name ends with .versions (isDir=true), and files with the v_ prefix inside a .versions parent (isDir=false). Previously the function was too broad and could match user-created paths like "my.versions/data.txt". - Update all 4 call sites to pass the entry's IsDirectory field. - Rename TestVersionedDirectoryNotFilteredByHasData to TestVersionsDirectoryFilteredByHasData so the name reflects the actual assertion (directories ARE filtered by HasData). - Expand TestIsVersionedPath with isDir cases and false-positive checks. * fix(remote_gateway): persist sync marker after delete-marker propagation The delete-marker branch was calling client.DeleteFile() and returning without updating the local entry, making event replay re-issue the remote delete. Now call updateLocalEntry after a successful DeleteFile to stamp the delete-marker entry with a RemoteEntry, matching the pattern used by the normal create path. * refactor(remote_gateway): extract syncDeleteMarker and fix root path edge case - Extract syncDeleteMarker() shared helper used by both bucketed and mounted-dir event processors, replacing the duplicated delete + persist local marker logic. - Fix rewriteVersionedSourcePath for root-level objects: when lastSlash is 0 (e.g. "/file.xml.versions"), return "/" as the parent dir instead of an empty string. - The strings.Contains(dir, ".versions/") condition flagged in review was already removed in a prior commit that tightened isVersionedPath. * fix(remote_gateway): skip updateLocalEntry for versioned path rewrites After rewriting a .versions/v_{id} path to the logical S3 key and uploading, the code was calling updateLocalEntry on the original v_* entry, stamping it with a RemoteEntry for the logical key. This is semantically wrong: the logical object has no filer entry in versioned buckets, and the internal v_* entry should not carry a RemoteEntry for a different path. Skip updateLocalEntry when the path was rewritten from a versioned source. Replay safety is preserved because S3 PutObject is idempotent. * fix(remote_gateway): scope versioning checks to /buckets/ namespace isVersionedPath and rewriteVersionedSourcePath could wrongly match paths in non-bucket mounts (e.g. /mnt/remote/file.xml.versions). Add the same /buckets/ prefix guard used by isMultipartUploadDir so the .versions / v_ logic only applies within the bucket namespace. --- weed/command/filer_remote_gateway_buckets.go | 48 ++- weed/command/filer_remote_sync_dir.go | 125 ++++++- weed/command/filer_remote_sync_dir_test.go | 330 +++++++++++++++++++ 3 files changed, 501 insertions(+), 2 deletions(-) create mode 100644 weed/command/filer_remote_sync_dir_test.go diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 5238e7fc5..97147766e 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -210,6 +210,24 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) { return nil } + // Propagate delete markers as deletions on the remote. + // Delete markers are zero-content version entries, so they + // would be filtered out by the HasData check below. + if isDeleteMarker(message.NewEntry) { + if newParent, newName, ok := rewriteVersionedSourcePath(message.NewParentPath, message.NewEntry.Name); ok { + bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(newParent) + if !ok { + return nil + } + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + dest := toRemoteStorageLocation(bucket, util.NewFullPath(newParent, newName), remoteStorageMountLocation) + return syncDeleteMarker(client, option, message, dest) + } + return nil + } if !filer.HasData(message.NewEntry) { return nil } @@ -226,7 +244,16 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour glog.V(2).Infof("skipping creating: %+v", resp) return nil } - dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + // Rewrite internal versioning paths to the original S3 key + // to prevent double-versioning when central also has versioning enabled + parentPath, entryName := message.NewParentPath, message.NewEntry.Name + isRewrittenVersion := false + if newParent, newName, ok := rewriteVersionedSourcePath(parentPath, entryName); ok { + glog.V(0).Infof("rewrite versioned path %s/%s -> %s/%s", parentPath, entryName, newParent, newName) + parentPath, entryName = newParent, newName + isRewrittenVersion = true + } + dest := toRemoteStorageLocation(bucket, util.NewFullPath(parentPath, entryName), remoteStorageMountLocation) if message.NewEntry.IsDirectory { glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) return client.WriteDirectory(dest, message.NewEntry) @@ -236,12 +263,26 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour if writeErr != nil { return writeErr } + // Skip updateLocalEntry for versioned rewrites: the logical + // object (e.g. file.xml) has no filer entry in versioned + // buckets, and stamping the internal v_* entry with a + // RemoteEntry for the logical key is semantically wrong. + // Replay is safe because S3 PutObject is idempotent. + if isRewrittenVersion { + return nil + } return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry) } if filer_pb.IsDelete(resp) { if resp.Directory == option.bucketsDir { return handleDeleteBucket(message.OldEntry) } + // Skip deletion of internal version files; individual version + // deletes should not propagate to the remote object + if isVersionedPath(resp.Directory, message.OldEntry.Name, message.OldEntry.IsDirectory) { + glog.V(2).Infof("skipping delete of internal version path: %s/%s", resp.Directory, message.OldEntry.Name) + return nil + } bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory) if !ok { return nil @@ -276,6 +317,11 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) { return nil } + // Skip updates to internal version paths + if isVersionedPath(message.NewParentPath, message.NewEntry.Name, message.NewEntry.IsDirectory) { + glog.V(2).Infof("skipping update of internal version path: %s/%s", message.NewParentPath, message.NewEntry.Name) + return nil + } oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory) newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath) if oldOk && newOk { diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index b5cc02cfc..60f1f8f5b 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -138,6 +138,16 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) { return nil } + // Propagate delete markers as deletions on the remote. + // Delete markers are zero-content version entries, so they + // would be filtered out by the HasData check below. + if isDeleteMarker(message.NewEntry) { + if newParent, newName, ok := rewriteVersionedSourcePath(message.NewParentPath, message.NewEntry.Name); ok { + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(newParent, newName), remoteStorageMountLocation) + return syncDeleteMarker(client, option, message, dest) + } + return nil + } if !filer.HasData(message.NewEntry) { return nil } @@ -146,7 +156,16 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem glog.V(2).Infof("skipping creating: %+v", resp) return nil } - dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + // Rewrite internal versioning paths to the original S3 key + // to prevent double-versioning when central also has versioning enabled + parentPath, entryName := message.NewParentPath, message.NewEntry.Name + isRewrittenVersion := false + if newParent, newName, ok := rewriteVersionedSourcePath(parentPath, entryName); ok { + glog.V(0).Infof("rewrite versioned path %s/%s -> %s/%s", parentPath, entryName, newParent, newName) + parentPath, entryName = newParent, newName + isRewrittenVersion = true + } + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(parentPath, entryName), remoteStorageMountLocation) if message.NewEntry.IsDirectory { glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) return client.WriteDirectory(dest, message.NewEntry) @@ -156,9 +175,23 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem if writeErr != nil { return writeErr } + // Skip updateLocalEntry for versioned rewrites: the logical + // object (e.g. file.xml) has no filer entry in versioned + // buckets, and stamping the internal v_* entry with a + // RemoteEntry for the logical key is semantically wrong. + // Replay is safe because S3 PutObject is idempotent. + if isRewrittenVersion { + return nil + } return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry) } if filer_pb.IsDelete(resp) { + // Skip deletion of internal version files; individual version + // deletes should not propagate to the remote object + if isVersionedPath(resp.Directory, message.OldEntry.Name, message.OldEntry.IsDirectory) { + glog.V(2).Infof("skipping delete of internal version path: %s/%s", resp.Directory, message.OldEntry.Name) + return nil + } glog.V(2).Infof("delete: %+v", resp) dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) if message.OldEntry.IsDirectory { @@ -172,6 +205,11 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) { return nil } + // Skip updates to internal version paths + if isVersionedPath(message.NewParentPath, message.NewEntry.Name, message.NewEntry.IsDirectory) { + glog.V(2).Infof("skipping update of internal version path: %s/%s", message.NewParentPath, message.NewEntry.Name) + return nil + } oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) if !shouldSendToRemote(message.NewEntry) { @@ -292,3 +330,88 @@ func isMultipartUploadDir(dir string) bool { return strings.HasPrefix(dir, "/buckets/") && strings.Contains(dir, "/"+s3_constants.MultipartUploadsFolder+"/") } + +// isDeleteMarker returns true if the entry is an S3 delete marker +// (a zero-content version entry with ExtDeleteMarkerKey set to "true"). +func isDeleteMarker(entry *filer_pb.Entry) bool { + if entry == nil || entry.Extended == nil { + return false + } + return string(entry.Extended[s3_constants.ExtDeleteMarkerKey]) == "true" +} + +// syncDeleteMarker propagates a delete marker to the remote storage and +// persists a local sync marker so that replaying the same event is a no-op. +func syncDeleteMarker( + client remote_storage.RemoteStorageClient, + filerClient filer_pb.FilerClient, + message *filer_pb.EventNotification, + dest *remote_pb.RemoteStorageLocation, +) error { + glog.V(0).Infof("delete (marker) %s", remote_storage.FormatLocation(dest)) + if err := client.DeleteFile(dest); err != nil { + return err + } + return updateLocalEntry(filerClient, message.NewParentPath, message.NewEntry, &filer_pb.RemoteEntry{ + StorageName: dest.Name, + RemoteMtime: message.NewEntry.Attributes.GetMtime(), + }) +} + +// isVersionedPath returns true if the dir/name refers to an internal +// versioning path (.versions directory or a version file inside it). +// These paths are SeaweedFS-internal and must not be synced to remote +// storage as-is, because the remote S3 endpoint may apply its own +// versioning, leading to double-versioned paths. +// +// For directories: matches only when the entry name ends with the +// VersionsFolder suffix (e.g. "file.xml.versions"). +// For files: matches only when the parent directory ends with +// VersionsFolder and the file name has the "v_" prefix used by +// the internal version file naming convention. +func isVersionedPath(dir string, name string, isDir bool) bool { + if !strings.HasPrefix(dir, "/buckets/") { + return false + } + if isDir { + return strings.HasSuffix(name, s3_constants.VersionsFolder) + } + return strings.HasSuffix(dir, s3_constants.VersionsFolder) && strings.HasPrefix(name, "v_") +} + +// rewriteVersionedSourcePath rewrites an internal versioning path to the +// original S3 object key. When a file is uploaded to a versioned bucket, +// SeaweedFS stores it internally as: +// +// /buckets/{bucket}/{key}.versions/v_{versionId} +// +// This function strips the ".versions/v_{versionId}" suffix and returns +// the original parent directory and object name, so the remote destination +// points to the logical S3 key rather than the internal version storage path. +// +// Returns (newDir, newName, true) if the path was rewritten, or +// (dir, name, false) if the path is not a versioned path. +func rewriteVersionedSourcePath(dir string, name string) (string, string, bool) { + if !strings.HasPrefix(dir, "/buckets/") { + return dir, name, false + } + if !strings.HasSuffix(dir, s3_constants.VersionsFolder) { + return dir, name, false + } + if !strings.HasPrefix(name, "v_") { + return dir, name, false + } + // dir = "/buckets/bucket/path/to/file.xml.versions" + // name = "v_abc123" + // Original object: dir without ".versions" suffix → "/buckets/bucket/path/to/file.xml" + originalObjectPath := dir[:len(dir)-len(s3_constants.VersionsFolder)] + lastSlash := strings.LastIndex(originalObjectPath, "/") + if lastSlash < 0 { + return dir, name, false + } + newDir := originalObjectPath[:lastSlash] + if lastSlash == 0 { + newDir = "/" + } + return newDir, originalObjectPath[lastSlash+1:], true +} diff --git a/weed/command/filer_remote_sync_dir_test.go b/weed/command/filer_remote_sync_dir_test.go new file mode 100644 index 000000000..7b73038d0 --- /dev/null +++ b/weed/command/filer_remote_sync_dir_test.go @@ -0,0 +1,330 @@ +package command + +import ( + "strings" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// TestVersionedFilePathRewrittenForRemote verifies that the fix for +// https://github.com/seaweedfs/seaweedfs/discussions/8481#discussioncomment-16209342 +// works correctly: internal .versions/v_{id} paths are rewritten to the +// original S3 object key before syncing to the remote. +func TestVersionedFilePathRewrittenForRemote(t *testing.T) { + bucketsDir := "/buckets" + bucketName := "devicetransaction" + bucket := util.FullPath(bucketsDir).Child(bucketName) + + objectPath := "9e149757-2363-11f1-bfa6-11c8ff31b539/transactionlog-2026-03-19-16-30-00.xml" + versionId := "6761c63812bd9b64704acf08a3ba5800" + versionFileName := "v_" + versionId + + // The filer event for the version file creation + versionedParentPath := string(bucket) + "/" + objectPath + s3_constants.VersionsFolder + + // The CREATE event that remote_gateway receives + event := &filer_pb.SubscribeMetadataResponse{ + Directory: versionedParentPath, + EventNotification: &filer_pb.EventNotification{ + NewParentPath: versionedParentPath, + NewEntry: &filer_pb.Entry{ + Name: versionFileName, + Content: []byte("test content"), + }, + }, + } + + // Verify preconditions + if !filer_pb.IsCreate(event) { + t.Fatal("expected create event") + } + if isMultipartUploadFile(event.EventNotification.NewParentPath, event.EventNotification.NewEntry.Name) { + t.Fatal("should not be detected as multipart upload") + } + if !filer.HasData(event.EventNotification.NewEntry) { + t.Fatal("version file should have data") + } + if !shouldSendToRemote(event.EventNotification.NewEntry) { + t.Fatal("version file should be eligible for remote sync") + } + + // Apply the versioned path rewriting (as remote_gateway now does) + parentPath, entryName := event.EventNotification.NewParentPath, event.EventNotification.NewEntry.Name + if newParent, newName, ok := rewriteVersionedSourcePath(parentPath, entryName); ok { + parentPath, entryName = newParent, newName + } + + // Compute the remote destination with the rewritten path + remoteStorageMountLocation := &remote_pb.RemoteStorageLocation{ + Name: "central", + Bucket: bucketName, + Path: "/", + } + sourcePath := util.NewFullPath(parentPath, entryName) + dest := toRemoteStorageLocation(bucket, sourcePath, remoteStorageMountLocation) + + // Verify the destination does NOT contain internal .versions structure + if strings.Contains(dest.Path, s3_constants.VersionsFolder) { + t.Errorf("remote destination path still contains .versions: %s", dest.Path) + } + + expectedPath := "/" + objectPath + if dest.Path != expectedPath { + t.Errorf("remote destination path = %q, want %q", dest.Path, expectedPath) + } +} + +// TestVersionsDirectoryFilteredByHasData verifies that the .versions +// directory creation event is correctly filtered out (no data), so only +// the version file event needs path rewriting. +func TestVersionsDirectoryFilteredByHasData(t *testing.T) { + bucket := "/buckets/devicetransaction" + objectPath := "9e149757-2363-11f1-bfa6-11c8ff31b539/transactionlog-2026-03-19-16-30-00.xml" + + dirEvent := &filer_pb.SubscribeMetadataResponse{ + Directory: bucket + "/9e149757-2363-11f1-bfa6-11c8ff31b539", + EventNotification: &filer_pb.EventNotification{ + NewParentPath: bucket + "/9e149757-2363-11f1-bfa6-11c8ff31b539", + NewEntry: &filer_pb.Entry{ + Name: objectPath[strings.LastIndex(objectPath, "/")+1:] + s3_constants.VersionsFolder, + IsDirectory: true, + }, + }, + } + + if filer.HasData(dirEvent.EventNotification.NewEntry) { + t.Error(".versions directory should not have data") + } +} + +func TestIsVersionedPath(t *testing.T) { + tests := []struct { + label string + dir string + name string + isDir bool + expected bool + }{ + // Version file inside .versions directory (file, v_ prefix) + { + label: "version file in .versions dir", + dir: "/buckets/mybucket/path/to/file.xml" + s3_constants.VersionsFolder, + name: "v_6761c63812bd9b64704acf08a3ba5800", + isDir: false, + expected: true, + }, + // Regular file (not versioned) + { + label: "regular file", + dir: "/buckets/mybucket/path/to", + name: "file.xml", + isDir: false, + expected: false, + }, + // .versions directory entry itself + { + label: ".versions directory entry", + dir: "/buckets/mybucket/path/to", + name: "file.xml" + s3_constants.VersionsFolder, + isDir: true, + expected: true, + }, + // Non-version file inside .versions dir (no v_ prefix) — not internal + { + label: "non-version file in .versions dir", + dir: "/buckets/mybucket/file.xml" + s3_constants.VersionsFolder, + name: "some_other_file", + isDir: false, + expected: false, + }, + // User-created directory whose name ends with .versions — not + // treated as versioned when isDir=false (file inside it) + { + label: "file in user dir ending with .versions but no v_ prefix", + dir: "/buckets/mybucket/my" + s3_constants.VersionsFolder, + name: "data.txt", + isDir: false, + expected: false, + }, + // Regular directory (not .versions) + { + label: "regular directory", + dir: "/buckets/mybucket/path/to", + name: "subdir", + isDir: true, + expected: false, + }, + // Entry whose name ends with .versions but is a file, not a dir + { + label: "file named like .versions dir", + dir: "/buckets/mybucket/path/to", + name: "file.xml" + s3_constants.VersionsFolder, + isDir: false, + expected: false, + }, + // Non-bucket mount: .versions dir should not match + { + label: ".versions dir outside /buckets/", + dir: "/mnt/remote/path/to", + name: "file.xml" + s3_constants.VersionsFolder, + isDir: true, + expected: false, + }, + // Non-bucket mount: v_ file should not match + { + label: "v_ file outside /buckets/", + dir: "/data/archive/file.xml" + s3_constants.VersionsFolder, + name: "v_abc123", + isDir: false, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.label, func(t *testing.T) { + got := isVersionedPath(tt.dir, tt.name, tt.isDir) + if got != tt.expected { + t.Errorf("isVersionedPath(%q, %q, %v) = %v, want %v", + tt.dir, tt.name, tt.isDir, got, tt.expected) + } + }) + } +} + +// TestDeleteMarkerDetectedBeforeHasDataFilter verifies that a delete marker +// (zero-content version entry with ExtDeleteMarkerKey="true") is detected +// and can be propagated as a deletion, rather than being silently dropped +// by the HasData() check. +func TestDeleteMarkerDetectedBeforeHasDataFilter(t *testing.T) { + bucketsDir := "/buckets" + bucketName := "devicetransaction" + bucket := util.FullPath(bucketsDir).Child(bucketName) + + objectPath := "docs/report.pdf" + versionId := "aabb112233445566" + versionFileName := "v_" + versionId + versionedParentPath := string(bucket) + "/" + objectPath + s3_constants.VersionsFolder + + // A delete marker CREATE event: has ExtDeleteMarkerKey but no content + deleteMarkerEntry := &filer_pb.Entry{ + Name: versionFileName, + Extended: map[string][]byte{ + s3_constants.ExtDeleteMarkerKey: []byte("true"), + s3_constants.ExtVersionIdKey: []byte(versionId), + }, + // Content and Chunks are nil → HasData() returns false + } + + // Preconditions + if filer.HasData(deleteMarkerEntry) { + t.Fatal("delete marker should have no data") + } + if !isDeleteMarker(deleteMarkerEntry) { + t.Fatal("should be detected as a delete marker") + } + + // The versioned path should be rewritable to the original key + newParent, newName, ok := rewriteVersionedSourcePath(versionedParentPath, deleteMarkerEntry.Name) + if !ok { + t.Fatal("delete marker path should be rewritable") + } + + // Verify the rewritten path points to the original object + remoteStorageMountLocation := &remote_pb.RemoteStorageLocation{ + Name: "central", + Bucket: bucketName, + Path: "/", + } + dest := toRemoteStorageLocation(bucket, util.NewFullPath(newParent, newName), remoteStorageMountLocation) + + expectedPath := "/" + objectPath + if dest.Path != expectedPath { + t.Errorf("delete marker destination = %q, want %q", dest.Path, expectedPath) + } + if strings.Contains(dest.Path, s3_constants.VersionsFolder) { + t.Errorf("delete marker destination should not contain .versions: %s", dest.Path) + } +} + +func TestRewriteVersionedSourcePath(t *testing.T) { + tests := []struct { + name string + dir string + entryName string + wantDir string + wantName string + wantChanged bool + }{ + { + name: "version file in .versions dir", + dir: "/buckets/bucket/path/to/file.xml" + s3_constants.VersionsFolder, + entryName: "v_6761c63812bd9b64704acf08a3ba5800", + wantDir: "/buckets/bucket/path/to", + wantName: "file.xml", + wantChanged: true, + }, + { + name: "regular file", + dir: "/buckets/bucket/path/to", + entryName: "file.xml", + wantDir: "/buckets/bucket/path/to", + wantName: "file.xml", + wantChanged: false, + }, + { + name: "version file at bucket root", + dir: "/buckets/bucket/report.pdf" + s3_constants.VersionsFolder, + entryName: "v_abc123", + wantDir: "/buckets/bucket", + wantName: "report.pdf", + wantChanged: true, + }, + { + name: "non-bucket path not rewritten", + dir: "/file.xml" + s3_constants.VersionsFolder, + entryName: "v_abc123", + wantDir: "/file.xml" + s3_constants.VersionsFolder, + wantName: "v_abc123", + wantChanged: false, + }, + { + name: "non-bucket mount not rewritten", + dir: "/mnt/remote/file.xml" + s3_constants.VersionsFolder, + entryName: "v_abc123", + wantDir: "/mnt/remote/file.xml" + s3_constants.VersionsFolder, + wantName: "v_abc123", + wantChanged: false, + }, + { + name: "non-version file in .versions dir", + dir: "/buckets/bucket/file.xml" + s3_constants.VersionsFolder, + entryName: "some_other_file", + wantDir: "/buckets/bucket/file.xml" + s3_constants.VersionsFolder, + wantName: "some_other_file", + wantChanged: false, + }, + { + name: "dir not ending in .versions", + dir: "/buckets/bucket/path/to", + entryName: "v_abc123", + wantDir: "/buckets/bucket/path/to", + wantName: "v_abc123", + wantChanged: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotDir, gotName, gotChanged := rewriteVersionedSourcePath(tt.dir, tt.entryName) + if gotDir != tt.wantDir || gotName != tt.wantName || gotChanged != tt.wantChanged { + t.Errorf("rewriteVersionedSourcePath(%q, %q) = (%q, %q, %v), want (%q, %q, %v)", + tt.dir, tt.entryName, gotDir, gotName, gotChanged, tt.wantDir, tt.wantName, tt.wantChanged) + } + }) + } +}