diff --git a/weed/plugin/worker/lifecycle/execution.go b/weed/plugin/worker/lifecycle/execution.go index 9c4d58a29..3dd47021a 100644 --- a/weed/plugin/worker/lifecycle/execution.go +++ b/weed/plugin/worker/lifecycle/execution.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "path" + "sort" "strings" "time" @@ -42,15 +43,26 @@ func (h *Handler) executeLifecycleForBucket( result := &executionResult{} // Try to load lifecycle rules from stored XML first (full rule evaluation). - // Fall back to filer.conf TTL-only evaluation if no XML exists. + // Fall back to filer.conf TTL-only evaluation only if no XML is configured. + // If XML exists but is malformed, fail closed (don't fall back to TTL, + // which could apply broader rules and delete objects the XML rules would keep). + // Transient filer errors fall back to TTL with a warning. lifecycleRules, xmlErr := loadLifecycleRulesFromBucket(ctx, filerClient, bucketsPath, bucket) + if xmlErr != nil && errors.Is(xmlErr, errMalformedLifecycleXML) { + glog.Errorf("s3_lifecycle: bucket %s: %v (skipping bucket)", bucket, xmlErr) + return result, xmlErr + } if xmlErr != nil { - glog.V(1).Infof("s3_lifecycle: bucket %s: failed to load lifecycle XML: %v, falling back to TTL", bucket, xmlErr) + glog.V(1).Infof("s3_lifecycle: bucket %s: transient error loading lifecycle XML: %v, falling back to TTL", bucket, xmlErr) } - useRuleEval := xmlErr == nil && len(lifecycleRules) > 0 - - if !useRuleEval { - // Fall back: check filer.conf TTL rules. + // lifecycleRules is non-nil when XML was present (even if empty/all disabled). + // Only fall back to TTL when XML was truly absent (nil). + xmlPresent := xmlErr == nil && lifecycleRules != nil + useRuleEval := xmlPresent && len(lifecycleRules) > 0 + + if !useRuleEval && !xmlPresent { + // Fall back to filer.conf TTL rules only when no lifecycle XML exists. + // When XML is present but has no effective rules, skip TTL fallback. fc, err := loadFilerConf(ctx, filerClient) if err != nil { return result, fmt.Errorf("load filer conf: %w", err) @@ -81,9 +93,11 @@ func (h *Handler) executeLifecycleForBucket( var err error if useRuleEval { expired, scanned, err = listExpiredObjectsByRules(ctx, filerClient, bucketsPath, bucket, lifecycleRules, remaining) - } else { + } else if !xmlPresent { + // TTL-only scan when no lifecycle XML exists. expired, scanned, err = listExpiredObjects(ctx, filerClient, bucketsPath, bucket, remaining) } + // When xmlPresent but no effective rules (all disabled), skip object scanning. result.objectsScanned = scanned if err != nil { return result, fmt.Errorf("list expired objects: %w", err) @@ -380,11 +394,26 @@ func listExpiredObjectsByRules( limitReached := false err := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory { - // Skip .uploads and .versions directories. - if entry.Name != s3_constants.MultipartUploadsFolder && - !strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { - dirsToProcess = append(dirsToProcess, path.Join(dir, entry.Name)) + if dir == bucketPath && entry.Name == s3_constants.MultipartUploadsFolder { + return nil // skip .uploads at bucket root only } + if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { + // Process versioned object. + versionsDir := path.Join(dir, entry.Name) + vExpired, vScanned, vErr := processVersionsDirectory(ctx, client, versionsDir, bucketPath, rules, now, needTags, limit-int64(len(expired))) + if vErr != nil { + glog.V(1).Infof("s3_lifecycle: %v", vErr) + return vErr + } + expired = append(expired, vExpired...) + scanned += vScanned + if limit > 0 && int64(len(expired)) >= limit { + limitReached = true + return errLimitReached + } + return nil + } + dirsToProcess = append(dirsToProcess, path.Join(dir, entry.Name)) return nil } scanned++ @@ -402,12 +431,14 @@ func listExpiredObjectsByRules( objInfo := s3lifecycle.ObjectInfo{ Key: relKey, IsLatest: true, // non-versioned objects are always "latest" - Size: int64(entry.Attributes.GetFileSize()), } - if entry.Attributes != nil && entry.Attributes.Mtime > 0 { - objInfo.ModTime = time.Unix(entry.Attributes.Mtime, 0) - } else if entry.Attributes != nil && entry.Attributes.Crtime > 0 { - objInfo.ModTime = time.Unix(entry.Attributes.Crtime, 0) + if entry.Attributes != nil { + objInfo.Size = int64(entry.Attributes.GetFileSize()) + if entry.Attributes.Mtime > 0 { + objInfo.ModTime = time.Unix(entry.Attributes.Mtime, 0) + } else if entry.Attributes.Crtime > 0 { + objInfo.ModTime = time.Unix(entry.Attributes.Crtime, 0) + } } if needTags { objInfo.Tags = s3lifecycle.ExtractTags(entry.Extended) @@ -436,3 +467,123 @@ func listExpiredObjectsByRules( return expired, scanned, nil } + +// processVersionsDirectory evaluates NoncurrentVersionExpiration rules +// against all versions in a .versions directory. +func processVersionsDirectory( + ctx context.Context, + client filer_pb.SeaweedFilerClient, + versionsDir, bucketPath string, + rules []s3lifecycle.Rule, + now time.Time, + needTags bool, + limit int64, +) ([]expiredObject, int64, error) { + var expired []expiredObject + var scanned int64 + + // Check if any rule has NoncurrentVersionExpiration. + hasNoncurrentRules := false + for _, r := range rules { + if r.Status == "Enabled" && r.NoncurrentVersionExpirationDays > 0 { + hasNoncurrentRules = true + break + } + } + if !hasNoncurrentRules { + return nil, 0, nil + } + + // List all versions in this directory. + var versions []*filer_pb.Entry + listErr := filer_pb.SeaweedList(ctx, client, versionsDir, "", func(entry *filer_pb.Entry, isLast bool) error { + if !entry.IsDirectory { + versions = append(versions, entry) + } + return nil + }, "", false, 10000) + if listErr != nil { + return nil, 0, fmt.Errorf("list versions in %s: %w", versionsDir, listErr) + } + if len(versions) <= 1 { + return nil, 0, nil // only one version (the latest), nothing to expire + } + + // Sort by version timestamp, newest first. + sortVersionsByVersionId(versions) + + // Derive the object key from the .versions directory path. + // e.g., /buckets/mybucket/path/to/key.versions -> path/to/key + relDir := strings.TrimPrefix(versionsDir, bucketPath+"/") + objKey := strings.TrimSuffix(relDir, s3_constants.VersionsFolder) + + // Walk versions: first is latest, rest are non-current. + noncurrentIndex := 0 + for i := 1; i < len(versions); i++ { + entry := versions[i] + scanned++ + + // Skip delete markers from expiration evaluation, but count + // them toward NewerNoncurrentVersions so data versions get + // the correct noncurrent index. + if isDeleteMarker(entry) { + noncurrentIndex++ + continue + } + + // Determine successor's timestamp (the version that replaced this one). + successorEntry := versions[i-1] + successorVersionId := strings.TrimPrefix(successorEntry.Name, "v_") + successorTime := s3lifecycle.GetVersionTimestamp(successorVersionId) + if successorTime.IsZero() && successorEntry.Attributes != nil && successorEntry.Attributes.Mtime > 0 { + successorTime = time.Unix(successorEntry.Attributes.Mtime, 0) + } + + objInfo := s3lifecycle.ObjectInfo{ + Key: objKey, + IsLatest: false, + SuccessorModTime: successorTime, + NumVersions: len(versions), + NoncurrentIndex: noncurrentIndex, + } + if entry.Attributes != nil { + objInfo.Size = int64(entry.Attributes.GetFileSize()) + if entry.Attributes.Mtime > 0 { + objInfo.ModTime = time.Unix(entry.Attributes.Mtime, 0) + } + } + if needTags { + objInfo.Tags = s3lifecycle.ExtractTags(entry.Extended) + } + + // Evaluate using the detailed ShouldExpireNoncurrentVersion which + // handles NewerNoncurrentVersions. + for _, rule := range rules { + if s3lifecycle.ShouldExpireNoncurrentVersion(rule, objInfo, noncurrentIndex, now) { + expired = append(expired, expiredObject{dir: versionsDir, name: entry.Name}) + break + } + } + + noncurrentIndex++ + + if limit > 0 && int64(len(expired)) >= limit { + break + } + } + + return expired, scanned, nil +} + +// sortVersionsByVersionId sorts version entries newest-first using full +// version ID comparison (matching compareVersionIds in s3api_version_id.go). +// This uses the complete version ID string, not just the decoded timestamp, +// so entries with the same timestamp prefix are correctly ordered by their +// random suffix. +func sortVersionsByVersionId(versions []*filer_pb.Entry) { + sort.Slice(versions, func(i, j int) bool { + vidI := strings.TrimPrefix(versions[i].Name, "v_") + vidJ := strings.TrimPrefix(versions[j].Name, "v_") + return s3lifecycle.CompareVersionIds(vidI, vidJ) < 0 + }) +} diff --git a/weed/plugin/worker/lifecycle/rules.go b/weed/plugin/worker/lifecycle/rules.go index d8ea1ceaf..c3855f22c 100644 --- a/weed/plugin/worker/lifecycle/rules.go +++ b/weed/plugin/worker/lifecycle/rules.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/xml" + "errors" "fmt" "time" @@ -65,8 +66,18 @@ type abortMPU struct { DaysAfterInitiation int `xml:"DaysAfterInitiation"` } +// errMalformedLifecycleXML indicates the lifecycle XML exists but could not be parsed. +// Callers should fail closed (not fall back to TTL) to avoid broader deletions. +var errMalformedLifecycleXML = errors.New("malformed lifecycle XML") + // loadLifecycleRulesFromBucket reads the lifecycle XML from a bucket's // metadata and converts it to evaluator-friendly rules. +// +// Returns: +// - (rules, nil) when lifecycle XML is configured and parseable +// - (nil, nil) when no lifecycle XML is configured (caller should use TTL fallback) +// - (nil, errMalformedLifecycleXML) when XML exists but is malformed (fail closed) +// - (nil, err) for transient filer errors (caller should use TTL fallback with warning) func loadLifecycleRulesFromBucket( ctx context.Context, client filer_pb.SeaweedFilerClient, @@ -78,6 +89,7 @@ func loadLifecycleRulesFromBucket( Name: bucket, }) if err != nil { + // Transient filer error — not the same as malformed XML. return nil, fmt.Errorf("lookup bucket %s: %w", bucket, err) } if resp.Entry == nil || resp.Entry.Extended == nil { @@ -87,7 +99,17 @@ func loadLifecycleRulesFromBucket( if len(xmlData) == 0 { return nil, nil } - return parseLifecycleXML(xmlData) + rules, parseErr := parseLifecycleXML(xmlData) + if parseErr != nil { + return nil, fmt.Errorf("%w: bucket %s: %v", errMalformedLifecycleXML, bucket, parseErr) + } + // Return non-nil empty slice when XML was present but yielded no rules + // (e.g., all rules disabled). This lets callers distinguish "no XML" (nil) + // from "XML present, no effective rules" (empty slice). + if rules == nil { + rules = []s3lifecycle.Rule{} + } + return rules, nil } // parseLifecycleXML parses lifecycle configuration XML and converts it diff --git a/weed/plugin/worker/lifecycle/version_test.go b/weed/plugin/worker/lifecycle/version_test.go new file mode 100644 index 000000000..43cc0d93b --- /dev/null +++ b/weed/plugin/worker/lifecycle/version_test.go @@ -0,0 +1,112 @@ +package lifecycle + +import ( + "fmt" + "math" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" +) + +// makeVersionId creates a new-format version ID from a timestamp. +func makeVersionId(t time.Time) string { + inverted := math.MaxInt64 - t.UnixNano() + return fmt.Sprintf("%016x", inverted) + "0000000000000000" +} + +func TestSortVersionsByVersionId(t *testing.T) { + t1 := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + t2 := time.Date(2026, 2, 1, 0, 0, 0, 0, time.UTC) + t3 := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) + + vid1 := makeVersionId(t1) + vid2 := makeVersionId(t2) + vid3 := makeVersionId(t3) + + entries := []*filer_pb.Entry{ + {Name: "v_" + vid1}, + {Name: "v_" + vid3}, + {Name: "v_" + vid2}, + } + + sortVersionsByVersionId(entries) + + // Should be sorted newest first: t3, t2, t1. + expected := []string{"v_" + vid3, "v_" + vid2, "v_" + vid1} + for i, want := range expected { + if entries[i].Name != want { + t.Errorf("entries[%d].Name = %s, want %s", i, entries[i].Name, want) + } + } +} + +func TestSortVersionsByVersionId_SameTimestampDifferentSuffix(t *testing.T) { + // Two versions with the same timestamp prefix but different random suffix. + // The sort must still produce a deterministic order. + base := makeVersionId(time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC)) + vid1 := base[:16] + "aaaaaaaaaaaaaaaa" + vid2 := base[:16] + "bbbbbbbbbbbbbbbb" + + entries := []*filer_pb.Entry{ + {Name: "v_" + vid2}, + {Name: "v_" + vid1}, + } + + sortVersionsByVersionId(entries) + + // New format: smaller hex = newer. vid1 ("aaa...") < vid2 ("bbb...") so vid1 is newer. + if strings.TrimPrefix(entries[0].Name, "v_") != vid1 { + t.Errorf("expected vid1 (newer) first, got %s", entries[0].Name) + } +} + +func TestCompareVersionIdsMixedFormats(t *testing.T) { + // Old format: raw nanosecond timestamp (below threshold ~0x17...). + // New format: inverted timestamp (above threshold ~0x68...). + oldTs := time.Date(2023, 6, 15, 12, 0, 0, 0, time.UTC) + newTs := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) + + oldFormatId := fmt.Sprintf("%016x", oldTs.UnixNano()) + "abcdef0123456789" + newFormatId := makeVersionId(newTs) // uses inverted timestamp + + // newTs is more recent, so newFormatId should sort as "newer". + cmp := s3lifecycle.CompareVersionIds(newFormatId, oldFormatId) + if cmp >= 0 { + t.Errorf("expected new-format ID (2026) to be newer than old-format ID (2023), got cmp=%d", cmp) + } + + // Reverse comparison. + cmp2 := s3lifecycle.CompareVersionIds(oldFormatId, newFormatId) + if cmp2 <= 0 { + t.Errorf("expected old-format ID (2023) to be older than new-format ID (2026), got cmp=%d", cmp2) + } + + // Sort a mixed slice: should be newest-first. + entries := []*filer_pb.Entry{ + {Name: "v_" + oldFormatId}, + {Name: "v_" + newFormatId}, + } + sortVersionsByVersionId(entries) + + if strings.TrimPrefix(entries[0].Name, "v_") != newFormatId { + t.Errorf("expected new-format (newer) entry first after sort") + } +} + +func TestVersionsDirectoryNaming(t *testing.T) { + if s3_constants.VersionsFolder != ".versions" { + t.Fatalf("unexpected VersionsFolder constant: %q", s3_constants.VersionsFolder) + } + + versionsDir := "/buckets/mybucket/path/to/key.versions" + bucketPath := "/buckets/mybucket" + relDir := strings.TrimPrefix(versionsDir, bucketPath+"/") + objKey := strings.TrimSuffix(relDir, s3_constants.VersionsFolder) + if objKey != "path/to/key" { + t.Errorf("expected 'path/to/key', got %q", objKey) + } +} diff --git a/weed/s3api/s3lifecycle/version_time.go b/weed/s3api/s3lifecycle/version_time.go index d4f4c5f94..fb6cfbbf5 100644 --- a/weed/s3api/s3lifecycle/version_time.go +++ b/weed/s3api/s3lifecycle/version_time.go @@ -40,3 +40,60 @@ func getVersionTimestampNanos(versionId string) int64 { } return int64(timestampPart) } + +// isNewFormatVersionId returns true if the version ID uses inverted timestamps. +func isNewFormatVersionId(versionId string) bool { + if len(versionId) < 16 || versionId == "null" { + return false + } + timestampPart, err := strconv.ParseUint(versionId[:16], 16, 64) + if err != nil { + return false + } + return timestampPart > versionIdFormatThreshold && timestampPart <= math.MaxInt64 +} + +// CompareVersionIds compares two version IDs for sorting (newest first). +// Returns negative if a is newer, positive if b is newer, 0 if equal. +// Handles both old and new format version IDs and uses full lexicographic +// comparison (not just timestamps) to break ties from the random suffix. +func CompareVersionIds(a, b string) int { + if a == b { + return 0 + } + if a == "null" { + return 1 + } + if b == "null" { + return -1 + } + + aIsNew := isNewFormatVersionId(a) + bIsNew := isNewFormatVersionId(b) + + if aIsNew == bIsNew { + if aIsNew { + // New format: smaller hex = newer (inverted timestamps). + if a < b { + return -1 + } + return 1 + } + // Old format: smaller hex = older. + if a < b { + return 1 + } + return -1 + } + + // Mixed formats: compare by actual timestamp. + aTime := getVersionTimestampNanos(a) + bTime := getVersionTimestampNanos(b) + if aTime > bTime { + return -1 + } + if aTime < bTime { + return 1 + } + return 0 +}