Browse Source

lifecycle worker: drive MPU abort from lifecycle rules (#8812)

* lifecycle worker: drive MPU abort from lifecycle rules

Update the multipart upload abort phase to read
AbortIncompleteMultipartUpload.DaysAfterInitiation from the parsed
lifecycle rules. Falls back to the worker config abort_mpu_days when
no lifecycle XML rule specifies the value.

This means per-bucket MPU abort thresholds are now respected when
set via PutBucketLifecycleConfiguration, instead of using a single
global worker config value for all buckets.

* lifecycle worker: only use config AbortMPUDays when no lifecycle XML exists

When a bucket has lifecycle XML (useRuleEval=true) but no
AbortIncompleteMultipartUpload rule, mpuAbortDays should be 0
(no abort), not the worker config default. The config fallback
should only apply to buckets without lifecycle XML.

* lifecycle worker: only skip .uploads at bucket root

* lifecycle worker: use per-upload rule evaluation for MPU abort

Replace the single bucket-wide mpuAbortDays with per-upload evaluation
using s3lifecycle.EvaluateMPUAbort, which respects each rule's prefix
filter and DaysAfterInitiation threshold.

Previously the code took the first enabled abort rule's days value
and applied it to all uploads, ignoring prefix scoping and multiple
rules with different thresholds.

Config fallback (abort_mpu_days) now only applies when lifecycle XML
is truly absent (xmlPresent=false), not when XML exists but has no
abort rules.

Also fix EvaluateMPUAbort to use expectedExpiryTime for midnight-UTC
semantics matching other lifecycle cutoffs.

---------

Co-authored-by: Copilot <copilot@github.com>
pull/4306/merge
Chris Lu 1 day ago
committed by GitHub
parent
commit
782ab84f95
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 66
      weed/plugin/worker/lifecycle/execution.go
  2. 2
      weed/s3api/s3lifecycle/evaluator.go

66
weed/plugin/worker/lifecycle/execution.go

@ -200,7 +200,23 @@ func (h *Handler) executeLifecycleForBucket(
}
// Abort incomplete multipart uploads.
if config.AbortMPUDays > 0 && remaining > 0 {
// When lifecycle XML exists, evaluate each upload against the rules
// (respecting per-rule prefix filters and DaysAfterInitiation).
// Fall back to worker config abort_mpu_days only when no lifecycle
// XML is configured for the bucket.
if xmlPresent && remaining > 0 {
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: jobID, JobType: jobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING,
Stage: "aborting_mpus", Message: "evaluating MPU abort rules",
})
aborted, abortErrs, abortCtxErr := abortMPUsByRules(ctx, filerClient, bucketsPath, bucket, lifecycleRules, remaining)
result.mpuAborted = int64(aborted)
result.errors += int64(abortErrs)
if abortCtxErr != nil {
return result, abortCtxErr
}
} else if !xmlPresent && config.AbortMPUDays > 0 && remaining > 0 {
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: jobID, JobType: jobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING,
@ -364,6 +380,54 @@ func matchesDeleteMarkerRule(rules []s3lifecycle.Rule, objKey string) bool {
return false
}
// abortMPUsByRules scans the .uploads directory and evaluates each upload
// against lifecycle rules using EvaluateMPUAbort, which respects per-rule
// prefix filters and DaysAfterInitiation thresholds.
func abortMPUsByRules(
ctx context.Context,
client filer_pb.SeaweedFilerClient,
bucketsPath, bucket string,
rules []s3lifecycle.Rule,
limit int64,
) (aborted, errs int, ctxErr error) {
uploadsDir := path.Join(bucketsPath, bucket, ".uploads")
now := time.Now()
listErr := filer_pb.SeaweedList(ctx, client, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error {
if ctx.Err() != nil {
return ctx.Err()
}
if !entry.IsDirectory {
return nil
}
if entry.Attributes == nil || entry.Attributes.Crtime <= 0 {
return nil
}
createdAt := time.Unix(entry.Attributes.Crtime, 0)
result := s3lifecycle.EvaluateMPUAbort(rules, entry.Name, createdAt, now)
if result.Action == s3lifecycle.ActionAbortMultipartUpload {
uploadPath := path.Join(uploadsDir, entry.Name)
if err := filer_pb.DoRemove(ctx, client, uploadsDir, entry.Name, true, true, true, false, nil); err != nil {
glog.V(1).Infof("s3_lifecycle: failed to abort MPU %s: %v", uploadPath, err)
errs++
} else {
aborted++
}
}
if limit > 0 && int64(aborted+errs) >= limit {
return errLimitReached
}
return nil
}, "", false, 10000)
if listErr != nil && !errors.Is(listErr, errLimitReached) {
return aborted, errs, fmt.Errorf("list uploads in %s: %w", uploadsDir, listErr)
}
return aborted, errs, nil
}
// abortIncompleteMPUs scans the .uploads directory under a bucket and
// removes multipart upload entries older than the specified number of days.
func abortIncompleteMPUs(

2
weed/s3api/s3lifecycle/evaluator.go

@ -107,7 +107,7 @@ func EvaluateMPUAbort(rules []Rule, uploadKey string, createdAt time.Time, now t
if !matchesPrefix(rule.Prefix, uploadKey) {
continue
}
cutoff := createdAt.Add(time.Duration(rule.AbortMPUDaysAfterInitiation) * 24 * time.Hour)
cutoff := expectedExpiryTime(createdAt, rule.AbortMPUDaysAfterInitiation)
if !now.Before(cutoff) {
return EvalResult{Action: ActionAbortMultipartUpload, RuleID: rule.ID}
}

Loading…
Cancel
Save