diff --git a/weed/plugin/worker/lifecycle/execution.go b/weed/plugin/worker/lifecycle/execution.go index 628623195..9c4d58a29 100644 --- a/weed/plugin/worker/lifecycle/execution.go +++ b/weed/plugin/worker/lifecycle/execution.go @@ -2,6 +2,7 @@ package lifecycle import ( "context" + "errors" "fmt" "math" "path" @@ -13,8 +14,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb" pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" ) +var errLimitReached = errors.New("limit reached") + type executionResult struct { objectsExpired int64 objectsScanned int64 @@ -37,17 +41,26 @@ func (h *Handler) executeLifecycleForBucket( ) (*executionResult, error) { result := &executionResult{} - // Load filer.conf to verify TTL rules still exist. - fc, err := loadFilerConf(ctx, filerClient) - if err != nil { - return result, fmt.Errorf("load filer conf: %w", err) + // Try to load lifecycle rules from stored XML first (full rule evaluation). + // Fall back to filer.conf TTL-only evaluation if no XML exists. + lifecycleRules, xmlErr := loadLifecycleRulesFromBucket(ctx, filerClient, bucketsPath, bucket) + if xmlErr != nil { + glog.V(1).Infof("s3_lifecycle: bucket %s: failed to load lifecycle XML: %v, falling back to TTL", bucket, xmlErr) } + useRuleEval := xmlErr == nil && len(lifecycleRules) > 0 - collection := bucket - ttlRules := fc.GetCollectionTtls(collection) - if len(ttlRules) == 0 { - glog.V(1).Infof("s3_lifecycle: bucket %s has no lifecycle rules, skipping", bucket) - return result, nil + if !useRuleEval { + // Fall back: check filer.conf TTL rules. + fc, err := loadFilerConf(ctx, filerClient) + if err != nil { + return result, fmt.Errorf("load filer conf: %w", err) + } + collection := bucket + ttlRules := fc.GetCollectionTtls(collection) + if len(ttlRules) == 0 { + glog.V(1).Infof("s3_lifecycle: bucket %s has no lifecycle rules, skipping", bucket) + return result, nil + } } _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ @@ -56,14 +69,21 @@ func (h *Handler) executeLifecycleForBucket( State: plugin_pb.JobState_JOB_STATE_RUNNING, ProgressPercent: 10, Stage: "scanning", - Message: fmt.Sprintf("scanning bucket %s for expired objects (%d rules)", bucket, len(ttlRules)), + Message: fmt.Sprintf("scanning bucket %s for expired objects", bucket), }) // Shared budget across all phases so we don't exceed MaxDeletesPerBucket. remaining := config.MaxDeletesPerBucket - // Find expired objects. - expired, scanned, err := listExpiredObjects(ctx, filerClient, bucketsPath, bucket, remaining) + // Find expired objects using rule-based evaluation or TTL fallback. + var expired []expiredObject + var scanned int64 + var err error + if useRuleEval { + expired, scanned, err = listExpiredObjectsByRules(ctx, filerClient, bucketsPath, bucket, lifecycleRules, remaining) + } else { + expired, scanned, err = listExpiredObjects(ctx, filerClient, bucketsPath, bucket, remaining) + } result.objectsScanned = scanned if err != nil { return result, fmt.Errorf("list expired objects: %w", err) @@ -326,3 +346,93 @@ func deleteExpiredObjects( func nowUnix() int64 { return time.Now().Unix() } + +// listExpiredObjectsByRules scans a bucket directory tree and evaluates +// lifecycle rules against each object using the s3lifecycle evaluator. +// This function handles non-versioned objects (IsLatest=true). Versioned +// objects in .versions directories are handled by processVersionsDirectory +// (added in a separate change for NoncurrentVersionExpiration support). +func listExpiredObjectsByRules( + ctx context.Context, + client filer_pb.SeaweedFilerClient, + bucketsPath, bucket string, + rules []s3lifecycle.Rule, + limit int64, +) ([]expiredObject, int64, error) { + var expired []expiredObject + var scanned int64 + + bucketPath := path.Join(bucketsPath, bucket) + now := time.Now() + needTags := s3lifecycle.HasTagRules(rules) + + dirsToProcess := []string{bucketPath} + for len(dirsToProcess) > 0 { + select { + case <-ctx.Done(): + return expired, scanned, ctx.Err() + default: + } + + dir := dirsToProcess[0] + dirsToProcess = dirsToProcess[1:] + + limitReached := false + err := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + // Skip .uploads and .versions directories. + if entry.Name != s3_constants.MultipartUploadsFolder && + !strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { + dirsToProcess = append(dirsToProcess, path.Join(dir, entry.Name)) + } + return nil + } + scanned++ + + // Skip objects already handled by TTL fast path. + if entry.Attributes != nil && entry.Attributes.TtlSec > 0 { + expirationUnix := entry.Attributes.Crtime + int64(entry.Attributes.TtlSec) + if expirationUnix > nowUnix() { + return nil // will be expired by RocksDB compaction + } + } + + // Build ObjectInfo for the evaluator. + relKey := strings.TrimPrefix(path.Join(dir, entry.Name), bucketPath+"/") + objInfo := s3lifecycle.ObjectInfo{ + Key: relKey, + IsLatest: true, // non-versioned objects are always "latest" + Size: int64(entry.Attributes.GetFileSize()), + } + if entry.Attributes != nil && entry.Attributes.Mtime > 0 { + objInfo.ModTime = time.Unix(entry.Attributes.Mtime, 0) + } else if entry.Attributes != nil && entry.Attributes.Crtime > 0 { + objInfo.ModTime = time.Unix(entry.Attributes.Crtime, 0) + } + if needTags { + objInfo.Tags = s3lifecycle.ExtractTags(entry.Extended) + } + + result := s3lifecycle.Evaluate(rules, objInfo, now) + if result.Action == s3lifecycle.ActionDeleteObject { + expired = append(expired, expiredObject{dir: dir, name: entry.Name}) + } + + if limit > 0 && int64(len(expired)) >= limit { + limitReached = true + return errLimitReached + } + return nil + }, "", false, 10000) + + if err != nil && !errors.Is(err, errLimitReached) { + return expired, scanned, fmt.Errorf("list %s: %w", dir, err) + } + + if limitReached || (limit > 0 && int64(len(expired)) >= limit) { + break + } + } + + return expired, scanned, nil +} diff --git a/weed/plugin/worker/lifecycle/rules.go b/weed/plugin/worker/lifecycle/rules.go new file mode 100644 index 000000000..d8ea1ceaf --- /dev/null +++ b/weed/plugin/worker/lifecycle/rules.go @@ -0,0 +1,177 @@ +package lifecycle + +import ( + "bytes" + "context" + "encoding/xml" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3lifecycle" +) + +// lifecycleConfig mirrors the XML structure just enough to parse rules. +// We define a minimal local struct to avoid importing the s3api package +// (which would create a circular dependency if s3api ever imports the worker). +type lifecycleConfig struct { + XMLName xml.Name `xml:"LifecycleConfiguration"` + Rules []lifecycleConfigRule `xml:"Rule"` +} + +type lifecycleConfigRule struct { + ID string `xml:"ID"` + Status string `xml:"Status"` + Filter lifecycleFilter `xml:"Filter"` + Prefix string `xml:"Prefix"` + Expiration lifecycleExpiration `xml:"Expiration"` + NoncurrentVersionExpiration noncurrentVersionExpiration `xml:"NoncurrentVersionExpiration"` + AbortIncompleteMultipartUpload abortMPU `xml:"AbortIncompleteMultipartUpload"` +} + +type lifecycleFilter struct { + Prefix string `xml:"Prefix"` + Tag lifecycleTag `xml:"Tag"` + And lifecycleAnd `xml:"And"` + ObjectSizeGreaterThan int64 `xml:"ObjectSizeGreaterThan"` + ObjectSizeLessThan int64 `xml:"ObjectSizeLessThan"` +} + +type lifecycleAnd struct { + Prefix string `xml:"Prefix"` + Tags []lifecycleTag `xml:"Tag"` + ObjectSizeGreaterThan int64 `xml:"ObjectSizeGreaterThan"` + ObjectSizeLessThan int64 `xml:"ObjectSizeLessThan"` +} + +type lifecycleTag struct { + Key string `xml:"Key"` + Value string `xml:"Value"` +} + +type lifecycleExpiration struct { + Days int `xml:"Days"` + Date string `xml:"Date"` + ExpiredObjectDeleteMarker bool `xml:"ExpiredObjectDeleteMarker"` +} + +type noncurrentVersionExpiration struct { + NoncurrentDays int `xml:"NoncurrentDays"` + NewerNoncurrentVersions int `xml:"NewerNoncurrentVersions"` +} + +type abortMPU struct { + DaysAfterInitiation int `xml:"DaysAfterInitiation"` +} + +// loadLifecycleRulesFromBucket reads the lifecycle XML from a bucket's +// metadata and converts it to evaluator-friendly rules. +func loadLifecycleRulesFromBucket( + ctx context.Context, + client filer_pb.SeaweedFilerClient, + bucketsPath, bucket string, +) ([]s3lifecycle.Rule, error) { + bucketDir := bucketsPath + resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: bucketDir, + Name: bucket, + }) + if err != nil { + return nil, fmt.Errorf("lookup bucket %s: %w", bucket, err) + } + if resp.Entry == nil || resp.Entry.Extended == nil { + return nil, nil + } + xmlData := resp.Entry.Extended[lifecycleXMLKey] + if len(xmlData) == 0 { + return nil, nil + } + return parseLifecycleXML(xmlData) +} + +// parseLifecycleXML parses lifecycle configuration XML and converts it +// to evaluator-friendly rules. +func parseLifecycleXML(data []byte) ([]s3lifecycle.Rule, error) { + var config lifecycleConfig + if err := xml.NewDecoder(bytes.NewReader(data)).Decode(&config); err != nil { + return nil, fmt.Errorf("decode lifecycle XML: %w", err) + } + + var rules []s3lifecycle.Rule + for _, r := range config.Rules { + rule := s3lifecycle.Rule{ + ID: r.ID, + Status: r.Status, + } + + // Resolve prefix: Filter.And.Prefix > Filter.Prefix > Rule.Prefix + switch { + case r.Filter.And.Prefix != "" || len(r.Filter.And.Tags) > 0 || + r.Filter.And.ObjectSizeGreaterThan > 0 || r.Filter.And.ObjectSizeLessThan > 0: + rule.Prefix = r.Filter.And.Prefix + rule.FilterTags = tagsToMap(r.Filter.And.Tags) + rule.FilterSizeGreaterThan = r.Filter.And.ObjectSizeGreaterThan + rule.FilterSizeLessThan = r.Filter.And.ObjectSizeLessThan + case r.Filter.Tag.Key != "": + rule.Prefix = r.Filter.Prefix + rule.FilterTags = map[string]string{r.Filter.Tag.Key: r.Filter.Tag.Value} + rule.FilterSizeGreaterThan = r.Filter.ObjectSizeGreaterThan + rule.FilterSizeLessThan = r.Filter.ObjectSizeLessThan + default: + if r.Filter.Prefix != "" { + rule.Prefix = r.Filter.Prefix + } else { + rule.Prefix = r.Prefix + } + rule.FilterSizeGreaterThan = r.Filter.ObjectSizeGreaterThan + rule.FilterSizeLessThan = r.Filter.ObjectSizeLessThan + } + + rule.ExpirationDays = r.Expiration.Days + rule.ExpiredObjectDeleteMarker = r.Expiration.ExpiredObjectDeleteMarker + rule.NoncurrentVersionExpirationDays = r.NoncurrentVersionExpiration.NoncurrentDays + rule.NewerNoncurrentVersions = r.NoncurrentVersionExpiration.NewerNoncurrentVersions + rule.AbortMPUDaysAfterInitiation = r.AbortIncompleteMultipartUpload.DaysAfterInitiation + + // Parse Date if present. + if r.Expiration.Date != "" { + // Date may be RFC3339 or ISO 8601 date-only. + parsed, parseErr := parseExpirationDate(r.Expiration.Date) + if parseErr != nil { + glog.V(1).Infof("s3_lifecycle: skipping rule %s: invalid expiration date %q: %v", r.ID, r.Expiration.Date, parseErr) + continue + } + rule.ExpirationDate = parsed + } + + rules = append(rules, rule) + } + return rules, nil +} + +func tagsToMap(tags []lifecycleTag) map[string]string { + if len(tags) == 0 { + return nil + } + m := make(map[string]string, len(tags)) + for _, t := range tags { + m[t.Key] = t.Value + } + return m +} + +func parseExpirationDate(s string) (time.Time, error) { + // Try RFC3339 first, then ISO 8601 date-only. + formats := []string{ + "2006-01-02T15:04:05Z07:00", + "2006-01-02", + } + for _, f := range formats { + t, err := time.Parse(f, s) + if err == nil { + return t, nil + } + } + return time.Time{}, fmt.Errorf("unrecognized date format: %s", s) +} diff --git a/weed/plugin/worker/lifecycle/rules_test.go b/weed/plugin/worker/lifecycle/rules_test.go new file mode 100644 index 000000000..ab57137a7 --- /dev/null +++ b/weed/plugin/worker/lifecycle/rules_test.go @@ -0,0 +1,256 @@ +package lifecycle + +import ( + "testing" + "time" +) + +func TestParseLifecycleXML_CompleteConfig(t *testing.T) { + xml := []byte(` + + rotation + + Enabled + 30 + + 7 + 2 + + + 3 + + +`) + + rules, err := parseLifecycleXML(xml) + if err != nil { + t.Fatalf("parseLifecycleXML: %v", err) + } + if len(rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(rules)) + } + + r := rules[0] + if r.ID != "rotation" { + t.Errorf("expected ID 'rotation', got %q", r.ID) + } + if r.Status != "Enabled" { + t.Errorf("expected Status 'Enabled', got %q", r.Status) + } + if r.ExpirationDays != 30 { + t.Errorf("expected ExpirationDays=30, got %d", r.ExpirationDays) + } + if r.NoncurrentVersionExpirationDays != 7 { + t.Errorf("expected NoncurrentVersionExpirationDays=7, got %d", r.NoncurrentVersionExpirationDays) + } + if r.NewerNoncurrentVersions != 2 { + t.Errorf("expected NewerNoncurrentVersions=2, got %d", r.NewerNoncurrentVersions) + } + if r.AbortMPUDaysAfterInitiation != 3 { + t.Errorf("expected AbortMPUDaysAfterInitiation=3, got %d", r.AbortMPUDaysAfterInitiation) + } +} + +func TestParseLifecycleXML_PrefixFilter(t *testing.T) { + xml := []byte(` + + logs + Enabled + logs/ + 7 + +`) + + rules, err := parseLifecycleXML(xml) + if err != nil { + t.Fatalf("parseLifecycleXML: %v", err) + } + if len(rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(rules)) + } + if rules[0].Prefix != "logs/" { + t.Errorf("expected Prefix='logs/', got %q", rules[0].Prefix) + } +} + +func TestParseLifecycleXML_LegacyPrefix(t *testing.T) { + // Old-style at rule level instead of inside . + xml := []byte(` + + old + Enabled + archive/ + 90 + +`) + + rules, err := parseLifecycleXML(xml) + if err != nil { + t.Fatalf("parseLifecycleXML: %v", err) + } + if len(rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(rules)) + } + if rules[0].Prefix != "archive/" { + t.Errorf("expected Prefix='archive/', got %q", rules[0].Prefix) + } +} + +func TestParseLifecycleXML_TagFilter(t *testing.T) { + xml := []byte(` + + tag-rule + Enabled + + envdev + + 1 + +`) + + rules, err := parseLifecycleXML(xml) + if err != nil { + t.Fatalf("parseLifecycleXML: %v", err) + } + if len(rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(rules)) + } + r := rules[0] + if len(r.FilterTags) != 1 || r.FilterTags["env"] != "dev" { + t.Errorf("expected FilterTags={env:dev}, got %v", r.FilterTags) + } +} + +func TestParseLifecycleXML_AndFilter(t *testing.T) { + xml := []byte(` + + and-rule + Enabled + + + data/ + envstaging + 1024 + + + 14 + +`) + + rules, err := parseLifecycleXML(xml) + if err != nil { + t.Fatalf("parseLifecycleXML: %v", err) + } + if len(rules) != 1 { + t.Fatalf("expected 1 rule, got %d", len(rules)) + } + r := rules[0] + if r.Prefix != "data/" { + t.Errorf("expected Prefix='data/', got %q", r.Prefix) + } + if r.FilterTags["env"] != "staging" { + t.Errorf("expected tag env=staging, got %v", r.FilterTags) + } + if r.FilterSizeGreaterThan != 1024 { + t.Errorf("expected FilterSizeGreaterThan=1024, got %d", r.FilterSizeGreaterThan) + } +} + +func TestParseLifecycleXML_ExpirationDate(t *testing.T) { + xml := []byte(` + + date-rule + Enabled + + 2026-06-01T00:00:00Z + +`) + + rules, err := parseLifecycleXML(xml) + if err != nil { + t.Fatalf("parseLifecycleXML: %v", err) + } + expected := time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC) + if !rules[0].ExpirationDate.Equal(expected) { + t.Errorf("expected ExpirationDate=%v, got %v", expected, rules[0].ExpirationDate) + } +} + +func TestParseLifecycleXML_ExpiredObjectDeleteMarker(t *testing.T) { + xml := []byte(` + + marker-cleanup + Enabled + + true + +`) + + rules, err := parseLifecycleXML(xml) + if err != nil { + t.Fatalf("parseLifecycleXML: %v", err) + } + if !rules[0].ExpiredObjectDeleteMarker { + t.Error("expected ExpiredObjectDeleteMarker=true") + } +} + +func TestParseLifecycleXML_MultipleRules(t *testing.T) { + xml := []byte(` + + rule1 + Enabled + logs/ + 7 + + + rule2 + Disabled + temp/ + 1 + + + rule3 + Enabled + + 365 + +`) + + rules, err := parseLifecycleXML(xml) + if err != nil { + t.Fatalf("parseLifecycleXML: %v", err) + } + if len(rules) != 3 { + t.Fatalf("expected 3 rules, got %d", len(rules)) + } + if rules[1].Status != "Disabled" { + t.Errorf("expected rule2 Status=Disabled, got %q", rules[1].Status) + } +} + +func TestParseExpirationDate(t *testing.T) { + tests := []struct { + name string + input string + want time.Time + wantErr bool + }{ + {"rfc3339_utc", "2026-06-01T00:00:00Z", time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC), false}, + {"rfc3339_offset", "2026-06-01T00:00:00+05:00", time.Date(2026, 6, 1, 0, 0, 0, 0, time.FixedZone("", 5*3600)), false}, + {"date_only", "2026-06-01", time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC), false}, + {"invalid", "not-a-date", time.Time{}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseExpirationDate(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("parseExpirationDate(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr) + return + } + if !tt.wantErr && !got.Equal(tt.want) { + t.Errorf("parseExpirationDate(%q) = %v, want %v", tt.input, got, tt.want) + } + }) + } +}