diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 5238e7fc5..97147766e 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -210,6 +210,24 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) { return nil } + // Propagate delete markers as deletions on the remote. + // Delete markers are zero-content version entries, so they + // would be filtered out by the HasData check below. + if isDeleteMarker(message.NewEntry) { + if newParent, newName, ok := rewriteVersionedSourcePath(message.NewParentPath, message.NewEntry.Name); ok { + bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(newParent) + if !ok { + return nil + } + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + dest := toRemoteStorageLocation(bucket, util.NewFullPath(newParent, newName), remoteStorageMountLocation) + return syncDeleteMarker(client, option, message, dest) + } + return nil + } if !filer.HasData(message.NewEntry) { return nil } @@ -226,7 +244,16 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour glog.V(2).Infof("skipping creating: %+v", resp) return nil } - dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + // Rewrite internal versioning paths to the original S3 key + // to prevent double-versioning when central also has versioning enabled + parentPath, entryName := message.NewParentPath, message.NewEntry.Name + isRewrittenVersion := false + if newParent, newName, ok := rewriteVersionedSourcePath(parentPath, entryName); ok { + glog.V(0).Infof("rewrite versioned path %s/%s -> %s/%s", parentPath, entryName, newParent, newName) + parentPath, entryName = newParent, newName + isRewrittenVersion = true + } + dest := toRemoteStorageLocation(bucket, util.NewFullPath(parentPath, entryName), remoteStorageMountLocation) if message.NewEntry.IsDirectory { glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) return client.WriteDirectory(dest, message.NewEntry) @@ -236,12 +263,26 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour if writeErr != nil { return writeErr } + // Skip updateLocalEntry for versioned rewrites: the logical + // object (e.g. file.xml) has no filer entry in versioned + // buckets, and stamping the internal v_* entry with a + // RemoteEntry for the logical key is semantically wrong. + // Replay is safe because S3 PutObject is idempotent. + if isRewrittenVersion { + return nil + } return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry) } if filer_pb.IsDelete(resp) { if resp.Directory == option.bucketsDir { return handleDeleteBucket(message.OldEntry) } + // Skip deletion of internal version files; individual version + // deletes should not propagate to the remote object + if isVersionedPath(resp.Directory, message.OldEntry.Name, message.OldEntry.IsDirectory) { + glog.V(2).Infof("skipping delete of internal version path: %s/%s", resp.Directory, message.OldEntry.Name) + return nil + } bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory) if !ok { return nil @@ -276,6 +317,11 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) { return nil } + // Skip updates to internal version paths + if isVersionedPath(message.NewParentPath, message.NewEntry.Name, message.NewEntry.IsDirectory) { + glog.V(2).Infof("skipping update of internal version path: %s/%s", message.NewParentPath, message.NewEntry.Name) + return nil + } oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory) newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath) if oldOk && newOk { diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index b5cc02cfc..60f1f8f5b 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -138,6 +138,16 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) { return nil } + // Propagate delete markers as deletions on the remote. + // Delete markers are zero-content version entries, so they + // would be filtered out by the HasData check below. + if isDeleteMarker(message.NewEntry) { + if newParent, newName, ok := rewriteVersionedSourcePath(message.NewParentPath, message.NewEntry.Name); ok { + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(newParent, newName), remoteStorageMountLocation) + return syncDeleteMarker(client, option, message, dest) + } + return nil + } if !filer.HasData(message.NewEntry) { return nil } @@ -146,7 +156,16 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem glog.V(2).Infof("skipping creating: %+v", resp) return nil } - dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + // Rewrite internal versioning paths to the original S3 key + // to prevent double-versioning when central also has versioning enabled + parentPath, entryName := message.NewParentPath, message.NewEntry.Name + isRewrittenVersion := false + if newParent, newName, ok := rewriteVersionedSourcePath(parentPath, entryName); ok { + glog.V(0).Infof("rewrite versioned path %s/%s -> %s/%s", parentPath, entryName, newParent, newName) + parentPath, entryName = newParent, newName + isRewrittenVersion = true + } + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(parentPath, entryName), remoteStorageMountLocation) if message.NewEntry.IsDirectory { glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) return client.WriteDirectory(dest, message.NewEntry) @@ -156,9 +175,23 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem if writeErr != nil { return writeErr } + // Skip updateLocalEntry for versioned rewrites: the logical + // object (e.g. file.xml) has no filer entry in versioned + // buckets, and stamping the internal v_* entry with a + // RemoteEntry for the logical key is semantically wrong. + // Replay is safe because S3 PutObject is idempotent. + if isRewrittenVersion { + return nil + } return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry) } if filer_pb.IsDelete(resp) { + // Skip deletion of internal version files; individual version + // deletes should not propagate to the remote object + if isVersionedPath(resp.Directory, message.OldEntry.Name, message.OldEntry.IsDirectory) { + glog.V(2).Infof("skipping delete of internal version path: %s/%s", resp.Directory, message.OldEntry.Name) + return nil + } glog.V(2).Infof("delete: %+v", resp) dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) if message.OldEntry.IsDirectory { @@ -172,6 +205,11 @@ func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.Rem if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) { return nil } + // Skip updates to internal version paths + if isVersionedPath(message.NewParentPath, message.NewEntry.Name, message.NewEntry.IsDirectory) { + glog.V(2).Infof("skipping update of internal version path: %s/%s", message.NewParentPath, message.NewEntry.Name) + return nil + } oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) if !shouldSendToRemote(message.NewEntry) { @@ -292,3 +330,88 @@ func isMultipartUploadDir(dir string) bool { return strings.HasPrefix(dir, "/buckets/") && strings.Contains(dir, "/"+s3_constants.MultipartUploadsFolder+"/") } + +// isDeleteMarker returns true if the entry is an S3 delete marker +// (a zero-content version entry with ExtDeleteMarkerKey set to "true"). +func isDeleteMarker(entry *filer_pb.Entry) bool { + if entry == nil || entry.Extended == nil { + return false + } + return string(entry.Extended[s3_constants.ExtDeleteMarkerKey]) == "true" +} + +// syncDeleteMarker propagates a delete marker to the remote storage and +// persists a local sync marker so that replaying the same event is a no-op. +func syncDeleteMarker( + client remote_storage.RemoteStorageClient, + filerClient filer_pb.FilerClient, + message *filer_pb.EventNotification, + dest *remote_pb.RemoteStorageLocation, +) error { + glog.V(0).Infof("delete (marker) %s", remote_storage.FormatLocation(dest)) + if err := client.DeleteFile(dest); err != nil { + return err + } + return updateLocalEntry(filerClient, message.NewParentPath, message.NewEntry, &filer_pb.RemoteEntry{ + StorageName: dest.Name, + RemoteMtime: message.NewEntry.Attributes.GetMtime(), + }) +} + +// isVersionedPath returns true if the dir/name refers to an internal +// versioning path (.versions directory or a version file inside it). +// These paths are SeaweedFS-internal and must not be synced to remote +// storage as-is, because the remote S3 endpoint may apply its own +// versioning, leading to double-versioned paths. +// +// For directories: matches only when the entry name ends with the +// VersionsFolder suffix (e.g. "file.xml.versions"). +// For files: matches only when the parent directory ends with +// VersionsFolder and the file name has the "v_" prefix used by +// the internal version file naming convention. +func isVersionedPath(dir string, name string, isDir bool) bool { + if !strings.HasPrefix(dir, "/buckets/") { + return false + } + if isDir { + return strings.HasSuffix(name, s3_constants.VersionsFolder) + } + return strings.HasSuffix(dir, s3_constants.VersionsFolder) && strings.HasPrefix(name, "v_") +} + +// rewriteVersionedSourcePath rewrites an internal versioning path to the +// original S3 object key. When a file is uploaded to a versioned bucket, +// SeaweedFS stores it internally as: +// +// /buckets/{bucket}/{key}.versions/v_{versionId} +// +// This function strips the ".versions/v_{versionId}" suffix and returns +// the original parent directory and object name, so the remote destination +// points to the logical S3 key rather than the internal version storage path. +// +// Returns (newDir, newName, true) if the path was rewritten, or +// (dir, name, false) if the path is not a versioned path. +func rewriteVersionedSourcePath(dir string, name string) (string, string, bool) { + if !strings.HasPrefix(dir, "/buckets/") { + return dir, name, false + } + if !strings.HasSuffix(dir, s3_constants.VersionsFolder) { + return dir, name, false + } + if !strings.HasPrefix(name, "v_") { + return dir, name, false + } + // dir = "/buckets/bucket/path/to/file.xml.versions" + // name = "v_abc123" + // Original object: dir without ".versions" suffix → "/buckets/bucket/path/to/file.xml" + originalObjectPath := dir[:len(dir)-len(s3_constants.VersionsFolder)] + lastSlash := strings.LastIndex(originalObjectPath, "/") + if lastSlash < 0 { + return dir, name, false + } + newDir := originalObjectPath[:lastSlash] + if lastSlash == 0 { + newDir = "/" + } + return newDir, originalObjectPath[lastSlash+1:], true +} diff --git a/weed/command/filer_remote_sync_dir_test.go b/weed/command/filer_remote_sync_dir_test.go new file mode 100644 index 000000000..7b73038d0 --- /dev/null +++ b/weed/command/filer_remote_sync_dir_test.go @@ -0,0 +1,330 @@ +package command + +import ( + "strings" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// TestVersionedFilePathRewrittenForRemote verifies that the fix for +// https://github.com/seaweedfs/seaweedfs/discussions/8481#discussioncomment-16209342 +// works correctly: internal .versions/v_{id} paths are rewritten to the +// original S3 object key before syncing to the remote. +func TestVersionedFilePathRewrittenForRemote(t *testing.T) { + bucketsDir := "/buckets" + bucketName := "devicetransaction" + bucket := util.FullPath(bucketsDir).Child(bucketName) + + objectPath := "9e149757-2363-11f1-bfa6-11c8ff31b539/transactionlog-2026-03-19-16-30-00.xml" + versionId := "6761c63812bd9b64704acf08a3ba5800" + versionFileName := "v_" + versionId + + // The filer event for the version file creation + versionedParentPath := string(bucket) + "/" + objectPath + s3_constants.VersionsFolder + + // The CREATE event that remote_gateway receives + event := &filer_pb.SubscribeMetadataResponse{ + Directory: versionedParentPath, + EventNotification: &filer_pb.EventNotification{ + NewParentPath: versionedParentPath, + NewEntry: &filer_pb.Entry{ + Name: versionFileName, + Content: []byte("test content"), + }, + }, + } + + // Verify preconditions + if !filer_pb.IsCreate(event) { + t.Fatal("expected create event") + } + if isMultipartUploadFile(event.EventNotification.NewParentPath, event.EventNotification.NewEntry.Name) { + t.Fatal("should not be detected as multipart upload") + } + if !filer.HasData(event.EventNotification.NewEntry) { + t.Fatal("version file should have data") + } + if !shouldSendToRemote(event.EventNotification.NewEntry) { + t.Fatal("version file should be eligible for remote sync") + } + + // Apply the versioned path rewriting (as remote_gateway now does) + parentPath, entryName := event.EventNotification.NewParentPath, event.EventNotification.NewEntry.Name + if newParent, newName, ok := rewriteVersionedSourcePath(parentPath, entryName); ok { + parentPath, entryName = newParent, newName + } + + // Compute the remote destination with the rewritten path + remoteStorageMountLocation := &remote_pb.RemoteStorageLocation{ + Name: "central", + Bucket: bucketName, + Path: "/", + } + sourcePath := util.NewFullPath(parentPath, entryName) + dest := toRemoteStorageLocation(bucket, sourcePath, remoteStorageMountLocation) + + // Verify the destination does NOT contain internal .versions structure + if strings.Contains(dest.Path, s3_constants.VersionsFolder) { + t.Errorf("remote destination path still contains .versions: %s", dest.Path) + } + + expectedPath := "/" + objectPath + if dest.Path != expectedPath { + t.Errorf("remote destination path = %q, want %q", dest.Path, expectedPath) + } +} + +// TestVersionsDirectoryFilteredByHasData verifies that the .versions +// directory creation event is correctly filtered out (no data), so only +// the version file event needs path rewriting. +func TestVersionsDirectoryFilteredByHasData(t *testing.T) { + bucket := "/buckets/devicetransaction" + objectPath := "9e149757-2363-11f1-bfa6-11c8ff31b539/transactionlog-2026-03-19-16-30-00.xml" + + dirEvent := &filer_pb.SubscribeMetadataResponse{ + Directory: bucket + "/9e149757-2363-11f1-bfa6-11c8ff31b539", + EventNotification: &filer_pb.EventNotification{ + NewParentPath: bucket + "/9e149757-2363-11f1-bfa6-11c8ff31b539", + NewEntry: &filer_pb.Entry{ + Name: objectPath[strings.LastIndex(objectPath, "/")+1:] + s3_constants.VersionsFolder, + IsDirectory: true, + }, + }, + } + + if filer.HasData(dirEvent.EventNotification.NewEntry) { + t.Error(".versions directory should not have data") + } +} + +func TestIsVersionedPath(t *testing.T) { + tests := []struct { + label string + dir string + name string + isDir bool + expected bool + }{ + // Version file inside .versions directory (file, v_ prefix) + { + label: "version file in .versions dir", + dir: "/buckets/mybucket/path/to/file.xml" + s3_constants.VersionsFolder, + name: "v_6761c63812bd9b64704acf08a3ba5800", + isDir: false, + expected: true, + }, + // Regular file (not versioned) + { + label: "regular file", + dir: "/buckets/mybucket/path/to", + name: "file.xml", + isDir: false, + expected: false, + }, + // .versions directory entry itself + { + label: ".versions directory entry", + dir: "/buckets/mybucket/path/to", + name: "file.xml" + s3_constants.VersionsFolder, + isDir: true, + expected: true, + }, + // Non-version file inside .versions dir (no v_ prefix) — not internal + { + label: "non-version file in .versions dir", + dir: "/buckets/mybucket/file.xml" + s3_constants.VersionsFolder, + name: "some_other_file", + isDir: false, + expected: false, + }, + // User-created directory whose name ends with .versions — not + // treated as versioned when isDir=false (file inside it) + { + label: "file in user dir ending with .versions but no v_ prefix", + dir: "/buckets/mybucket/my" + s3_constants.VersionsFolder, + name: "data.txt", + isDir: false, + expected: false, + }, + // Regular directory (not .versions) + { + label: "regular directory", + dir: "/buckets/mybucket/path/to", + name: "subdir", + isDir: true, + expected: false, + }, + // Entry whose name ends with .versions but is a file, not a dir + { + label: "file named like .versions dir", + dir: "/buckets/mybucket/path/to", + name: "file.xml" + s3_constants.VersionsFolder, + isDir: false, + expected: false, + }, + // Non-bucket mount: .versions dir should not match + { + label: ".versions dir outside /buckets/", + dir: "/mnt/remote/path/to", + name: "file.xml" + s3_constants.VersionsFolder, + isDir: true, + expected: false, + }, + // Non-bucket mount: v_ file should not match + { + label: "v_ file outside /buckets/", + dir: "/data/archive/file.xml" + s3_constants.VersionsFolder, + name: "v_abc123", + isDir: false, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.label, func(t *testing.T) { + got := isVersionedPath(tt.dir, tt.name, tt.isDir) + if got != tt.expected { + t.Errorf("isVersionedPath(%q, %q, %v) = %v, want %v", + tt.dir, tt.name, tt.isDir, got, tt.expected) + } + }) + } +} + +// TestDeleteMarkerDetectedBeforeHasDataFilter verifies that a delete marker +// (zero-content version entry with ExtDeleteMarkerKey="true") is detected +// and can be propagated as a deletion, rather than being silently dropped +// by the HasData() check. +func TestDeleteMarkerDetectedBeforeHasDataFilter(t *testing.T) { + bucketsDir := "/buckets" + bucketName := "devicetransaction" + bucket := util.FullPath(bucketsDir).Child(bucketName) + + objectPath := "docs/report.pdf" + versionId := "aabb112233445566" + versionFileName := "v_" + versionId + versionedParentPath := string(bucket) + "/" + objectPath + s3_constants.VersionsFolder + + // A delete marker CREATE event: has ExtDeleteMarkerKey but no content + deleteMarkerEntry := &filer_pb.Entry{ + Name: versionFileName, + Extended: map[string][]byte{ + s3_constants.ExtDeleteMarkerKey: []byte("true"), + s3_constants.ExtVersionIdKey: []byte(versionId), + }, + // Content and Chunks are nil → HasData() returns false + } + + // Preconditions + if filer.HasData(deleteMarkerEntry) { + t.Fatal("delete marker should have no data") + } + if !isDeleteMarker(deleteMarkerEntry) { + t.Fatal("should be detected as a delete marker") + } + + // The versioned path should be rewritable to the original key + newParent, newName, ok := rewriteVersionedSourcePath(versionedParentPath, deleteMarkerEntry.Name) + if !ok { + t.Fatal("delete marker path should be rewritable") + } + + // Verify the rewritten path points to the original object + remoteStorageMountLocation := &remote_pb.RemoteStorageLocation{ + Name: "central", + Bucket: bucketName, + Path: "/", + } + dest := toRemoteStorageLocation(bucket, util.NewFullPath(newParent, newName), remoteStorageMountLocation) + + expectedPath := "/" + objectPath + if dest.Path != expectedPath { + t.Errorf("delete marker destination = %q, want %q", dest.Path, expectedPath) + } + if strings.Contains(dest.Path, s3_constants.VersionsFolder) { + t.Errorf("delete marker destination should not contain .versions: %s", dest.Path) + } +} + +func TestRewriteVersionedSourcePath(t *testing.T) { + tests := []struct { + name string + dir string + entryName string + wantDir string + wantName string + wantChanged bool + }{ + { + name: "version file in .versions dir", + dir: "/buckets/bucket/path/to/file.xml" + s3_constants.VersionsFolder, + entryName: "v_6761c63812bd9b64704acf08a3ba5800", + wantDir: "/buckets/bucket/path/to", + wantName: "file.xml", + wantChanged: true, + }, + { + name: "regular file", + dir: "/buckets/bucket/path/to", + entryName: "file.xml", + wantDir: "/buckets/bucket/path/to", + wantName: "file.xml", + wantChanged: false, + }, + { + name: "version file at bucket root", + dir: "/buckets/bucket/report.pdf" + s3_constants.VersionsFolder, + entryName: "v_abc123", + wantDir: "/buckets/bucket", + wantName: "report.pdf", + wantChanged: true, + }, + { + name: "non-bucket path not rewritten", + dir: "/file.xml" + s3_constants.VersionsFolder, + entryName: "v_abc123", + wantDir: "/file.xml" + s3_constants.VersionsFolder, + wantName: "v_abc123", + wantChanged: false, + }, + { + name: "non-bucket mount not rewritten", + dir: "/mnt/remote/file.xml" + s3_constants.VersionsFolder, + entryName: "v_abc123", + wantDir: "/mnt/remote/file.xml" + s3_constants.VersionsFolder, + wantName: "v_abc123", + wantChanged: false, + }, + { + name: "non-version file in .versions dir", + dir: "/buckets/bucket/file.xml" + s3_constants.VersionsFolder, + entryName: "some_other_file", + wantDir: "/buckets/bucket/file.xml" + s3_constants.VersionsFolder, + wantName: "some_other_file", + wantChanged: false, + }, + { + name: "dir not ending in .versions", + dir: "/buckets/bucket/path/to", + entryName: "v_abc123", + wantDir: "/buckets/bucket/path/to", + wantName: "v_abc123", + wantChanged: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotDir, gotName, gotChanged := rewriteVersionedSourcePath(tt.dir, tt.entryName) + if gotDir != tt.wantDir || gotName != tt.wantName || gotChanged != tt.wantChanged { + t.Errorf("rewriteVersionedSourcePath(%q, %q) = (%q, %q, %v), want (%q, %q, %v)", + tt.dir, tt.entryName, gotDir, gotName, gotChanged, tt.wantDir, tt.wantName, tt.wantChanged) + } + }) + } +}