Browse Source

cache and use bucket object lock config

pull/6996/head
chrislu 5 months ago
parent
commit
7bec2ef94b
  1. 42
      weed/s3api/s3api_bucket_config.go
  2. 16
      weed/s3api/s3api_bucket_handlers.go
  3. 108
      weed/s3api/s3api_object_handlers_put.go
  4. 56
      weed/s3api/s3api_object_handlers_retention.go

42
weed/s3api/s3api_bucket_config.go

@ -2,6 +2,7 @@ package s3api
import ( import (
"encoding/json" "encoding/json"
"encoding/xml"
"fmt" "fmt"
"path/filepath" "path/filepath"
"strings" "strings"
@ -17,14 +18,15 @@ import (
// BucketConfig represents cached bucket configuration // BucketConfig represents cached bucket configuration
type BucketConfig struct { type BucketConfig struct {
Name string
Versioning string // "Enabled", "Suspended", or ""
Ownership string
ACL []byte
Owner string
CORS *cors.CORSConfiguration
LastModified time.Time
Entry *filer_pb.Entry
Name string
Versioning string // "Enabled", "Suspended", or ""
Ownership string
ACL []byte
Owner string
CORS *cors.CORSConfiguration
ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration
LastModified time.Time
Entry *filer_pb.Entry
} }
// BucketConfigCache provides caching for bucket configurations // BucketConfigCache provides caching for bucket configurations
@ -121,6 +123,16 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err
if owner, exists := bucketEntry.Extended[s3_constants.ExtAmzOwnerKey]; exists { if owner, exists := bucketEntry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
config.Owner = string(owner) config.Owner = string(owner)
} }
// Parse Object Lock configuration if present
if objectLockConfigXML, exists := bucketEntry.Extended[s3_constants.ExtObjectLockConfigKey]; exists {
var objectLockConfig ObjectLockConfiguration
if err := xml.Unmarshal(objectLockConfigXML, &objectLockConfig); err != nil {
glog.Errorf("getBucketConfig: failed to parse Object Lock configuration for bucket %s: %v", bucket, err)
} else {
config.ObjectLockConfig = &objectLockConfig
glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket)
}
}
} }
// Load CORS configuration from .s3metadata // Load CORS configuration from .s3metadata
@ -173,6 +185,20 @@ func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketC
if config.Owner != "" { if config.Owner != "" {
config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner) config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner)
} }
// Update Object Lock configuration in extended attributes
if config.ObjectLockConfig != nil {
configXML, err := xml.Marshal(config.ObjectLockConfig)
if err != nil {
glog.Errorf("updateBucketConfig: failed to marshal Object Lock configuration for bucket %s: %v", bucket, err)
return s3err.ErrInternalError
}
config.Entry.Extended[s3_constants.ExtObjectLockConfigKey] = configXML
// Also set the boolean flag for backward compatibility
if config.ObjectLockConfig.ObjectLockEnabled != "" {
config.Entry.Extended[s3_constants.ExtObjectLockEnabledKey] = []byte(config.ObjectLockConfig.ObjectLockEnabled)
}
}
// Save to filer // Save to filer
err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry) err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry)

16
weed/s3api/s3api_bucket_handlers.go

@ -147,25 +147,13 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
// Enable versioning (required for Object Lock) // Enable versioning (required for Object Lock)
bucketConfig.Versioning = s3_constants.VersioningEnabled bucketConfig.Versioning = s3_constants.VersioningEnabled
// Enable Object Lock configuration
if bucketConfig.Entry.Extended == nil {
bucketConfig.Entry.Extended = make(map[string][]byte)
}
// Create basic Object Lock configuration (enabled without default retention) // Create basic Object Lock configuration (enabled without default retention)
// The ObjectLockConfiguration struct is defined below in this file.
objectLockConfig := &ObjectLockConfiguration{ objectLockConfig := &ObjectLockConfiguration{
ObjectLockEnabled: s3_constants.ObjectLockEnabled, ObjectLockEnabled: s3_constants.ObjectLockEnabled,
} }
// Store the configuration as XML in extended attributes
configXML, err := xml.Marshal(objectLockConfig)
if err != nil {
return fmt.Errorf("failed to marshal Object Lock configuration to XML: %w", err)
}
bucketConfig.Entry.Extended[s3_constants.ExtObjectLockConfigKey] = configXML
bucketConfig.Entry.Extended[s3_constants.ExtObjectLockEnabledKey] = []byte(s3_constants.ObjectLockEnabled)
// Set the cached Object Lock configuration
bucketConfig.ObjectLockConfig = objectLockConfig
return nil return nil
}) })

