Browse Source

read entry once

pull/6997/head
chrislu 5 months ago
parent
commit
9f8e0257b0
  1. 84
      weed/s3api/s3api_object_retention.go

84
weed/s3api/s3api_object_retention.go

@ -490,35 +490,52 @@ func (s3a *S3ApiServer) isObjectRetentionActive(bucket, object, versionId string
return false, nil
}
// getObjectRetentionWithStatus retrieves retention configuration and returns both the data and active status
// This is an optimization to avoid duplicate fetches when both retention data and status are needed
func (s3a *S3ApiServer) getObjectRetentionWithStatus(bucket, object, versionId string) (*ObjectRetention, bool, error) {
retention, err := s3a.getObjectRetention(bucket, object, versionId)
if err != nil {
// If no retention found, object is not under retention
if errors.Is(err, ErrNoRetentionConfiguration) {
return nil, false, nil
// getRetentionFromEntry extracts retention configuration from an existing entry
func (s3a *S3ApiServer) getRetentionFromEntry(entry *filer_pb.Entry) (*ObjectRetention, bool, error) {
if entry.Extended == nil {
return nil, false, nil
}
retention := &ObjectRetention{}
if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
retention.Mode = string(modeBytes)
}
if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists {
if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil {
t := time.Unix(timestamp, 0)
retention.RetainUntilDate = &t
} else {
return nil, false, fmt.Errorf("failed to parse retention timestamp: corrupted timestamp data")
}
return nil, false, err
}
if retention.Mode == "" || retention.RetainUntilDate == nil {
return nil, false, nil
}
// Check if retention is currently active
isActive := retention.RetainUntilDate != nil && retention.RetainUntilDate.After(time.Now())
isActive := retention.RetainUntilDate.After(time.Now())
return retention, isActive, nil
}
// isObjectLegalHoldActive checks if an object is currently under legal hold
func (s3a *S3ApiServer) isObjectLegalHoldActive(bucket, object, versionId string) (bool, error) {
legalHold, err := s3a.getObjectLegalHold(bucket, object, versionId)
if err != nil {
// If no legal hold found, object is not under legal hold
if errors.Is(err, ErrNoLegalHoldConfiguration) {
return false, nil
}
return false, err
// getLegalHoldFromEntry extracts legal hold configuration from an existing entry
func (s3a *S3ApiServer) getLegalHoldFromEntry(entry *filer_pb.Entry) (*ObjectLegalHold, bool, error) {
if entry.Extended == nil {
return nil, false, nil
}
return legalHold.Status == s3_constants.LegalHoldOn, nil
legalHold := &ObjectLegalHold{}
if statusBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
legalHold.Status = string(statusBytes)
} else {
return nil, false, nil
}
isActive := legalHold.Status == s3_constants.LegalHoldOn
return legalHold, isActive, nil
}
// checkGovernanceBypassPermission checks if the user has permission to bypass governance retention
@ -554,16 +571,31 @@ func (s3a *S3ApiServer) checkGovernanceBypassPermission(request *http.Request, b
// checkObjectLockPermissions checks if an object can be deleted or modified
func (s3a *S3ApiServer) checkObjectLockPermissions(request *http.Request, bucket, object, versionId string, bypassGovernance bool) error {
// Get retention configuration and status in a single call to avoid duplicate fetches
retention, retentionActive, err := s3a.getObjectRetentionWithStatus(bucket, object, versionId)
// Get the object entry once to check both retention and legal hold
entry, err := s3a.getObjectEntry(bucket, object, versionId)
if err != nil {
// If object doesn't exist, it's not under retention or legal hold - this is expected during delete operations
if errors.Is(err, ErrObjectNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrLatestVersionNotFound) {
// Object doesn't exist, so it can't be under retention or legal hold - this is normal
glog.V(4).Infof("Object %s/%s (versionId: %s) not found during object lock check (expected during delete operations)", bucket, object, versionId)
return nil
}
glog.Warningf("Error retrieving object %s/%s (versionId: %s) for lock check: %v", bucket, object, versionId, err)
return err
}
// Extract retention information from the entry
retention, retentionActive, err := s3a.getRetentionFromEntry(entry)
if err != nil {
glog.Warningf("Error checking retention for %s/%s: %v", bucket, object, err)
glog.Warningf("Error parsing retention for %s/%s (versionId: %s): %v", bucket, object, versionId, err)
// Continue with legal hold check even if retention parsing fails
}
// Check if object is under legal hold
legalHoldActive, err := s3a.isObjectLegalHoldActive(bucket, object, versionId)
// Extract legal hold information from the entry
_, legalHoldActive, err := s3a.getLegalHoldFromEntry(entry)
if err != nil {
glog.Warningf("Error checking legal hold for %s/%s: %v", bucket, object, err)
glog.Warningf("Error parsing legal hold for %s/%s (versionId: %s): %v", bucket, object, versionId, err)
// Continue with retention check even if legal hold parsing fails
}
// If object is under legal hold, it cannot be deleted or modified

Loading…
Cancel
Save