You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2290 lines
80 KiB
2290 lines
80 KiB
package s3api
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"modernc.org/strutil"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/operation"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
|
)
|
|
|
|
const (
|
|
DirectiveCopy = "COPY"
|
|
DirectiveReplace = "REPLACE"
|
|
)
|
|
|
|
func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
|
dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
|
|
|
|
// Copy source path.
|
|
cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
|
|
if err != nil {
|
|
// Save unescaped string as is.
|
|
cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
|
|
}
|
|
|
|
srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath)
|
|
|
|
glog.V(3).Infof("CopyObjectHandler %s %s (version: %s) => %s %s", srcBucket, srcObject, srcVersionId, dstBucket, dstObject)
|
|
|
|
// Validate copy source and destination
|
|
if err := ValidateCopySource(cpSrcPath, srcBucket, srcObject); err != nil {
|
|
glog.V(2).Infof("CopyObjectHandler validation error: %v", err)
|
|
errCode := MapCopyValidationError(err)
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
if err := ValidateCopyDestination(dstBucket, dstObject); err != nil {
|
|
glog.V(2).Infof("CopyObjectHandler validation error: %v", err)
|
|
errCode := MapCopyValidationError(err)
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
replaceMeta, replaceTagging := replaceDirective(r.Header)
|
|
|
|
if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) {
|
|
fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
|
|
dir, name := fullPath.DirAndName()
|
|
entry, err := s3a.getEntry(dir, name)
|
|
if err != nil || entry.IsDirectory {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
|
return
|
|
}
|
|
entry.Extended, err = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
|
|
entry.Attributes.Mtime = time.Now().Unix()
|
|
if err != nil {
|
|
glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
|
|
return
|
|
}
|
|
err = s3a.touch(dir, name, entry)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
|
return
|
|
}
|
|
writeSuccessResponseXML(w, r, CopyObjectResult{
|
|
ETag: fmt.Sprintf("%x", entry.Attributes.Md5),
|
|
LastModified: time.Now().UTC(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// If source object is empty or bucket is empty, reply back invalid copy source.
|
|
if srcObject == "" || srcBucket == "" {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
|
return
|
|
}
|
|
|
|
// Get detailed versioning state for source bucket
|
|
srcVersioningState, err := s3a.getVersioningState(srcBucket)
|
|
if err != nil {
|
|
glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
|
return
|
|
}
|
|
|
|
// Get the source entry with version awareness based on versioning state
|
|
var entry *filer_pb.Entry
|
|
if srcVersionId != "" {
|
|
// Specific version requested - always use version-aware retrieval
|
|
entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId)
|
|
} else if srcVersioningState == s3_constants.VersioningEnabled {
|
|
// Versioning enabled - get latest version from .versions directory
|
|
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
|
|
} else if srcVersioningState == s3_constants.VersioningSuspended {
|
|
// Versioning suspended - current object is stored as regular file ("null" version)
|
|
// Try regular file first, fall back to latest version if needed
|
|
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
|
|
dir, name := srcPath.DirAndName()
|
|
entry, err = s3a.getEntry(dir, name)
|
|
if err != nil {
|
|
// If regular file doesn't exist, try latest version as fallback
|
|
glog.V(2).Infof("CopyObject: regular file not found for suspended versioning, trying latest version")
|
|
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
|
|
}
|
|
} else {
|
|
// No versioning configured - use regular retrieval
|
|
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
|
|
dir, name := srcPath.DirAndName()
|
|
entry, err = s3a.getEntry(dir, name)
|
|
}
|
|
|
|
if err != nil || entry.IsDirectory {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
|
return
|
|
}
|
|
|
|
if srcBucket == dstBucket && srcObject == dstObject {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopyDest)
|
|
return
|
|
}
|
|
|
|
// Validate conditional copy headers
|
|
if err := s3a.validateConditionalCopyHeaders(r, entry); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
// Validate encryption parameters
|
|
if err := ValidateCopyEncryption(entry.Extended, r.Header); err != nil {
|
|
glog.V(2).Infof("CopyObjectHandler encryption validation error: %v", err)
|
|
errCode := MapCopyValidationError(err)
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
// Create new entry for destination
|
|
dstEntry := &filer_pb.Entry{
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: entry.Attributes.FileSize,
|
|
Mtime: time.Now().Unix(),
|
|
Crtime: entry.Attributes.Crtime,
|
|
Mime: entry.Attributes.Mime,
|
|
},
|
|
Extended: make(map[string][]byte),
|
|
}
|
|
|
|
// Copy extended attributes from source, filtering out conflicting encryption metadata
|
|
for k, v := range entry.Extended {
|
|
// Skip encryption-specific headers that might conflict with destination encryption type
|
|
skipHeader := false
|
|
|
|
// If we're doing cross-encryption, skip conflicting headers
|
|
if len(entry.GetChunks()) > 0 {
|
|
// Detect source and destination encryption types
|
|
srcHasSSEC := IsSSECEncrypted(entry.Extended)
|
|
srcHasSSEKMS := IsSSEKMSEncrypted(entry.Extended)
|
|
srcHasSSES3 := IsSSES3EncryptedInternal(entry.Extended)
|
|
dstWantsSSEC := IsSSECRequest(r)
|
|
dstWantsSSEKMS := IsSSEKMSRequest(r)
|
|
dstWantsSSES3 := IsSSES3RequestInternal(r)
|
|
|
|
// Use helper function to determine if header should be skipped
|
|
skipHeader = shouldSkipEncryptionHeader(k,
|
|
srcHasSSEC, srcHasSSEKMS, srcHasSSES3,
|
|
dstWantsSSEC, dstWantsSSEKMS, dstWantsSSES3)
|
|
}
|
|
|
|
if !skipHeader {
|
|
dstEntry.Extended[k] = v
|
|
}
|
|
}
|
|
|
|
// Process metadata and tags and apply to destination
|
|
processedMetadata, tagErr := processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
|
|
if tagErr != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
|
return
|
|
}
|
|
|
|
// Apply processed metadata to destination entry
|
|
for k, v := range processedMetadata {
|
|
dstEntry.Extended[k] = v
|
|
}
|
|
|
|
// For zero-size files or files without chunks, use the original approach
|
|
if entry.Attributes.FileSize == 0 || len(entry.GetChunks()) == 0 {
|
|
// Just copy the entry structure without chunks for zero-size files
|
|
dstEntry.Chunks = nil
|
|
} else {
|
|
// Use unified copy strategy approach
|
|
dstChunks, dstMetadata, copyErr := s3a.executeUnifiedCopyStrategy(entry, r, dstBucket, srcObject, dstObject)
|
|
if copyErr != nil {
|
|
glog.Errorf("CopyObjectHandler unified copy error: %v", copyErr)
|
|
// Map errors to appropriate S3 errors
|
|
errCode := s3a.mapCopyErrorToS3Error(copyErr)
|
|
s3err.WriteErrorResponse(w, r, errCode)
|
|
return
|
|
}
|
|
|
|
dstEntry.Chunks = dstChunks
|
|
|
|
// Apply destination-specific metadata (e.g., SSE-C IV and headers)
|
|
if dstMetadata != nil {
|
|
for k, v := range dstMetadata {
|
|
dstEntry.Extended[k] = v
|
|
}
|
|
glog.V(2).Infof("Applied %d destination metadata entries for copy: %s", len(dstMetadata), r.URL.Path)
|
|
}
|
|
}
|
|
|
|
// Check if destination bucket has versioning configured
|
|
dstVersioningConfigured, err := s3a.isVersioningConfigured(dstBucket)
|
|
if err != nil {
|
|
glog.Errorf("Error checking versioning status for destination bucket %s: %v", dstBucket, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
var dstVersionId string
|
|
var etag string
|
|
|
|
if dstVersioningConfigured {
|
|
// For versioned destination, create a new version
|
|
dstVersionId = generateVersionId()
|
|
glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject)
|
|
|
|
// Add version metadata to the entry
|
|
if dstEntry.Extended == nil {
|
|
dstEntry.Extended = make(map[string][]byte)
|
|
}
|
|
dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(dstVersionId)
|
|
|
|
// Calculate ETag for versioning
|
|
filerEntry := &filer.Entry{
|
|
FullPath: util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)),
|
|
Attr: filer.Attr{
|
|
FileSize: dstEntry.Attributes.FileSize,
|
|
Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
|
|
Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
|
|
Mime: dstEntry.Attributes.Mime,
|
|
},
|
|
Chunks: dstEntry.Chunks,
|
|
}
|
|
etag = filer.ETagEntry(filerEntry)
|
|
if !strings.HasPrefix(etag, "\"") {
|
|
etag = "\"" + etag + "\""
|
|
}
|
|
dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
|
|
|
|
// Create version file
|
|
versionFileName := s3a.getVersionFileName(dstVersionId)
|
|
versionObjectPath := dstObject + ".versions/" + versionFileName
|
|
bucketDir := s3a.option.BucketsPath + "/" + dstBucket
|
|
|
|
if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) {
|
|
entry.Attributes = dstEntry.Attributes
|
|
entry.Extended = dstEntry.Extended
|
|
}); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// Update the .versions directory metadata
|
|
err = s3a.updateLatestVersionInDirectory(dstBucket, dstObject, dstVersionId, versionFileName)
|
|
if err != nil {
|
|
glog.Errorf("CopyObjectHandler: failed to update latest version in directory: %v", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// Set version ID in response header
|
|
w.Header().Set("x-amz-version-id", dstVersionId)
|
|
} else {
|
|
// For non-versioned destination, use regular copy
|
|
dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
|
|
dstDir, dstName := dstPath.DirAndName()
|
|
|
|
// Check if destination exists and remove it first (S3 copy overwrites)
|
|
if exists, _ := s3a.exists(dstDir, dstName, false); exists {
|
|
if err := s3a.rm(dstDir, dstName, false, false); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Create the new file
|
|
if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
|
|
entry.Attributes = dstEntry.Attributes
|
|
entry.Extended = dstEntry.Extended
|
|
}); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// Calculate ETag
|
|
filerEntry := &filer.Entry{
|
|
FullPath: dstPath,
|
|
Attr: filer.Attr{
|
|
FileSize: dstEntry.Attributes.FileSize,
|
|
Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
|
|
Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
|
|
Mime: dstEntry.Attributes.Mime,
|
|
},
|
|
Chunks: dstEntry.Chunks,
|
|
}
|
|
etag = filer.ETagEntry(filerEntry)
|
|
}
|
|
|
|
setEtag(w, etag)
|
|
|
|
response := CopyObjectResult{
|
|
ETag: etag,
|
|
LastModified: time.Now().UTC(),
|
|
}
|
|
|
|
writeSuccessResponseXML(w, r, response)
|
|
|
|
}
|
|
|
|
func pathToBucketAndObject(path string) (bucket, object string) {
|
|
path = strings.TrimPrefix(path, "/")
|
|
parts := strings.SplitN(path, "/", 2)
|
|
if len(parts) == 2 {
|
|
return parts[0], "/" + parts[1]
|
|
}
|
|
return parts[0], "/"
|
|
}
|
|
|
|
func pathToBucketObjectAndVersion(path string) (bucket, object, versionId string) {
|
|
// Parse versionId from query string if present
|
|
// Format: /bucket/object?versionId=version-id
|
|
if idx := strings.Index(path, "?versionId="); idx != -1 {
|
|
versionId = path[idx+len("?versionId="):] // dynamically calculate length
|
|
path = path[:idx]
|
|
}
|
|
|
|
bucket, object = pathToBucketAndObject(path)
|
|
return bucket, object, versionId
|
|
}
|
|
|
|
type CopyPartResult struct {
|
|
LastModified time.Time `xml:"LastModified"`
|
|
ETag string `xml:"ETag"`
|
|
}
|
|
|
|
func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
|
|
dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
|
|
|
|
// Copy source path.
|
|
cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
|
|
if err != nil {
|
|
// Save unescaped string as is.
|
|
cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
|
|
}
|
|
|
|
srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath)
|
|
// If source object is empty or bucket is empty, reply back invalid copy source.
|
|
if srcObject == "" || srcBucket == "" {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
|
return
|
|
}
|
|
|
|
partIDString := r.URL.Query().Get("partNumber")
|
|
uploadID := r.URL.Query().Get("uploadId")
|
|
|
|
partID, err := strconv.Atoi(partIDString)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
|
|
return
|
|
}
|
|
|
|
// Check if the upload ID is valid
|
|
err = s3a.checkUploadId(dstObject, uploadID)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
|
|
return
|
|
}
|
|
|
|
glog.V(3).Infof("CopyObjectPartHandler %s %s => %s part %d upload %s", srcBucket, srcObject, dstBucket, partID, uploadID)
|
|
|
|
// check partID with maximum part ID for multipart objects
|
|
if partID > s3_constants.MaxS3MultipartParts {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
|
|
return
|
|
}
|
|
|
|
// Get detailed versioning state for source bucket
|
|
srcVersioningState, err := s3a.getVersioningState(srcBucket)
|
|
if err != nil {
|
|
glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
|
return
|
|
}
|
|
|
|
// Get the source entry with version awareness based on versioning state
|
|
var entry *filer_pb.Entry
|
|
if srcVersionId != "" {
|
|
// Specific version requested - always use version-aware retrieval
|
|
entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId)
|
|
} else if srcVersioningState == s3_constants.VersioningEnabled {
|
|
// Versioning enabled - get latest version from .versions directory
|
|
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
|
|
} else if srcVersioningState == s3_constants.VersioningSuspended {
|
|
// Versioning suspended - current object is stored as regular file ("null" version)
|
|
// Try regular file first, fall back to latest version if needed
|
|
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
|
|
dir, name := srcPath.DirAndName()
|
|
entry, err = s3a.getEntry(dir, name)
|
|
if err != nil {
|
|
// If regular file doesn't exist, try latest version as fallback
|
|
glog.V(2).Infof("CopyObjectPart: regular file not found for suspended versioning, trying latest version")
|
|
entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
|
|
}
|
|
} else {
|
|
// No versioning configured - use regular retrieval
|
|
srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
|
|
dir, name := srcPath.DirAndName()
|
|
entry, err = s3a.getEntry(dir, name)
|
|
}
|
|
|
|
if err != nil || entry.IsDirectory {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
|
return
|
|
}
|
|
|
|
// Validate conditional copy headers
|
|
if err := s3a.validateConditionalCopyHeaders(r, entry); err != s3err.ErrNone {
|
|
s3err.WriteErrorResponse(w, r, err)
|
|
return
|
|
}
|
|
|
|
// Handle range header if present
|
|
rangeHeader := r.Header.Get("x-amz-copy-source-range")
|
|
var startOffset, endOffset int64
|
|
if rangeHeader != "" {
|
|
startOffset, endOffset, err = parseRangeHeader(rangeHeader)
|
|
if err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
|
|
return
|
|
}
|
|
} else {
|
|
startOffset = 0
|
|
if entry.Attributes.FileSize == 0 {
|
|
endOffset = -1 // For zero-size files, use -1 as endOffset
|
|
} else {
|
|
endOffset = int64(entry.Attributes.FileSize) - 1
|
|
}
|
|
}
|
|
|
|
// Create new entry for the part
|
|
dstEntry := &filer_pb.Entry{
|
|
Attributes: &filer_pb.FuseAttributes{
|
|
FileSize: uint64(endOffset - startOffset + 1),
|
|
Mtime: time.Now().Unix(),
|
|
Crtime: time.Now().Unix(),
|
|
Mime: entry.Attributes.Mime,
|
|
},
|
|
Extended: make(map[string][]byte),
|
|
}
|
|
|
|
// Handle zero-size files or empty ranges
|
|
if entry.Attributes.FileSize == 0 || endOffset < startOffset {
|
|
// For zero-size files or invalid ranges, create an empty part
|
|
dstEntry.Chunks = nil
|
|
} else {
|
|
// Copy chunks that overlap with the range
|
|
dstChunks, err := s3a.copyChunksForRange(entry, startOffset, endOffset, r.URL.Path)
|
|
if err != nil {
|
|
glog.Errorf("CopyObjectPartHandler copy chunks error: %v", err)
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
dstEntry.Chunks = dstChunks
|
|
}
|
|
|
|
// Save the part entry to the multipart uploads folder
|
|
uploadDir := s3a.genUploadsFolder(dstBucket) + "/" + uploadID
|
|
partName := fmt.Sprintf("%04d_%s.part", partID, "copy")
|
|
|
|
// Check if part exists and remove it first (allow re-copying same part)
|
|
if exists, _ := s3a.exists(uploadDir, partName, false); exists {
|
|
if err := s3a.rm(uploadDir, partName, false, false); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
}
|
|
|
|
if err := s3a.mkFile(uploadDir, partName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
|
|
entry.Attributes = dstEntry.Attributes
|
|
entry.Extended = dstEntry.Extended
|
|
}); err != nil {
|
|
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
|
return
|
|
}
|
|
|
|
// Calculate ETag for the part
|
|
partPath := util.FullPath(uploadDir + "/" + partName)
|
|
filerEntry := &filer.Entry{
|
|
FullPath: partPath,
|
|
Attr: filer.Attr{
|
|
FileSize: dstEntry.Attributes.FileSize,
|
|
Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
|
|
Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
|
|
Mime: dstEntry.Attributes.Mime,
|
|
},
|
|
Chunks: dstEntry.Chunks,
|
|
}
|
|
|
|
etag := filer.ETagEntry(filerEntry)
|
|
setEtag(w, etag)
|
|
|
|
response := CopyPartResult{
|
|
ETag: etag,
|
|
LastModified: time.Now().UTC(),
|
|
}
|
|
|
|
writeSuccessResponseXML(w, r, response)
|
|
}
|
|
|
|
func replaceDirective(reqHeader http.Header) (replaceMeta, replaceTagging bool) {
|
|
return reqHeader.Get(s3_constants.AmzUserMetaDirective) == DirectiveReplace, reqHeader.Get(s3_constants.AmzObjectTaggingDirective) == DirectiveReplace
|
|
}
|
|
|
|
func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTagging bool, getTags func(parentDirectoryPath string, entryName string) (tags map[string]string, err error), dir, name string) (err error) {
|
|
if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) == 0 {
|
|
if sc := existing.Get(s3_constants.AmzStorageClass); len(sc) > 0 {
|
|
reqHeader.Set(s3_constants.AmzStorageClass, sc)
|
|
}
|
|
}
|
|
|
|
if !replaceMeta {
|
|
for header := range reqHeader {
|
|
if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
|
|
delete(reqHeader, header)
|
|
}
|
|
}
|
|
for k, v := range existing {
|
|
if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
|
|
reqHeader[k] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
if !replaceTagging {
|
|
for header, _ := range reqHeader {
|
|
if strings.HasPrefix(header, s3_constants.AmzObjectTagging) {
|
|
delete(reqHeader, header)
|
|
}
|
|
}
|
|
|
|
found := false
|
|
for k, _ := range existing {
|
|
if strings.HasPrefix(k, s3_constants.AmzObjectTaggingPrefix) {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if found {
|
|
tags, err := getTags(dir, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var tagArr []string
|
|
for k, v := range tags {
|
|
tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, v))
|
|
}
|
|
tagStr := strutil.JoinFields(tagArr, "&")
|
|
reqHeader.Set(s3_constants.AmzObjectTagging, tagStr)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte, err error) {
|
|
metadata = make(map[string][]byte)
|
|
|
|
if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
|
|
metadata[s3_constants.AmzStorageClass] = sc
|
|
}
|
|
if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) > 0 {
|
|
metadata[s3_constants.AmzStorageClass] = []byte(sc)
|
|
}
|
|
|
|
// Handle SSE-KMS headers - these are always processed from request headers if present
|
|
if sseAlgorithm := reqHeader.Get(s3_constants.AmzServerSideEncryption); sseAlgorithm == "aws:kms" {
|
|
metadata[s3_constants.AmzServerSideEncryption] = []byte(sseAlgorithm)
|
|
|
|
// KMS Key ID (optional - can use default key)
|
|
if kmsKeyID := reqHeader.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); kmsKeyID != "" {
|
|
metadata[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(kmsKeyID)
|
|
}
|
|
|
|
// Encryption Context (optional)
|
|
if encryptionContext := reqHeader.Get(s3_constants.AmzServerSideEncryptionContext); encryptionContext != "" {
|
|
metadata[s3_constants.AmzServerSideEncryptionContext] = []byte(encryptionContext)
|
|
}
|
|
|
|
// Bucket Key Enabled (optional)
|
|
if bucketKeyEnabled := reqHeader.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled != "" {
|
|
metadata[s3_constants.AmzServerSideEncryptionBucketKeyEnabled] = []byte(bucketKeyEnabled)
|
|
}
|
|
} else {
|
|
// If not explicitly setting SSE-KMS, preserve existing SSE headers from source
|
|
for _, sseHeader := range []string{
|
|
s3_constants.AmzServerSideEncryption,
|
|
s3_constants.AmzServerSideEncryptionAwsKmsKeyId,
|
|
s3_constants.AmzServerSideEncryptionContext,
|
|
s3_constants.AmzServerSideEncryptionBucketKeyEnabled,
|
|
} {
|
|
if existingValue, exists := existing[sseHeader]; exists {
|
|
metadata[sseHeader] = existingValue
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handle SSE-C headers - these are always processed from request headers if present
|
|
if sseCustomerAlgorithm := reqHeader.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); sseCustomerAlgorithm != "" {
|
|
metadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte(sseCustomerAlgorithm)
|
|
|
|
if sseCustomerKeyMD5 := reqHeader.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); sseCustomerKeyMD5 != "" {
|
|
metadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(sseCustomerKeyMD5)
|
|
}
|
|
} else {
|
|
// If not explicitly setting SSE-C, preserve existing SSE-C headers from source
|
|
for _, ssecHeader := range []string{
|
|
s3_constants.AmzServerSideEncryptionCustomerAlgorithm,
|
|
s3_constants.AmzServerSideEncryptionCustomerKeyMD5,
|
|
} {
|
|
if existingValue, exists := existing[ssecHeader]; exists {
|
|
metadata[ssecHeader] = existingValue
|
|
}
|
|
}
|
|
}
|
|
|
|
if replaceMeta {
|
|
for header, values := range reqHeader {
|
|
if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
|
|
for _, value := range values {
|
|
metadata[header] = []byte(value)
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
for k, v := range existing {
|
|
if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
|
|
metadata[k] = v
|
|
}
|
|
}
|
|
}
|
|
if replaceTagging {
|
|
if tags := reqHeader.Get(s3_constants.AmzObjectTagging); tags != "" {
|
|
parsedTags, err := parseTagsHeader(tags)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = ValidateTags(parsedTags)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for k, v := range parsedTags {
|
|
metadata[s3_constants.AmzObjectTagging+"-"+k] = []byte(v)
|
|
}
|
|
}
|
|
} else {
|
|
for k, v := range existing {
|
|
if strings.HasPrefix(k, s3_constants.AmzObjectTagging) {
|
|
metadata[k] = v
|
|
}
|
|
}
|
|
delete(metadata, s3_constants.AmzTagCount)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// copyChunks replicates chunks from source entry to destination entry
|
|
func (s3a *S3ApiServer) copyChunks(entry *filer_pb.Entry, dstPath string) ([]*filer_pb.FileChunk, error) {
|
|
dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
|
|
const defaultChunkCopyConcurrency = 4
|
|
executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations
|
|
errChan := make(chan error, len(entry.GetChunks()))
|
|
|
|
for i, chunk := range entry.GetChunks() {
|
|
chunkIndex := i
|
|
executor.Execute(func() {
|
|
dstChunk, err := s3a.copySingleChunk(chunk, dstPath)
|
|
if err != nil {
|
|
errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err)
|
|
return
|
|
}
|
|
dstChunks[chunkIndex] = dstChunk
|
|
errChan <- nil
|
|
})
|
|
}
|
|
|
|
// Wait for all operations to complete and check for errors
|
|
for i := 0; i < len(entry.GetChunks()); i++ {
|
|
if err := <-errChan; err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return dstChunks, nil
|
|
}
|
|
|
|
// copySingleChunk copies a single chunk from source to destination
|
|
func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath string) (*filer_pb.FileChunk, error) {
|
|
// Create destination chunk
|
|
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
|
|
|
|
// Prepare chunk copy (assign new volume and get source URL)
|
|
assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set file ID on destination chunk
|
|
if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Download and upload the chunk
|
|
chunkData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("download chunk data: %w", err)
|
|
}
|
|
|
|
if err := s3a.uploadChunkData(chunkData, assignResult); err != nil {
|
|
return nil, fmt.Errorf("upload chunk data: %w", err)
|
|
}
|
|
|
|
return dstChunk, nil
|
|
}
|
|
|
|
// copySingleChunkForRange copies a portion of a chunk for range operations
|
|
func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer_pb.FileChunk, rangeStart, rangeEnd int64, dstPath string) (*filer_pb.FileChunk, error) {
|
|
// Create destination chunk
|
|
dstChunk := s3a.createDestinationChunk(rangeChunk, rangeChunk.Offset, rangeChunk.Size)
|
|
|
|
// Prepare chunk copy (assign new volume and get source URL)
|
|
assignResult, srcUrl, err := s3a.prepareChunkCopy(originalChunk.GetFileIdString(), dstPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set file ID on destination chunk
|
|
if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Calculate the portion of the original chunk that we need to copy
|
|
chunkStart := originalChunk.Offset
|
|
overlapStart := max(rangeStart, chunkStart)
|
|
offsetInChunk := overlapStart - chunkStart
|
|
|
|
// Download and upload the chunk portion
|
|
chunkData, err := s3a.downloadChunkData(srcUrl, offsetInChunk, int64(rangeChunk.Size))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("download chunk range data: %w", err)
|
|
}
|
|
|
|
if err := s3a.uploadChunkData(chunkData, assignResult); err != nil {
|
|
return nil, fmt.Errorf("upload chunk range data: %w", err)
|
|
}
|
|
|
|
return dstChunk, nil
|
|
}
|
|
|
|
// assignNewVolume assigns a new volume for the chunk
|
|
func (s3a *S3ApiServer) assignNewVolume(dstPath string) (*filer_pb.AssignVolumeResponse, error) {
|
|
var assignResult *filer_pb.AssignVolumeResponse
|
|
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{
|
|
Count: 1,
|
|
Replication: "",
|
|
Collection: "",
|
|
DiskType: "",
|
|
DataCenter: s3a.option.DataCenter,
|
|
Path: dstPath,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("assign volume: %w", err)
|
|
}
|
|
if resp.Error != "" {
|
|
return fmt.Errorf("assign volume: %v", resp.Error)
|
|
}
|
|
assignResult = resp
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return assignResult, nil
|
|
}
|
|
|
|
// min returns the minimum of two int64 values
|
|
func min(a, b int64) int64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// max returns the maximum of two int64 values
|
|
func max(a, b int64) int64 {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// parseRangeHeader parses the x-amz-copy-source-range header
|
|
func parseRangeHeader(rangeHeader string) (startOffset, endOffset int64, err error) {
|
|
// Remove "bytes=" prefix if present
|
|
rangeStr := strings.TrimPrefix(rangeHeader, "bytes=")
|
|
parts := strings.Split(rangeStr, "-")
|
|
if len(parts) != 2 {
|
|
return 0, 0, fmt.Errorf("invalid range format")
|
|
}
|
|
|
|
startOffset, err = strconv.ParseInt(parts[0], 10, 64)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("invalid start offset: %w", err)
|
|
}
|
|
|
|
endOffset, err = strconv.ParseInt(parts[1], 10, 64)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("invalid end offset: %w", err)
|
|
}
|
|
|
|
return startOffset, endOffset, nil
|
|
}
|
|
|
|
// copyChunksForRange copies chunks that overlap with the specified range
|
|
func (s3a *S3ApiServer) copyChunksForRange(entry *filer_pb.Entry, startOffset, endOffset int64, dstPath string) ([]*filer_pb.FileChunk, error) {
|
|
var relevantChunks []*filer_pb.FileChunk
|
|
|
|
// Find chunks that overlap with the range
|
|
for _, chunk := range entry.GetChunks() {
|
|
chunkStart := chunk.Offset
|
|
chunkEnd := chunk.Offset + int64(chunk.Size)
|
|
|
|
// Check if chunk overlaps with the range
|
|
if chunkStart < endOffset+1 && chunkEnd > startOffset {
|
|
// Calculate the overlap
|
|
overlapStart := max(startOffset, chunkStart)
|
|
overlapEnd := min(endOffset+1, chunkEnd)
|
|
|
|
// Create a new chunk with adjusted offset and size relative to the range
|
|
newChunk := &filer_pb.FileChunk{
|
|
FileId: chunk.FileId,
|
|
Offset: overlapStart - startOffset, // Offset relative to the range start
|
|
Size: uint64(overlapEnd - overlapStart),
|
|
ModifiedTsNs: time.Now().UnixNano(),
|
|
ETag: chunk.ETag,
|
|
IsCompressed: chunk.IsCompressed,
|
|
CipherKey: chunk.CipherKey,
|
|
Fid: chunk.Fid,
|
|
}
|
|
relevantChunks = append(relevantChunks, newChunk)
|
|
}
|
|
}
|
|
|
|
// Copy the relevant chunks using a specialized method for range copies
|
|
dstChunks := make([]*filer_pb.FileChunk, len(relevantChunks))
|
|
const defaultChunkCopyConcurrency = 4
|
|
executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency)
|
|
errChan := make(chan error, len(relevantChunks))
|
|
|
|
// Create a map to track original chunks for each relevant chunk
|
|
originalChunks := make([]*filer_pb.FileChunk, len(relevantChunks))
|
|
relevantIndex := 0
|
|
for _, chunk := range entry.GetChunks() {
|
|
chunkStart := chunk.Offset
|
|
chunkEnd := chunk.Offset + int64(chunk.Size)
|
|
|
|
// Check if chunk overlaps with the range
|
|
if chunkStart < endOffset+1 && chunkEnd > startOffset {
|
|
originalChunks[relevantIndex] = chunk
|
|
relevantIndex++
|
|
}
|
|
}
|
|
|
|
for i, chunk := range relevantChunks {
|
|
chunkIndex := i
|
|
originalChunk := originalChunks[i] // Get the corresponding original chunk
|
|
executor.Execute(func() {
|
|
dstChunk, err := s3a.copySingleChunkForRange(originalChunk, chunk, startOffset, endOffset, dstPath)
|
|
if err != nil {
|
|
errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err)
|
|
return
|
|
}
|
|
dstChunks[chunkIndex] = dstChunk
|
|
errChan <- nil
|
|
})
|
|
}
|
|
|
|
// Wait for all operations to complete and check for errors
|
|
for i := 0; i < len(relevantChunks); i++ {
|
|
if err := <-errChan; err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return dstChunks, nil
|
|
}
|
|
|
|
// Helper methods for copy operations to avoid code duplication
|
|
|
|
// validateConditionalCopyHeaders validates the conditional copy headers against the source entry
|
|
func (s3a *S3ApiServer) validateConditionalCopyHeaders(r *http.Request, entry *filer_pb.Entry) s3err.ErrorCode {
|
|
// Calculate ETag for the source entry
|
|
srcPath := util.FullPath(fmt.Sprintf("%s/%s", r.URL.Path, entry.Name))
|
|
filerEntry := &filer.Entry{
|
|
FullPath: srcPath,
|
|
Attr: filer.Attr{
|
|
FileSize: entry.Attributes.FileSize,
|
|
Mtime: time.Unix(entry.Attributes.Mtime, 0),
|
|
Crtime: time.Unix(entry.Attributes.Crtime, 0),
|
|
Mime: entry.Attributes.Mime,
|
|
},
|
|
Chunks: entry.Chunks,
|
|
}
|
|
sourceETag := filer.ETagEntry(filerEntry)
|
|
|
|
// Check X-Amz-Copy-Source-If-Match
|
|
if ifMatch := r.Header.Get(s3_constants.AmzCopySourceIfMatch); ifMatch != "" {
|
|
// Remove quotes if present
|
|
ifMatch = strings.Trim(ifMatch, `"`)
|
|
sourceETag = strings.Trim(sourceETag, `"`)
|
|
glog.V(3).Infof("CopyObjectHandler: If-Match check - expected %s, got %s", ifMatch, sourceETag)
|
|
if ifMatch != sourceETag {
|
|
glog.V(3).Infof("CopyObjectHandler: If-Match failed - expected %s, got %s", ifMatch, sourceETag)
|
|
return s3err.ErrPreconditionFailed
|
|
}
|
|
}
|
|
|
|
// Check X-Amz-Copy-Source-If-None-Match
|
|
if ifNoneMatch := r.Header.Get(s3_constants.AmzCopySourceIfNoneMatch); ifNoneMatch != "" {
|
|
// Remove quotes if present
|
|
ifNoneMatch = strings.Trim(ifNoneMatch, `"`)
|
|
sourceETag = strings.Trim(sourceETag, `"`)
|
|
glog.V(3).Infof("CopyObjectHandler: If-None-Match check - comparing %s with %s", ifNoneMatch, sourceETag)
|
|
if ifNoneMatch == sourceETag {
|
|
glog.V(3).Infof("CopyObjectHandler: If-None-Match failed - matched %s", sourceETag)
|
|
return s3err.ErrPreconditionFailed
|
|
}
|
|
}
|
|
|
|
// Check X-Amz-Copy-Source-If-Modified-Since
|
|
if ifModifiedSince := r.Header.Get(s3_constants.AmzCopySourceIfModifiedSince); ifModifiedSince != "" {
|
|
t, err := time.Parse(time.RFC1123, ifModifiedSince)
|
|
if err != nil {
|
|
glog.V(3).Infof("CopyObjectHandler: Invalid If-Modified-Since header: %v", err)
|
|
return s3err.ErrInvalidRequest
|
|
}
|
|
if !time.Unix(entry.Attributes.Mtime, 0).After(t) {
|
|
glog.V(3).Infof("CopyObjectHandler: If-Modified-Since failed")
|
|
return s3err.ErrPreconditionFailed
|
|
}
|
|
}
|
|
|
|
// Check X-Amz-Copy-Source-If-Unmodified-Since
|
|
if ifUnmodifiedSince := r.Header.Get(s3_constants.AmzCopySourceIfUnmodifiedSince); ifUnmodifiedSince != "" {
|
|
t, err := time.Parse(time.RFC1123, ifUnmodifiedSince)
|
|
if err != nil {
|
|
glog.V(3).Infof("CopyObjectHandler: Invalid If-Unmodified-Since header: %v", err)
|
|
return s3err.ErrInvalidRequest
|
|
}
|
|
if time.Unix(entry.Attributes.Mtime, 0).After(t) {
|
|
glog.V(3).Infof("CopyObjectHandler: If-Unmodified-Since failed")
|
|
return s3err.ErrPreconditionFailed
|
|
}
|
|
}
|
|
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// createDestinationChunk creates a new chunk based on the source chunk with modified properties
|
|
func (s3a *S3ApiServer) createDestinationChunk(sourceChunk *filer_pb.FileChunk, offset int64, size uint64) *filer_pb.FileChunk {
|
|
return &filer_pb.FileChunk{
|
|
Offset: offset,
|
|
Size: size,
|
|
ModifiedTsNs: time.Now().UnixNano(),
|
|
ETag: sourceChunk.ETag,
|
|
IsCompressed: sourceChunk.IsCompressed,
|
|
CipherKey: sourceChunk.CipherKey,
|
|
}
|
|
}
|
|
|
|
// lookupVolumeUrl looks up the volume URL for a given file ID using the filer's LookupVolume method
|
|
func (s3a *S3ApiServer) lookupVolumeUrl(fileId string) (string, error) {
|
|
var srcUrl string
|
|
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
vid, _, err := operation.ParseFileId(fileId)
|
|
if err != nil {
|
|
return fmt.Errorf("parse file ID: %w", err)
|
|
}
|
|
|
|
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
|
|
VolumeIds: []string{vid},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("lookup volume: %w", err)
|
|
}
|
|
|
|
if locations, found := resp.LocationsMap[vid]; found && len(locations.Locations) > 0 {
|
|
srcUrl = "http://" + locations.Locations[0].Url + "/" + fileId
|
|
} else {
|
|
return fmt.Errorf("no location found for volume %s", vid)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("lookup volume URL: %w", err)
|
|
}
|
|
return srcUrl, nil
|
|
}
|
|
|
|
// setChunkFileId sets the file ID on the destination chunk
|
|
func (s3a *S3ApiServer) setChunkFileId(chunk *filer_pb.FileChunk, assignResult *filer_pb.AssignVolumeResponse) error {
|
|
chunk.FileId = assignResult.FileId
|
|
fid, err := filer_pb.ToFileIdObject(assignResult.FileId)
|
|
if err != nil {
|
|
return fmt.Errorf("parse file ID: %w", err)
|
|
}
|
|
chunk.Fid = fid
|
|
return nil
|
|
}
|
|
|
|
// prepareChunkCopy prepares a chunk for copying by assigning a new volume and looking up the source URL
|
|
func (s3a *S3ApiServer) prepareChunkCopy(sourceFileId, dstPath string) (*filer_pb.AssignVolumeResponse, string, error) {
|
|
// Assign new volume
|
|
assignResult, err := s3a.assignNewVolume(dstPath)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("assign volume: %w", err)
|
|
}
|
|
|
|
// Look up source URL
|
|
srcUrl, err := s3a.lookupVolumeUrl(sourceFileId)
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("lookup source URL: %w", err)
|
|
}
|
|
|
|
return assignResult, srcUrl, nil
|
|
}
|
|
|
|
// uploadChunkData uploads chunk data to the destination using common upload logic
|
|
func (s3a *S3ApiServer) uploadChunkData(chunkData []byte, assignResult *filer_pb.AssignVolumeResponse) error {
|
|
dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId)
|
|
|
|
uploadOption := &operation.UploadOption{
|
|
UploadUrl: dstUrl,
|
|
Cipher: false,
|
|
IsInputCompressed: false,
|
|
MimeType: "",
|
|
PairMap: nil,
|
|
Jwt: security.EncodedJwt(assignResult.Auth),
|
|
}
|
|
uploader, err := operation.NewUploader()
|
|
if err != nil {
|
|
return fmt.Errorf("create uploader: %w", err)
|
|
}
|
|
_, err = uploader.UploadData(context.Background(), chunkData, uploadOption)
|
|
if err != nil {
|
|
return fmt.Errorf("upload chunk: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// downloadChunkData downloads chunk data from the source URL
|
|
func (s3a *S3ApiServer) downloadChunkData(srcUrl string, offset, size int64) ([]byte, error) {
|
|
var chunkData []byte
|
|
shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, nil, false, false, offset, int(size), func(data []byte) {
|
|
chunkData = append(chunkData, data...)
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("download chunk: %w", err)
|
|
}
|
|
if shouldRetry {
|
|
return nil, fmt.Errorf("download chunk: retry needed")
|
|
}
|
|
return chunkData, nil
|
|
}
|
|
|
|
// copyMultipartSSECChunks handles copying multipart SSE-C objects
|
|
// Returns chunks and destination metadata that should be applied to the destination entry
|
|
func (s3a *S3ApiServer) copyMultipartSSECChunks(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
|
glog.Infof("copyMultipartSSECChunks called: copySourceKey=%v, destKey=%v, path=%s", copySourceKey != nil, destKey != nil, dstPath)
|
|
|
|
var sourceKeyMD5, destKeyMD5 string
|
|
if copySourceKey != nil {
|
|
sourceKeyMD5 = copySourceKey.KeyMD5
|
|
}
|
|
if destKey != nil {
|
|
destKeyMD5 = destKey.KeyMD5
|
|
}
|
|
glog.Infof("Key MD5 comparison: source=%s, dest=%s, equal=%t", sourceKeyMD5, destKeyMD5, sourceKeyMD5 == destKeyMD5)
|
|
|
|
// For multipart SSE-C, always use decrypt/reencrypt path to ensure proper metadata handling
|
|
// The standard copyChunks() doesn't preserve SSE metadata, so we need per-chunk processing
|
|
glog.Infof("✅ Taking multipart SSE-C reencrypt path to preserve metadata: %s", dstPath)
|
|
|
|
// Different keys or key changes: decrypt and re-encrypt each chunk individually
|
|
glog.V(2).Infof("Multipart SSE-C reencrypt copy (different keys): %s", dstPath)
|
|
|
|
var dstChunks []*filer_pb.FileChunk
|
|
var destIV []byte
|
|
|
|
for _, chunk := range entry.GetChunks() {
|
|
if chunk.GetSseType() != filer_pb.SSEType_SSE_C {
|
|
// Non-SSE-C chunk, copy directly
|
|
copiedChunk, err := s3a.copySingleChunk(chunk, dstPath)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to copy non-SSE-C chunk: %w", err)
|
|
}
|
|
dstChunks = append(dstChunks, copiedChunk)
|
|
continue
|
|
}
|
|
|
|
// SSE-C chunk: decrypt with stored per-chunk metadata, re-encrypt with dest key
|
|
copiedChunk, chunkDestIV, err := s3a.copyMultipartSSECChunk(chunk, copySourceKey, destKey, dstPath)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to copy SSE-C chunk %s: %w", chunk.GetFileIdString(), err)
|
|
}
|
|
|
|
dstChunks = append(dstChunks, copiedChunk)
|
|
|
|
// Store the first chunk's IV as the object's IV (for single-part compatibility)
|
|
if len(destIV) == 0 {
|
|
destIV = chunkDestIV
|
|
}
|
|
}
|
|
|
|
// Create destination metadata
|
|
dstMetadata := make(map[string][]byte)
|
|
if destKey != nil && len(destIV) > 0 {
|
|
// Store the IV and SSE-C headers for single-part compatibility
|
|
StoreIVInMetadata(dstMetadata, destIV)
|
|
dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
|
|
dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destKey.KeyMD5)
|
|
glog.V(2).Infof("Prepared multipart SSE-C destination metadata: %s", dstPath)
|
|
}
|
|
|
|
return dstChunks, dstMetadata, nil
|
|
}
|
|
|
|
// copyMultipartSSEKMSChunks handles copying multipart SSE-KMS objects (unified with SSE-C approach)
|
|
// Returns chunks and destination metadata that should be applied to the destination entry
|
|
func (s3a *S3ApiServer) copyMultipartSSEKMSChunks(entry *filer_pb.Entry, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
|
glog.Infof("copyMultipartSSEKMSChunks called: destKeyID=%s, path=%s", destKeyID, dstPath)
|
|
|
|
// For multipart SSE-KMS, always use decrypt/reencrypt path to ensure proper metadata handling
|
|
// The standard copyChunks() doesn't preserve SSE metadata, so we need per-chunk processing
|
|
glog.Infof("✅ Taking multipart SSE-KMS reencrypt path to preserve metadata: %s", dstPath)
|
|
|
|
var dstChunks []*filer_pb.FileChunk
|
|
|
|
for _, chunk := range entry.GetChunks() {
|
|
if chunk.GetSseType() != filer_pb.SSEType_SSE_KMS {
|
|
// Non-SSE-KMS chunk, copy directly
|
|
copiedChunk, err := s3a.copySingleChunk(chunk, dstPath)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to copy non-SSE-KMS chunk: %w", err)
|
|
}
|
|
dstChunks = append(dstChunks, copiedChunk)
|
|
continue
|
|
}
|
|
|
|
// SSE-KMS chunk: decrypt with stored per-chunk metadata, re-encrypt with dest key
|
|
copiedChunk, err := s3a.copyMultipartSSEKMSChunk(chunk, destKeyID, encryptionContext, bucketKeyEnabled, dstPath, bucket)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to copy SSE-KMS chunk %s: %w", chunk.GetFileIdString(), err)
|
|
}
|
|
|
|
dstChunks = append(dstChunks, copiedChunk)
|
|
}
|
|
|
|
// Create destination metadata for SSE-KMS
|
|
dstMetadata := make(map[string][]byte)
|
|
if destKeyID != "" {
|
|
// Store SSE-KMS metadata for single-part compatibility
|
|
if encryptionContext == nil {
|
|
encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled)
|
|
}
|
|
sseKey := &SSEKMSKey{
|
|
KeyID: destKeyID,
|
|
EncryptionContext: encryptionContext,
|
|
BucketKeyEnabled: bucketKeyEnabled,
|
|
}
|
|
if kmsMetadata, serErr := SerializeSSEKMSMetadata(sseKey); serErr == nil {
|
|
dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
|
|
glog.Infof("✅ Created object-level KMS metadata for GET compatibility")
|
|
} else {
|
|
glog.Errorf("❌ Failed to serialize SSE-KMS metadata: %v", serErr)
|
|
}
|
|
}
|
|
|
|
return dstChunks, dstMetadata, nil
|
|
}
|
|
|
|
// copyMultipartSSEKMSChunk copies a single SSE-KMS chunk from a multipart object (unified with SSE-C approach)
|
|
func (s3a *S3ApiServer) copyMultipartSSEKMSChunk(chunk *filer_pb.FileChunk, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) (*filer_pb.FileChunk, error) {
|
|
// Create destination chunk
|
|
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
|
|
|
|
// Prepare chunk copy (assign new volume and get source URL)
|
|
assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set file ID on destination chunk
|
|
if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Download encrypted chunk data
|
|
encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("download encrypted chunk data: %w", err)
|
|
}
|
|
|
|
var finalData []byte
|
|
|
|
// Decrypt source data using stored SSE-KMS metadata (same pattern as SSE-C)
|
|
if len(chunk.GetSseMetadata()) == 0 {
|
|
return nil, fmt.Errorf("SSE-KMS chunk missing per-chunk metadata")
|
|
}
|
|
|
|
// Deserialize the SSE-KMS metadata (reusing unified metadata structure)
|
|
sourceSSEKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
|
|
}
|
|
|
|
// Decrypt the chunk data using the source metadata
|
|
decryptedReader, decErr := CreateSSEKMSDecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey)
|
|
if decErr != nil {
|
|
return nil, fmt.Errorf("create SSE-KMS decrypted reader: %w", decErr)
|
|
}
|
|
|
|
decryptedData, readErr := io.ReadAll(decryptedReader)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("decrypt chunk data: %w", readErr)
|
|
}
|
|
finalData = decryptedData
|
|
glog.V(4).Infof("Decrypted multipart SSE-KMS chunk: %d bytes → %d bytes", len(encryptedData), len(finalData))
|
|
|
|
// Re-encrypt with destination key if specified
|
|
if destKeyID != "" {
|
|
// Build encryption context if not provided
|
|
if encryptionContext == nil {
|
|
encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled)
|
|
}
|
|
|
|
// Encrypt with destination key
|
|
encryptedReader, destSSEKey, encErr := CreateSSEKMSEncryptedReaderWithBucketKey(bytes.NewReader(finalData), destKeyID, encryptionContext, bucketKeyEnabled)
|
|
if encErr != nil {
|
|
return nil, fmt.Errorf("create SSE-KMS encrypted reader: %w", encErr)
|
|
}
|
|
|
|
reencryptedData, readErr := io.ReadAll(encryptedReader)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("re-encrypt chunk data: %w", readErr)
|
|
}
|
|
finalData = reencryptedData
|
|
|
|
// Create per-chunk SSE-KMS metadata for the destination chunk
|
|
// For copy operations, reset chunk offset to 0 (similar to SSE-C approach)
|
|
// The copied chunks form a new object structure independent of original part boundaries
|
|
destSSEKey.ChunkOffset = 0
|
|
kmsMetadata, err := SerializeSSEKMSMetadata(destSSEKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("serialize SSE-KMS metadata: %w", err)
|
|
}
|
|
|
|
// Set the SSE type and metadata on destination chunk (unified approach)
|
|
dstChunk.SseType = filer_pb.SSEType_SSE_KMS
|
|
dstChunk.SseMetadata = kmsMetadata
|
|
|
|
glog.V(4).Infof("Re-encrypted multipart SSE-KMS chunk: %d bytes → %d bytes", len(finalData)-len(reencryptedData)+len(finalData), len(finalData))
|
|
}
|
|
|
|
// Upload the final data
|
|
if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
|
|
return nil, fmt.Errorf("upload chunk data: %w", err)
|
|
}
|
|
|
|
// Update chunk size
|
|
dstChunk.Size = uint64(len(finalData))
|
|
|
|
glog.V(3).Infof("Successfully copied multipart SSE-KMS chunk %s → %s",
|
|
chunk.GetFileIdString(), dstChunk.GetFileIdString())
|
|
|
|
return dstChunk, nil
|
|
}
|
|
|
|
// copyMultipartSSECChunk copies a single SSE-C chunk from a multipart object
|
|
func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) (*filer_pb.FileChunk, []byte, error) {
|
|
// Create destination chunk
|
|
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
|
|
|
|
// Prepare chunk copy (assign new volume and get source URL)
|
|
assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Set file ID on destination chunk
|
|
if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Download encrypted chunk data
|
|
encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("download encrypted chunk data: %w", err)
|
|
}
|
|
|
|
var finalData []byte
|
|
var destIV []byte
|
|
|
|
// Decrypt if source is encrypted
|
|
if copySourceKey != nil {
|
|
// Get the per-chunk SSE-C metadata
|
|
if len(chunk.GetSseMetadata()) == 0 {
|
|
return nil, nil, fmt.Errorf("SSE-C chunk missing per-chunk metadata")
|
|
}
|
|
|
|
// Deserialize the SSE-C metadata
|
|
ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata())
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err)
|
|
}
|
|
|
|
// Decode the IV from the metadata
|
|
chunkBaseIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to decode chunk IV: %w", err)
|
|
}
|
|
|
|
// Calculate the correct IV for this chunk using within-part offset
|
|
var chunkIV []byte
|
|
if ssecMetadata.PartOffset > 0 {
|
|
chunkIV = calculateIVWithOffset(chunkBaseIV, ssecMetadata.PartOffset)
|
|
} else {
|
|
chunkIV = chunkBaseIV
|
|
}
|
|
|
|
// Decrypt the chunk data
|
|
decryptedReader, decErr := CreateSSECDecryptedReader(bytes.NewReader(encryptedData), copySourceKey, chunkIV)
|
|
if decErr != nil {
|
|
return nil, nil, fmt.Errorf("create decrypted reader: %w", decErr)
|
|
}
|
|
|
|
decryptedData, readErr := io.ReadAll(decryptedReader)
|
|
if readErr != nil {
|
|
return nil, nil, fmt.Errorf("decrypt chunk data: %w", readErr)
|
|
}
|
|
finalData = decryptedData
|
|
glog.V(4).Infof("Decrypted multipart SSE-C chunk: %d bytes → %d bytes", len(encryptedData), len(finalData))
|
|
} else {
|
|
// Source is unencrypted
|
|
finalData = encryptedData
|
|
}
|
|
|
|
// Re-encrypt if destination should be encrypted
|
|
if destKey != nil {
|
|
// Generate new IV for this chunk
|
|
newIV := make([]byte, s3_constants.AESBlockSize)
|
|
if _, err := rand.Read(newIV); err != nil {
|
|
return nil, nil, fmt.Errorf("generate IV: %w", err)
|
|
}
|
|
destIV = newIV
|
|
|
|
// Encrypt with new key and IV
|
|
encryptedReader, iv, encErr := CreateSSECEncryptedReader(bytes.NewReader(finalData), destKey)
|
|
if encErr != nil {
|
|
return nil, nil, fmt.Errorf("create encrypted reader: %w", encErr)
|
|
}
|
|
destIV = iv
|
|
|
|
reencryptedData, readErr := io.ReadAll(encryptedReader)
|
|
if readErr != nil {
|
|
return nil, nil, fmt.Errorf("re-encrypt chunk data: %w", readErr)
|
|
}
|
|
finalData = reencryptedData
|
|
|
|
// Create per-chunk SSE-C metadata for the destination chunk
|
|
ssecMetadata, err := SerializeSSECMetadata(destIV, destKey.KeyMD5, 0) // partOffset=0 for copied chunks
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("serialize SSE-C metadata: %w", err)
|
|
}
|
|
|
|
// Set the SSE type and metadata on destination chunk
|
|
dstChunk.SseType = filer_pb.SSEType_SSE_C
|
|
dstChunk.SseMetadata = ssecMetadata // Use unified metadata field
|
|
|
|
glog.V(4).Infof("Re-encrypted multipart SSE-C chunk: %d bytes → %d bytes", len(finalData)-len(reencryptedData)+len(finalData), len(finalData))
|
|
}
|
|
|
|
// Upload the final data
|
|
if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
|
|
return nil, nil, fmt.Errorf("upload chunk data: %w", err)
|
|
}
|
|
|
|
// Update chunk size
|
|
dstChunk.Size = uint64(len(finalData))
|
|
|
|
glog.V(3).Infof("Successfully copied multipart SSE-C chunk %s → %s",
|
|
chunk.GetFileIdString(), dstChunk.GetFileIdString())
|
|
|
|
return dstChunk, destIV, nil
|
|
}
|
|
|
|
// copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios
|
|
// This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain
|
|
func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
|
glog.Infof("copyMultipartCrossEncryption called: %s→%s, path=%s",
|
|
s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, false),
|
|
s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, false), dstPath)
|
|
|
|
var dstChunks []*filer_pb.FileChunk
|
|
|
|
// Parse destination encryption parameters
|
|
var destSSECKey *SSECustomerKey
|
|
var destKMSKeyID string
|
|
var destKMSEncryptionContext map[string]string
|
|
var destKMSBucketKeyEnabled bool
|
|
|
|
if state.DstSSEC {
|
|
var err error
|
|
destSSECKey, err = ParseSSECHeaders(r)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to parse destination SSE-C headers: %w", err)
|
|
}
|
|
glog.Infof("Destination SSE-C: keyMD5=%s", destSSECKey.KeyMD5)
|
|
} else if state.DstSSEKMS {
|
|
var err error
|
|
destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, err = ParseSSEKMSCopyHeaders(r)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err)
|
|
}
|
|
glog.Infof("Destination SSE-KMS: keyID=%s, bucketKey=%t", destKMSKeyID, destKMSBucketKeyEnabled)
|
|
} else {
|
|
glog.Infof("Destination: Unencrypted")
|
|
}
|
|
|
|
// Parse source encryption parameters
|
|
var sourceSSECKey *SSECustomerKey
|
|
if state.SrcSSEC {
|
|
var err error
|
|
sourceSSECKey, err = ParseSSECCopySourceHeaders(r)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to parse source SSE-C headers: %w", err)
|
|
}
|
|
glog.Infof("Source SSE-C: keyMD5=%s", sourceSSECKey.KeyMD5)
|
|
}
|
|
|
|
// Process each chunk with unified cross-encryption logic
|
|
for _, chunk := range entry.GetChunks() {
|
|
var copiedChunk *filer_pb.FileChunk
|
|
var err error
|
|
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
|
|
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
|
|
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
|
|
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
|
|
} else {
|
|
// Unencrypted chunk, copy directly
|
|
copiedChunk, err = s3a.copySingleChunk(chunk, dstPath)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to copy chunk %s: %w", chunk.GetFileIdString(), err)
|
|
}
|
|
|
|
dstChunks = append(dstChunks, copiedChunk)
|
|
}
|
|
|
|
// Create destination metadata based on destination encryption type
|
|
dstMetadata := make(map[string][]byte)
|
|
|
|
// Clear any previous encryption metadata to avoid routing conflicts
|
|
if state.SrcSSEKMS && state.DstSSEC {
|
|
// SSE-KMS → SSE-C: Remove SSE-KMS headers
|
|
// These will be excluded from dstMetadata, effectively removing them
|
|
} else if state.SrcSSEC && state.DstSSEKMS {
|
|
// SSE-C → SSE-KMS: Remove SSE-C headers
|
|
// These will be excluded from dstMetadata, effectively removing them
|
|
} else if !state.DstSSEC && !state.DstSSEKMS {
|
|
// Encrypted → Unencrypted: Remove all encryption metadata
|
|
// These will be excluded from dstMetadata, effectively removing them
|
|
}
|
|
|
|
if state.DstSSEC && destSSECKey != nil {
|
|
// For SSE-C destination, use first chunk's IV for compatibility
|
|
if len(dstChunks) > 0 && dstChunks[0].GetSseType() == filer_pb.SSEType_SSE_C && len(dstChunks[0].GetSseMetadata()) > 0 {
|
|
if ssecMetadata, err := DeserializeSSECMetadata(dstChunks[0].GetSseMetadata()); err == nil {
|
|
if iv, ivErr := base64.StdEncoding.DecodeString(ssecMetadata.IV); ivErr == nil {
|
|
StoreIVInMetadata(dstMetadata, iv)
|
|
dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
|
|
dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destSSECKey.KeyMD5)
|
|
glog.Infof("✅ Created SSE-C object-level metadata from first chunk")
|
|
}
|
|
}
|
|
}
|
|
} else if state.DstSSEKMS && destKMSKeyID != "" {
|
|
// For SSE-KMS destination, create object-level metadata
|
|
if destKMSEncryptionContext == nil {
|
|
destKMSEncryptionContext = BuildEncryptionContext(dstBucket, dstPath, destKMSBucketKeyEnabled)
|
|
}
|
|
sseKey := &SSEKMSKey{
|
|
KeyID: destKMSKeyID,
|
|
EncryptionContext: destKMSEncryptionContext,
|
|
BucketKeyEnabled: destKMSBucketKeyEnabled,
|
|
}
|
|
if kmsMetadata, serErr := SerializeSSEKMSMetadata(sseKey); serErr == nil {
|
|
dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
|
|
glog.Infof("✅ Created SSE-KMS object-level metadata")
|
|
} else {
|
|
glog.Errorf("❌ Failed to serialize SSE-KMS metadata: %v", serErr)
|
|
}
|
|
}
|
|
// For unencrypted destination, no metadata needed (dstMetadata remains empty)
|
|
|
|
return dstChunks, dstMetadata, nil
|
|
}
|
|
|
|
// copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support
|
|
func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
|
|
// Create destination chunk
|
|
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
|
|
|
|
// Prepare chunk copy (assign new volume and get source URL)
|
|
assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set file ID on destination chunk
|
|
if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Download encrypted chunk data
|
|
encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("download encrypted chunk data: %w", err)
|
|
}
|
|
|
|
var finalData []byte
|
|
|
|
// Step 1: Decrypt source data
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
|
|
// Decrypt SSE-C source
|
|
if len(chunk.GetSseMetadata()) == 0 {
|
|
return nil, fmt.Errorf("SSE-C chunk missing per-chunk metadata")
|
|
}
|
|
|
|
ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err)
|
|
}
|
|
|
|
chunkBaseIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode chunk IV: %w", err)
|
|
}
|
|
|
|
// Calculate the correct IV for this chunk using within-part offset
|
|
var chunkIV []byte
|
|
if ssecMetadata.PartOffset > 0 {
|
|
chunkIV = calculateIVWithOffset(chunkBaseIV, ssecMetadata.PartOffset)
|
|
} else {
|
|
chunkIV = chunkBaseIV
|
|
}
|
|
|
|
decryptedReader, decErr := CreateSSECDecryptedReader(bytes.NewReader(encryptedData), sourceSSECKey, chunkIV)
|
|
if decErr != nil {
|
|
return nil, fmt.Errorf("create SSE-C decrypted reader: %w", decErr)
|
|
}
|
|
|
|
decryptedData, readErr := io.ReadAll(decryptedReader)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("decrypt SSE-C chunk data: %w", readErr)
|
|
}
|
|
finalData = decryptedData
|
|
previewLen := 16
|
|
if len(finalData) < previewLen {
|
|
previewLen = len(finalData)
|
|
}
|
|
|
|
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
|
|
// Decrypt SSE-KMS source
|
|
if len(chunk.GetSseMetadata()) == 0 {
|
|
return nil, fmt.Errorf("SSE-KMS chunk missing per-chunk metadata")
|
|
}
|
|
|
|
sourceSSEKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
|
|
}
|
|
|
|
decryptedReader, decErr := CreateSSEKMSDecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey)
|
|
if decErr != nil {
|
|
return nil, fmt.Errorf("create SSE-KMS decrypted reader: %w", decErr)
|
|
}
|
|
|
|
decryptedData, readErr := io.ReadAll(decryptedReader)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("decrypt SSE-KMS chunk data: %w", readErr)
|
|
}
|
|
finalData = decryptedData
|
|
previewLen := 16
|
|
if len(finalData) < previewLen {
|
|
previewLen = len(finalData)
|
|
}
|
|
|
|
} else {
|
|
// Source is unencrypted
|
|
finalData = encryptedData
|
|
}
|
|
|
|
// Step 2: Re-encrypt with destination encryption (if any)
|
|
if state.DstSSEC && destSSECKey != nil {
|
|
// Encrypt with SSE-C
|
|
encryptedReader, iv, encErr := CreateSSECEncryptedReader(bytes.NewReader(finalData), destSSECKey)
|
|
if encErr != nil {
|
|
return nil, fmt.Errorf("create SSE-C encrypted reader: %w", encErr)
|
|
}
|
|
|
|
reencryptedData, readErr := io.ReadAll(encryptedReader)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("re-encrypt with SSE-C: %w", readErr)
|
|
}
|
|
finalData = reencryptedData
|
|
|
|
// Create per-chunk SSE-C metadata (offset=0 for cross-encryption copies)
|
|
ssecMetadata, err := SerializeSSECMetadata(iv, destSSECKey.KeyMD5, 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("serialize SSE-C metadata: %w", err)
|
|
}
|
|
|
|
dstChunk.SseType = filer_pb.SSEType_SSE_C
|
|
dstChunk.SseMetadata = ssecMetadata
|
|
|
|
previewLen := 16
|
|
if len(finalData) < previewLen {
|
|
previewLen = len(finalData)
|
|
}
|
|
|
|
} else if state.DstSSEKMS && destKMSKeyID != "" {
|
|
// Encrypt with SSE-KMS
|
|
if destKMSEncryptionContext == nil {
|
|
destKMSEncryptionContext = BuildEncryptionContext(dstBucket, dstPath, destKMSBucketKeyEnabled)
|
|
}
|
|
|
|
encryptedReader, destSSEKey, encErr := CreateSSEKMSEncryptedReaderWithBucketKey(bytes.NewReader(finalData), destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled)
|
|
if encErr != nil {
|
|
return nil, fmt.Errorf("create SSE-KMS encrypted reader: %w", encErr)
|
|
}
|
|
|
|
reencryptedData, readErr := io.ReadAll(encryptedReader)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("re-encrypt with SSE-KMS: %w", readErr)
|
|
}
|
|
finalData = reencryptedData
|
|
|
|
// Create per-chunk SSE-KMS metadata (offset=0 for cross-encryption copies)
|
|
destSSEKey.ChunkOffset = 0
|
|
kmsMetadata, err := SerializeSSEKMSMetadata(destSSEKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("serialize SSE-KMS metadata: %w", err)
|
|
}
|
|
|
|
dstChunk.SseType = filer_pb.SSEType_SSE_KMS
|
|
dstChunk.SseMetadata = kmsMetadata
|
|
|
|
glog.V(4).Infof("Re-encrypted chunk with SSE-KMS")
|
|
}
|
|
// For unencrypted destination, finalData remains as decrypted plaintext
|
|
|
|
// Upload the final data
|
|
if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
|
|
return nil, fmt.Errorf("upload chunk data: %w", err)
|
|
}
|
|
|
|
// Update chunk size
|
|
dstChunk.Size = uint64(len(finalData))
|
|
|
|
glog.V(3).Infof("Successfully copied cross-encryption chunk %s → %s",
|
|
chunk.GetFileIdString(), dstChunk.GetFileIdString())
|
|
|
|
return dstChunk, nil
|
|
}
|
|
|
|
// getEncryptionTypeString returns a string representation of encryption type for logging
|
|
func (s3a *S3ApiServer) getEncryptionTypeString(isSSEC, isSSEKMS, isSSES3 bool) string {
|
|
if isSSEC {
|
|
return s3_constants.SSETypeC
|
|
} else if isSSEKMS {
|
|
return s3_constants.SSETypeKMS
|
|
} else if isSSES3 {
|
|
return s3_constants.SSETypeS3
|
|
}
|
|
return "Plain"
|
|
}
|
|
|
|
// copyChunksWithSSEC handles SSE-C aware copying with smart fast/slow path selection
|
|
// Returns chunks and destination metadata that should be applied to the destination entry
|
|
func (s3a *S3ApiServer) copyChunksWithSSEC(entry *filer_pb.Entry, r *http.Request) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
|
glog.Infof("copyChunksWithSSEC called for %s with %d chunks", r.URL.Path, len(entry.GetChunks()))
|
|
|
|
// Parse SSE-C headers
|
|
copySourceKey, err := ParseSSECCopySourceHeaders(r)
|
|
if err != nil {
|
|
glog.Errorf("Failed to parse SSE-C copy source headers: %v", err)
|
|
return nil, nil, err
|
|
}
|
|
|
|
destKey, err := ParseSSECHeaders(r)
|
|
if err != nil {
|
|
glog.Errorf("Failed to parse SSE-C headers: %v", err)
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Check if this is a multipart SSE-C object
|
|
isMultipartSSEC := false
|
|
sseCChunks := 0
|
|
for i, chunk := range entry.GetChunks() {
|
|
glog.V(4).Infof("Chunk %d: sseType=%d, hasMetadata=%t", i, chunk.GetSseType(), len(chunk.GetSseMetadata()) > 0)
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
|
|
sseCChunks++
|
|
}
|
|
}
|
|
isMultipartSSEC = sseCChunks > 1
|
|
|
|
glog.Infof("SSE-C copy analysis: total chunks=%d, sseC chunks=%d, isMultipart=%t", len(entry.GetChunks()), sseCChunks, isMultipartSSEC)
|
|
|
|
if isMultipartSSEC {
|
|
glog.V(2).Infof("Detected multipart SSE-C object with %d encrypted chunks for copy", sseCChunks)
|
|
return s3a.copyMultipartSSECChunks(entry, copySourceKey, destKey, r.URL.Path)
|
|
}
|
|
|
|
// Single-part SSE-C object: use original logic
|
|
// Determine copy strategy
|
|
strategy, err := DetermineSSECCopyStrategy(entry.Extended, copySourceKey, destKey)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
glog.V(2).Infof("SSE-C copy strategy for single-part %s: %v", r.URL.Path, strategy)
|
|
|
|
switch strategy {
|
|
case SSECCopyStrategyDirect:
|
|
// FAST PATH: Direct chunk copy
|
|
glog.V(2).Infof("Using fast path: direct chunk copy for %s", r.URL.Path)
|
|
chunks, err := s3a.copyChunks(entry, r.URL.Path)
|
|
return chunks, nil, err
|
|
|
|
case SSECCopyStrategyDecryptEncrypt:
|
|
// SLOW PATH: Decrypt and re-encrypt
|
|
glog.V(2).Infof("Using slow path: decrypt/re-encrypt for %s", r.URL.Path)
|
|
chunks, destIV, err := s3a.copyChunksWithReencryption(entry, copySourceKey, destKey, r.URL.Path)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Create destination metadata with IV and SSE-C headers
|
|
dstMetadata := make(map[string][]byte)
|
|
if destKey != nil && len(destIV) > 0 {
|
|
// Store the IV
|
|
StoreIVInMetadata(dstMetadata, destIV)
|
|
|
|
// Store SSE-C algorithm and key MD5 for proper metadata
|
|
dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
|
|
dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destKey.KeyMD5)
|
|
|
|
glog.V(2).Infof("Prepared IV and SSE-C metadata for destination copy: %s", r.URL.Path)
|
|
}
|
|
|
|
return chunks, dstMetadata, nil
|
|
|
|
default:
|
|
return nil, nil, fmt.Errorf("unknown SSE-C copy strategy: %v", strategy)
|
|
}
|
|
}
|
|
|
|
// copyChunksWithReencryption handles the slow path: decrypt source and re-encrypt for destination
|
|
// Returns the destination chunks and the IV used for encryption (if any)
|
|
func (s3a *S3ApiServer) copyChunksWithReencryption(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, []byte, error) {
|
|
dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
|
|
const defaultChunkCopyConcurrency = 4
|
|
executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations
|
|
errChan := make(chan error, len(entry.GetChunks()))
|
|
|
|
// Generate a single IV for the destination object (if destination is encrypted)
|
|
var destIV []byte
|
|
if destKey != nil {
|
|
destIV = make([]byte, s3_constants.AESBlockSize)
|
|
if _, err := io.ReadFull(rand.Reader, destIV); err != nil {
|
|
return nil, nil, fmt.Errorf("failed to generate destination IV: %w", err)
|
|
}
|
|
}
|
|
|
|
for i, chunk := range entry.GetChunks() {
|
|
chunkIndex := i
|
|
executor.Execute(func() {
|
|
dstChunk, err := s3a.copyChunkWithReencryption(chunk, copySourceKey, destKey, dstPath, entry.Extended, destIV)
|
|
if err != nil {
|
|
errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err)
|
|
return
|
|
}
|
|
dstChunks[chunkIndex] = dstChunk
|
|
errChan <- nil
|
|
})
|
|
}
|
|
|
|
// Wait for all operations to complete and check for errors
|
|
for i := 0; i < len(entry.GetChunks()); i++ {
|
|
if err := <-errChan; err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
return dstChunks, destIV, nil
|
|
}
|
|
|
|
// copyChunkWithReencryption copies a single chunk with decrypt/re-encrypt
|
|
func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string, srcMetadata map[string][]byte, destIV []byte) (*filer_pb.FileChunk, error) {
|
|
// Create destination chunk
|
|
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
|
|
|
|
// Prepare chunk copy (assign new volume and get source URL)
|
|
assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set file ID on destination chunk
|
|
if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Download encrypted chunk data
|
|
encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("download encrypted chunk data: %w", err)
|
|
}
|
|
|
|
var finalData []byte
|
|
|
|
// Decrypt if source is encrypted
|
|
if copySourceKey != nil {
|
|
// Get IV from source metadata
|
|
srcIV, err := GetIVFromMetadata(srcMetadata)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get IV from metadata: %w", err)
|
|
}
|
|
|
|
// Use counter offset based on chunk position in the original object
|
|
decryptedReader, decErr := CreateSSECDecryptedReaderWithOffset(bytes.NewReader(encryptedData), copySourceKey, srcIV, uint64(chunk.Offset))
|
|
if decErr != nil {
|
|
return nil, fmt.Errorf("create decrypted reader: %w", decErr)
|
|
}
|
|
|
|
decryptedData, readErr := io.ReadAll(decryptedReader)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("decrypt chunk data: %w", readErr)
|
|
}
|
|
finalData = decryptedData
|
|
} else {
|
|
// Source is unencrypted
|
|
finalData = encryptedData
|
|
}
|
|
|
|
// Re-encrypt if destination should be encrypted
|
|
if destKey != nil {
|
|
// Use the provided destination IV with counter offset based on chunk position
|
|
// This ensures all chunks of the same object use the same IV with different counters
|
|
encryptedReader, encErr := CreateSSECEncryptedReaderWithOffset(bytes.NewReader(finalData), destKey, destIV, uint64(chunk.Offset))
|
|
if encErr != nil {
|
|
return nil, fmt.Errorf("create encrypted reader: %w", encErr)
|
|
}
|
|
|
|
reencryptedData, readErr := io.ReadAll(encryptedReader)
|
|
if readErr != nil {
|
|
return nil, fmt.Errorf("re-encrypt chunk data: %w", readErr)
|
|
}
|
|
finalData = reencryptedData
|
|
|
|
// Update chunk size to include IV
|
|
dstChunk.Size = uint64(len(finalData))
|
|
}
|
|
|
|
// Upload the processed data
|
|
if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
|
|
return nil, fmt.Errorf("upload processed chunk data: %w", err)
|
|
}
|
|
|
|
return dstChunk, nil
|
|
}
|
|
|
|
// copyChunksWithSSEKMS handles SSE-KMS aware copying with smart fast/slow path selection
|
|
// Returns chunks and destination metadata like SSE-C for consistency
|
|
func (s3a *S3ApiServer) copyChunksWithSSEKMS(entry *filer_pb.Entry, r *http.Request, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
|
glog.Infof("copyChunksWithSSEKMS called for %s with %d chunks", r.URL.Path, len(entry.GetChunks()))
|
|
|
|
// Parse SSE-KMS headers from copy request
|
|
destKeyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Check if this is a multipart SSE-KMS object
|
|
isMultipartSSEKMS := false
|
|
sseKMSChunks := 0
|
|
for i, chunk := range entry.GetChunks() {
|
|
glog.V(4).Infof("Chunk %d: sseType=%d, hasKMSMetadata=%t", i, chunk.GetSseType(), len(chunk.GetSseMetadata()) > 0)
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
|
|
sseKMSChunks++
|
|
}
|
|
}
|
|
isMultipartSSEKMS = sseKMSChunks > 1
|
|
|
|
glog.Infof("SSE-KMS copy analysis: total chunks=%d, sseKMS chunks=%d, isMultipart=%t", len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS)
|
|
|
|
if isMultipartSSEKMS {
|
|
glog.V(2).Infof("Detected multipart SSE-KMS object with %d encrypted chunks for copy", sseKMSChunks)
|
|
return s3a.copyMultipartSSEKMSChunks(entry, destKeyID, encryptionContext, bucketKeyEnabled, r.URL.Path, bucket)
|
|
}
|
|
|
|
// Single-part SSE-KMS object: use existing logic
|
|
// If no SSE-KMS headers and source is not SSE-KMS encrypted, use regular copy
|
|
if destKeyID == "" && !IsSSEKMSEncrypted(entry.Extended) {
|
|
chunks, err := s3a.copyChunks(entry, r.URL.Path)
|
|
return chunks, nil, err
|
|
}
|
|
|
|
// Apply bucket default encryption if no explicit key specified
|
|
if destKeyID == "" {
|
|
bucketMetadata, err := s3a.getBucketMetadata(bucket)
|
|
if err != nil {
|
|
glog.V(2).Infof("Could not get bucket metadata for default encryption: %v", err)
|
|
} else if bucketMetadata != nil && bucketMetadata.Encryption != nil && bucketMetadata.Encryption.SseAlgorithm == "aws:kms" {
|
|
destKeyID = bucketMetadata.Encryption.KmsKeyId
|
|
bucketKeyEnabled = bucketMetadata.Encryption.BucketKeyEnabled
|
|
}
|
|
}
|
|
|
|
// Determine copy strategy
|
|
strategy, err := DetermineSSEKMSCopyStrategy(entry.Extended, destKeyID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
glog.V(2).Infof("SSE-KMS copy strategy for %s: %v", r.URL.Path, strategy)
|
|
|
|
switch strategy {
|
|
case SSEKMSCopyStrategyDirect:
|
|
// FAST PATH: Direct chunk copy (same key or both unencrypted)
|
|
glog.V(2).Infof("Using fast path: direct chunk copy for %s", r.URL.Path)
|
|
chunks, err := s3a.copyChunks(entry, r.URL.Path)
|
|
// For direct copy, generate destination metadata if we're encrypting to SSE-KMS
|
|
var dstMetadata map[string][]byte
|
|
if destKeyID != "" {
|
|
dstMetadata = make(map[string][]byte)
|
|
if encryptionContext == nil {
|
|
encryptionContext = BuildEncryptionContext(bucket, r.URL.Path, bucketKeyEnabled)
|
|
}
|
|
sseKey := &SSEKMSKey{
|
|
KeyID: destKeyID,
|
|
EncryptionContext: encryptionContext,
|
|
BucketKeyEnabled: bucketKeyEnabled,
|
|
}
|
|
if kmsMetadata, serializeErr := SerializeSSEKMSMetadata(sseKey); serializeErr == nil {
|
|
dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
|
|
glog.V(3).Infof("Generated SSE-KMS metadata for direct copy: keyID=%s", destKeyID)
|
|
} else {
|
|
glog.Errorf("Failed to serialize SSE-KMS metadata for direct copy: %v", serializeErr)
|
|
}
|
|
}
|
|
return chunks, dstMetadata, err
|
|
|
|
case SSEKMSCopyStrategyDecryptEncrypt:
|
|
// SLOW PATH: Decrypt source and re-encrypt for destination
|
|
glog.V(2).Infof("Using slow path: decrypt/re-encrypt for %s", r.URL.Path)
|
|
return s3a.copyChunksWithSSEKMSReencryption(entry, destKeyID, encryptionContext, bucketKeyEnabled, r.URL.Path, bucket)
|
|
|
|
default:
|
|
return nil, nil, fmt.Errorf("unknown SSE-KMS copy strategy: %v", strategy)
|
|
}
|
|
}
|
|
|
|
// copyChunksWithSSEKMSReencryption handles the slow path: decrypt source and re-encrypt for destination
|
|
// Returns chunks and destination metadata like SSE-C for consistency
|
|
func (s3a *S3ApiServer) copyChunksWithSSEKMSReencryption(entry *filer_pb.Entry, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
|
var dstChunks []*filer_pb.FileChunk
|
|
|
|
// Extract and deserialize source SSE-KMS metadata
|
|
var sourceSSEKey *SSEKMSKey
|
|
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
|
|
var err error
|
|
sourceSSEKey, err = DeserializeSSEKMSMetadata(keyData)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to deserialize source SSE-KMS metadata: %w", err)
|
|
}
|
|
glog.V(3).Infof("Extracted source SSE-KMS key: keyID=%s, bucketKey=%t", sourceSSEKey.KeyID, sourceSSEKey.BucketKeyEnabled)
|
|
}
|
|
|
|
// Process chunks
|
|
for _, chunk := range entry.GetChunks() {
|
|
dstChunk, err := s3a.copyChunkWithSSEKMSReencryption(chunk, sourceSSEKey, destKeyID, encryptionContext, bucketKeyEnabled, dstPath, bucket)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("copy chunk with SSE-KMS re-encryption: %w", err)
|
|
}
|
|
dstChunks = append(dstChunks, dstChunk)
|
|
}
|
|
|
|
// Generate destination metadata for SSE-KMS encryption (consistent with SSE-C pattern)
|
|
dstMetadata := make(map[string][]byte)
|
|
if destKeyID != "" {
|
|
// Build encryption context if not provided
|
|
if encryptionContext == nil {
|
|
encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled)
|
|
}
|
|
|
|
// Create SSE-KMS key structure for destination metadata
|
|
sseKey := &SSEKMSKey{
|
|
KeyID: destKeyID,
|
|
EncryptionContext: encryptionContext,
|
|
BucketKeyEnabled: bucketKeyEnabled,
|
|
// Note: EncryptedDataKey will be generated during actual encryption
|
|
// IV is also generated per chunk during encryption
|
|
}
|
|
|
|
// Serialize SSE-KMS metadata for storage
|
|
kmsMetadata, err := SerializeSSEKMSMetadata(sseKey)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("serialize destination SSE-KMS metadata: %w", err)
|
|
}
|
|
|
|
dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
|
|
glog.V(3).Infof("Generated destination SSE-KMS metadata: keyID=%s, bucketKey=%t", destKeyID, bucketKeyEnabled)
|
|
}
|
|
|
|
return dstChunks, dstMetadata, nil
|
|
}
|
|
|
|
// copyChunkWithSSEKMSReencryption copies a single chunk with SSE-KMS decrypt/re-encrypt
|
|
func (s3a *S3ApiServer) copyChunkWithSSEKMSReencryption(chunk *filer_pb.FileChunk, sourceSSEKey *SSEKMSKey, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) (*filer_pb.FileChunk, error) {
|
|
// Create destination chunk
|
|
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
|
|
|
|
// Prepare chunk copy (assign new volume and get source URL)
|
|
assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set file ID on destination chunk
|
|
if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Download chunk data
|
|
chunkData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("download chunk data: %w", err)
|
|
}
|
|
|
|
var finalData []byte
|
|
|
|
// Decrypt source data if it's SSE-KMS encrypted
|
|
if sourceSSEKey != nil {
|
|
// For SSE-KMS, the encrypted chunk data contains IV + encrypted content
|
|
// Use the source SSE key to decrypt the chunk data
|
|
decryptedReader, err := CreateSSEKMSDecryptedReader(bytes.NewReader(chunkData), sourceSSEKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create SSE-KMS decrypted reader: %w", err)
|
|
}
|
|
|
|
decryptedData, err := io.ReadAll(decryptedReader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("decrypt chunk data: %w", err)
|
|
}
|
|
finalData = decryptedData
|
|
glog.V(4).Infof("Decrypted chunk data: %d bytes → %d bytes", len(chunkData), len(finalData))
|
|
} else {
|
|
// Source is not SSE-KMS encrypted, use data as-is
|
|
finalData = chunkData
|
|
}
|
|
|
|
// Re-encrypt if destination should be SSE-KMS encrypted
|
|
if destKeyID != "" {
|
|
// Encryption context should already be provided by the caller
|
|
// But ensure we have a fallback for robustness
|
|
if encryptionContext == nil {
|
|
encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled)
|
|
}
|
|
|
|
encryptedReader, _, err := CreateSSEKMSEncryptedReaderWithBucketKey(bytes.NewReader(finalData), destKeyID, encryptionContext, bucketKeyEnabled)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create SSE-KMS encrypted reader: %w", err)
|
|
}
|
|
|
|
reencryptedData, err := io.ReadAll(encryptedReader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("re-encrypt chunk data: %w", err)
|
|
}
|
|
|
|
// Store original decrypted data size for logging
|
|
originalSize := len(finalData)
|
|
finalData = reencryptedData
|
|
glog.V(4).Infof("Re-encrypted chunk data: %d bytes → %d bytes", originalSize, len(finalData))
|
|
|
|
// Update chunk size to include IV and encryption overhead
|
|
dstChunk.Size = uint64(len(finalData))
|
|
}
|
|
|
|
// Upload the processed data
|
|
if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
|
|
return nil, fmt.Errorf("upload processed chunk data: %w", err)
|
|
}
|
|
|
|
glog.V(3).Infof("Successfully processed SSE-KMS chunk re-encryption: src_key=%s, dst_key=%s, size=%d→%d",
|
|
getKeyIDString(sourceSSEKey), destKeyID, len(chunkData), len(finalData))
|
|
|
|
return dstChunk, nil
|
|
}
|
|
|
|
// getKeyIDString safely gets the KeyID from an SSEKMSKey, handling nil cases
|
|
func getKeyIDString(key *SSEKMSKey) string {
|
|
if key == nil {
|
|
return "none"
|
|
}
|
|
if key.KeyID == "" {
|
|
return "default"
|
|
}
|
|
return key.KeyID
|
|
}
|
|
|
|
// EncryptionHeaderContext holds encryption type information and header classifications
|
|
type EncryptionHeaderContext struct {
|
|
SrcSSEC, SrcSSEKMS, SrcSSES3 bool
|
|
DstSSEC, DstSSEKMS, DstSSES3 bool
|
|
IsSSECHeader, IsSSEKMSHeader, IsSSES3Header bool
|
|
}
|
|
|
|
// newEncryptionHeaderContext creates a context for encryption header processing
|
|
func newEncryptionHeaderContext(headerKey string, srcSSEC, srcSSEKMS, srcSSES3, dstSSEC, dstSSEKMS, dstSSES3 bool) *EncryptionHeaderContext {
|
|
return &EncryptionHeaderContext{
|
|
SrcSSEC: srcSSEC, SrcSSEKMS: srcSSEKMS, SrcSSES3: srcSSES3,
|
|
DstSSEC: dstSSEC, DstSSEKMS: dstSSEKMS, DstSSES3: dstSSES3,
|
|
IsSSECHeader: isSSECHeader(headerKey),
|
|
IsSSEKMSHeader: isSSEKMSHeader(headerKey, srcSSEKMS, dstSSEKMS),
|
|
IsSSES3Header: isSSES3Header(headerKey, srcSSES3, dstSSES3),
|
|
}
|
|
}
|
|
|
|
// isSSECHeader checks if the header is SSE-C specific
|
|
func isSSECHeader(headerKey string) bool {
|
|
return headerKey == s3_constants.AmzServerSideEncryptionCustomerAlgorithm ||
|
|
headerKey == s3_constants.AmzServerSideEncryptionCustomerKeyMD5 ||
|
|
headerKey == s3_constants.SeaweedFSSSEIV
|
|
}
|
|
|
|
// isSSEKMSHeader checks if the header is SSE-KMS specific
|
|
func isSSEKMSHeader(headerKey string, srcSSEKMS, dstSSEKMS bool) bool {
|
|
return (headerKey == s3_constants.AmzServerSideEncryption && (srcSSEKMS || dstSSEKMS)) ||
|
|
headerKey == s3_constants.AmzServerSideEncryptionAwsKmsKeyId ||
|
|
headerKey == s3_constants.SeaweedFSSSEKMSKey ||
|
|
headerKey == s3_constants.SeaweedFSSSEKMSKeyID ||
|
|
headerKey == s3_constants.SeaweedFSSSEKMSEncryption ||
|
|
headerKey == s3_constants.SeaweedFSSSEKMSBucketKeyEnabled ||
|
|
headerKey == s3_constants.SeaweedFSSSEKMSEncryptionContext ||
|
|
headerKey == s3_constants.SeaweedFSSSEKMSBaseIV
|
|
}
|
|
|
|
// isSSES3Header checks if the header is SSE-S3 specific
|
|
func isSSES3Header(headerKey string, srcSSES3, dstSSES3 bool) bool {
|
|
return (headerKey == s3_constants.AmzServerSideEncryption && (srcSSES3 || dstSSES3)) ||
|
|
headerKey == s3_constants.SeaweedFSSSES3Key ||
|
|
headerKey == s3_constants.SeaweedFSSSES3Encryption ||
|
|
headerKey == s3_constants.SeaweedFSSSES3BaseIV ||
|
|
headerKey == s3_constants.SeaweedFSSSES3KeyData
|
|
}
|
|
|
|
// shouldSkipCrossEncryptionHeader handles cross-encryption copy scenarios
|
|
func (ctx *EncryptionHeaderContext) shouldSkipCrossEncryptionHeader() bool {
|
|
// SSE-C to SSE-KMS: skip SSE-C headers
|
|
if ctx.SrcSSEC && ctx.DstSSEKMS && ctx.IsSSECHeader {
|
|
return true
|
|
}
|
|
|
|
// SSE-KMS to SSE-C: skip SSE-KMS headers
|
|
if ctx.SrcSSEKMS && ctx.DstSSEC && ctx.IsSSEKMSHeader {
|
|
return true
|
|
}
|
|
|
|
// SSE-C to SSE-S3: skip SSE-C headers
|
|
if ctx.SrcSSEC && ctx.DstSSES3 && ctx.IsSSECHeader {
|
|
return true
|
|
}
|
|
|
|
// SSE-S3 to SSE-C: skip SSE-S3 headers
|
|
if ctx.SrcSSES3 && ctx.DstSSEC && ctx.IsSSES3Header {
|
|
return true
|
|
}
|
|
|
|
// SSE-KMS to SSE-S3: skip SSE-KMS headers
|
|
if ctx.SrcSSEKMS && ctx.DstSSES3 && ctx.IsSSEKMSHeader {
|
|
return true
|
|
}
|
|
|
|
// SSE-S3 to SSE-KMS: skip SSE-S3 headers
|
|
if ctx.SrcSSES3 && ctx.DstSSEKMS && ctx.IsSSES3Header {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// shouldSkipEncryptedToUnencryptedHeader handles encrypted to unencrypted copy scenarios
|
|
func (ctx *EncryptionHeaderContext) shouldSkipEncryptedToUnencryptedHeader() bool {
|
|
// Skip all encryption headers when copying from encrypted to unencrypted
|
|
hasSourceEncryption := ctx.SrcSSEC || ctx.SrcSSEKMS || ctx.SrcSSES3
|
|
hasDestinationEncryption := ctx.DstSSEC || ctx.DstSSEKMS || ctx.DstSSES3
|
|
isAnyEncryptionHeader := ctx.IsSSECHeader || ctx.IsSSEKMSHeader || ctx.IsSSES3Header
|
|
|
|
return hasSourceEncryption && !hasDestinationEncryption && isAnyEncryptionHeader
|
|
}
|
|
|
|
// shouldSkipEncryptionHeader determines if a header should be skipped when copying extended attributes
|
|
// based on the source and destination encryption types. This consolidates the repetitive logic for
|
|
// filtering encryption-related headers during copy operations.
|
|
func shouldSkipEncryptionHeader(headerKey string,
|
|
srcSSEC, srcSSEKMS, srcSSES3 bool,
|
|
dstSSEC, dstSSEKMS, dstSSES3 bool) bool {
|
|
|
|
// Create context to reduce complexity and improve testability
|
|
ctx := newEncryptionHeaderContext(headerKey, srcSSEC, srcSSEKMS, srcSSES3, dstSSEC, dstSSEKMS, dstSSES3)
|
|
|
|
// If it's not an encryption header, don't skip it
|
|
if !ctx.IsSSECHeader && !ctx.IsSSEKMSHeader && !ctx.IsSSES3Header {
|
|
return false
|
|
}
|
|
|
|
// Handle cross-encryption scenarios (different encryption types)
|
|
if ctx.shouldSkipCrossEncryptionHeader() {
|
|
return true
|
|
}
|
|
|
|
// Handle encrypted to unencrypted scenarios
|
|
if ctx.shouldSkipEncryptedToUnencryptedHeader() {
|
|
return true
|
|
}
|
|
|
|
// Default: don't skip the header
|
|
return false
|
|
}
|