108
weed/s3api/s3api_object_handlers_put.go

@ -12,12 +12,11 @@ import (
"time" "time"
"github.com/pquerna/cachecontrol/cacheobject" "github.com/pquerna/cachecontrol/cacheobject"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
weed_server "github.com/seaweedfs/seaweedfs/weed/server" weed_server "github.com/seaweedfs/seaweedfs/weed/server"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
) )
@ -374,28 +373,30 @@ func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId
} }
// extractObjectLockMetadataFromRequest extracts object lock headers from PUT requests // extractObjectLockMetadataFromRequest extracts object lock headers from PUT requests
// and stores them in the entry's Extended attributes
// and applies bucket default retention if no explicit retention is provided
func (s3a *S3ApiServer) extractObjectLockMetadataFromRequest(r *http.Request, entry *filer_pb.Entry) error { func (s3a *S3ApiServer) extractObjectLockMetadataFromRequest(r *http.Request, entry *filer_pb.Entry) error {
if entry.Extended == nil { if entry.Extended == nil {
entry.Extended = make(map[string][]byte) entry.Extended = make(map[string][]byte)
} }
// Extract object lock mode (GOVERNANCE or COMPLIANCE)
if mode := r.Header.Get(s3_constants.AmzObjectLockMode); mode != "" {
entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(mode)
glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing object lock mode: %s", mode)
// Extract explicit object lock mode (GOVERNANCE or COMPLIANCE)
explicitMode := r.Header.Get(s3_constants.AmzObjectLockMode)
if explicitMode != "" {
entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(explicitMode)
glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing explicit object lock mode: %s", explicitMode)
} }
// Extract retention until date
if retainUntilDate := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate); retainUntilDate != "" {
// Extract explicit retention until date
explicitRetainUntilDate := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate)
if explicitRetainUntilDate != "" {
// Parse the ISO8601 date and convert to Unix timestamp for storage // Parse the ISO8601 date and convert to Unix timestamp for storage
parsedTime, err := time.Parse(time.RFC3339, retainUntilDate)
parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate)
if err != nil { if err != nil {
glog.Errorf("extractObjectLockMetadataFromRequest: failed to parse retention until date, expected format: %s, error: %v", time.RFC3339, err) glog.Errorf("extractObjectLockMetadataFromRequest: failed to parse retention until date, expected format: %s, error: %v", time.RFC3339, err)
return ErrInvalidRetentionDateFormat return ErrInvalidRetentionDateFormat
} }
entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(parsedTime.Unix(), 10)) entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(parsedTime.Unix(), 10))
glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing retention until date (timestamp: %d)", parsedTime.Unix())
glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing explicit retention until date (timestamp: %d)", parsedTime.Unix())
} }
// Extract legal hold status // Extract legal hold status
@ -410,6 +411,89 @@ func (s3a *S3ApiServer) extractObjectLockMetadataFromRequest(r *http.Request, en
} }
} }
// Apply bucket default retention if no explicit retention was provided
// This implements AWS S3 behavior where bucket default retention automatically applies to new objects
if explicitMode == "" && explicitRetainUntilDate == "" {
bucket, _ := s3_constants.GetBucketAndObject(r)
if err := s3a.applyBucketDefaultRetention(bucket, entry); err != nil {
glog.V(2).Infof("extractObjectLockMetadataFromRequest: could not apply bucket default retention for %s: %v", bucket, err)
// Don't fail the upload if default retention can't be applied - this matches AWS behavior
}
}
return nil
}
// applyBucketDefaultRetention applies bucket default retention settings to a new object
// This implements AWS S3 behavior where bucket default retention automatically applies to new objects
// when no explicit retention headers are provided in the upload request
func (s3a *S3ApiServer) applyBucketDefaultRetention(bucket string, entry *filer_pb.Entry) error {
// Safety check - if bucket config cache is not available, skip default retention
if s3a.bucketConfigCache == nil {
return nil
}
// Try to get bucket configuration from cache first, then from filer
var bucketConfig *BucketConfig
var errCode s3err.ErrorCode
// Check cache first for performance
var found bool
bucketConfig, found = s3a.bucketConfigCache.Get(bucket)
if found {
errCode = s3err.ErrNone
} else {
// Fall back to filer if not in cache
bucketConfig, errCode = s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
return fmt.Errorf("failed to get bucket config: %v", errCode)
}
}
// Check if bucket has cached Object Lock configuration
if bucketConfig.ObjectLockConfig == nil {
return nil // No Object Lock configuration
}
objectLockConfig := bucketConfig.ObjectLockConfig
// Check if there's a default retention rule
if objectLockConfig.Rule == nil || objectLockConfig.Rule.DefaultRetention == nil {
return nil // No default retention configured
}
defaultRetention := objectLockConfig.Rule.DefaultRetention
// Validate default retention has required fields
if defaultRetention.Mode == "" {
return fmt.Errorf("default retention missing mode")
}
if defaultRetention.Days == 0 && defaultRetention.Years == 0 {
return fmt.Errorf("default retention missing period")
}
// Calculate retention until date based on default retention period
var retainUntilDate time.Time
now := time.Now()
if defaultRetention.Days > 0 {
retainUntilDate = now.AddDate(0, 0, defaultRetention.Days)
} else if defaultRetention.Years > 0 {
retainUntilDate = now.AddDate(defaultRetention.Years, 0, 0)
}
// Apply default retention to the object
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(defaultRetention.Mode)
entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(retainUntilDate.Unix(), 10))
glog.V(2).Infof("applyBucketDefaultRetention: applied default retention %s until %s for bucket %s",
defaultRetention.Mode, retainUntilDate.Format(time.RFC3339), bucket)
return nil return nil
} }

