You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
231 lines
7.7 KiB
231 lines
7.7 KiB
package s3_objectlock
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
)
|
|
|
|
// ====================================================================
|
|
// SHARED OBJECT LOCK CHECKING FUNCTIONS
|
|
// ====================================================================
|
|
// These functions are used by S3 API, Admin UI, and shell commands for
|
|
// checking Object Lock status before bucket deletion.
|
|
|
|
// EntryHasActiveLock checks if an entry has an active retention or legal hold
|
|
// This is a standalone function that can be used by any component
|
|
func EntryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool {
|
|
if entry == nil || entry.Extended == nil {
|
|
return false
|
|
}
|
|
|
|
// Check for active legal hold (case-insensitive, trimmed for defensive parsing)
|
|
if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
|
|
legalHold := strings.TrimSpace(strings.ToUpper(string(legalHoldBytes)))
|
|
if legalHold == s3_constants.LegalHoldOn {
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Check for active retention (case-insensitive, trimmed for defensive parsing)
|
|
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
|
|
mode := strings.TrimSpace(strings.ToUpper(string(modeBytes)))
|
|
if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance {
|
|
// Check if retention is still active
|
|
if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
|
|
dateStr := strings.TrimSpace(string(dateBytes))
|
|
timestamp, err := strconv.ParseInt(dateStr, 10, 64)
|
|
if err != nil {
|
|
// Fail-safe: if we can't parse the retention date, assume the object is locked
|
|
// to prevent accidental data loss
|
|
glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", dateStr, err)
|
|
return true
|
|
}
|
|
retainUntil := time.Unix(timestamp, 0)
|
|
if retainUntil.After(currentTime) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// HasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
|
|
// This function uses the filer gRPC client to scan the bucket directory
|
|
func HasObjectsWithActiveLocks(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketPath string) (bool, error) {
|
|
hasLocks := false
|
|
currentTime := time.Now()
|
|
|
|
err := recursivelyCheckLocksWithClient(ctx, client, bucketPath, &hasLocks, currentTime)
|
|
if err != nil {
|
|
return false, fmt.Errorf("error checking for locked objects: %w", err)
|
|
}
|
|
|
|
return hasLocks, nil
|
|
}
|
|
|
|
// paginateEntries is a generic helper that handles pagination logic for listing directory entries.
|
|
// The processEntry callback is called for each entry; returning stop=true stops iteration early.
|
|
func paginateEntries(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string,
|
|
processEntry func(entry *filer_pb.Entry) (stop bool, err error)) error {
|
|
lastFileName := ""
|
|
for {
|
|
resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
|
|
Directory: dir,
|
|
StartFromFileName: lastFileName,
|
|
InclusiveStartFrom: false,
|
|
Limit: 10000,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list directory %s: %w", dir, err)
|
|
}
|
|
|
|
entriesReceived := false
|
|
for {
|
|
entryResp, recvErr := resp.Recv()
|
|
if recvErr != nil {
|
|
if errors.Is(recvErr, io.EOF) {
|
|
break // Normal end of stream
|
|
}
|
|
return fmt.Errorf("failed to receive entry from %s: %w", dir, recvErr)
|
|
}
|
|
entriesReceived = true
|
|
entry := entryResp.Entry
|
|
lastFileName = entry.Name
|
|
|
|
// Skip invalid entry names to prevent path traversal
|
|
if entry.Name == "" || entry.Name == "." || entry.Name == ".." ||
|
|
strings.ContainsAny(entry.Name, "/\\") {
|
|
glog.V(2).Infof("Skipping invalid entry name: %q in %s", entry.Name, dir)
|
|
continue
|
|
}
|
|
|
|
stop, err := processEntry(entry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if stop {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
if !entriesReceived {
|
|
break
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// recursivelyCheckLocksWithClient recursively checks all objects and versions for active locks
|
|
func recursivelyCheckLocksWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string, hasLocks *bool, currentTime time.Time) error {
|
|
if *hasLocks {
|
|
return nil // Early exit if already found a locked object
|
|
}
|
|
|
|
return paginateEntries(ctx, client, dir, func(entry *filer_pb.Entry) (bool, error) {
|
|
if *hasLocks {
|
|
return true, nil // Stop iteration
|
|
}
|
|
|
|
// Skip special directories
|
|
if entry.Name == s3_constants.MultipartUploadsFolder {
|
|
return false, nil // Continue
|
|
}
|
|
|
|
if entry.IsDirectory {
|
|
subDir := dir + "/" + entry.Name
|
|
if entry.Name == s3_constants.VersionsFolder {
|
|
// Check all version files (exact match for .versions folder)
|
|
if err := checkVersionsForLocksWithClient(ctx, client, subDir, hasLocks, currentTime); err != nil {
|
|
return false, err
|
|
}
|
|
} else {
|
|
// Recursively check subdirectories
|
|
if err := recursivelyCheckLocksWithClient(ctx, client, subDir, hasLocks, currentTime); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
} else {
|
|
// Check if this object has an active lock
|
|
if EntryHasActiveLock(entry, currentTime) {
|
|
*hasLocks = true
|
|
glog.V(2).Infof("Found object with active lock: %s/%s", dir, entry.Name)
|
|
return true, nil // Stop iteration
|
|
}
|
|
}
|
|
return false, nil // Continue
|
|
})
|
|
}
|
|
|
|
// checkVersionsForLocksWithClient checks all versions in a .versions directory for active locks
|
|
func checkVersionsForLocksWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, versionsDir string, hasLocks *bool, currentTime time.Time) error {
|
|
return paginateEntries(ctx, client, versionsDir, func(entry *filer_pb.Entry) (bool, error) {
|
|
if *hasLocks {
|
|
return true, nil // Stop iteration
|
|
}
|
|
|
|
if EntryHasActiveLock(entry, currentTime) {
|
|
*hasLocks = true
|
|
glog.V(2).Infof("Found version with active lock: %s/%s", versionsDir, entry.Name)
|
|
return true, nil // Stop iteration
|
|
}
|
|
return false, nil // Continue
|
|
})
|
|
}
|
|
|
|
// IsObjectLockEnabled checks if Object Lock is enabled on a bucket entry
|
|
func IsObjectLockEnabled(entry *filer_pb.Entry) bool {
|
|
if entry == nil || entry.Extended == nil {
|
|
return false
|
|
}
|
|
|
|
enabledBytes, exists := entry.Extended[s3_constants.ExtObjectLockEnabledKey]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
enabled := string(enabledBytes)
|
|
return enabled == s3_constants.ObjectLockEnabled || enabled == "true"
|
|
}
|
|
|
|
// CheckBucketForLockedObjects is a unified function that checks if a bucket has Object Lock enabled
|
|
// and if so, scans for objects with active locks. This combines the bucket lookup and lock check
|
|
// into a single operation used by S3 API, Admin UI, and shell commands.
|
|
// Returns an error if the bucket has locked objects or if the check fails.
|
|
func CheckBucketForLockedObjects(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketsPath, bucketName string) error {
|
|
// Look up the bucket entry
|
|
lookupResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
|
|
Directory: bucketsPath,
|
|
Name: bucketName,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("bucket not found: %w", err)
|
|
}
|
|
|
|
// Check if Object Lock is enabled
|
|
if !IsObjectLockEnabled(lookupResp.Entry) {
|
|
return nil // No Object Lock, nothing to check
|
|
}
|
|
|
|
// Check for objects with active locks
|
|
bucketPath := bucketsPath + "/" + bucketName
|
|
hasLockedObjects, checkErr := HasObjectsWithActiveLocks(ctx, client, bucketPath)
|
|
if checkErr != nil {
|
|
return fmt.Errorf("failed to check for locked objects: %w", checkErr)
|
|
}
|
|
if hasLockedObjects {
|
|
return fmt.Errorf("bucket has objects with active Object Lock retention or legal hold")
|
|
}
|
|
|
|
return nil
|
|
}
|