diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go index eeeccf981..c62b9da07 100644 --- a/weed/admin/dash/admin_server.go +++ b/weed/admin/dash/admin_server.go @@ -30,6 +30,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/worker/tasks" ) +const ( + // DefaultBucketsPath is the default path for S3 buckets in the filer + DefaultBucketsPath = "/buckets" +) + type AdminServer struct { masterClient *wdclient.MasterClient templateFS http.FileSystem @@ -271,7 +276,7 @@ func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) { err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { // List buckets by looking at the /buckets directory stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ - Directory: "/buckets", + Directory: DefaultBucketsPath, Prefix: "", StartFromFileName: "", InclusiveStartFrom: false, @@ -381,7 +386,7 @@ func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { // Get bucket info bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ - Directory: "/buckets", + Directory: DefaultBucketsPath, Name: bucketName, }) if err != nil { @@ -506,14 +511,36 @@ func (s *AdminServer) CreateS3Bucket(bucketName string) error { // DeleteS3Bucket deletes an S3 bucket and all its contents func (s *AdminServer) DeleteS3Bucket(bucketName string) error { + // First, check if bucket has Object Lock enabled and if there are locked objects + ctx := context.Background() + err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return s3api.CheckBucketForLockedObjects(ctx, client, DefaultBucketsPath, bucketName) + }) + if err != nil { + return err + } + + // Delete the collection first (same as s3.bucket.delete shell command) + // This ensures volume data is cleaned up properly + err = s.WithMasterClient(func(client master_pb.SeaweedClient) error { + _, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{ + Name: bucketName, + }) + return err + }) + if err != nil { + return fmt.Errorf("failed to delete collection: %w", err) + } + + // Then delete bucket directory recursively from filer + // Use same parameters as s3.bucket.delete shell command and S3 API return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - // Delete bucket directory recursively - _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{ - Directory: "/buckets", + _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{ + Directory: DefaultBucketsPath, Name: bucketName, - IsDeleteData: true, + IsDeleteData: false, // Collection already deleted, just remove metadata IsRecursive: true, - IgnoreRecursiveError: false, + IgnoreRecursiveError: true, // Same as S3 API and shell command }) if err != nil { return fmt.Errorf("failed to delete bucket: %w", err) diff --git a/weed/s3api/object_lock_utils.go b/weed/s3api/object_lock_utils.go index 39496e14f..6b00d8595 100644 --- a/weed/s3api/object_lock_utils.go +++ b/weed/s3api/object_lock_utils.go @@ -1,6 +1,7 @@ package s3api import ( + "context" "encoding/xml" "fmt" "strconv" @@ -9,6 +10,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_objectlock" ) // ObjectLockUtils provides shared utilities for Object Lock configuration @@ -361,3 +363,28 @@ func validateDefaultRetention(retention *DefaultRetention) error { return nil } + +// ==================================================================== +// SHARED OBJECT LOCK CHECKING FUNCTIONS +// ==================================================================== +// These functions delegate to s3_objectlock package to avoid code duplication. +// They are kept here for backward compatibility with existing callers. + +// EntryHasActiveLock checks if an entry has an active retention or legal hold +// Delegates to s3_objectlock.EntryHasActiveLock +func EntryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool { + return s3_objectlock.EntryHasActiveLock(entry, currentTime) +} + +// HasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold +// Delegates to s3_objectlock.HasObjectsWithActiveLocks +func HasObjectsWithActiveLocks(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketPath string) (bool, error) { + return s3_objectlock.HasObjectsWithActiveLocks(ctx, client, bucketPath) +} + +// CheckBucketForLockedObjects is a unified function that checks if a bucket has Object Lock enabled +// and if so, scans for objects with active locks. +// Delegates to s3_objectlock.CheckBucketForLockedObjects +func CheckBucketForLockedObjects(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketsPath, bucketName string) error { + return s3_objectlock.CheckBucketForLockedObjects(ctx, client, bucketsPath, bucketName) +} diff --git a/weed/s3api/s3_objectlock/object_lock_check.go b/weed/s3api/s3_objectlock/object_lock_check.go new file mode 100644 index 000000000..a66e587c5 --- /dev/null +++ b/weed/s3api/s3_objectlock/object_lock_check.go @@ -0,0 +1,232 @@ +package s3_objectlock + +import ( + "context" + "errors" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" +) + +// ==================================================================== +// SHARED OBJECT LOCK CHECKING FUNCTIONS +// ==================================================================== +// These functions are used by S3 API, Admin UI, and shell commands for +// checking Object Lock status before bucket deletion. + +// EntryHasActiveLock checks if an entry has an active retention or legal hold +// This is a standalone function that can be used by any component +func EntryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool { + if entry == nil || entry.Extended == nil { + return false + } + + // Check for active legal hold (case-insensitive, trimmed for defensive parsing) + if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists { + legalHold := strings.TrimSpace(strings.ToUpper(string(legalHoldBytes))) + if legalHold == s3_constants.LegalHoldOn { + return true + } + } + + // Check for active retention (case-insensitive, trimmed for defensive parsing) + if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists { + mode := strings.TrimSpace(strings.ToUpper(string(modeBytes))) + if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance { + // Check if retention is still active + if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists { + dateStr := strings.TrimSpace(string(dateBytes)) + timestamp, err := strconv.ParseInt(dateStr, 10, 64) + if err != nil { + // Fail-safe: if we can't parse the retention date, assume the object is locked + // to prevent accidental data loss + glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", dateStr, err) + return true + } + retainUntil := time.Unix(timestamp, 0) + if retainUntil.After(currentTime) { + return true + } + } + } + } + + return false +} + +// HasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold +// This function uses the filer gRPC client to scan the bucket directory +func HasObjectsWithActiveLocks(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketPath string) (bool, error) { + hasLocks := false + currentTime := time.Now() + + err := recursivelyCheckLocksWithClient(ctx, client, bucketPath, &hasLocks, currentTime) + if err != nil { + return false, fmt.Errorf("error checking for locked objects: %w", err) + } + + return hasLocks, nil +} + +// paginateEntries is a generic helper that handles pagination logic for listing directory entries. +// The processEntry callback is called for each entry; returning stop=true stops iteration early. +func paginateEntries(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string, + processEntry func(entry *filer_pb.Entry) (stop bool, err error)) error { + lastFileName := "" + for { + resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + Directory: dir, + StartFromFileName: lastFileName, + InclusiveStartFrom: false, + Limit: 10000, + }) + if err != nil { + return fmt.Errorf("failed to list directory %s: %w", dir, err) + } + + entriesReceived := false + for { + entryResp, recvErr := resp.Recv() + if recvErr != nil { + if errors.Is(recvErr, io.EOF) { + break // Normal end of stream + } + return fmt.Errorf("failed to receive entry from %s: %w", dir, recvErr) + } + entriesReceived = true + entry := entryResp.Entry + lastFileName = entry.Name + + // Skip invalid entry names to prevent path traversal + if entry.Name == "" || entry.Name == "." || entry.Name == ".." || + strings.ContainsAny(entry.Name, "/\\") { + glog.V(2).Infof("Skipping invalid entry name: %q in %s", entry.Name, dir) + continue + } + + stop, err := processEntry(entry) + if err != nil { + return err + } + if stop { + return nil + } + } + + if !entriesReceived { + break + } + } + return nil +} + +// recursivelyCheckLocksWithClient recursively checks all objects and versions for active locks +func recursivelyCheckLocksWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string, hasLocks *bool, currentTime time.Time) error { + if *hasLocks { + return nil // Early exit if already found a locked object + } + + return paginateEntries(ctx, client, dir, func(entry *filer_pb.Entry) (bool, error) { + if *hasLocks { + return true, nil // Stop iteration + } + + // Skip special directories + if entry.Name == s3_constants.MultipartUploadsFolder { + return false, nil // Continue + } + + if entry.IsDirectory { + subDir := dir + "/" + entry.Name + if entry.Name == s3_constants.VersionsFolder { + // Check all version files (exact match for .versions folder) + if err := checkVersionsForLocksWithClient(ctx, client, subDir, hasLocks, currentTime); err != nil { + return false, err + } + } else { + // Recursively check subdirectories + if err := recursivelyCheckLocksWithClient(ctx, client, subDir, hasLocks, currentTime); err != nil { + return false, err + } + } + } else { + // Check if this object has an active lock + if EntryHasActiveLock(entry, currentTime) { + *hasLocks = true + glog.V(2).Infof("Found object with active lock: %s/%s", dir, entry.Name) + return true, nil // Stop iteration + } + } + return false, nil // Continue + }) +} + +// checkVersionsForLocksWithClient checks all versions in a .versions directory for active locks +func checkVersionsForLocksWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, versionsDir string, hasLocks *bool, currentTime time.Time) error { + return paginateEntries(ctx, client, versionsDir, func(entry *filer_pb.Entry) (bool, error) { + if *hasLocks { + return true, nil // Stop iteration + } + + if EntryHasActiveLock(entry, currentTime) { + *hasLocks = true + glog.V(2).Infof("Found version with active lock: %s/%s", versionsDir, entry.Name) + return true, nil // Stop iteration + } + return false, nil // Continue + }) +} + +// IsObjectLockEnabled checks if Object Lock is enabled on a bucket entry +func IsObjectLockEnabled(entry *filer_pb.Entry) bool { + if entry == nil || entry.Extended == nil { + return false + } + + enabledBytes, exists := entry.Extended[s3_constants.ExtObjectLockEnabledKey] + if !exists { + return false + } + + enabled := string(enabledBytes) + return enabled == s3_constants.ObjectLockEnabled || enabled == "true" +} + +// CheckBucketForLockedObjects is a unified function that checks if a bucket has Object Lock enabled +// and if so, scans for objects with active locks. This combines the bucket lookup and lock check +// into a single operation used by S3 API, Admin UI, and shell commands. +// Returns an error if the bucket has locked objects or if the check fails. +func CheckBucketForLockedObjects(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketsPath, bucketName string) error { + // Look up the bucket entry + lookupResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + Directory: bucketsPath, + Name: bucketName, + }) + if err != nil { + return fmt.Errorf("bucket not found: %w", err) + } + + // Check if Object Lock is enabled + if !IsObjectLockEnabled(lookupResp.Entry) { + return nil // No Object Lock, nothing to check + } + + // Check for objects with active locks + bucketPath := bucketsPath + "/" + bucketName + hasLockedObjects, checkErr := HasObjectsWithActiveLocks(ctx, client, bucketPath) + if checkErr != nil { + return fmt.Errorf("failed to check for locked objects: %w", checkErr) + } + if hasLockedObjects { + return fmt.Errorf("bucket has objects with active Object Lock retention or legal hold") + } + + return nil +} + diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 2d67aa551..5ff155890 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -9,9 +9,7 @@ import ( "fmt" "math" "net/http" - "path" "sort" - "strconv" "strings" "time" @@ -336,7 +334,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque // If object lock is enabled, check for objects with active locks if bucketConfig.ObjectLockConfig != nil { - hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(bucket) + hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(r.Context(), bucket) if checkErr != nil { glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -400,158 +398,22 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque } // hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold -func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) { +// Delegates to the shared HasObjectsWithActiveLocks function in object_lock_utils.go +func (s3a *S3ApiServer) hasObjectsWithActiveLocks(ctx context.Context, bucket string) (bool, error) { bucketPath := s3a.option.BucketsPath + "/" + bucket + var hasLocks bool + var checkErr error - // Check all objects including versions for active locks - // Establish current time once at the start for consistency across the entire scan - hasLocks := false - currentTime := time.Now() - err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks, currentTime) + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + hasLocks, checkErr = HasObjectsWithActiveLocks(ctx, client, bucketPath) + return checkErr + }) if err != nil { - return false, fmt.Errorf("error checking for locked objects: %w", err) + return false, err } - return hasLocks, nil } -const ( - // lockCheckPaginationSize is the page size for listing directories during lock checks - lockCheckPaginationSize = 10000 -) - -// errStopPagination is a sentinel error to signal early termination of pagination -var errStopPagination = errors.New("stop pagination") - -// paginateEntries iterates through directory entries with pagination -// Calls fn for each page of entries. If fn returns errStopPagination, iteration stops successfully. -func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error { - startFrom := "" - for { - entries, isLast, err := s3a.list(dir, "", startFrom, false, lockCheckPaginationSize) - if err != nil { - // Fail-safe: propagate error to prevent incorrect bucket deletion - return fmt.Errorf("failed to list directory %s: %w", dir, err) - } - - if err := fn(entries); err != nil { - if errors.Is(err, errStopPagination) { - return nil - } - return err - } - - if isLast || len(entries) == 0 { - break - } - // Use the last entry name as the start point for next page - startFrom = entries[len(entries)-1].Name - } - return nil -} - -// recursivelyCheckLocks recursively checks all objects and versions for active locks -// Uses pagination to handle directories with more than 10,000 entries -func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error { - if *hasLocks { - // Early exit if we've already found a locked object - return nil - } - - // Process entries in the current directory with pagination - err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error { - for _, entry := range entries { - if *hasLocks { - // Early exit if we've already found a locked object - return errStopPagination - } - - // Skip special directories (multipart uploads, etc) - if entry.Name == s3_constants.MultipartUploadsFolder { - continue - } - - if entry.IsDirectory { - subDir := path.Join(dir, entry.Name) - if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { - // If it's a .versions directory, check all version files with pagination - err := s3a.paginateEntries(subDir, func(versionEntries []*filer_pb.Entry) error { - for _, versionEntry := range versionEntries { - if s3a.entryHasActiveLock(versionEntry, currentTime) { - *hasLocks = true - glog.V(2).Infof("Found object with active lock in versions: %s/%s", subDir, versionEntry.Name) - return errStopPagination - } - } - return nil - }) - if err != nil { - return err - } - } else { - // Recursively check other subdirectories - subRelativePath := path.Join(relativePath, entry.Name) - if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil { - return err - } - // Early exit if a locked object was found in the subdirectory - if *hasLocks { - return errStopPagination - } - } - } else { - // Check regular files for locks - if s3a.entryHasActiveLock(entry, currentTime) { - *hasLocks = true - objectPath := path.Join(relativePath, entry.Name) - glog.V(2).Infof("Found object with active lock: %s", objectPath) - return errStopPagination - } - } - } - return nil - }) - - return err -} - -// entryHasActiveLock checks if an entry has an active retention or legal hold -func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool { - if entry.Extended == nil { - return false - } - - // Check for active legal hold - if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists { - if string(legalHoldBytes) == s3_constants.LegalHoldOn { - return true - } - } - - // Check for active retention - if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists { - mode := string(modeBytes) - if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance { - // Check if retention is still active - if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists { - timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64) - if err != nil { - // Fail-safe: if we can't parse the retention date, assume the object is locked - // to prevent accidental data loss - glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", string(dateBytes), err) - return true - } - retainUntil := time.Unix(timestamp, 0) - if retainUntil.After(currentTime) { - return true - } - } - } - } - - return false -} - func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go index 0227151fe..ddd3201e9 100644 --- a/weed/shell/command_s3_bucket_delete.go +++ b/weed/shell/command_s3_bucket_delete.go @@ -4,10 +4,11 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "io" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_objectlock" ) func init() { @@ -55,9 +56,18 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer return fmt.Errorf("read buckets: %w", err) } + // Check if bucket has Object Lock enabled and if there are locked objects + ctx := context.Background() + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return s3_objectlock.CheckBucketForLockedObjects(ctx, client, filerBucketsPath, *bucketName) + }) + if err != nil { + return err + } + // delete the collection directly first err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ + _, err = client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{ Name: getCollectionName(commandEnv, *bucketName), }) return err @@ -66,6 +76,6 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer return } - return filer_pb.Remove(context.Background(), commandEnv, filerBucketsPath, *bucketName, false, true, true, false, nil) + return filer_pb.Remove(ctx, commandEnv, filerBucketsPath, *bucketName, false, true, true, false, nil) }