Browse Source

sse kms

pull/7481/head
chrislu 3 weeks ago
parent
commit
8c96d1246f
  1. 6
      weed/s3api/s3api_object_handlers_multipart.go
  2. 4
      weed/s3api/s3api_object_handlers_postpolicy.go
  3. 120
      weed/s3api/s3api_object_handlers_put.go

6
weed/s3api/s3api_object_handlers_multipart.go

@ -401,7 +401,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d",
bucket, object, uploadID, partID, r.ContentLength)
etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, bucket, partID)
etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, partID)
if errCode != s3err.ErrNone {
glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d",
errCode, bucket, object, partID)
@ -410,12 +410,12 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
}
glog.V(2).Infof("PutObjectPart: SUCCESS - bucket=%s, object=%s, partNumber=%d, etag=%s, sseType=%s",
bucket, object, partID, etag, sseType)
bucket, object, partID, etag, sseMetadata.SSEType)
setEtag(w, etag)
// Set SSE response headers for multipart uploads
s3a.setSSEResponseHeaders(w, r, sseType)
s3a.setSSEResponseHeaders(w, r, sseMetadata)
writeSuccessResponseEmpty(w, r)

4
weed/s3api/s3api_object_handlers_postpolicy.go

@ -136,7 +136,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
}
}
etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, fileBody, bucket, 1)
etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, fileBody, bucket, 1)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
@ -153,7 +153,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
setEtag(w, etag)
// Include SSE response headers (important for bucket-default encryption)
s3a.setSSEResponseHeaders(w, r, sseType)
s3a.setSSEResponseHeaders(w, r, sseMetadata)
// Decide what http response to send depending on success_action_status parameter
switch successStatus {

120
weed/s3api/s3api_object_handlers_put.go

@ -63,6 +63,13 @@ type BucketDefaultEncryptionResult struct {
SSEKMSKey *SSEKMSKey
}
// SSEResponseMetadata holds encryption metadata needed for HTTP response headers
type SSEResponseMetadata struct {
SSEType string
KMSKeyID string
BucketKeyEnabled bool
}
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
@ -162,7 +169,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
case s3_constants.VersioningEnabled:
// Handle enabled versioning - create new versions with real version IDs
glog.V(3).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object)
versionId, etag, errCode, sseType := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
versionId, etag, errCode, sseMetadata := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object)
s3err.WriteErrorResponse(w, r, errCode)
@ -183,12 +190,12 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
setEtag(w, etag)
// Set SSE response headers for versioned objects
s3a.setSSEResponseHeaders(w, r, sseType)
s3a.setSSEResponseHeaders(w, r, sseMetadata)
case s3_constants.VersioningSuspended:
// Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
glog.V(3).Infof("PutObjectHandler: SUSPENDED versioning detected for %s/%s, calling putSuspendedVersioningObject", bucket, object)
etag, errCode, sseType := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
etag, errCode, sseMetadata := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
@ -201,7 +208,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
setEtag(w, etag)
// Set SSE response headers for suspended versioning
s3a.setSSEResponseHeaders(w, r, sseType)
s3a.setSSEResponseHeaders(w, r, sseMetadata)
default:
// Handle regular PUT (never configured versioning)
uploadUrl := s3a.toFilerUrl(bucket, object)
@ -209,7 +216,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
dataReader = mimeDetect(r, dataReader)
}
etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1)
etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
@ -220,7 +227,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
setEtag(w, etag)
// Set SSE response headers based on encryption type used
s3a.setSSEResponseHeaders(w, r, sseType)
s3a.setSSEResponseHeaders(w, r, sseMetadata)
}
}
stats_collect.RecordBucketActiveTime(bucket)
@ -229,7 +236,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
writeSuccessResponseEmpty(w, r)
}
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) {
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
// This eliminates the filer proxy overhead for PUT operations
@ -240,7 +247,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
// Handle all SSE encryption types in a unified manner
sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset)
if sseErrorCode != s3err.ErrNone {
return "", sseErrorCode, ""
return "", sseErrorCode, SSEResponseMetadata{}
}
// Extract results from unified SSE handling
@ -261,7 +268,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader)
if applyErr != nil {
glog.Errorf("Failed to apply bucket default encryption: %v", applyErr)
return "", s3err.ErrInternalError, ""
return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Update variables based on the result
@ -275,7 +282,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key)
if metaErr != nil {
glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr)
return "", s3err.ErrInternalError, ""
return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
}
} else {
@ -288,28 +295,28 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
parsedUrl, parseErr := url.Parse(uploadUrl)
if parseErr != nil {
glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr)
return "", s3err.ErrInternalError, ""
return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// URL-decode the path to get the actual file path
// This is critical because toFilerUrl() encodes special characters like (, ), etc.
decodedPath, decodeErr := url.PathUnescape(parsedUrl.Path)
if decodeErr != nil {
glog.Errorf("putToFiler: failed to decode path %q: %v", parsedUrl.Path, decodeErr)
return "", s3err.ErrInternalError, ""
return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
filePath := decodedPath
// Step 1 & 2: Use auto-chunking to handle large files without OOM
// This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers
const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard)
const smallFileLimit = 256 * 1024 // 256KB - store inline in filer
const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard)
const smallFileLimit = 256 * 1024 // 256KB - store inline in filer
collection := ""
if s3a.option.FilerGroup != "" {
collection = s3a.getCollectionName(bucket)
}
// Create assign function for chunked upload
assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) {
var assignResult *filer_pb.AssignVolumeResponse
@ -334,7 +341,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
if err != nil {
return nil, nil, err
}
// Convert filer_pb.AssignVolumeResponse to operation.AssignResult
return nil, &operation.AssignResult{
Fid: assignResult.FileId,
@ -344,7 +351,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
Auth: security.EncodedJwt(assignResult.Auth),
}, nil
}
// Upload with auto-chunking
chunkResult, err := operation.UploadReaderInChunks(r.Context(), dataReader, &operation.ChunkedUploadOption{
ChunkSize: chunkSize,
@ -358,18 +365,18 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
if err != nil {
glog.Errorf("putToFiler: chunked upload failed: %v", err)
if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
return "", s3err.ErrInvalidDigest, ""
return "", s3err.ErrInvalidDigest, SSEResponseMetadata{}
}
return "", s3err.ErrInternalError, ""
return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Step 3: Calculate ETag and add SSE metadata to chunks
md5Sum := chunkResult.Md5Hash.Sum(nil)
etag = fmt.Sprintf("%x", md5Sum)
glog.V(3).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d, etag=%s",
filePath, len(chunkResult.FileChunks), chunkResult.TotalSize, etag)
// Add SSE metadata to all chunks if present
if customerKey != nil {
// SSE-C: Create per-chunk metadata (matches filer logic)
@ -508,7 +515,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
})
if createErr != nil {
glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr)
return "", filerErrorToS3Error(createErr.Error()), ""
return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{}
}
glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath)
@ -517,8 +524,20 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
BucketTrafficReceived(chunkResult.TotalSize, r)
// Return the SSE type determined by the unified handler
return etag, s3err.ErrNone, sseResult.SSEType
// Build SSE response metadata with encryption details
responseMetadata := SSEResponseMetadata{
SSEType: sseResult.SSEType,
}
// For SSE-KMS, include key ID and bucket-key-enabled flag from stored metadata
if sseKMSKey != nil {
responseMetadata.KMSKeyID = sseKMSKey.KeyID
responseMetadata.BucketKeyEnabled = sseKMSKey.BucketKeyEnabled
glog.V(4).Infof("putToFiler: returning SSE-KMS metadata - keyID=%s, bucketKeyEnabled=%v",
sseKMSKey.KeyID, sseKMSKey.BucketKeyEnabled)
}
return etag, s3err.ErrNone, responseMetadata
}
func setEtag(w http.ResponseWriter, etag string) {
@ -532,8 +551,8 @@ func setEtag(w http.ResponseWriter, etag string) {
}
// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type
func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseType string) {
switch sseType {
func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseMetadata SSEResponseMetadata) {
switch sseMetadata.SSEType {
case s3_constants.SSETypeS3:
// SSE-S3: Return the encryption algorithm
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
@ -550,10 +569,19 @@ func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Req
case s3_constants.SSETypeKMS:
// SSE-KMS: Return the KMS key ID and algorithm
w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms")
if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" {
// Use metadata from stored encryption config (for bucket-default encryption)
// or fall back to request headers (for explicit encryption)
if sseMetadata.KMSKeyID != "" {
w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, sseMetadata.KMSKeyID)
} else if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" {
w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID)
}
if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" {
// Set bucket-key-enabled header if it was enabled
if sseMetadata.BucketKeyEnabled {
w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
} else if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" {
w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
}
}
@ -621,7 +649,7 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_
//
// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory,
// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory.
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseType string) {
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// Normalize object path to ensure consistency with toFilerUrl behavior
normalizedObject := removeDuplicateSlashes(object)
@ -703,7 +731,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate)
if err != nil {
glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err)
return "", s3err.ErrInvalidRequest, ""
return "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10))
glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix())
@ -715,7 +743,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold)
} else {
glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold)
return "", s3err.ErrInvalidRequest, ""
return "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
}
@ -740,10 +768,10 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
if isTestObj {
glog.V(3).Infof("=== TESTOBJBAR: calling putToFiler ===")
}
etag, errCode, sseType = s3a.putToFiler(r, uploadUrl, body, bucket, 1)
etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
return "", errCode, ""
return "", errCode, SSEResponseMetadata{}
}
if isTestObj {
glog.V(3).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag)
@ -787,7 +815,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
if isTestObj {
glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===")
}
return etag, s3err.ErrNone, sseType
return etag, s3err.ErrNone, sseMetadata
}
// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
@ -859,7 +887,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object
return nil
}
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseType string) {
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// Generate version ID
versionId = generateVersionId()
@ -884,7 +912,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
})
if err != nil {
glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err)
return "", "", s3err.ErrInternalError, ""
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
hash := md5.New()
@ -895,10 +923,10 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
etag, errCode, sseType = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1)
etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
return "", "", errCode, ""
return "", "", errCode, SSEResponseMetadata{}
}
// Get the uploaded entry to add versioning metadata
@ -920,7 +948,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
if err != nil {
glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err)
return "", "", s3err.ErrInternalError, ""
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Add versioning metadata to this version
@ -941,7 +969,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// Extract and store object lock metadata from request headers
if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err)
return "", "", s3err.ErrInvalidRequest, ""
return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
// Update the version entry with metadata
@ -952,17 +980,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
})
if err != nil {
glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
return "", "", s3err.ErrInternalError, ""
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Update the .versions directory metadata to indicate this is the latest version
err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName)
if err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
return "", "", s3err.ErrInternalError, ""
return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject)
return versionId, etag, s3err.ErrNone, sseType
return versionId, etag, s3err.ErrNone, sseMetadata
}
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version

Loading…
Cancel
Save