From 782ab84f9511e900795f3d77b5e8e9fc42e96e20 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Mar 2026 13:50:33 -0700 Subject: [PATCH] lifecycle worker: drive MPU abort from lifecycle rules (#8812) * lifecycle worker: drive MPU abort from lifecycle rules Update the multipart upload abort phase to read AbortIncompleteMultipartUpload.DaysAfterInitiation from the parsed lifecycle rules. Falls back to the worker config abort_mpu_days when no lifecycle XML rule specifies the value. This means per-bucket MPU abort thresholds are now respected when set via PutBucketLifecycleConfiguration, instead of using a single global worker config value for all buckets. * lifecycle worker: only use config AbortMPUDays when no lifecycle XML exists When a bucket has lifecycle XML (useRuleEval=true) but no AbortIncompleteMultipartUpload rule, mpuAbortDays should be 0 (no abort), not the worker config default. The config fallback should only apply to buckets without lifecycle XML. * lifecycle worker: only skip .uploads at bucket root * lifecycle worker: use per-upload rule evaluation for MPU abort Replace the single bucket-wide mpuAbortDays with per-upload evaluation using s3lifecycle.EvaluateMPUAbort, which respects each rule's prefix filter and DaysAfterInitiation threshold. Previously the code took the first enabled abort rule's days value and applied it to all uploads, ignoring prefix scoping and multiple rules with different thresholds. Config fallback (abort_mpu_days) now only applies when lifecycle XML is truly absent (xmlPresent=false), not when XML exists but has no abort rules. Also fix EvaluateMPUAbort to use expectedExpiryTime for midnight-UTC semantics matching other lifecycle cutoffs. --------- Co-authored-by: Copilot --- weed/plugin/worker/lifecycle/execution.go | 66 ++++++++++++++++++++++- weed/s3api/s3lifecycle/evaluator.go | 2 +- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/weed/plugin/worker/lifecycle/execution.go b/weed/plugin/worker/lifecycle/execution.go index 114392fc1..be47b6add 100644 --- a/weed/plugin/worker/lifecycle/execution.go +++ b/weed/plugin/worker/lifecycle/execution.go @@ -200,7 +200,23 @@ func (h *Handler) executeLifecycleForBucket( } // Abort incomplete multipart uploads. - if config.AbortMPUDays > 0 && remaining > 0 { + // When lifecycle XML exists, evaluate each upload against the rules + // (respecting per-rule prefix filters and DaysAfterInitiation). + // Fall back to worker config abort_mpu_days only when no lifecycle + // XML is configured for the bucket. + if xmlPresent && remaining > 0 { + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: jobID, JobType: jobType, + State: plugin_pb.JobState_JOB_STATE_RUNNING, + Stage: "aborting_mpus", Message: "evaluating MPU abort rules", + }) + aborted, abortErrs, abortCtxErr := abortMPUsByRules(ctx, filerClient, bucketsPath, bucket, lifecycleRules, remaining) + result.mpuAborted = int64(aborted) + result.errors += int64(abortErrs) + if abortCtxErr != nil { + return result, abortCtxErr + } + } else if !xmlPresent && config.AbortMPUDays > 0 && remaining > 0 { _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: jobID, JobType: jobType, State: plugin_pb.JobState_JOB_STATE_RUNNING, @@ -364,6 +380,54 @@ func matchesDeleteMarkerRule(rules []s3lifecycle.Rule, objKey string) bool { return false } +// abortMPUsByRules scans the .uploads directory and evaluates each upload +// against lifecycle rules using EvaluateMPUAbort, which respects per-rule +// prefix filters and DaysAfterInitiation thresholds. +func abortMPUsByRules( + ctx context.Context, + client filer_pb.SeaweedFilerClient, + bucketsPath, bucket string, + rules []s3lifecycle.Rule, + limit int64, +) (aborted, errs int, ctxErr error) { + uploadsDir := path.Join(bucketsPath, bucket, ".uploads") + now := time.Now() + + listErr := filer_pb.SeaweedList(ctx, client, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error { + if ctx.Err() != nil { + return ctx.Err() + } + if !entry.IsDirectory { + return nil + } + if entry.Attributes == nil || entry.Attributes.Crtime <= 0 { + return nil + } + + createdAt := time.Unix(entry.Attributes.Crtime, 0) + result := s3lifecycle.EvaluateMPUAbort(rules, entry.Name, createdAt, now) + if result.Action == s3lifecycle.ActionAbortMultipartUpload { + uploadPath := path.Join(uploadsDir, entry.Name) + if err := filer_pb.DoRemove(ctx, client, uploadsDir, entry.Name, true, true, true, false, nil); err != nil { + glog.V(1).Infof("s3_lifecycle: failed to abort MPU %s: %v", uploadPath, err) + errs++ + } else { + aborted++ + } + } + + if limit > 0 && int64(aborted+errs) >= limit { + return errLimitReached + } + return nil + }, "", false, 10000) + + if listErr != nil && !errors.Is(listErr, errLimitReached) { + return aborted, errs, fmt.Errorf("list uploads in %s: %w", uploadsDir, listErr) + } + return aborted, errs, nil +} + // abortIncompleteMPUs scans the .uploads directory under a bucket and // removes multipart upload entries older than the specified number of days. func abortIncompleteMPUs( diff --git a/weed/s3api/s3lifecycle/evaluator.go b/weed/s3api/s3lifecycle/evaluator.go index 233580501..181b08e44 100644 --- a/weed/s3api/s3lifecycle/evaluator.go +++ b/weed/s3api/s3lifecycle/evaluator.go @@ -107,7 +107,7 @@ func EvaluateMPUAbort(rules []Rule, uploadKey string, createdAt time.Time, now t if !matchesPrefix(rule.Prefix, uploadKey) { continue } - cutoff := createdAt.Add(time.Duration(rule.AbortMPUDaysAfterInitiation) * 24 * time.Hour) + cutoff := expectedExpiryTime(createdAt, rule.AbortMPUDaysAfterInitiation) if !now.Before(cutoff) { return EvalResult{Action: ActionAbortMultipartUpload, RuleID: rule.ID} }