diff --git a/weed/plugin/worker/lifecycle/execution.go b/weed/plugin/worker/lifecycle/execution.go index 114392fc1..be47b6add 100644 --- a/weed/plugin/worker/lifecycle/execution.go +++ b/weed/plugin/worker/lifecycle/execution.go @@ -200,7 +200,23 @@ func (h *Handler) executeLifecycleForBucket( } // Abort incomplete multipart uploads. - if config.AbortMPUDays > 0 && remaining > 0 { + // When lifecycle XML exists, evaluate each upload against the rules + // (respecting per-rule prefix filters and DaysAfterInitiation). + // Fall back to worker config abort_mpu_days only when no lifecycle + // XML is configured for the bucket. + if xmlPresent && remaining > 0 { + _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ + JobId: jobID, JobType: jobType, + State: plugin_pb.JobState_JOB_STATE_RUNNING, + Stage: "aborting_mpus", Message: "evaluating MPU abort rules", + }) + aborted, abortErrs, abortCtxErr := abortMPUsByRules(ctx, filerClient, bucketsPath, bucket, lifecycleRules, remaining) + result.mpuAborted = int64(aborted) + result.errors += int64(abortErrs) + if abortCtxErr != nil { + return result, abortCtxErr + } + } else if !xmlPresent && config.AbortMPUDays > 0 && remaining > 0 { _ = sender.SendProgress(&plugin_pb.JobProgressUpdate{ JobId: jobID, JobType: jobType, State: plugin_pb.JobState_JOB_STATE_RUNNING, @@ -364,6 +380,54 @@ func matchesDeleteMarkerRule(rules []s3lifecycle.Rule, objKey string) bool { return false } +// abortMPUsByRules scans the .uploads directory and evaluates each upload +// against lifecycle rules using EvaluateMPUAbort, which respects per-rule +// prefix filters and DaysAfterInitiation thresholds. +func abortMPUsByRules( + ctx context.Context, + client filer_pb.SeaweedFilerClient, + bucketsPath, bucket string, + rules []s3lifecycle.Rule, + limit int64, +) (aborted, errs int, ctxErr error) { + uploadsDir := path.Join(bucketsPath, bucket, ".uploads") + now := time.Now() + + listErr := filer_pb.SeaweedList(ctx, client, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error { + if ctx.Err() != nil { + return ctx.Err() + } + if !entry.IsDirectory { + return nil + } + if entry.Attributes == nil || entry.Attributes.Crtime <= 0 { + return nil + } + + createdAt := time.Unix(entry.Attributes.Crtime, 0) + result := s3lifecycle.EvaluateMPUAbort(rules, entry.Name, createdAt, now) + if result.Action == s3lifecycle.ActionAbortMultipartUpload { + uploadPath := path.Join(uploadsDir, entry.Name) + if err := filer_pb.DoRemove(ctx, client, uploadsDir, entry.Name, true, true, true, false, nil); err != nil { + glog.V(1).Infof("s3_lifecycle: failed to abort MPU %s: %v", uploadPath, err) + errs++ + } else { + aborted++ + } + } + + if limit > 0 && int64(aborted+errs) >= limit { + return errLimitReached + } + return nil + }, "", false, 10000) + + if listErr != nil && !errors.Is(listErr, errLimitReached) { + return aborted, errs, fmt.Errorf("list uploads in %s: %w", uploadsDir, listErr) + } + return aborted, errs, nil +} + // abortIncompleteMPUs scans the .uploads directory under a bucket and // removes multipart upload entries older than the specified number of days. func abortIncompleteMPUs( diff --git a/weed/s3api/s3lifecycle/evaluator.go b/weed/s3api/s3lifecycle/evaluator.go index 233580501..181b08e44 100644 --- a/weed/s3api/s3lifecycle/evaluator.go +++ b/weed/s3api/s3lifecycle/evaluator.go @@ -107,7 +107,7 @@ func EvaluateMPUAbort(rules []Rule, uploadKey string, createdAt time.Time, now t if !matchesPrefix(rule.Prefix, uploadKey) { continue } - cutoff := createdAt.Add(time.Duration(rule.AbortMPUDaysAfterInitiation) * 24 * time.Hour) + cutoff := expectedExpiryTime(createdAt, rule.AbortMPUDaysAfterInitiation) if !now.Before(cutoff) { return EvalResult{Action: ActionAbortMultipartUpload, RuleID: rule.ID} }