From f6ec9941cbaa2329c772444dfb64b335057a4705 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Mar 2026 12:58:21 -0700 Subject: [PATCH] lifecycle worker: NoncurrentVersionExpiration support (#8810) * lifecycle worker: add NoncurrentVersionExpiration support Add version-aware scanning to the rule-based execution path. When the walker encounters a .versions directory, processVersionsDirectory(): - Lists all version entries (v_) - Sorts by version timestamp (newest first) - Walks non-current versions with ShouldExpireNoncurrentVersion() which handles both NoncurrentDays and NewerNoncurrentVersions - Extracts successor time from version IDs (both old/new format) - Skips delete markers in noncurrent version counting - Falls back to entry Mtime when version ID timestamp is unavailable Helper functions: - sortVersionsByTimestamp: insertion sort by version ID timestamp - getEntryVersionTimestamp: extracts timestamp with Mtime fallback * lifecycle worker: address review feedback for noncurrent versions - Use sentinel errLimitReached in versions directory handler - Set NoncurrentIndex on ObjectInfo for proper NewerNoncurrentVersions evaluation * lifecycle worker: fail closed on XML parse error, guard zero Mtime - Fail closed when lifecycle XML exists but fails to parse, instead of falling back to TTL which could apply broader rules - Guard Mtime > 0 before using time.Unix(mtime, 0) to avoid mapping unset Mtime to 1970, which would misorder versions and cause premature expiration * lifecycle worker: count delete markers toward NoncurrentIndex Noncurrent delete markers should count toward the NewerNoncurrentVersions retention threshold so data versions get the correct position index. Previously, skipping delete markers without incrementing the index could retain too many versions after delete/recreate cycles. * lifecycle worker: fix version ordering, error propagation, and fail-closed scope 1. Use full version ID comparison (CompareVersionIds) for sorting .versions entries, not just decoded timestamps. Two versions with the same timestamp prefix but different random suffixes were previously misordered, potentially treating the newest version as noncurrent and deleting it. 2. Propagate .versions listing failures to the caller instead of swallowing them with (nil, 0). Transient filer errors on a .versions directory now surface in the job result. 3. Narrow the fail-closed path to only malformed lifecycle XML (errMalformedLifecycleXML). Transient filer LookupEntry errors now fall back to TTL with a warning, matching the original intent of "fail closed on bad config, not on network blips." * lifecycle worker: only skip .uploads at bucket root * lifecycle worker: sort.Slice, mixed-format test, XML presence tracking - Replace manual insertion sort with sort.Slice in sortVersionsByVersionId - Add TestCompareVersionIdsMixedFormats covering old/new format ordering - Distinguish "no lifecycle XML" (nil) from "XML present but no effective rules" (non-nil empty slice) so buckets with all-disabled rules don't incorrectly fall back to filer.conf TTL expiration * lifecycle worker: guard nil Attributes, use TrimSuffix in test - Guard entry.Attributes != nil before accessing GetFileSize() and Mtime in both listExpiredObjectsByRules and processVersionsDirectory - Use strings.TrimPrefix/TrimSuffix in TestVersionsDirectoryNaming to match the production code pattern * lifecycle worker: skip TTL scan when XML present, fix test assertions - When lifecycle XML is present but has no effective rules, skip object scanning entirely instead of falling back to TTL path - Test sort output against concrete expected names instead of re-using the same comparator as the sort itself --------- Co-authored-by: Copilot --- weed/plugin/worker/lifecycle/execution.go | 183 +++++++++++++++++-- weed/plugin/worker/lifecycle/rules.go | 24 ++- weed/plugin/worker/lifecycle/version_test.go | 112 ++++++++++++ weed/s3api/s3lifecycle/version_time.go | 57 ++++++ 4 files changed, 359 insertions(+), 17 deletions(-) create mode 100644 weed/plugin/worker/lifecycle/version_test.go diff --git a/weed/plugin/worker/lifecycle/execution.go b/weed/plugin/worker/lifecycle/execution.go index 9c4d58a29..3dd47021a 100644 --- a/weed/plugin/worker/lifecycle/execution.go +++ b/weed/plugin/worker/lifecycle/execution.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "path" + "sort" "strings" "time" @@ -42,15 +43,26 @@ func (h *Handler) executeLifecycleForBucket( result := &executionResult{} // Try to load lifecycle rules from stored XML first (full rule evaluation). - // Fall back to filer.conf TTL-only evaluation if no XML exists. + // Fall back to filer.conf TTL-only evaluation only if no XML is configured. + // If XML exists but is malformed, fail closed (don't fall back to TTL, + // which could apply broader rules and delete objects the XML rules would keep). + // Transient filer errors fall back to TTL with a warning. lifecycleRules, xmlErr := loadLifecycleRulesFromBucket(ctx, filerClient, bucketsPath, bucket) + if xmlErr != nil && errors.Is(xmlErr, errMalformedLifecycleXML) { + glog.Errorf("s3_lifecycle: bucket %s: %v (skipping bucket)", bucket, xmlErr) + return result, xmlErr + } if xmlErr != nil { - glog.V(1).Infof("s3_lifecycle: bucket %s: failed to load lifecycle XML: %v, falling back to TTL", bucket, xmlErr) + glog.V(1).Infof("s3_lifecycle: bucket %s: transient error loading lifecycle XML: %v, falling back to TTL", bucket, xmlErr) } - useRuleEval := xmlErr == nil && len(lifecycleRules) > 0 - - if !useRuleEval { - // Fall back: check filer.conf TTL rules. + // lifecycleRules is non-nil when XML was present (even if empty/all disabled). + // Only fall back to TTL when XML was truly absent (nil). + xmlPresent := xmlErr == nil && lifecycleRules != nil + useRuleEval := xmlPresent && len(lifecycleRules) > 0 + + if !useRuleEval && !xmlPresent { + // Fall back to filer.conf TTL rules only when no lifecycle XML exists. + // When XML is present but has no effective rules, skip TTL fallback. fc, err := loadFilerConf(ctx, filerClient) if err != nil { return result, fmt.Errorf("load filer conf: %w", err) @@ -81,9 +93,11 @@ func (h *Handler) executeLifecycleForBucket( var err error if useRuleEval { expired, scanned, err = listExpiredObjectsByRules(ctx, filerClient, bucketsPath, bucket, lifecycleRules, remaining) - } else { + } else if !xmlPresent { + // TTL-only scan when no lifecycle XML exists. expired, scanned, err = listExpiredObjects(ctx, filerClient, bucketsPath, bucket, remaining) } + // When xmlPresent but no effective rules (all disabled), skip object scanning. result.objectsScanned = scanned if err != nil { return result, fmt.Errorf("list expired objects: %w", err) @@ -380,11 +394,26 @@ func listExpiredObjectsByRules( limitReached := false err := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory { - // Skip .uploads and .versions directories. - if entry.Name != s3_constants.MultipartUploadsFolder && - !strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { - dirsToProcess = append(dirsToProcess, path.Join(dir, entry.Name)) + if dir == bucketPath && entry.Name == s3_constants.MultipartUploadsFolder { + return nil // skip .uploads at bucket root only } + if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { + // Process versioned object. + versionsDir := path.Join(dir, entry.Name) + vExpired, vScanned, vErr := processVersionsDirectory(ctx, client, versionsDir, bucketPath, rules, now, needTags, limit-int64(len(expired))) + if vErr != nil { + glog.V(1).Infof("s3_lifecycle: %v", vErr) + return vErr + } + expired = append(expired, vExpired...) + scanned += vScanned + if limit > 0 && int64(len(expired)) >= limit { + limitReached = true + return errLimitReached + } + return nil + } + dirsToProcess = append(dirsToProcess, path.Join(dir, entry.Name)) return nil } scanned++ @@ -402,12 +431,14 @@ func listExpiredObjectsByRules( objInfo := s3lifecycle.ObjectInfo{ Key: relKey, IsLatest: true, // non-versioned objects are always "latest" - Size: int64(entry.Attributes.GetFileSize()), } - if entry.Attributes != nil && entry.Attributes.Mtime > 0 { - objInfo.ModTime = time.Unix(entry.Attributes.Mtime, 0) - } else if entry.Attributes != nil && entry.Attributes.Crtime > 0 { - objInfo.ModTime = time.Unix(entry.Attributes.Crtime, 0) + if entry.Attributes != nil { + objInfo.Size = int64(entry.Attributes.GetFileSize()) + if entry.Attributes.Mtime > 0 { + objInfo.ModTime = time.Unix(entry.Attributes.Mtime, 0) + } else if entry.Attributes.Crtime > 0 { + objInfo.ModTime = time.Unix(entry.Attributes.Crtime, 0) + } } if needTags { objInfo.Tags = s3lifecycle.ExtractTags(entry.Extended) @@ -436,3 +467,123 @@ func listExpiredObjectsByRules( return expired, scanned, nil } + +// processVersionsDirectory evaluates NoncurrentVersionExpiration rules +// against all versions in a .versions directory. +func processVersionsDirectory( + ctx context.Context, + client filer_pb.SeaweedFilerClient, + versionsDir, bucketPath string, + rules []s3lifecycle.Rule, + now time.Time, + needTags bool, + limit int64, +) ([]expiredObject, int64, error) { + var expired []expiredObject + var scanned int64 + + // Check if any rule has NoncurrentVersionExpiration. + hasNoncurrentRules := false + for _, r := range rules { + if r.Status == "Enabled" && r.NoncurrentVersionExpirationDays > 0 { + hasNoncurrentRules = true + break + } + } + if !hasNoncurrentRules { + return nil, 0, nil + } + + // List all versions in this directory. + var versions []*filer_pb.Entry + listErr := filer_pb.SeaweedList(ctx, client, versionsDir, "", func(entry *filer_pb.Entry, isLast bool) error { + if !entry.IsDirectory { + versions = append(versions, entry) + } + return nil + }, "", false, 10000) + if listErr != nil { + return nil, 0, fmt.Errorf("list versions in %s: %w", versionsDir, listErr) + } + if len(versions) <= 1 { + return nil, 0, nil // only one version (the latest), nothing to expire + } + + // Sort by version timestamp, newest first. + sortVersionsByVersionId(versions) + + // Derive the object key from the .versions directory path. + // e.g., /buckets/mybucket/path/to/key.versions -> path/to/key + relDir := strings.TrimPrefix(versionsDir, bucketPath+"/") + objKey := strings.TrimSuffix(relDir, s3_constants.VersionsFolder) + + // Walk versions: first is latest, rest are non-current. + noncurrentIndex := 0 + for i := 1; i < len(versions); i++ { + entry := versions[i] + scanned++ + + // Skip delete markers from expiration evaluation, but count + // them toward NewerNoncurrentVersions so data versions get + // the correct noncurrent index. + if isDeleteMarker(entry) { + noncurrentIndex++ + continue + } + + // Determine successor's timestamp (the version that replaced this one). + successorEntry := versions[i-1] + successorVersionId := strings.TrimPrefix(successorEntry.Name, "v_") + successorTime := s3lifecycle.GetVersionTimestamp(successorVersionId) + if successorTime.IsZero() && successorEntry.Attributes != nil && successorEntry.Attributes.Mtime > 0 { + successorTime = time.Unix(successorEntry.Attributes.Mtime, 0) + } + + objInfo := s3lifecycle.ObjectInfo{ + Key: objKey, + IsLatest: false, + SuccessorModTime: successorTime, + NumVersions: len(versions), + NoncurrentIndex: noncurrentIndex, + } + if entry.Attributes != nil { + objInfo.Size = int64(entry.Attributes.GetFileSize()) + if entry.Attributes.Mtime > 0 { + objInfo.ModTime = time.Unix(entry.Attributes.Mtime, 0) + } + } + if needTags { + objInfo.Tags = s3lifecycle.ExtractTags(entry.Extended) + } + + // Evaluate using the detailed ShouldExpireNoncurrentVersion which + // handles NewerNoncurrentVersions. + for _, rule := range rules { + if s3lifecycle.ShouldExpireNoncurrentVersion(rule, objInfo, noncurrentIndex, now) { + expired = append(expired, expiredObject{dir: versionsDir, name: entry.Name}) + break + } + } + + noncurrentIndex++ + + if limit > 0 && int64(len(expired)) >= limit { + break + } + } + + return expired, scanned, nil +} + +// sortVersionsByVersionId sorts version entries newest-first using full +// version ID comparison (matching compareVersionIds in s3api_version_id.go). +// This uses the complete version ID string, not just the decoded timestamp, +// so entries with the same timestamp prefix are correctly ordered by their +// random suffix. +func sortVersionsByVersionId(versions []*filer_pb.Entry) { + sort.Slice(versions, func(i, j int) bool { + vidI := strings.TrimPrefix(versions[i].Name, "v_") + vidJ := strings.TrimPrefix(versions[j].Name, "v_") + return s3lifecycle.CompareVersionIds(vidI, vidJ) < 0 + }) +} diff --git a/weed/plugin/worker/lifecycle/rules.go b/weed/plugin/worker/lifecycle/rules.go index d8ea1ceaf..c3855f22c 100644 --- a/weed/plugin/worker/lifecycle/rules.go +++ b/weed/plugin/worker/lifecycle/rules.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/xml" + "errors" "fmt" "time" @@ -65,8 +66,18 @@ type abortMPU struct { DaysAfterInitiation int `xml:"DaysAfterInitiation"` } +// errMalformedLifecycleXML indicates the lifecycle XML exists but could not be parsed. +// Callers should fail closed (not fall back to TTL) to avoid broader deletions. +var errMalformedLifecycleXML = errors.New("malformed lifecycle XML") + // loadLifecycleRulesFromBucket reads the lifecycle XML from a bucket's // metadata and converts it to evaluator-friendly rules. +// +// Returns: +// - (rules, nil) when lifecycle XML is configured and parseable +// - (nil, nil) when no lifecycle XML is configured (caller should use TTL fallback) +// - (nil, errMalformedLifecycleXML) when XML exists but is malformed (fail closed) +// - (nil, err) for transient filer errors (caller should use TTL fallback with warning) func loadLifecycleRulesFromBucket( ctx context.Context, client filer_pb.SeaweedFilerClient, @@ -78,6 +89,7 @@ func loadLifecycleRulesFromBucket( Name: bucket, }) if err != nil { + // Transient filer error — not the same as malformed XML. return nil, fmt.Errorf("lookup bucket %s: %w", bucket, err) } if resp.Entry == nil || resp.Entry.Extended == nil { @@ -87,7 +99,17 @@ func loadLifecycleRulesFromBucket( if len(xmlData) == 0 { return nil, nil } - return parseLifecycleXML(xmlData) + rules, parseErr := parseLifecycleXML(xmlData) + if parseErr != nil { + return nil, fmt.Errorf("%w: bucket %s: %v", errMalformedLifecycleXML, bucket, parseErr) + } + // Return non-nil empty slice when XML was present but yielded no rules + // (e.g., all rules disabled). This lets callers distinguish "no XML" (nil) + // from "XML present, no effective rules" (empty slice). + if rules == nil { + rules = []s3lifecycle.Rule{} + } + return rules, nil } // parseLifecycleXML parses lifecycle configuration XML and converts it diff --git a/weed/plugin/worker/lifecycle/version_test.go b/weed/plugin/worker/lifecycle/version_test.go new file mode 100644 index 000000000..43cc0d93b --- /dev/null +++ b/weed/plugin/worker/lifecycle/version_test.go @@ -0,0 +1,112 @@ +package lifecycle + +import ( + "fmt" + "math" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" +) + +// makeVersionId creates a new-format version ID from a timestamp. +func makeVersionId(t time.Time) string { + inverted := math.MaxInt64 - t.UnixNano() + return fmt.Sprintf("%016x", inverted) + "0000000000000000" +} + +func TestSortVersionsByVersionId(t *testing.T) { + t1 := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + t2 := time.Date(2026, 2, 1, 0, 0, 0, 0, time.UTC) + t3 := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) + + vid1 := makeVersionId(t1) + vid2 := makeVersionId(t2) + vid3 := makeVersionId(t3) + + entries := []*filer_pb.Entry{ + {Name: "v_" + vid1}, + {Name: "v_" + vid3}, + {Name: "v_" + vid2}, + } + + sortVersionsByVersionId(entries) + + // Should be sorted newest first: t3, t2, t1. + expected := []string{"v_" + vid3, "v_" + vid2, "v_" + vid1} + for i, want := range expected { + if entries[i].Name != want { + t.Errorf("entries[%d].Name = %s, want %s", i, entries[i].Name, want) + } + } +} + +func TestSortVersionsByVersionId_SameTimestampDifferentSuffix(t *testing.T) { + // Two versions with the same timestamp prefix but different random suffix. + // The sort must still produce a deterministic order. + base := makeVersionId(time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC)) + vid1 := base[:16] + "aaaaaaaaaaaaaaaa" + vid2 := base[:16] + "bbbbbbbbbbbbbbbb" + + entries := []*filer_pb.Entry{ + {Name: "v_" + vid2}, + {Name: "v_" + vid1}, + } + + sortVersionsByVersionId(entries) + + // New format: smaller hex = newer. vid1 ("aaa...") < vid2 ("bbb...") so vid1 is newer. + if strings.TrimPrefix(entries[0].Name, "v_") != vid1 { + t.Errorf("expected vid1 (newer) first, got %s", entries[0].Name) + } +} + +func TestCompareVersionIdsMixedFormats(t *testing.T) { + // Old format: raw nanosecond timestamp (below threshold ~0x17...). + // New format: inverted timestamp (above threshold ~0x68...). + oldTs := time.Date(2023, 6, 15, 12, 0, 0, 0, time.UTC) + newTs := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) + + oldFormatId := fmt.Sprintf("%016x", oldTs.UnixNano()) + "abcdef0123456789" + newFormatId := makeVersionId(newTs) // uses inverted timestamp + + // newTs is more recent, so newFormatId should sort as "newer". + cmp := s3lifecycle.CompareVersionIds(newFormatId, oldFormatId) + if cmp >= 0 { + t.Errorf("expected new-format ID (2026) to be newer than old-format ID (2023), got cmp=%d", cmp) + } + + // Reverse comparison. + cmp2 := s3lifecycle.CompareVersionIds(oldFormatId, newFormatId) + if cmp2 <= 0 { + t.Errorf("expected old-format ID (2023) to be older than new-format ID (2026), got cmp=%d", cmp2) + } + + // Sort a mixed slice: should be newest-first. + entries := []*filer_pb.Entry{ + {Name: "v_" + oldFormatId}, + {Name: "v_" + newFormatId}, + } + sortVersionsByVersionId(entries) + + if strings.TrimPrefix(entries[0].Name, "v_") != newFormatId { + t.Errorf("expected new-format (newer) entry first after sort") + } +} + +func TestVersionsDirectoryNaming(t *testing.T) { + if s3_constants.VersionsFolder != ".versions" { + t.Fatalf("unexpected VersionsFolder constant: %q", s3_constants.VersionsFolder) + } + + versionsDir := "/buckets/mybucket/path/to/key.versions" + bucketPath := "/buckets/mybucket" + relDir := strings.TrimPrefix(versionsDir, bucketPath+"/") + objKey := strings.TrimSuffix(relDir, s3_constants.VersionsFolder) + if objKey != "path/to/key" { + t.Errorf("expected 'path/to/key', got %q", objKey) + } +} diff --git a/weed/s3api/s3lifecycle/version_time.go b/weed/s3api/s3lifecycle/version_time.go index d4f4c5f94..fb6cfbbf5 100644 --- a/weed/s3api/s3lifecycle/version_time.go +++ b/weed/s3api/s3lifecycle/version_time.go @@ -40,3 +40,60 @@ func getVersionTimestampNanos(versionId string) int64 { } return int64(timestampPart) } + +// isNewFormatVersionId returns true if the version ID uses inverted timestamps. +func isNewFormatVersionId(versionId string) bool { + if len(versionId) < 16 || versionId == "null" { + return false + } + timestampPart, err := strconv.ParseUint(versionId[:16], 16, 64) + if err != nil { + return false + } + return timestampPart > versionIdFormatThreshold && timestampPart <= math.MaxInt64 +} + +// CompareVersionIds compares two version IDs for sorting (newest first). +// Returns negative if a is newer, positive if b is newer, 0 if equal. +// Handles both old and new format version IDs and uses full lexicographic +// comparison (not just timestamps) to break ties from the random suffix. +func CompareVersionIds(a, b string) int { + if a == b { + return 0 + } + if a == "null" { + return 1 + } + if b == "null" { + return -1 + } + + aIsNew := isNewFormatVersionId(a) + bIsNew := isNewFormatVersionId(b) + + if aIsNew == bIsNew { + if aIsNew { + // New format: smaller hex = newer (inverted timestamps). + if a < b { + return -1 + } + return 1 + } + // Old format: smaller hex = older. + if a < b { + return 1 + } + return -1 + } + + // Mixed formats: compare by actual timestamp. + aTime := getVersionTimestampNanos(a) + bTime := getVersionTimestampNanos(b) + if aTime > bTime { + return -1 + } + if aTime < bTime { + return 1 + } + return 0 +}