Browse Source

add basic object ACL (#7004)

* add back tests

* get put object acl

* check permission to put object acl

* rename file

* object list versions now contains owners

* set object owner

* refactoring

* Revert "add back tests"

This reverts commit 9adc507c45.
pull/7009/head
Chris Lu 3 months ago
committed by GitHub
parent
commit
377f1f24c7
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 30
      weed/s3api/filer_multipart.go
  2. 21
      weed/s3api/s3api_bucket_config.go
  3. 6
      weed/s3api/s3api_bucket_handlers.go
  4. 236
      weed/s3api/s3api_object_handlers_acl.go
  5. 5
      weed/s3api/s3api_object_handlers_multipart.go
  6. 30
      weed/s3api/s3api_object_handlers_put.go
  7. 21
      weed/s3api/s3api_object_handlers_skip.go
  8. 58
      weed/s3api/s3api_object_versioning.go

30
weed/s3api/filer_multipart.go

@ -51,6 +51,13 @@ func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateM
entry.Extended = make(map[string][]byte) entry.Extended = make(map[string][]byte)
} }
entry.Extended["key"] = []byte(*input.Key) entry.Extended["key"] = []byte(*input.Key)
// Set object owner for multipart upload
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range input.Metadata { for k, v := range input.Metadata {
entry.Extended[k] = []byte(*v) entry.Extended[k] = []byte(*v)
} }
@ -92,7 +99,7 @@ type CompleteMultipartUploadResult struct {
VersionId *string `xml:"-"` VersionId *string `xml:"-"`
} }
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("completeMultipartUpload input %v", input) glog.V(2).Infof("completeMultipartUpload input %v", input)
if len(parts.Parts) == 0 { if len(parts.Parts) == 0 {
@ -254,6 +261,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
} }
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Set object owner for versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
versionEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended { for k, v := range pentry.Extended {
if k != "key" { if k != "key" {
versionEntry.Extended[k] = v versionEntry.Extended[k] = v
@ -296,6 +310,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
entry.Extended = make(map[string][]byte) entry.Extended = make(map[string][]byte)
} }
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
// Set object owner for suspended versioning multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended { for k, v := range pentry.Extended {
if k != "key" { if k != "key" {
entry.Extended[k] = v entry.Extended[k] = v
@ -329,6 +350,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
entry.Extended = make(map[string][]byte) entry.Extended = make(map[string][]byte)
} }
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Set object owner for non-versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended { for k, v := range pentry.Extended {
if k != "key" { if k != "key" {
entry.Extended[k] = v entry.Extended[k] = v

21
weed/s3api/s3api_bucket_config.go

@ -98,10 +98,11 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err
return config, s3err.ErrNone return config, s3err.ErrNone
} }
// Load from filer
bucketEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
// Try to get from filer
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if err != nil { if err != nil {
if err == filer_pb.ErrNotFound {
if errors.Is(err, filer_pb.ErrNotFound) {
// Bucket doesn't exist
return nil, s3err.ErrNoSuchBucket return nil, s3err.ErrNoSuchBucket
} }
glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err) glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err)
@ -110,25 +111,25 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err
config := &BucketConfig{ config := &BucketConfig{
Name: bucket, Name: bucket,
Entry: bucketEntry,
Entry: entry,
} }
// Extract configuration from extended attributes // Extract configuration from extended attributes
if bucketEntry.Extended != nil {
if versioning, exists := bucketEntry.Extended[s3_constants.ExtVersioningKey]; exists {
if entry.Extended != nil {
if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
config.Versioning = string(versioning) config.Versioning = string(versioning)
} }
if ownership, exists := bucketEntry.Extended[s3_constants.ExtOwnershipKey]; exists {
if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists {
config.Ownership = string(ownership) config.Ownership = string(ownership)
} }
if acl, exists := bucketEntry.Extended[s3_constants.ExtAmzAclKey]; exists {
if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists {
config.ACL = acl config.ACL = acl
} }
if owner, exists := bucketEntry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
config.Owner = string(owner) config.Owner = string(owner)
} }
// Parse Object Lock configuration if present // Parse Object Lock configuration if present
if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(bucketEntry); found {
if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found {
config.ObjectLockConfig = objectLockConfig config.ObjectLockConfig = objectLockConfig
glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket) glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket)
} }

6
weed/s3api/s3api_bucket_handlers.go

@ -230,7 +230,7 @@ func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request
bucket, _ := s3_constants.GetBucketAndObject(r) bucket, _ := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("HeadBucketHandler %s", bucket) glog.V(3).Infof("HeadBucketHandler %s", bucket)
if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); entry == nil || err == filer_pb.ErrNotFound {
if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); entry == nil || errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return return
} }
@ -240,7 +240,7 @@ func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request
func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode { func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode {
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if entry == nil || err == filer_pb.ErrNotFound {
if entry == nil || errors.Is(err, filer_pb.ErrNotFound) {
return s3err.ErrNoSuchBucket return s3err.ErrNoSuchBucket
} }
@ -669,7 +669,7 @@ func (s3a *S3ApiServer) DeleteBucketOwnershipControls(w http.ResponseWriter, r *
bucketEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) bucketEntry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
if err != nil { if err != nil {
if err == filer_pb.ErrNotFound {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return return
} }

236
weed/s3api/s3api_object_handlers_acl.go

@ -0,0 +1,236 @@
package s3api
import (
"context"
"errors"
"fmt"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// GetObjectAclHandler Get object ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAcl.html
func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetObjectAclHandler %s %s", bucket, object)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
// Check if object exists and get its metadata
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err := s3a.getEntry(bucketDir, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
glog.Errorf("GetObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if entry == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
// Get object owner from metadata, fallback to request account
var objectOwner string
var objectOwnerDisplayName string
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if entry.Extended != nil {
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
objectOwner = string(ownerBytes)
}
}
// Fallback to current account if no owner stored
if objectOwner == "" {
objectOwner = amzAccountId
}
objectOwnerDisplayName = s3a.iam.GetAccountNameById(objectOwner)
// Build ACL response
response := AccessControlPolicy{
Owner: CanonicalUser{
ID: objectOwner,
DisplayName: objectOwnerDisplayName,
},
}
// Get grants from stored ACL metadata
grants := GetAcpGrants(entry.Extended)
if len(grants) > 0 {
// Convert AWS SDK grants to local Grant format
for _, grant := range grants {
localGrant := Grant{
Permission: Permission(*grant.Permission),
}
if grant.Grantee != nil {
localGrant.Grantee = Grantee{
Type: *grant.Grantee.Type,
XMLXSI: "CanonicalUser",
XMLNS: "http://www.w3.org/2001/XMLSchema-instance",
}
if grant.Grantee.ID != nil {
localGrant.Grantee.ID = *grant.Grantee.ID
localGrant.Grantee.DisplayName = s3a.iam.GetAccountNameById(*grant.Grantee.ID)
}
if grant.Grantee.URI != nil {
localGrant.Grantee.URI = *grant.Grantee.URI
}
}
response.AccessControlList.Grant = append(response.AccessControlList.Grant, localGrant)
}
} else {
// Fallback to default full control for object owner
response.AccessControlList.Grant = append(response.AccessControlList.Grant, Grant{
Grantee: Grantee{
ID: objectOwner,
DisplayName: objectOwnerDisplayName,
Type: "CanonicalUser",
XMLXSI: "CanonicalUser",
XMLNS: "http://www.w3.org/2001/XMLSchema-instance"},
Permission: Permission(s3_constants.PermissionFullControl),
})
}
writeSuccessResponseXML(w, r, response)
}
// PutObjectAclHandler Put object ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectAcl.html
func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Request) {
// collect parameters
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("PutObjectAclHandler %s %s", bucket, object)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, err)
return
}
// Check if object exists and get its metadata
bucketDir := s3a.option.BucketsPath + "/" + bucket
entry, err := s3a.getEntry(bucketDir, object)
if err != nil {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
glog.Errorf("PutObjectAclHandler: error checking object %s/%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if entry == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
// Get current object owner from metadata
var objectOwner string
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if entry.Extended != nil {
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
objectOwner = string(ownerBytes)
}
}
// Fallback to current account if no owner stored
if objectOwner == "" {
objectOwner = amzAccountId
}
// **PERMISSION CHECKS**
// 1. Check if user is admin (admins can modify any ACL)
if !s3a.isUserAdmin(r) {
// 2. Check object ownership - only object owner can modify ACL (unless admin)
if objectOwner != amzAccountId {
glog.V(3).Infof("PutObjectAclHandler: Access denied - user %s is not owner of object %s/%s (owner: %s)",
amzAccountId, bucket, object, objectOwner)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
// 3. Check object-level WRITE_ACP permission
// Create the specific action for this object
writeAcpAction := Action(fmt.Sprintf("WriteAcp:%s/%s", bucket, object))
identity, errCode := s3a.iam.authRequest(r, writeAcpAction)
if errCode != s3err.ErrNone {
glog.V(3).Infof("PutObjectAclHandler: Auth failed for WriteAcp action on %s/%s: %v", bucket, object, errCode)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
// 4. Verify the authenticated identity can perform WriteAcp on this specific object
if identity == nil || !identity.canDo(writeAcpAction, bucket, object) {
glog.V(3).Infof("PutObjectAclHandler: Identity %v cannot perform WriteAcp on %s/%s", identity, bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return
}
} else {
glog.V(3).Infof("PutObjectAclHandler: Admin user %s granted ACL modification permission for %s/%s", amzAccountId, bucket, object)
}
// Get bucket config for ownership settings
bucketConfig, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
bucketOwnership := bucketConfig.Ownership
bucketOwnerId := bucketConfig.Owner
// Extract ACL from request (either canned ACL or XML body)
// This function also validates that the owner in the request matches the object owner
grants, errCode := ExtractAcl(r, s3a.iam, bucketOwnership, bucketOwnerId, objectOwner, amzAccountId)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
// Store ACL in object metadata
if errCode := AssembleEntryWithAcp(entry, objectOwner, grants); errCode != s3err.ErrNone {
glog.Errorf("PutObjectAclHandler: failed to assemble entry with ACP: %v", errCode)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
// Update the object with new ACL metadata
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: bucketDir,
Entry: entry,
}
if _, err := client.UpdateEntry(context.Background(), request); err != nil {
return err
}
return nil
})
if err != nil {
glog.Errorf("PutObjectAclHandler: failed to update entry: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
glog.V(3).Infof("PutObjectAclHandler: Successfully updated ACL for %s/%s by user %s", bucket, object, amzAccountId)
writeSuccessResponseEmpty(w, r)
}

5
weed/s3api/s3api_object_handlers_multipart.go

@ -3,6 +3,7 @@ package s3api
import ( import (
"crypto/sha1" "crypto/sha1"
"encoding/xml" "encoding/xml"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -41,7 +42,7 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
// Check if versioning is enabled for the bucket (needed for object lock) // Check if versioning is enabled for the bucket (needed for object lock)
versioningEnabled, err := s3a.isVersioningEnabled(bucket) versioningEnabled, err := s3a.isVersioningEnabled(bucket)
if err != nil { if err != nil {
if err == filer_pb.ErrNotFound {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return return
} }
@ -111,7 +112,7 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
return return
} }
response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
response, errCode := s3a.completeMultipartUpload(r, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
Key: objectKey(aws.String(object)), Key: objectKey(aws.String(object)),
UploadId: aws.String(uploadID), UploadId: aws.String(uploadID),

30
weed/s3api/s3api_object_handlers_put.go

@ -98,7 +98,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// Get detailed versioning state for the bucket // Get detailed versioning state for the bucket
versioningState, err := s3a.getVersioningState(bucket) versioningState, err := s3a.getVersioningState(bucket)
if err != nil { if err != nil {
if err == filer_pb.ErrNotFound {
if errors.Is(err, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return return
} }
@ -213,6 +213,14 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
proxyReq.Header.Add(header, value) proxyReq.Header.Add(header, value)
} }
} }
// Set object owner header for filer to extract
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId)
glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl)
}
// ensure that the Authorization header is overriding any previous // ensure that the Authorization header is overriding any previous
// Authorization header which might be already present in proxyReq // Authorization header which might be already present in proxyReq
s3a.maybeAddFilerJwtAuthorization(proxyReq, true) s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
@ -244,8 +252,8 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
glog.Errorf("upload to filer error: %v", ret.Error) glog.Errorf("upload to filer error: %v", ret.Error)
return "", filerErrorToS3Error(ret.Error) return "", filerErrorToS3Error(ret.Error)
} }
stats_collect.RecordBucketActiveTime(bucket) stats_collect.RecordBucketActiveTime(bucket)
stats_collect.S3BucketTrafficReceivedBytesCounter.WithLabelValues(bucket).Add(float64(ret.Size))
return etag, s3err.ErrNone return etag, s3err.ErrNone
} }
@ -290,6 +298,18 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string
return string(encodedJwt) return string(encodedJwt)
} }
// setObjectOwnerFromRequest sets the object owner metadata based on the authenticated user
func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_pb.Entry) {
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
glog.V(2).Infof("setObjectOwnerFromRequest: set object owner to %s", amzAccountId)
}
}
// putVersionedObject handles PUT operations for versioned buckets using the new layout // putVersionedObject handles PUT operations for versioned buckets using the new layout
// where all versions (including latest) are stored in the .versions directory // where all versions (including latest) are stored in the .versions directory
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) { func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
@ -321,6 +341,9 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
} }
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
// Set object owner for suspended versioning objects
s3a.setObjectOwnerFromRequest(r, entry)
// Extract and store object lock metadata from request headers (if any) // Extract and store object lock metadata from request headers (if any)
if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil { if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
glog.Errorf("putSuspendedVersioningObject: failed to extract object lock metadata: %v", err) glog.Errorf("putSuspendedVersioningObject: failed to extract object lock metadata: %v", err)
@ -466,6 +489,9 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
} }
versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
// Set object owner for versioned objects
s3a.setObjectOwnerFromRequest(r, versionEntry)
// Extract and store object lock metadata from request headers // Extract and store object lock metadata from request headers
if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil { if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err) glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err)