56
weed/s3api/s3api_object_handlers_retention.go

@ -275,22 +275,8 @@ func (s3a *S3ApiServer) PutObjectLockConfigurationHandler(w http.ResponseWriter,
// Set object lock configuration on the bucket // Set object lock configuration on the bucket
errCode := s3a.updateBucketConfig(bucket, func(bucketConfig *BucketConfig) error { errCode := s3a.updateBucketConfig(bucket, func(bucketConfig *BucketConfig) error {
if bucketConfig.Entry.Extended == nil {
bucketConfig.Entry.Extended = make(map[string][]byte)
}
// Store the configuration as JSON in extended attributes
configXML, err := xml.Marshal(config)
if err != nil {
return err
}
bucketConfig.Entry.Extended[s3_constants.ExtObjectLockConfigKey] = configXML
if config.ObjectLockEnabled != "" {
bucketConfig.Entry.Extended[s3_constants.ExtObjectLockEnabledKey] = []byte(config.ObjectLockEnabled)
}
// Set the cached Object Lock configuration
bucketConfig.ObjectLockConfig = config
return nil return nil
}) })
@ -324,19 +310,33 @@ func (s3a *S3ApiServer) GetObjectLockConfigurationHandler(w http.ResponseWriter,
var configXML []byte var configXML []byte
// Check if we have cached Object Lock configuration
if bucketConfig.ObjectLockConfig != nil {
// Use cached configuration and marshal it to XML for response
configXML, err := xml.Marshal(bucketConfig.ObjectLockConfig)
if err != nil {
glog.Errorf("GetObjectLockConfigurationHandler: failed to marshal cached Object Lock config: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Write XML response
w.Header().Set("Content-Type", "application/xml")
w.WriteHeader(http.StatusOK)
w.Write(configXML)
glog.V(3).Infof("GetObjectLockConfigurationHandler: successfully retrieved cached object lock config for %s", bucket)
return
}
// Fallback: check for legacy storage in extended attributes
if bucketConfig.Entry.Extended != nil { if bucketConfig.Entry.Extended != nil {
// First check if we have stored XML configuration (includes retention policies)
if storedConfigXML, exists := bucketConfig.Entry.Extended[s3_constants.ExtObjectLockConfigKey]; exists {
configXML = storedConfigXML
} else {
// Check if Object Lock is enabled via boolean flag
if enabledBytes, exists := bucketConfig.Entry.Extended[s3_constants.ExtObjectLockEnabledKey]; exists {
enabled := string(enabledBytes)
if enabled == s3_constants.ObjectLockEnabled || enabled == "true" {
// Generate minimal XML configuration for enabled Object Lock without retention policies
minimalConfig := `<ObjectLockConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><ObjectLockEnabled>Enabled</ObjectLockEnabled></ObjectLockConfiguration>`
configXML = []byte(minimalConfig)
}
// Check if Object Lock is enabled via boolean flag
if enabledBytes, exists := bucketConfig.Entry.Extended[s3_constants.ExtObjectLockEnabledKey]; exists {
enabled := string(enabledBytes)
if enabled == s3_constants.ObjectLockEnabled || enabled == "true" {
// Generate minimal XML configuration for enabled Object Lock without retention policies
minimalConfig := `<ObjectLockConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><ObjectLockEnabled>Enabled</ObjectLockEnabled></ObjectLockConfiguration>`
configXML = []byte(minimalConfig)
} }
} }
} }

Loading…
Cancel
Save