You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
535 lines
17 KiB
535 lines
17 KiB
package s3api
|
|
|
|
import (
|
|
"encoding/xml"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
)
|
|
|
|
// ObjectRetention represents S3 Object Retention configuration
|
|
type ObjectRetention struct {
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Retention"`
|
|
Mode string `xml:"Mode,omitempty"`
|
|
RetainUntilDate *time.Time `xml:"RetainUntilDate,omitempty"`
|
|
}
|
|
|
|
// ObjectLegalHold represents S3 Object Legal Hold configuration
|
|
type ObjectLegalHold struct {
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LegalHold"`
|
|
Status string `xml:"Status,omitempty"`
|
|
}
|
|
|
|
// ObjectLockConfiguration represents S3 Object Lock Configuration
|
|
type ObjectLockConfiguration struct {
|
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ObjectLockConfiguration"`
|
|
ObjectLockEnabled string `xml:"ObjectLockEnabled,omitempty"`
|
|
Rule *ObjectLockRule `xml:"Rule,omitempty"`
|
|
}
|
|
|
|
// ObjectLockRule represents an Object Lock Rule
|
|
type ObjectLockRule struct {
|
|
XMLName xml.Name `xml:"Rule"`
|
|
DefaultRetention *DefaultRetention `xml:"DefaultRetention,omitempty"`
|
|
}
|
|
|
|
// DefaultRetention represents default retention settings
|
|
type DefaultRetention struct {
|
|
XMLName xml.Name `xml:"DefaultRetention"`
|
|
Mode string `xml:"Mode,omitempty"`
|
|
Days int `xml:"Days,omitempty"`
|
|
Years int `xml:"Years,omitempty"`
|
|
}
|
|
|
|
// Custom time unmarshalling for AWS S3 ISO8601 format
|
|
func (or *ObjectRetention) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
|
type Alias ObjectRetention
|
|
aux := &struct {
|
|
*Alias
|
|
RetainUntilDate *string `xml:"RetainUntilDate,omitempty"`
|
|
}{
|
|
Alias: (*Alias)(or),
|
|
}
|
|
|
|
if err := d.DecodeElement(aux, &start); err != nil {
|
|
return err
|
|
}
|
|
|
|
if aux.RetainUntilDate != nil {
|
|
t, err := time.Parse(time.RFC3339, *aux.RetainUntilDate)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
or.RetainUntilDate = &t
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseXML is a generic helper function to parse XML from request body
|
|
func parseXML[T any](r *http.Request, result *T) error {
|
|
if r.Body == nil {
|
|
return fmt.Errorf("empty request body")
|
|
}
|
|
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
return fmt.Errorf("error reading request body: %v", err)
|
|
}
|
|
|
|
if err := xml.Unmarshal(body, result); err != nil {
|
|
return fmt.Errorf("error parsing XML: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// parseObjectRetention parses XML retention configuration from request body
|
|
func parseObjectRetention(r *http.Request) (*ObjectRetention, error) {
|
|
var retention ObjectRetention
|
|
if err := parseXML(r, &retention); err != nil {
|
|
return nil, err
|
|
}
|
|
return &retention, nil
|
|
}
|
|
|
|
// parseObjectLegalHold parses XML legal hold configuration from request body
|
|
func parseObjectLegalHold(r *http.Request) (*ObjectLegalHold, error) {
|
|
var legalHold ObjectLegalHold
|
|
if err := parseXML(r, &legalHold); err != nil {
|
|
return nil, err
|
|
}
|
|
return &legalHold, nil
|
|
}
|
|
|
|
// parseObjectLockConfiguration parses XML object lock configuration from request body
|
|
func parseObjectLockConfiguration(r *http.Request) (*ObjectLockConfiguration, error) {
|
|
var config ObjectLockConfiguration
|
|
if err := parseXML(r, &config); err != nil {
|
|
return nil, err
|
|
}
|
|
return &config, nil
|
|
}
|
|
|
|
// validateRetention validates retention configuration
|
|
func validateRetention(retention *ObjectRetention) error {
|
|
// AWS requires both Mode and RetainUntilDate for PutObjectRetention
|
|
if retention.Mode == "" {
|
|
return fmt.Errorf("retention configuration must specify Mode")
|
|
}
|
|
|
|
if retention.RetainUntilDate == nil {
|
|
return fmt.Errorf("retention configuration must specify RetainUntilDate")
|
|
}
|
|
|
|
if retention.Mode != s3_constants.RetentionModeGovernance && retention.Mode != s3_constants.RetentionModeCompliance {
|
|
return fmt.Errorf("invalid retention mode: %s", retention.Mode)
|
|
}
|
|
|
|
if retention.RetainUntilDate.Before(time.Now()) {
|
|
return fmt.Errorf("retain until date must be in the future")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// validateLegalHold validates legal hold configuration
|
|
func validateLegalHold(legalHold *ObjectLegalHold) error {
|
|
if legalHold.Status != s3_constants.LegalHoldOn && legalHold.Status != s3_constants.LegalHoldOff {
|
|
return fmt.Errorf("invalid legal hold status: %s", legalHold.Status)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// validateObjectLockConfiguration validates object lock configuration
|
|
func validateObjectLockConfiguration(config *ObjectLockConfiguration) error {
|
|
// Validate ObjectLockEnabled if present
|
|
if config.ObjectLockEnabled != "" && config.ObjectLockEnabled != s3_constants.ObjectLockEnabled {
|
|
return fmt.Errorf("invalid object lock enabled value: %s", config.ObjectLockEnabled)
|
|
}
|
|
|
|
// Validate Rule if present
|
|
if config.Rule != nil {
|
|
if config.Rule.DefaultRetention == nil {
|
|
return fmt.Errorf("rule configuration must specify DefaultRetention")
|
|
}
|
|
return validateDefaultRetention(config.Rule.DefaultRetention)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// validateDefaultRetention validates default retention configuration
|
|
func validateDefaultRetention(retention *DefaultRetention) error {
|
|
// Mode is required
|
|
if retention.Mode == "" {
|
|
return fmt.Errorf("default retention must specify Mode")
|
|
}
|
|
|
|
// Mode must be valid
|
|
if retention.Mode != s3_constants.RetentionModeGovernance && retention.Mode != s3_constants.RetentionModeCompliance {
|
|
return fmt.Errorf("invalid default retention mode: %s", retention.Mode)
|
|
}
|
|
|
|
// Exactly one of Days or Years must be specified
|
|
if retention.Days == 0 && retention.Years == 0 {
|
|
return fmt.Errorf("default retention must specify either Days or Years")
|
|
}
|
|
|
|
if retention.Days > 0 && retention.Years > 0 {
|
|
return fmt.Errorf("default retention cannot specify both Days and Years")
|
|
}
|
|
|
|
// Validate ranges
|
|
if retention.Days < 0 || retention.Days > 36500 {
|
|
return fmt.Errorf("default retention days must be between 0 and 36500")
|
|
}
|
|
|
|
if retention.Years < 0 || retention.Years > 100 {
|
|
return fmt.Errorf("default retention years must be between 0 and 100")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getObjectRetention retrieves retention configuration from object metadata
|
|
func (s3a *S3ApiServer) getObjectRetention(bucket, object, versionId string) (*ObjectRetention, error) {
|
|
var entry *filer_pb.Entry
|
|
var err error
|
|
|
|
if versionId != "" {
|
|
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
|
} else {
|
|
// Check if versioning is enabled
|
|
versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
|
|
if vErr != nil {
|
|
return nil, fmt.Errorf("error checking versioning: %v", vErr)
|
|
}
|
|
|
|
if versioningEnabled {
|
|
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
|
} else {
|
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
|
entry, err = s3a.getEntry(bucketDir, object)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("object not found: %v", err)
|
|
}
|
|
|
|
if entry.Extended == nil {
|
|
return nil, fmt.Errorf("no retention configuration found")
|
|
}
|
|
|
|
retention := &ObjectRetention{}
|
|
|
|
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
|
|
retention.Mode = string(modeBytes)
|
|
}
|
|
|
|
if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists {
|
|
if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil {
|
|
t := time.Unix(timestamp, 0)
|
|
retention.RetainUntilDate = &t
|
|
}
|
|
}
|
|
|
|
if retention.Mode == "" || retention.RetainUntilDate == nil {
|
|
return nil, fmt.Errorf("no retention configuration found")
|
|
}
|
|
|
|
return retention, nil
|
|
}
|
|
|
|
// setObjectRetention sets retention configuration on object metadata
|
|
func (s3a *S3ApiServer) setObjectRetention(bucket, object, versionId string, retention *ObjectRetention, bypassGovernance bool) error {
|
|
var entry *filer_pb.Entry
|
|
var err error
|
|
var entryPath string
|
|
|
|
if versionId != "" {
|
|
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
|
if err != nil {
|
|
return fmt.Errorf("version not found: %v", err)
|
|
}
|
|
// For versioned objects, we need to update the version file
|
|
entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
|
|
} else {
|
|
// Check if versioning is enabled
|
|
versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
|
|
if vErr != nil {
|
|
return fmt.Errorf("error checking versioning: %v", vErr)
|
|
}
|
|
|
|
if versioningEnabled {
|
|
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
|
if err != nil {
|
|
return fmt.Errorf("latest version not found: %v", err)
|
|
}
|
|
// Extract version ID from entry metadata
|
|
if entry.Extended != nil {
|
|
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
|
|
versionId = string(versionIdBytes)
|
|
entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
|
|
}
|
|
}
|
|
} else {
|
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
|
entry, err = s3a.getEntry(bucketDir, object)
|
|
if err != nil {
|
|
return fmt.Errorf("object not found: %v", err)
|
|
}
|
|
entryPath = object
|
|
}
|
|
}
|
|
|
|
// Check if object is already under retention
|
|
if entry.Extended != nil {
|
|
if existingMode, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
|
|
if string(existingMode) == s3_constants.RetentionModeCompliance && !bypassGovernance {
|
|
return fmt.Errorf("cannot modify retention on object under COMPLIANCE mode")
|
|
}
|
|
|
|
if existingDateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
|
|
if timestamp, err := strconv.ParseInt(string(existingDateBytes), 10, 64); err == nil {
|
|
existingDate := time.Unix(timestamp, 0)
|
|
if existingDate.After(time.Now()) && string(existingMode) == s3_constants.RetentionModeGovernance && !bypassGovernance {
|
|
return fmt.Errorf("cannot modify retention on object under GOVERNANCE mode without bypass")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update retention metadata
|
|
if entry.Extended == nil {
|
|
entry.Extended = make(map[string][]byte)
|
|
}
|
|
|
|
if retention.Mode != "" {
|
|
entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(retention.Mode)
|
|
}
|
|
|
|
if retention.RetainUntilDate != nil {
|
|
entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(retention.RetainUntilDate.Unix(), 10))
|
|
|
|
// Also update the existing WORM fields for compatibility
|
|
entry.WormEnforcedAtTsNs = time.Now().UnixNano()
|
|
}
|
|
|
|
// Update the entry
|
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
|
return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
|
updatedEntry.Extended = entry.Extended
|
|
updatedEntry.WormEnforcedAtTsNs = entry.WormEnforcedAtTsNs
|
|
})
|
|
}
|
|
|
|
// getObjectLegalHold retrieves legal hold configuration from object metadata
|
|
func (s3a *S3ApiServer) getObjectLegalHold(bucket, object, versionId string) (*ObjectLegalHold, error) {
|
|
var entry *filer_pb.Entry
|
|
var err error
|
|
|
|
if versionId != "" {
|
|
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
|
} else {
|
|
// Check if versioning is enabled
|
|
versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
|
|
if vErr != nil {
|
|
return nil, fmt.Errorf("error checking versioning: %v", vErr)
|
|
}
|
|
|
|
if versioningEnabled {
|
|
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
|
} else {
|
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
|
entry, err = s3a.getEntry(bucketDir, object)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("object not found: %v", err)
|
|
}
|
|
|
|
if entry.Extended == nil {
|
|
return nil, fmt.Errorf("no legal hold configuration found")
|
|
}
|
|
|
|
legalHold := &ObjectLegalHold{}
|
|
|
|
if statusBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
|
|
legalHold.Status = string(statusBytes)
|
|
} else {
|
|
return nil, fmt.Errorf("no legal hold configuration found")
|
|
}
|
|
|
|
return legalHold, nil
|
|
}
|
|
|
|
// setObjectLegalHold sets legal hold configuration on object metadata
|
|
func (s3a *S3ApiServer) setObjectLegalHold(bucket, object, versionId string, legalHold *ObjectLegalHold) error {
|
|
var entry *filer_pb.Entry
|
|
var err error
|
|
var entryPath string
|
|
|
|
if versionId != "" {
|
|
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
|
|
if err != nil {
|
|
return fmt.Errorf("version not found: %v", err)
|
|
}
|
|
entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
|
|
} else {
|
|
// Check if versioning is enabled
|
|
versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
|
|
if vErr != nil {
|
|
return fmt.Errorf("error checking versioning: %v", vErr)
|
|
}
|
|
|
|
if versioningEnabled {
|
|
entry, err = s3a.getLatestObjectVersion(bucket, object)
|
|
if err != nil {
|
|
return fmt.Errorf("latest version not found: %v", err)
|
|
}
|
|
// Extract version ID from entry metadata
|
|
if entry.Extended != nil {
|
|
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
|
|
versionId = string(versionIdBytes)
|
|
entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
|
|
}
|
|
}
|
|
} else {
|
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
|
entry, err = s3a.getEntry(bucketDir, object)
|
|
if err != nil {
|
|
return fmt.Errorf("object not found: %v", err)
|
|
}
|
|
entryPath = object
|
|
}
|
|
}
|
|
|
|
// Update legal hold metadata
|
|
if entry.Extended == nil {
|
|
entry.Extended = make(map[string][]byte)
|
|
}
|
|
|
|
entry.Extended[s3_constants.ExtLegalHoldKey] = []byte(legalHold.Status)
|
|
|
|
// Update the entry
|
|
bucketDir := s3a.option.BucketsPath + "/" + bucket
|
|
return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
|
|
updatedEntry.Extended = entry.Extended
|
|
})
|
|
}
|
|
|
|
// isObjectRetentionActive checks if an object is currently under retention
|
|
func (s3a *S3ApiServer) isObjectRetentionActive(bucket, object, versionId string) (bool, error) {
|
|
retention, err := s3a.getObjectRetention(bucket, object, versionId)
|
|
if err != nil {
|
|
// If no retention found, object is not under retention
|
|
if strings.Contains(err.Error(), "no retention configuration found") {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
|
|
if retention.RetainUntilDate != nil && retention.RetainUntilDate.After(time.Now()) {
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// isObjectLegalHoldActive checks if an object is currently under legal hold
|
|
func (s3a *S3ApiServer) isObjectLegalHoldActive(bucket, object, versionId string) (bool, error) {
|
|
legalHold, err := s3a.getObjectLegalHold(bucket, object, versionId)
|
|
if err != nil {
|
|
// If no legal hold found, object is not under legal hold
|
|
if strings.Contains(err.Error(), "no legal hold configuration found") {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
|
|
return legalHold.Status == s3_constants.LegalHoldOn, nil
|
|
}
|
|
|
|
// checkObjectLockPermissions checks if an object can be deleted or modified
|
|
func (s3a *S3ApiServer) checkObjectLockPermissions(bucket, object, versionId string, bypassGovernance bool) error {
|
|
// Check if object is under retention
|
|
retentionActive, err := s3a.isObjectRetentionActive(bucket, object, versionId)
|
|
if err != nil {
|
|
glog.Warningf("Error checking retention for %s/%s: %v", bucket, object, err)
|
|
}
|
|
|
|
// Check if object is under legal hold
|
|
legalHoldActive, err := s3a.isObjectLegalHoldActive(bucket, object, versionId)
|
|
if err != nil {
|
|
glog.Warningf("Error checking legal hold for %s/%s: %v", bucket, object, err)
|
|
}
|
|
|
|
// If object is under legal hold, it cannot be deleted or modified
|
|
if legalHoldActive {
|
|
return fmt.Errorf("object is under legal hold and cannot be deleted or modified")
|
|
}
|
|
|
|
// If object is under retention, check the mode
|
|
if retentionActive {
|
|
retention, err := s3a.getObjectRetention(bucket, object, versionId)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting retention configuration: %v", err)
|
|
}
|
|
|
|
if retention.Mode == s3_constants.RetentionModeCompliance {
|
|
return fmt.Errorf("object is under COMPLIANCE mode retention and cannot be deleted or modified")
|
|
}
|
|
|
|
if retention.Mode == s3_constants.RetentionModeGovernance && !bypassGovernance {
|
|
return fmt.Errorf("object is under GOVERNANCE mode retention and cannot be deleted or modified without bypass")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// isObjectLockAvailable checks if Object Lock features are available for the bucket
|
|
// Object Lock requires versioning to be enabled (AWS S3 requirement)
|
|
func (s3a *S3ApiServer) isObjectLockAvailable(bucket string) error {
|
|
versioningEnabled, err := s3a.isVersioningEnabled(bucket)
|
|
if err != nil {
|
|
if err == filer_pb.ErrNotFound {
|
|
return fmt.Errorf("bucket not found")
|
|
}
|
|
return fmt.Errorf("error checking versioning status: %v", err)
|
|
}
|
|
|
|
if !versioningEnabled {
|
|
return fmt.Errorf("object lock requires versioning to be enabled")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// checkObjectLockPermissionsForPut checks object lock permissions for PUT operations
|
|
// This is a shared helper to avoid code duplication in PUT handlers
|
|
func (s3a *S3ApiServer) checkObjectLockPermissionsForPut(bucket, object string, bypassGovernance bool, versioningEnabled bool) error {
|
|
// Object Lock only applies to versioned buckets (AWS S3 requirement)
|
|
if !versioningEnabled {
|
|
return nil
|
|
}
|
|
|
|
// For PUT operations, we check permissions on the current object (empty versionId)
|
|
if err := s3a.checkObjectLockPermissions(bucket, object, "", bypassGovernance); err != nil {
|
|
glog.V(2).Infof("checkObjectLockPermissionsForPut: object lock check failed for %s/%s: %v", bucket, object, err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|