Browse Source

s3api: return entry from enforceObjectLockProtections for reuse

Change enforceObjectLockProtections() to return the fetched entry along
with any error. This allows callers to reuse the entry and avoid
duplicate filer lookups.

The entry is returned as (*filer_pb.Entry, error):
- Returns (entry, nil) on success
- Returns (nil, nil) if object doesn't exist (not an error for deletes)
- Returns (entry, err) if object is locked (entry still returned)
- Returns (nil, err) on lookup failure
optimize-delete-lookups
chrislu 5 days ago
parent
commit
eb6a56b6c6
  1. 16
      weed/s3api/s3api_object_handlers_delete.go
  2. 5
      weed/s3api/s3api_object_handlers_put.go
  3. 18
      weed/s3api/s3api_object_retention.go

16
weed/s3api/s3api_object_handlers_delete.go

@ -55,8 +55,8 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// Delete specific version (same for both enabled and suspended)
// Check object lock permissions before deleting specific version
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, versionId, governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
if _, lockErr := s3a.enforceObjectLockProtections(r, bucket, object, versionId, governanceBypassAllowed); lockErr != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, lockErr)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
@ -93,8 +93,8 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// Check object lock permissions before deleting "null" version
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, "null", governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
if _, lockErr := s3a.enforceObjectLockProtections(r, bucket, object, "null", governanceBypassAllowed); lockErr != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, lockErr)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
@ -115,8 +115,8 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
// Handle regular delete (non-versioned)
// Check object lock permissions before deleting object
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
if _, lockErr := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); lockErr != nil {
glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, lockErr)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
@ -238,8 +238,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
if versioningConfigured {
// Validate governance bypass for this specific object
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object.Key)
if err := s3a.enforceObjectLockProtections(r, bucket, object.Key, object.VersionId, governanceBypassAllowed); err != nil {
glog.V(2).Infof("DeleteMultipleObjectsHandler: object lock check failed for %s/%s (version: %s): %v", bucket, object.Key, object.VersionId, err)
if _, lockErr := s3a.enforceObjectLockProtections(r, bucket, object.Key, object.VersionId, governanceBypassAllowed); lockErr != nil {
glog.V(2).Infof("DeleteMultipleObjectsHandler: object lock check failed for %s/%s (version: %s): %v", bucket, object.Key, object.VersionId, lockErr)
deleteErrors = append(deleteErrors, DeleteError{
Code: s3err.GetAPIError(s3err.ErrAccessDenied).Code,
Message: s3err.GetAPIError(s3err.ErrAccessDenied).Description,

5
weed/s3api/s3api_object_handlers_put.go

@ -170,8 +170,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// that would prevent overwrite (PUT operations overwrite existing objects in non-versioned buckets)
if !versioningConfigured {
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
glog.V(2).Infof("PutObjectHandler: object lock permissions check failed for %s/%s: %v", bucket, object, err)
// Note: We ignore the returned entry since PUT creates a new entry anyway
if _, lockErr := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); lockErr != nil {
glog.V(2).Infof("PutObjectHandler: object lock permissions check failed for %s/%s: %v", bucket, object, lockErr)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}

18
weed/s3api/s3api_object_retention.go

@ -584,8 +584,10 @@ func (s3a *S3ApiServer) evaluateGovernanceBypassRequest(r *http.Request, bucket,
return true
}
// enforceObjectLockProtections enforces object lock protections for operations
func (s3a *S3ApiServer) enforceObjectLockProtections(request *http.Request, bucket, object, versionId string, governanceBypassAllowed bool) error {
// enforceObjectLockProtections enforces object lock protections for operations.
// Returns the fetched entry (if found) to allow callers to reuse it, avoiding duplicate lookups.
// Returns (nil, nil) if object doesn't exist (not an error for delete operations).
func (s3a *S3ApiServer) enforceObjectLockProtections(request *http.Request, bucket, object, versionId string, governanceBypassAllowed bool) (*filer_pb.Entry, error) {
// Get the object entry to check both retention and legal hold
// For delete operations without versionId, we need to check the latest version
var entry *filer_pb.Entry
@ -604,10 +606,10 @@ func (s3a *S3ApiServer) enforceObjectLockProtections(request *http.Request, buck
if errors.Is(err, filer_pb.ErrNotFound) || errors.Is(err, ErrObjectNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrLatestVersionNotFound) {
// Object doesn't exist, so it can't be under retention or legal hold - this is normal
glog.V(4).Infof("Object %s/%s (versionId: %s) not found during object lock check (expected during delete operations)", bucket, object, versionId)
return nil
return nil, nil
}
glog.Warningf("Error retrieving object %s/%s (versionId: %s) for lock check: %v", bucket, object, versionId, err)
return err
return nil, err
}
// Extract retention information from the entry
@ -626,25 +628,25 @@ func (s3a *S3ApiServer) enforceObjectLockProtections(request *http.Request, buck
// If object is under legal hold, it cannot be deleted or modified (including delete marker creation)
if legalHoldActive {
return ErrObjectUnderLegalHold
return entry, ErrObjectUnderLegalHold
}
// If object is under retention, check the mode
if retentionActive && retention != nil {
if retention.Mode == s3_constants.RetentionModeCompliance {
return ErrComplianceModeActive
return entry, ErrComplianceModeActive
}
if retention.Mode == s3_constants.RetentionModeGovernance {
if !governanceBypassAllowed {
return ErrGovernanceModeActive
return entry, ErrGovernanceModeActive
}
// Note: governanceBypassAllowed parameter is already validated by evaluateGovernanceBypassRequest()
// which checks both header presence and IAM permissions, so we trust it here
}
}
return nil
return entry, nil
}
// ====================================================================

Loading…
Cancel
Save