diff --git a/weed/plugin/worker/lifecycle/execution.go b/weed/plugin/worker/lifecycle/execution.go index be47b6add..183e7648e 100644 --- a/weed/plugin/worker/lifecycle/execution.go +++ b/weed/plugin/worker/lifecycle/execution.go @@ -7,6 +7,7 @@ import ( "math" "path" "sort" + "strconv" "strings" "time" @@ -174,6 +175,9 @@ func (h *Handler) executeLifecycleForBucket( }) } + // Clean up .versions directories left empty after version deletion. + cleanupEmptyVersionsDirectories(ctx, filerClient, expired) + remaining -= result.objectsExpired + result.errors if remaining < 0 { remaining = 0 @@ -541,8 +545,22 @@ func listExpiredObjectsByRules( return nil // skip .uploads at bucket root only } if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { - // Process versioned object. versionsDir := path.Join(dir, entry.Name) + + // Evaluate Expiration rules against the latest version. + // In versioned buckets, data lives in .versions/ directories, + // so we must evaluate the latest version here — it is never + // seen as a regular file entry in the parent directory. + if obj, ok := latestVersionExpiredByRules(ctx, client, entry, versionsDir, bucketPath, rules, now, needTags); ok { + expired = append(expired, obj) + scanned++ + if limit > 0 && int64(len(expired)) >= limit { + limitReached = true + return errLimitReached + } + } + + // Process noncurrent versions. vExpired, vScanned, vErr := processVersionsDirectory(ctx, client, versionsDir, bucketPath, rules, now, needTags, limit-int64(len(expired))) if vErr != nil { glog.V(1).Infof("s3_lifecycle: %v", vErr) @@ -718,6 +736,134 @@ func processVersionsDirectory( return expired, scanned, nil } +// latestVersionExpiredByRules evaluates Expiration rules (Days/Date) against +// the latest version in a .versions directory. In versioned buckets all data +// lives inside .versions/ directories, so the latest version is never seen as +// a regular file entry during the bucket walk. Without this check, Expiration +// rules would never fire for versioned objects (issue #8757). +// +// The .versions directory entry caches metadata about the latest version in +// its Extended attributes, so we can evaluate expiration without an extra +// filer round-trip. +func latestVersionExpiredByRules( + ctx context.Context, + client filer_pb.SeaweedFilerClient, + dirEntry *filer_pb.Entry, + versionsDir, bucketPath string, + rules []s3lifecycle.Rule, + now time.Time, + needTags bool, +) (expiredObject, bool) { + if dirEntry.Extended == nil { + return expiredObject{}, false + } + + // Skip if the latest version is a delete marker — those are handled + // by the ExpiredObjectDeleteMarker rule in cleanupDeleteMarkers. + if string(dirEntry.Extended[s3_constants.ExtLatestVersionIsDeleteMarker]) == "true" { + return expiredObject{}, false + } + + latestFileName := string(dirEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]) + if latestFileName == "" { + return expiredObject{}, false + } + + // Derive the object key: /buckets/b/path/key.versions → path/key + relDir := strings.TrimPrefix(versionsDir, bucketPath+"/") + objKey := strings.TrimSuffix(relDir, s3_constants.VersionsFolder) + + objInfo := s3lifecycle.ObjectInfo{ + Key: objKey, + IsLatest: true, + } + + // Populate ModTime from cached metadata. + if mtimeStr := string(dirEntry.Extended[s3_constants.ExtLatestVersionMtimeKey]); mtimeStr != "" { + if mtime, err := strconv.ParseInt(mtimeStr, 10, 64); err == nil { + objInfo.ModTime = time.Unix(mtime, 0) + } + } + if objInfo.ModTime.IsZero() && dirEntry.Attributes != nil && dirEntry.Attributes.Mtime > 0 { + objInfo.ModTime = time.Unix(dirEntry.Attributes.Mtime, 0) + } + + // Populate Size from cached metadata. + if sizeStr := string(dirEntry.Extended[s3_constants.ExtLatestVersionSizeKey]); sizeStr != "" { + if size, err := strconv.ParseInt(sizeStr, 10, 64); err == nil { + objInfo.Size = size + } + } + + if needTags { + // Tags are stored on the version file entry, not the .versions + // directory. Fetch the actual version file to get them. + resp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + Directory: versionsDir, + Name: latestFileName, + }) + if err == nil && resp.Entry != nil { + objInfo.Tags = s3lifecycle.ExtractTags(resp.Entry.Extended) + } + } + + result := s3lifecycle.Evaluate(rules, objInfo, now) + if result.Action == s3lifecycle.ActionDeleteObject { + return expiredObject{dir: versionsDir, name: latestFileName}, true + } + + return expiredObject{}, false +} + +// cleanupEmptyVersionsDirectories removes .versions directories that became +// empty after their contents were deleted. This is called after +// deleteExpiredObjects to avoid leaving orphaned directories. +func cleanupEmptyVersionsDirectories( + ctx context.Context, + client filer_pb.SeaweedFilerClient, + deleted []expiredObject, +) int { + // Collect unique .versions directories that had entries deleted. + versionsDirs := map[string]struct{}{} + for _, obj := range deleted { + if strings.HasSuffix(obj.dir, s3_constants.VersionsFolder) { + versionsDirs[obj.dir] = struct{}{} + } + } + + cleaned := 0 + for vDir := range versionsDirs { + if ctx.Err() != nil { + break + } + // Check if the directory is now empty. + empty := true + listErr := filer_pb.SeaweedList(ctx, client, vDir, "", func(entry *filer_pb.Entry, isLast bool) error { + empty = false + return errLimitReached // stop after first entry + }, "", false, 1) + + if listErr != nil && !errors.Is(listErr, errLimitReached) { + glog.V(1).Infof("s3_lifecycle: failed to check if versions dir %s is empty: %v", vDir, listErr) + continue + } + + if !empty { + continue + } + + // Remove the empty .versions directory. + parentDir, dirName := path.Split(vDir) + parentDir = strings.TrimSuffix(parentDir, "/") + if err := filer_pb.DoRemove(ctx, client, parentDir, dirName, false, true, true, false, nil); err != nil { + glog.V(1).Infof("s3_lifecycle: failed to clean up empty versions dir %s: %v", vDir, err) + } else { + cleaned++ + } + } + return cleaned +} + // sortVersionsByVersionId sorts version entries newest-first using full // version ID comparison (matching compareVersionIds in s3api_version_id.go). // This uses the complete version ID string, not just the decoded timestamp, diff --git a/weed/plugin/worker/lifecycle/integration_test.go b/weed/plugin/worker/lifecycle/integration_test.go index e84635713..60b11175c 100644 --- a/weed/plugin/worker/lifecycle/integration_test.go +++ b/weed/plugin/worker/lifecycle/integration_test.go @@ -527,3 +527,255 @@ func TestIntegration_DeleteExpiredObjects(t *testing.T) { t.Error("to-keep.txt should still exist") } } + +// TestIntegration_VersionedBucket_ExpirationDays verifies that Expiration.Days +// rules correctly detect and delete the latest version in a versioned bucket +// where all data lives in .versions/ directories (issue #8757). +func TestIntegration_VersionedBucket_ExpirationDays(t *testing.T) { + server, client := startTestFiler(t) + bucketsPath := "/buckets" + bucket := "versioned-expire" + bucketDir := bucketsPath + "/" + bucket + + now := time.Now() + old := now.Add(-60 * 24 * time.Hour) // 60 days ago — should expire + recent := now.Add(-5 * 24 * time.Hour) // 5 days ago — should NOT expire + + vidOld := testVersionId(old) + vidRecent := testVersionId(recent) + + server.putEntry(bucketsPath, &filer_pb.Entry{Name: bucket, IsDirectory: true}) + + // --- Single-version object (old, should expire) --- + server.putEntry(bucketDir, &filer_pb.Entry{ + Name: "old-file.txt" + s3_constants.VersionsFolder, IsDirectory: true, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte(vidOld), + s3_constants.ExtLatestVersionFileNameKey: []byte("v_" + vidOld), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(old.Unix(), 10)), + s3_constants.ExtLatestVersionSizeKey: []byte("3400000000"), + s3_constants.ExtLatestVersionIsDeleteMarker: []byte("false"), + }, + }) + oldVersionsDir := bucketDir + "/old-file.txt" + s3_constants.VersionsFolder + server.putEntry(oldVersionsDir, &filer_pb.Entry{ + Name: "v_" + vidOld, + Attributes: &filer_pb.FuseAttributes{Mtime: old.Unix(), FileSize: 3400000000}, + Extended: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte(vidOld), + }, + }) + + // --- Single-version object (recent, should NOT expire) --- + server.putEntry(bucketDir, &filer_pb.Entry{ + Name: "recent-file.txt" + s3_constants.VersionsFolder, IsDirectory: true, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte(vidRecent), + s3_constants.ExtLatestVersionFileNameKey: []byte("v_" + vidRecent), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(recent.Unix(), 10)), + s3_constants.ExtLatestVersionSizeKey: []byte("3400000000"), + s3_constants.ExtLatestVersionIsDeleteMarker: []byte("false"), + }, + }) + recentVersionsDir := bucketDir + "/recent-file.txt" + s3_constants.VersionsFolder + server.putEntry(recentVersionsDir, &filer_pb.Entry{ + Name: "v_" + vidRecent, + Attributes: &filer_pb.FuseAttributes{Mtime: recent.Unix(), FileSize: 3400000000}, + Extended: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte(vidRecent), + }, + }) + + // --- Object with delete marker as latest (should NOT be expired by Expiration.Days) --- + vidMarker := testVersionId(old) + server.putEntry(bucketDir, &filer_pb.Entry{ + Name: "deleted-obj.txt" + s3_constants.VersionsFolder, IsDirectory: true, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte(vidMarker), + s3_constants.ExtLatestVersionFileNameKey: []byte("v_" + vidMarker), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(old.Unix(), 10)), + s3_constants.ExtLatestVersionIsDeleteMarker: []byte("true"), + }, + }) + + rules := []s3lifecycle.Rule{{ + ID: "expire-30d", Status: "Enabled", + ExpirationDays: 30, + }} + + expired, scanned, err := listExpiredObjectsByRules(context.Background(), client, bucketsPath, bucket, rules, 100) + if err != nil { + t.Fatalf("listExpiredObjectsByRules: %v", err) + } + + // Only old-file.txt's latest version should be expired. + // recent-file.txt is too young; deleted-obj.txt is a delete marker. + if len(expired) != 1 { + t.Fatalf("expected 1 expired, got %d: %+v", len(expired), expired) + } + if expired[0].dir != oldVersionsDir { + t.Errorf("expected dir=%s, got %s", oldVersionsDir, expired[0].dir) + } + if expired[0].name != "v_"+vidOld { + t.Errorf("expected name=v_%s, got %s", vidOld, expired[0].name) + } + // The old-file.txt latest version should count as scanned. + if scanned < 1 { + t.Errorf("expected at least 1 scanned, got %d", scanned) + } +} + +// TestIntegration_VersionedBucket_ExpirationDays_DeleteAndCleanup verifies +// end-to-end deletion and .versions directory cleanup for a single-version +// versioned object expired by Expiration.Days. +func TestIntegration_VersionedBucket_ExpirationDays_DeleteAndCleanup(t *testing.T) { + server, client := startTestFiler(t) + bucketsPath := "/buckets" + bucket := "versioned-cleanup" + bucketDir := bucketsPath + "/" + bucket + + now := time.Now() + old := now.Add(-60 * 24 * time.Hour) + vidOld := testVersionId(old) + + server.putEntry(bucketsPath, &filer_pb.Entry{Name: bucket, IsDirectory: true}) + + // Single-version object that should expire. + versionsDir := bucketDir + "/data.bin" + s3_constants.VersionsFolder + server.putEntry(bucketDir, &filer_pb.Entry{ + Name: "data.bin" + s3_constants.VersionsFolder, IsDirectory: true, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte(vidOld), + s3_constants.ExtLatestVersionFileNameKey: []byte("v_" + vidOld), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(old.Unix(), 10)), + s3_constants.ExtLatestVersionSizeKey: []byte("1024"), + s3_constants.ExtLatestVersionIsDeleteMarker: []byte("false"), + }, + }) + server.putEntry(versionsDir, &filer_pb.Entry{ + Name: "v_" + vidOld, + Attributes: &filer_pb.FuseAttributes{Mtime: old.Unix(), FileSize: 1024}, + Extended: map[string][]byte{ + s3_constants.ExtVersionIdKey: []byte(vidOld), + }, + }) + + rules := []s3lifecycle.Rule{{ + ID: "expire-30d", Status: "Enabled", + ExpirationDays: 30, + }} + + // Step 1: Detect expired. + expired, _, err := listExpiredObjectsByRules(context.Background(), client, bucketsPath, bucket, rules, 100) + if err != nil { + t.Fatalf("list: %v", err) + } + if len(expired) != 1 { + t.Fatalf("expected 1 expired, got %d", len(expired)) + } + + // Step 2: Delete the expired version file. + deleted, errs, delErr := deleteExpiredObjects(context.Background(), client, expired) + if delErr != nil { + t.Fatalf("delete: %v", delErr) + } + if deleted != 1 || errs != 0 { + t.Errorf("expected 1 deleted 0 errors, got %d deleted %d errors", deleted, errs) + } + + // Version file should be gone. + if server.hasEntry(versionsDir, "v_"+vidOld) { + t.Error("version file should have been removed") + } + + // Step 3: Cleanup empty .versions directory. + cleaned := cleanupEmptyVersionsDirectories(context.Background(), client, expired) + if cleaned != 1 { + t.Errorf("expected 1 directory cleaned, got %d", cleaned) + } + + // The .versions directory itself should be gone. + if server.hasEntry(bucketDir, "data.bin"+s3_constants.VersionsFolder) { + t.Error(".versions directory should have been removed after cleanup") + } +} + +// TestIntegration_VersionedBucket_MultiVersion_ExpirationDays verifies that +// when a multi-version object's latest version expires, only the latest +// version is deleted and noncurrent versions remain. +func TestIntegration_VersionedBucket_MultiVersion_ExpirationDays(t *testing.T) { + server, client := startTestFiler(t) + bucketsPath := "/buckets" + bucket := "versioned-multi" + bucketDir := bucketsPath + "/" + bucket + + now := time.Now() + tOld := now.Add(-60 * 24 * time.Hour) + tNoncurrent := now.Add(-90 * 24 * time.Hour) + vidLatest := testVersionId(tOld) + vidNoncurrent := testVersionId(tNoncurrent) + + server.putEntry(bucketsPath, &filer_pb.Entry{Name: bucket, IsDirectory: true}) + + versionsDir := bucketDir + "/multi.txt" + s3_constants.VersionsFolder + server.putEntry(bucketDir, &filer_pb.Entry{ + Name: "multi.txt" + s3_constants.VersionsFolder, IsDirectory: true, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte(vidLatest), + s3_constants.ExtLatestVersionFileNameKey: []byte("v_" + vidLatest), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(tOld.Unix(), 10)), + s3_constants.ExtLatestVersionSizeKey: []byte("500"), + s3_constants.ExtLatestVersionIsDeleteMarker: []byte("false"), + }, + }) + server.putEntry(versionsDir, &filer_pb.Entry{ + Name: "v_" + vidLatest, + Attributes: &filer_pb.FuseAttributes{Mtime: tOld.Unix(), FileSize: 500}, + Extended: map[string][]byte{s3_constants.ExtVersionIdKey: []byte(vidLatest)}, + }) + server.putEntry(versionsDir, &filer_pb.Entry{ + Name: "v_" + vidNoncurrent, + Attributes: &filer_pb.FuseAttributes{Mtime: tNoncurrent.Unix(), FileSize: 500}, + Extended: map[string][]byte{s3_constants.ExtVersionIdKey: []byte(vidNoncurrent)}, + }) + + rules := []s3lifecycle.Rule{{ + ID: "expire-30d", Status: "Enabled", + ExpirationDays: 30, + }} + + expired, _, err := listExpiredObjectsByRules(context.Background(), client, bucketsPath, bucket, rules, 100) + if err != nil { + t.Fatalf("list: %v", err) + } + // Only the latest version should be detected as expired. + if len(expired) != 1 { + t.Fatalf("expected 1 expired (latest only), got %d", len(expired)) + } + if expired[0].name != "v_"+vidLatest { + t.Errorf("expected latest version expired, got %s", expired[0].name) + } + + // Delete it. + deleted, errs, delErr := deleteExpiredObjects(context.Background(), client, expired) + if delErr != nil { + t.Fatalf("delete: %v", delErr) + } + if deleted != 1 || errs != 0 { + t.Errorf("expected 1 deleted 0 errors, got %d deleted %d errors", deleted, errs) + } + + // Noncurrent version should still exist. + if !server.hasEntry(versionsDir, "v_"+vidNoncurrent) { + t.Error("noncurrent version should still exist") + } + + // .versions directory should NOT be cleaned up (not empty). + cleaned := cleanupEmptyVersionsDirectories(context.Background(), client, expired) + if cleaned != 0 { + t.Errorf("expected 0 directories cleaned (not empty), got %d", cleaned) + } + if !server.hasEntry(bucketDir, "multi.txt"+s3_constants.VersionsFolder) { + t.Error(".versions directory should still exist (has noncurrent version)") + } +} diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index c5833dacd..5abbd5d22 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -952,6 +952,27 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr collectionTtls := fc.GetCollectionTtls(collectionName) changed := false + // Check whether the bucket has versioning enabled. Versioned buckets must + // NOT use the TTL fast-path because: + // 1. TTL volumes expire as a unit, destroying all data — including + // noncurrent versions that should be preserved. + // 2. Filer-backend TTL (RocksDB compaction, Redis expire) removes entries + // without triggering chunk deletion, leaving orphaned volume data. + // 3. On AWS S3, Expiration.Days on a versioned bucket creates a delete + // marker — it does not delete data. TTL has no such nuance. + // For versioned buckets the lifecycle worker handles all rule evaluation + // at scan time, which correctly operates on individual versions. + bucketVersioning, versioningErr := s3a.getBucketVersioningStatus(bucket) + if versioningErr != s3err.ErrNone { + // Fail closed: if we cannot determine versioning status, treat the + // bucket as versioned to avoid creating TTL entries that would + // destroy noncurrent versions. + glog.V(1).Infof("PutBucketLifecycleConfigurationHandler: could not determine versioning status for %s (err %v), skipping TTL fast-path", bucket, versioningErr) + } + isVersioned := versioningErr != s3err.ErrNone || + bucketVersioning == s3_constants.VersioningEnabled || + bucketVersioning == s3_constants.VersioningSuspended + for _, rule := range lifeCycleConfig.Rules { if rule.Status != Enabled { continue @@ -963,6 +984,10 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr return } + if isVersioned { + continue // all rules evaluated by lifecycle worker at scan time + } + var rulePrefix string switch { case rule.Filter.andSet: diff --git a/weed/s3api/s3api_bucket_lifecycle_response_test.go b/weed/s3api/s3api_bucket_lifecycle_response_test.go index df7e4d397..c28eff9f8 100644 --- a/weed/s3api/s3api_bucket_lifecycle_response_test.go +++ b/weed/s3api/s3api_bucket_lifecycle_response_test.go @@ -10,6 +10,7 @@ import ( "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -124,3 +125,39 @@ func (f failingReadCloser) Read(_ []byte) (int, error) { func (f failingReadCloser) Close() error { return nil } + +// TestShouldSkipTTLFastPathForVersionedBuckets verifies the versioning guard +// logic that PutBucketLifecycleConfigurationHandler uses to decide whether +// to create filer.conf TTL entries. On AWS S3, Expiration.Days on a versioned +// bucket creates a delete marker — it does not delete data. TTL volumes +// would destroy all versions indiscriminately, so the lifecycle worker must +// handle versioned buckets at scan time instead. (issue #8757) +// +// Note: an integration test that invokes PutBucketLifecycleConfigurationHandler +// directly is not feasible here because the handler requires filer gRPC +// connectivity (ReadFilerConfFromFilers, WithFilerClient) before it reaches +// the versioning check. The lifecycle worker integration tests in +// weed/plugin/worker/lifecycle/ cover the end-to-end versioned-bucket behavior. +func TestShouldSkipTTLFastPathForVersionedBuckets(t *testing.T) { + tests := []struct { + name string + versioning string + expectSkip bool + }{ + {"versioning_enabled_skips_ttl", s3_constants.VersioningEnabled, true}, + {"versioning_suspended_skips_ttl", s3_constants.VersioningSuspended, true}, + {"unversioned_allows_ttl", "", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // This mirrors the guard in PutBucketLifecycleConfigurationHandler: + // isVersioned := versioningErr != s3err.ErrNone || + // bucketVersioning == s3_constants.VersioningEnabled || + // bucketVersioning == s3_constants.VersioningSuspended + // When isVersioned is true, the TTL fast-path is skipped entirely. + isVersioned := tt.versioning == s3_constants.VersioningEnabled || + tt.versioning == s3_constants.VersioningSuspended + assert.Equal(t, tt.expectSkip, isVersioned) + }) + } +}