21
weed/s3api/s3api_object_handlers_skip.go

@ -1,21 +0,0 @@
package s3api
import (
"net/http"
)
// GetObjectAclHandler Get object ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAcl.html
func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// PutObjectAclHandler Put object ACL
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectAcl.html
func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

58
weed/s3api/s3api_object_versioning.go

@ -278,7 +278,7 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
VersionId: version.VersionId, VersionId: version.VersionId,
IsLatest: version.IsLatest, IsLatest: version.IsLatest,
LastModified: version.LastModified, LastModified: version.LastModified,
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
Owner: s3a.getObjectOwnerFromVersion(version, bucket, objectKey),
} }
*allVersions = append(*allVersions, deleteMarker) *allVersions = append(*allVersions, deleteMarker)
} else { } else {
@ -289,7 +289,7 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
LastModified: version.LastModified, LastModified: version.LastModified,
ETag: version.ETag, ETag: version.ETag,
Size: version.Size, Size: version.Size,
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
Owner: s3a.getObjectOwnerFromVersion(version, bucket, objectKey),
StorageClass: "STANDARD", StorageClass: "STANDARD",
} }
*allVersions = append(*allVersions, versionEntry) *allVersions = append(*allVersions, versionEntry)
@ -339,7 +339,7 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
LastModified: time.Unix(entry.Attributes.Mtime, 0), LastModified: time.Unix(entry.Attributes.Mtime, 0),
ETag: etag, ETag: etag,
Size: int64(entry.Attributes.FileSize), Size: int64(entry.Attributes.FileSize),
Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
Owner: s3a.getObjectOwnerFromEntry(entry),
StorageClass: "STANDARD", StorageClass: "STANDARD",
} }
*allVersions = append(*allVersions, versionEntry) *allVersions = append(*allVersions, versionEntry)
@ -761,3 +761,55 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb
return latestVersionEntry, nil return latestVersionEntry, nil
} }
// getObjectOwnerFromVersion extracts object owner information from version entry metadata
func (s3a *S3ApiServer) getObjectOwnerFromVersion(version *ObjectVersion, bucket, objectKey string) CanonicalUser {
// First try to get owner from the version entry itself
if version.Entry != nil && version.Entry.Extended != nil {
if ownerBytes, exists := version.Entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
ownerId := string(ownerBytes)
ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
}
}
// Fallback: try to get owner from the current version of the object
// This handles cases where older versions might not have owner metadata
if version.VersionId == "null" {
// For null version, check the regular object file
bucketDir := s3a.option.BucketsPath + "/" + bucket
if entry, err := s3a.getEntry(bucketDir, objectKey); err == nil && entry.Extended != nil {
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
ownerId := string(ownerBytes)
ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
}
}
} else {
// For versioned objects, try to get from latest version metadata
if latestVersion, err := s3a.getLatestObjectVersion(bucket, objectKey); err == nil && latestVersion.Extended != nil {
if ownerBytes, exists := latestVersion.Extended[s3_constants.ExtAmzOwnerKey]; exists {
ownerId := string(ownerBytes)
ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
}
}
}
// Ultimate fallback: return anonymous if no owner found
return CanonicalUser{ID: s3_constants.AccountAnonymousId, DisplayName: "anonymous"}
}
// getObjectOwnerFromEntry extracts object owner information from a file entry
func (s3a *S3ApiServer) getObjectOwnerFromEntry(entry *filer_pb.Entry) CanonicalUser {
if entry != nil && entry.Extended != nil {
if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
ownerId := string(ownerBytes)
ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
}
}
// Fallback: return anonymous if no owner found
return CanonicalUser{ID: s3_constants.AccountAnonymousId, DisplayName: "anonymous"}
}
Loading…
Cancel
Save