You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							2290 lines
						
					
					
						
							80 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							2290 lines
						
					
					
						
							80 KiB
						
					
					
				| package s3api | |
| 
 | |
| import ( | |
| 	"bytes" | |
| 	"context" | |
| 	"crypto/rand" | |
| 	"encoding/base64" | |
| 	"fmt" | |
| 	"io" | |
| 	"net/http" | |
| 	"net/url" | |
| 	"strconv" | |
| 	"strings" | |
| 	"time" | |
| 
 | |
| 	"modernc.org/strutil" | |
| 
 | |
| 	"github.com/seaweedfs/seaweedfs/weed/filer" | |
| 	"github.com/seaweedfs/seaweedfs/weed/glog" | |
| 	"github.com/seaweedfs/seaweedfs/weed/operation" | |
| 	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" | |
| 	"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" | |
| 	"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" | |
| 	"github.com/seaweedfs/seaweedfs/weed/security" | |
| 	"github.com/seaweedfs/seaweedfs/weed/util" | |
| 	util_http "github.com/seaweedfs/seaweedfs/weed/util/http" | |
| ) | |
| 
 | |
| const ( | |
| 	DirectiveCopy    = "COPY" | |
| 	DirectiveReplace = "REPLACE" | |
| ) | |
| 
 | |
| func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { | |
| 
 | |
| 	dstBucket, dstObject := s3_constants.GetBucketAndObject(r) | |
| 
 | |
| 	// Copy source path. | |
| 	cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source")) | |
| 	if err != nil { | |
| 		// Save unescaped string as is. | |
| 		cpSrcPath = r.Header.Get("X-Amz-Copy-Source") | |
| 	} | |
| 
 | |
| 	srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath) | |
| 
 | |
| 	glog.V(3).Infof("CopyObjectHandler %s %s (version: %s) => %s %s", srcBucket, srcObject, srcVersionId, dstBucket, dstObject) | |
| 
 | |
| 	// Validate copy source and destination | |
| 	if err := ValidateCopySource(cpSrcPath, srcBucket, srcObject); err != nil { | |
| 		glog.V(2).Infof("CopyObjectHandler validation error: %v", err) | |
| 		errCode := MapCopyValidationError(err) | |
| 		s3err.WriteErrorResponse(w, r, errCode) | |
| 		return | |
| 	} | |
| 
 | |
| 	if err := ValidateCopyDestination(dstBucket, dstObject); err != nil { | |
| 		glog.V(2).Infof("CopyObjectHandler validation error: %v", err) | |
| 		errCode := MapCopyValidationError(err) | |
| 		s3err.WriteErrorResponse(w, r, errCode) | |
| 		return | |
| 	} | |
| 
 | |
| 	replaceMeta, replaceTagging := replaceDirective(r.Header) | |
| 
 | |
| 	if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) { | |
| 		fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) | |
| 		dir, name := fullPath.DirAndName() | |
| 		entry, err := s3a.getEntry(dir, name) | |
| 		if err != nil || entry.IsDirectory { | |
| 			s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) | |
| 			return | |
| 		} | |
| 		entry.Extended, err = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging) | |
| 		entry.Attributes.Mtime = time.Now().Unix() | |
| 		if err != nil { | |
| 			glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, err) | |
| 			s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag) | |
| 			return | |
| 		} | |
| 		err = s3a.touch(dir, name, entry) | |
| 		if err != nil { | |
| 			s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) | |
| 			return | |
| 		} | |
| 		writeSuccessResponseXML(w, r, CopyObjectResult{ | |
| 			ETag:         fmt.Sprintf("%x", entry.Attributes.Md5), | |
| 			LastModified: time.Now().UTC(), | |
| 		}) | |
| 		return | |
| 	} | |
| 
 | |
| 	// If source object is empty or bucket is empty, reply back invalid copy source. | |
| 	if srcObject == "" || srcBucket == "" { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Get detailed versioning state for source bucket | |
| 	srcVersioningState, err := s3a.getVersioningState(srcBucket) | |
| 	if err != nil { | |
| 		glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err) | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Get the source entry with version awareness based on versioning state | |
| 	var entry *filer_pb.Entry | |
| 	if srcVersionId != "" { | |
| 		// Specific version requested - always use version-aware retrieval | |
| 		entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId) | |
| 	} else if srcVersioningState == s3_constants.VersioningEnabled { | |
| 		// Versioning enabled - get latest version from .versions directory | |
| 		entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) | |
| 	} else if srcVersioningState == s3_constants.VersioningSuspended { | |
| 		// Versioning suspended - current object is stored as regular file ("null" version) | |
| 		// Try regular file first, fall back to latest version if needed | |
| 		srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) | |
| 		dir, name := srcPath.DirAndName() | |
| 		entry, err = s3a.getEntry(dir, name) | |
| 		if err != nil { | |
| 			// If regular file doesn't exist, try latest version as fallback | |
| 			glog.V(2).Infof("CopyObject: regular file not found for suspended versioning, trying latest version") | |
| 			entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) | |
| 		} | |
| 	} else { | |
| 		// No versioning configured - use regular retrieval | |
| 		srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) | |
| 		dir, name := srcPath.DirAndName() | |
| 		entry, err = s3a.getEntry(dir, name) | |
| 	} | |
| 
 | |
| 	if err != nil || entry.IsDirectory { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) | |
| 		return | |
| 	} | |
| 
 | |
| 	if srcBucket == dstBucket && srcObject == dstObject { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopyDest) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Validate conditional copy headers | |
| 	if err := s3a.validateConditionalCopyHeaders(r, entry); err != s3err.ErrNone { | |
| 		s3err.WriteErrorResponse(w, r, err) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Validate encryption parameters | |
| 	if err := ValidateCopyEncryption(entry.Extended, r.Header); err != nil { | |
| 		glog.V(2).Infof("CopyObjectHandler encryption validation error: %v", err) | |
| 		errCode := MapCopyValidationError(err) | |
| 		s3err.WriteErrorResponse(w, r, errCode) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Create new entry for destination | |
| 	dstEntry := &filer_pb.Entry{ | |
| 		Attributes: &filer_pb.FuseAttributes{ | |
| 			FileSize: entry.Attributes.FileSize, | |
| 			Mtime:    time.Now().Unix(), | |
| 			Crtime:   entry.Attributes.Crtime, | |
| 			Mime:     entry.Attributes.Mime, | |
| 		}, | |
| 		Extended: make(map[string][]byte), | |
| 	} | |
| 
 | |
| 	// Copy extended attributes from source, filtering out conflicting encryption metadata | |
| 	for k, v := range entry.Extended { | |
| 		// Skip encryption-specific headers that might conflict with destination encryption type | |
| 		skipHeader := false | |
| 
 | |
| 		// If we're doing cross-encryption, skip conflicting headers | |
| 		if len(entry.GetChunks()) > 0 { | |
| 			// Detect source and destination encryption types | |
| 			srcHasSSEC := IsSSECEncrypted(entry.Extended) | |
| 			srcHasSSEKMS := IsSSEKMSEncrypted(entry.Extended) | |
| 			srcHasSSES3 := IsSSES3EncryptedInternal(entry.Extended) | |
| 			dstWantsSSEC := IsSSECRequest(r) | |
| 			dstWantsSSEKMS := IsSSEKMSRequest(r) | |
| 			dstWantsSSES3 := IsSSES3RequestInternal(r) | |
| 
 | |
| 			// Use helper function to determine if header should be skipped | |
| 			skipHeader = shouldSkipEncryptionHeader(k, | |
| 				srcHasSSEC, srcHasSSEKMS, srcHasSSES3, | |
| 				dstWantsSSEC, dstWantsSSEKMS, dstWantsSSES3) | |
| 		} | |
| 
 | |
| 		if !skipHeader { | |
| 			dstEntry.Extended[k] = v | |
| 		} | |
| 	} | |
| 
 | |
| 	// Process metadata and tags and apply to destination | |
| 	processedMetadata, tagErr := processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging) | |
| 	if tagErr != nil { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Apply processed metadata to destination entry | |
| 	for k, v := range processedMetadata { | |
| 		dstEntry.Extended[k] = v | |
| 	} | |
| 
 | |
| 	// For zero-size files or files without chunks, use the original approach | |
| 	if entry.Attributes.FileSize == 0 || len(entry.GetChunks()) == 0 { | |
| 		// Just copy the entry structure without chunks for zero-size files | |
| 		dstEntry.Chunks = nil | |
| 	} else { | |
| 		// Use unified copy strategy approach | |
| 		dstChunks, dstMetadata, copyErr := s3a.executeUnifiedCopyStrategy(entry, r, dstBucket, srcObject, dstObject) | |
| 		if copyErr != nil { | |
| 			glog.Errorf("CopyObjectHandler unified copy error: %v", copyErr) | |
| 			// Map errors to appropriate S3 errors | |
| 			errCode := s3a.mapCopyErrorToS3Error(copyErr) | |
| 			s3err.WriteErrorResponse(w, r, errCode) | |
| 			return | |
| 		} | |
| 
 | |
| 		dstEntry.Chunks = dstChunks | |
| 
 | |
| 		// Apply destination-specific metadata (e.g., SSE-C IV and headers) | |
| 		if dstMetadata != nil { | |
| 			for k, v := range dstMetadata { | |
| 				dstEntry.Extended[k] = v | |
| 			} | |
| 			glog.V(2).Infof("Applied %d destination metadata entries for copy: %s", len(dstMetadata), r.URL.Path) | |
| 		} | |
| 	} | |
| 
 | |
| 	// Check if destination bucket has versioning configured | |
| 	dstVersioningConfigured, err := s3a.isVersioningConfigured(dstBucket) | |
| 	if err != nil { | |
| 		glog.Errorf("Error checking versioning status for destination bucket %s: %v", dstBucket, err) | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) | |
| 		return | |
| 	} | |
| 
 | |
| 	var dstVersionId string | |
| 	var etag string | |
| 
 | |
| 	if dstVersioningConfigured { | |
| 		// For versioned destination, create a new version | |
| 		dstVersionId = generateVersionId() | |
| 		glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject) | |
| 
 | |
| 		// Add version metadata to the entry | |
| 		if dstEntry.Extended == nil { | |
| 			dstEntry.Extended = make(map[string][]byte) | |
| 		} | |
| 		dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(dstVersionId) | |
| 
 | |
| 		// Calculate ETag for versioning | |
| 		filerEntry := &filer.Entry{ | |
| 			FullPath: util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)), | |
| 			Attr: filer.Attr{ | |
| 				FileSize: dstEntry.Attributes.FileSize, | |
| 				Mtime:    time.Unix(dstEntry.Attributes.Mtime, 0), | |
| 				Crtime:   time.Unix(dstEntry.Attributes.Crtime, 0), | |
| 				Mime:     dstEntry.Attributes.Mime, | |
| 			}, | |
| 			Chunks: dstEntry.Chunks, | |
| 		} | |
| 		etag = filer.ETagEntry(filerEntry) | |
| 		if !strings.HasPrefix(etag, "\"") { | |
| 			etag = "\"" + etag + "\"" | |
| 		} | |
| 		dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) | |
| 
 | |
| 		// Create version file | |
| 		versionFileName := s3a.getVersionFileName(dstVersionId) | |
| 		versionObjectPath := dstObject + ".versions/" + versionFileName | |
| 		bucketDir := s3a.option.BucketsPath + "/" + dstBucket | |
| 
 | |
| 		if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) { | |
| 			entry.Attributes = dstEntry.Attributes | |
| 			entry.Extended = dstEntry.Extended | |
| 		}); err != nil { | |
| 			s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) | |
| 			return | |
| 		} | |
| 
 | |
| 		// Update the .versions directory metadata | |
| 		err = s3a.updateLatestVersionInDirectory(dstBucket, dstObject, dstVersionId, versionFileName) | |
| 		if err != nil { | |
| 			glog.Errorf("CopyObjectHandler: failed to update latest version in directory: %v", err) | |
| 			s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) | |
| 			return | |
| 		} | |
| 
 | |
| 		// Set version ID in response header | |
| 		w.Header().Set("x-amz-version-id", dstVersionId) | |
| 	} else { | |
| 		// For non-versioned destination, use regular copy | |
| 		dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) | |
| 		dstDir, dstName := dstPath.DirAndName() | |
| 
 | |
| 		// Check if destination exists and remove it first (S3 copy overwrites) | |
| 		if exists, _ := s3a.exists(dstDir, dstName, false); exists { | |
| 			if err := s3a.rm(dstDir, dstName, false, false); err != nil { | |
| 				s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) | |
| 				return | |
| 			} | |
| 		} | |
| 
 | |
| 		// Create the new file | |
| 		if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) { | |
| 			entry.Attributes = dstEntry.Attributes | |
| 			entry.Extended = dstEntry.Extended | |
| 		}); err != nil { | |
| 			s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) | |
| 			return | |
| 		} | |
| 
 | |
| 		// Calculate ETag | |
| 		filerEntry := &filer.Entry{ | |
| 			FullPath: dstPath, | |
| 			Attr: filer.Attr{ | |
| 				FileSize: dstEntry.Attributes.FileSize, | |
| 				Mtime:    time.Unix(dstEntry.Attributes.Mtime, 0), | |
| 				Crtime:   time.Unix(dstEntry.Attributes.Crtime, 0), | |
| 				Mime:     dstEntry.Attributes.Mime, | |
| 			}, | |
| 			Chunks: dstEntry.Chunks, | |
| 		} | |
| 		etag = filer.ETagEntry(filerEntry) | |
| 	} | |
| 
 | |
| 	setEtag(w, etag) | |
| 
 | |
| 	response := CopyObjectResult{ | |
| 		ETag:         etag, | |
| 		LastModified: time.Now().UTC(), | |
| 	} | |
| 
 | |
| 	writeSuccessResponseXML(w, r, response) | |
| 
 | |
| } | |
| 
 | |
| func pathToBucketAndObject(path string) (bucket, object string) { | |
| 	path = strings.TrimPrefix(path, "/") | |
| 	parts := strings.SplitN(path, "/", 2) | |
| 	if len(parts) == 2 { | |
| 		return parts[0], "/" + parts[1] | |
| 	} | |
| 	return parts[0], "/" | |
| } | |
| 
 | |
| func pathToBucketObjectAndVersion(path string) (bucket, object, versionId string) { | |
| 	// Parse versionId from query string if present | |
| 	// Format: /bucket/object?versionId=version-id | |
| 	if idx := strings.Index(path, "?versionId="); idx != -1 { | |
| 		versionId = path[idx+len("?versionId="):] // dynamically calculate length | |
| 		path = path[:idx] | |
| 	} | |
| 
 | |
| 	bucket, object = pathToBucketAndObject(path) | |
| 	return bucket, object, versionId | |
| } | |
| 
 | |
| type CopyPartResult struct { | |
| 	LastModified time.Time `xml:"LastModified"` | |
| 	ETag         string    `xml:"ETag"` | |
| } | |
| 
 | |
| func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) { | |
| 	// https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html | |
| 	// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html | |
| 	dstBucket, dstObject := s3_constants.GetBucketAndObject(r) | |
| 
 | |
| 	// Copy source path. | |
| 	cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source")) | |
| 	if err != nil { | |
| 		// Save unescaped string as is. | |
| 		cpSrcPath = r.Header.Get("X-Amz-Copy-Source") | |
| 	} | |
| 
 | |
| 	srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath) | |
| 	// If source object is empty or bucket is empty, reply back invalid copy source. | |
| 	if srcObject == "" || srcBucket == "" { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) | |
| 		return | |
| 	} | |
| 
 | |
| 	partIDString := r.URL.Query().Get("partNumber") | |
| 	uploadID := r.URL.Query().Get("uploadId") | |
| 
 | |
| 	partID, err := strconv.Atoi(partIDString) | |
| 	if err != nil { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Check if the upload ID is valid | |
| 	err = s3a.checkUploadId(dstObject, uploadID) | |
| 	if err != nil { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) | |
| 		return | |
| 	} | |
| 
 | |
| 	glog.V(3).Infof("CopyObjectPartHandler %s %s => %s part %d upload %s", srcBucket, srcObject, dstBucket, partID, uploadID) | |
| 
 | |
| 	// check partID with maximum part ID for multipart objects | |
| 	if partID > s3_constants.MaxS3MultipartParts { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Get detailed versioning state for source bucket | |
| 	srcVersioningState, err := s3a.getVersioningState(srcBucket) | |
| 	if err != nil { | |
| 		glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err) | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Get the source entry with version awareness based on versioning state | |
| 	var entry *filer_pb.Entry | |
| 	if srcVersionId != "" { | |
| 		// Specific version requested - always use version-aware retrieval | |
| 		entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId) | |
| 	} else if srcVersioningState == s3_constants.VersioningEnabled { | |
| 		// Versioning enabled - get latest version from .versions directory | |
| 		entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) | |
| 	} else if srcVersioningState == s3_constants.VersioningSuspended { | |
| 		// Versioning suspended - current object is stored as regular file ("null" version) | |
| 		// Try regular file first, fall back to latest version if needed | |
| 		srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) | |
| 		dir, name := srcPath.DirAndName() | |
| 		entry, err = s3a.getEntry(dir, name) | |
| 		if err != nil { | |
| 			// If regular file doesn't exist, try latest version as fallback | |
| 			glog.V(2).Infof("CopyObjectPart: regular file not found for suspended versioning, trying latest version") | |
| 			entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject) | |
| 		} | |
| 	} else { | |
| 		// No versioning configured - use regular retrieval | |
| 		srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject)) | |
| 		dir, name := srcPath.DirAndName() | |
| 		entry, err = s3a.getEntry(dir, name) | |
| 	} | |
| 
 | |
| 	if err != nil || entry.IsDirectory { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Validate conditional copy headers | |
| 	if err := s3a.validateConditionalCopyHeaders(r, entry); err != s3err.ErrNone { | |
| 		s3err.WriteErrorResponse(w, r, err) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Handle range header if present | |
| 	rangeHeader := r.Header.Get("x-amz-copy-source-range") | |
| 	var startOffset, endOffset int64 | |
| 	if rangeHeader != "" { | |
| 		startOffset, endOffset, err = parseRangeHeader(rangeHeader) | |
| 		if err != nil { | |
| 			s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) | |
| 			return | |
| 		} | |
| 	} else { | |
| 		startOffset = 0 | |
| 		if entry.Attributes.FileSize == 0 { | |
| 			endOffset = -1 // For zero-size files, use -1 as endOffset | |
| 		} else { | |
| 			endOffset = int64(entry.Attributes.FileSize) - 1 | |
| 		} | |
| 	} | |
| 
 | |
| 	// Create new entry for the part | |
| 	dstEntry := &filer_pb.Entry{ | |
| 		Attributes: &filer_pb.FuseAttributes{ | |
| 			FileSize: uint64(endOffset - startOffset + 1), | |
| 			Mtime:    time.Now().Unix(), | |
| 			Crtime:   time.Now().Unix(), | |
| 			Mime:     entry.Attributes.Mime, | |
| 		}, | |
| 		Extended: make(map[string][]byte), | |
| 	} | |
| 
 | |
| 	// Handle zero-size files or empty ranges | |
| 	if entry.Attributes.FileSize == 0 || endOffset < startOffset { | |
| 		// For zero-size files or invalid ranges, create an empty part | |
| 		dstEntry.Chunks = nil | |
| 	} else { | |
| 		// Copy chunks that overlap with the range | |
| 		dstChunks, err := s3a.copyChunksForRange(entry, startOffset, endOffset, r.URL.Path) | |
| 		if err != nil { | |
| 			glog.Errorf("CopyObjectPartHandler copy chunks error: %v", err) | |
| 			s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) | |
| 			return | |
| 		} | |
| 		dstEntry.Chunks = dstChunks | |
| 	} | |
| 
 | |
| 	// Save the part entry to the multipart uploads folder | |
| 	uploadDir := s3a.genUploadsFolder(dstBucket) + "/" + uploadID | |
| 	partName := fmt.Sprintf("%04d_%s.part", partID, "copy") | |
| 
 | |
| 	// Check if part exists and remove it first (allow re-copying same part) | |
| 	if exists, _ := s3a.exists(uploadDir, partName, false); exists { | |
| 		if err := s3a.rm(uploadDir, partName, false, false); err != nil { | |
| 			s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) | |
| 			return | |
| 		} | |
| 	} | |
| 
 | |
| 	if err := s3a.mkFile(uploadDir, partName, dstEntry.Chunks, func(entry *filer_pb.Entry) { | |
| 		entry.Attributes = dstEntry.Attributes | |
| 		entry.Extended = dstEntry.Extended | |
| 	}); err != nil { | |
| 		s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) | |
| 		return | |
| 	} | |
| 
 | |
| 	// Calculate ETag for the part | |
| 	partPath := util.FullPath(uploadDir + "/" + partName) | |
| 	filerEntry := &filer.Entry{ | |
| 		FullPath: partPath, | |
| 		Attr: filer.Attr{ | |
| 			FileSize: dstEntry.Attributes.FileSize, | |
| 			Mtime:    time.Unix(dstEntry.Attributes.Mtime, 0), | |
| 			Crtime:   time.Unix(dstEntry.Attributes.Crtime, 0), | |
| 			Mime:     dstEntry.Attributes.Mime, | |
| 		}, | |
| 		Chunks: dstEntry.Chunks, | |
| 	} | |
| 
 | |
| 	etag := filer.ETagEntry(filerEntry) | |
| 	setEtag(w, etag) | |
| 
 | |
| 	response := CopyPartResult{ | |
| 		ETag:         etag, | |
| 		LastModified: time.Now().UTC(), | |
| 	} | |
| 
 | |
| 	writeSuccessResponseXML(w, r, response) | |
| } | |
| 
 | |
| func replaceDirective(reqHeader http.Header) (replaceMeta, replaceTagging bool) { | |
| 	return reqHeader.Get(s3_constants.AmzUserMetaDirective) == DirectiveReplace, reqHeader.Get(s3_constants.AmzObjectTaggingDirective) == DirectiveReplace | |
| } | |
| 
 | |
| func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTagging bool, getTags func(parentDirectoryPath string, entryName string) (tags map[string]string, err error), dir, name string) (err error) { | |
| 	if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) == 0 { | |
| 		if sc := existing.Get(s3_constants.AmzStorageClass); len(sc) > 0 { | |
| 			reqHeader.Set(s3_constants.AmzStorageClass, sc) | |
| 		} | |
| 	} | |
| 
 | |
| 	if !replaceMeta { | |
| 		for header := range reqHeader { | |
| 			if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) { | |
| 				delete(reqHeader, header) | |
| 			} | |
| 		} | |
| 		for k, v := range existing { | |
| 			if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) { | |
| 				reqHeader[k] = v | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	if !replaceTagging { | |
| 		for header, _ := range reqHeader { | |
| 			if strings.HasPrefix(header, s3_constants.AmzObjectTagging) { | |
| 				delete(reqHeader, header) | |
| 			} | |
| 		} | |
| 
 | |
| 		found := false | |
| 		for k, _ := range existing { | |
| 			if strings.HasPrefix(k, s3_constants.AmzObjectTaggingPrefix) { | |
| 				found = true | |
| 				break | |
| 			} | |
| 		} | |
| 
 | |
| 		if found { | |
| 			tags, err := getTags(dir, name) | |
| 			if err != nil { | |
| 				return err | |
| 			} | |
| 
 | |
| 			var tagArr []string | |
| 			for k, v := range tags { | |
| 				tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, v)) | |
| 			} | |
| 			tagStr := strutil.JoinFields(tagArr, "&") | |
| 			reqHeader.Set(s3_constants.AmzObjectTagging, tagStr) | |
| 		} | |
| 	} | |
| 	return | |
| } | |
| 
 | |
| func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte, err error) { | |
| 	metadata = make(map[string][]byte) | |
| 
 | |
| 	if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 { | |
| 		metadata[s3_constants.AmzStorageClass] = sc | |
| 	} | |
| 	if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) > 0 { | |
| 		metadata[s3_constants.AmzStorageClass] = []byte(sc) | |
| 	} | |
| 
 | |
| 	// Handle SSE-KMS headers - these are always processed from request headers if present | |
| 	if sseAlgorithm := reqHeader.Get(s3_constants.AmzServerSideEncryption); sseAlgorithm == "aws:kms" { | |
| 		metadata[s3_constants.AmzServerSideEncryption] = []byte(sseAlgorithm) | |
| 
 | |
| 		// KMS Key ID (optional - can use default key) | |
| 		if kmsKeyID := reqHeader.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); kmsKeyID != "" { | |
| 			metadata[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(kmsKeyID) | |
| 		} | |
| 
 | |
| 		// Encryption Context (optional) | |
| 		if encryptionContext := reqHeader.Get(s3_constants.AmzServerSideEncryptionContext); encryptionContext != "" { | |
| 			metadata[s3_constants.AmzServerSideEncryptionContext] = []byte(encryptionContext) | |
| 		} | |
| 
 | |
| 		// Bucket Key Enabled (optional) | |
| 		if bucketKeyEnabled := reqHeader.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled != "" { | |
| 			metadata[s3_constants.AmzServerSideEncryptionBucketKeyEnabled] = []byte(bucketKeyEnabled) | |
| 		} | |
| 	} else { | |
| 		// If not explicitly setting SSE-KMS, preserve existing SSE headers from source | |
| 		for _, sseHeader := range []string{ | |
| 			s3_constants.AmzServerSideEncryption, | |
| 			s3_constants.AmzServerSideEncryptionAwsKmsKeyId, | |
| 			s3_constants.AmzServerSideEncryptionContext, | |
| 			s3_constants.AmzServerSideEncryptionBucketKeyEnabled, | |
| 		} { | |
| 			if existingValue, exists := existing[sseHeader]; exists { | |
| 				metadata[sseHeader] = existingValue | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	// Handle SSE-C headers - these are always processed from request headers if present | |
| 	if sseCustomerAlgorithm := reqHeader.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); sseCustomerAlgorithm != "" { | |
| 		metadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte(sseCustomerAlgorithm) | |
| 
 | |
| 		if sseCustomerKeyMD5 := reqHeader.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); sseCustomerKeyMD5 != "" { | |
| 			metadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(sseCustomerKeyMD5) | |
| 		} | |
| 	} else { | |
| 		// If not explicitly setting SSE-C, preserve existing SSE-C headers from source | |
| 		for _, ssecHeader := range []string{ | |
| 			s3_constants.AmzServerSideEncryptionCustomerAlgorithm, | |
| 			s3_constants.AmzServerSideEncryptionCustomerKeyMD5, | |
| 		} { | |
| 			if existingValue, exists := existing[ssecHeader]; exists { | |
| 				metadata[ssecHeader] = existingValue | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	if replaceMeta { | |
| 		for header, values := range reqHeader { | |
| 			if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) { | |
| 				for _, value := range values { | |
| 					metadata[header] = []byte(value) | |
| 				} | |
| 			} | |
| 		} | |
| 	} else { | |
| 		for k, v := range existing { | |
| 			if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) { | |
| 				metadata[k] = v | |
| 			} | |
| 		} | |
| 	} | |
| 	if replaceTagging { | |
| 		if tags := reqHeader.Get(s3_constants.AmzObjectTagging); tags != "" { | |
| 			parsedTags, err := parseTagsHeader(tags) | |
| 			if err != nil { | |
| 				return nil, err | |
| 			} | |
| 			err = ValidateTags(parsedTags) | |
| 			if err != nil { | |
| 				return nil, err | |
| 			} | |
| 			for k, v := range parsedTags { | |
| 				metadata[s3_constants.AmzObjectTagging+"-"+k] = []byte(v) | |
| 			} | |
| 		} | |
| 	} else { | |
| 		for k, v := range existing { | |
| 			if strings.HasPrefix(k, s3_constants.AmzObjectTagging) { | |
| 				metadata[k] = v | |
| 			} | |
| 		} | |
| 		delete(metadata, s3_constants.AmzTagCount) | |
| 	} | |
| 
 | |
| 	return | |
| } | |
| 
 | |
| // copyChunks replicates chunks from source entry to destination entry | |
| func (s3a *S3ApiServer) copyChunks(entry *filer_pb.Entry, dstPath string) ([]*filer_pb.FileChunk, error) { | |
| 	dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks())) | |
| 	const defaultChunkCopyConcurrency = 4 | |
| 	executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations | |
| 	errChan := make(chan error, len(entry.GetChunks())) | |
| 
 | |
| 	for i, chunk := range entry.GetChunks() { | |
| 		chunkIndex := i | |
| 		executor.Execute(func() { | |
| 			dstChunk, err := s3a.copySingleChunk(chunk, dstPath) | |
| 			if err != nil { | |
| 				errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err) | |
| 				return | |
| 			} | |
| 			dstChunks[chunkIndex] = dstChunk | |
| 			errChan <- nil | |
| 		}) | |
| 	} | |
| 
 | |
| 	// Wait for all operations to complete and check for errors | |
| 	for i := 0; i < len(entry.GetChunks()); i++ { | |
| 		if err := <-errChan; err != nil { | |
| 			return nil, err | |
| 		} | |
| 	} | |
| 
 | |
| 	return dstChunks, nil | |
| } | |
| 
 | |
| // copySingleChunk copies a single chunk from source to destination | |
| func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath string) (*filer_pb.FileChunk, error) { | |
| 	// Create destination chunk | |
| 	dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) | |
| 
 | |
| 	// Prepare chunk copy (assign new volume and get source URL) | |
| 	assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Set file ID on destination chunk | |
| 	if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Download and upload the chunk | |
| 	chunkData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("download chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	if err := s3a.uploadChunkData(chunkData, assignResult); err != nil { | |
| 		return nil, fmt.Errorf("upload chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	return dstChunk, nil | |
| } | |
| 
 | |
| // copySingleChunkForRange copies a portion of a chunk for range operations | |
| func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer_pb.FileChunk, rangeStart, rangeEnd int64, dstPath string) (*filer_pb.FileChunk, error) { | |
| 	// Create destination chunk | |
| 	dstChunk := s3a.createDestinationChunk(rangeChunk, rangeChunk.Offset, rangeChunk.Size) | |
| 
 | |
| 	// Prepare chunk copy (assign new volume and get source URL) | |
| 	assignResult, srcUrl, err := s3a.prepareChunkCopy(originalChunk.GetFileIdString(), dstPath) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Set file ID on destination chunk | |
| 	if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Calculate the portion of the original chunk that we need to copy | |
| 	chunkStart := originalChunk.Offset | |
| 	overlapStart := max(rangeStart, chunkStart) | |
| 	offsetInChunk := overlapStart - chunkStart | |
| 
 | |
| 	// Download and upload the chunk portion | |
| 	chunkData, err := s3a.downloadChunkData(srcUrl, offsetInChunk, int64(rangeChunk.Size)) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("download chunk range data: %w", err) | |
| 	} | |
| 
 | |
| 	if err := s3a.uploadChunkData(chunkData, assignResult); err != nil { | |
| 		return nil, fmt.Errorf("upload chunk range data: %w", err) | |
| 	} | |
| 
 | |
| 	return dstChunk, nil | |
| } | |
| 
 | |
| // assignNewVolume assigns a new volume for the chunk | |
| func (s3a *S3ApiServer) assignNewVolume(dstPath string) (*filer_pb.AssignVolumeResponse, error) { | |
| 	var assignResult *filer_pb.AssignVolumeResponse | |
| 	err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |
| 		resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{ | |
| 			Count:       1, | |
| 			Replication: "", | |
| 			Collection:  "", | |
| 			DiskType:    "", | |
| 			DataCenter:  s3a.option.DataCenter, | |
| 			Path:        dstPath, | |
| 		}) | |
| 		if err != nil { | |
| 			return fmt.Errorf("assign volume: %w", err) | |
| 		} | |
| 		if resp.Error != "" { | |
| 			return fmt.Errorf("assign volume: %v", resp.Error) | |
| 		} | |
| 		assignResult = resp | |
| 		return nil | |
| 	}) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 	return assignResult, nil | |
| } | |
| 
 | |
| // min returns the minimum of two int64 values | |
| func min(a, b int64) int64 { | |
| 	if a < b { | |
| 		return a | |
| 	} | |
| 	return b | |
| } | |
| 
 | |
| // max returns the maximum of two int64 values | |
| func max(a, b int64) int64 { | |
| 	if a > b { | |
| 		return a | |
| 	} | |
| 	return b | |
| } | |
| 
 | |
| // parseRangeHeader parses the x-amz-copy-source-range header | |
| func parseRangeHeader(rangeHeader string) (startOffset, endOffset int64, err error) { | |
| 	// Remove "bytes=" prefix if present | |
| 	rangeStr := strings.TrimPrefix(rangeHeader, "bytes=") | |
| 	parts := strings.Split(rangeStr, "-") | |
| 	if len(parts) != 2 { | |
| 		return 0, 0, fmt.Errorf("invalid range format") | |
| 	} | |
| 
 | |
| 	startOffset, err = strconv.ParseInt(parts[0], 10, 64) | |
| 	if err != nil { | |
| 		return 0, 0, fmt.Errorf("invalid start offset: %w", err) | |
| 	} | |
| 
 | |
| 	endOffset, err = strconv.ParseInt(parts[1], 10, 64) | |
| 	if err != nil { | |
| 		return 0, 0, fmt.Errorf("invalid end offset: %w", err) | |
| 	} | |
| 
 | |
| 	return startOffset, endOffset, nil | |
| } | |
| 
 | |
| // copyChunksForRange copies chunks that overlap with the specified range | |
| func (s3a *S3ApiServer) copyChunksForRange(entry *filer_pb.Entry, startOffset, endOffset int64, dstPath string) ([]*filer_pb.FileChunk, error) { | |
| 	var relevantChunks []*filer_pb.FileChunk | |
| 
 | |
| 	// Find chunks that overlap with the range | |
| 	for _, chunk := range entry.GetChunks() { | |
| 		chunkStart := chunk.Offset | |
| 		chunkEnd := chunk.Offset + int64(chunk.Size) | |
| 
 | |
| 		// Check if chunk overlaps with the range | |
| 		if chunkStart < endOffset+1 && chunkEnd > startOffset { | |
| 			// Calculate the overlap | |
| 			overlapStart := max(startOffset, chunkStart) | |
| 			overlapEnd := min(endOffset+1, chunkEnd) | |
| 
 | |
| 			// Create a new chunk with adjusted offset and size relative to the range | |
| 			newChunk := &filer_pb.FileChunk{ | |
| 				FileId:       chunk.FileId, | |
| 				Offset:       overlapStart - startOffset, // Offset relative to the range start | |
| 				Size:         uint64(overlapEnd - overlapStart), | |
| 				ModifiedTsNs: time.Now().UnixNano(), | |
| 				ETag:         chunk.ETag, | |
| 				IsCompressed: chunk.IsCompressed, | |
| 				CipherKey:    chunk.CipherKey, | |
| 				Fid:          chunk.Fid, | |
| 			} | |
| 			relevantChunks = append(relevantChunks, newChunk) | |
| 		} | |
| 	} | |
| 
 | |
| 	// Copy the relevant chunks using a specialized method for range copies | |
| 	dstChunks := make([]*filer_pb.FileChunk, len(relevantChunks)) | |
| 	const defaultChunkCopyConcurrency = 4 | |
| 	executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) | |
| 	errChan := make(chan error, len(relevantChunks)) | |
| 
 | |
| 	// Create a map to track original chunks for each relevant chunk | |
| 	originalChunks := make([]*filer_pb.FileChunk, len(relevantChunks)) | |
| 	relevantIndex := 0 | |
| 	for _, chunk := range entry.GetChunks() { | |
| 		chunkStart := chunk.Offset | |
| 		chunkEnd := chunk.Offset + int64(chunk.Size) | |
| 
 | |
| 		// Check if chunk overlaps with the range | |
| 		if chunkStart < endOffset+1 && chunkEnd > startOffset { | |
| 			originalChunks[relevantIndex] = chunk | |
| 			relevantIndex++ | |
| 		} | |
| 	} | |
| 
 | |
| 	for i, chunk := range relevantChunks { | |
| 		chunkIndex := i | |
| 		originalChunk := originalChunks[i] // Get the corresponding original chunk | |
| 		executor.Execute(func() { | |
| 			dstChunk, err := s3a.copySingleChunkForRange(originalChunk, chunk, startOffset, endOffset, dstPath) | |
| 			if err != nil { | |
| 				errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err) | |
| 				return | |
| 			} | |
| 			dstChunks[chunkIndex] = dstChunk | |
| 			errChan <- nil | |
| 		}) | |
| 	} | |
| 
 | |
| 	// Wait for all operations to complete and check for errors | |
| 	for i := 0; i < len(relevantChunks); i++ { | |
| 		if err := <-errChan; err != nil { | |
| 			return nil, err | |
| 		} | |
| 	} | |
| 
 | |
| 	return dstChunks, nil | |
| } | |
| 
 | |
| // Helper methods for copy operations to avoid code duplication | |
|  | |
| // validateConditionalCopyHeaders validates the conditional copy headers against the source entry | |
| func (s3a *S3ApiServer) validateConditionalCopyHeaders(r *http.Request, entry *filer_pb.Entry) s3err.ErrorCode { | |
| 	// Calculate ETag for the source entry | |
| 	srcPath := util.FullPath(fmt.Sprintf("%s/%s", r.URL.Path, entry.Name)) | |
| 	filerEntry := &filer.Entry{ | |
| 		FullPath: srcPath, | |
| 		Attr: filer.Attr{ | |
| 			FileSize: entry.Attributes.FileSize, | |
| 			Mtime:    time.Unix(entry.Attributes.Mtime, 0), | |
| 			Crtime:   time.Unix(entry.Attributes.Crtime, 0), | |
| 			Mime:     entry.Attributes.Mime, | |
| 		}, | |
| 		Chunks: entry.Chunks, | |
| 	} | |
| 	sourceETag := filer.ETagEntry(filerEntry) | |
| 
 | |
| 	// Check X-Amz-Copy-Source-If-Match | |
| 	if ifMatch := r.Header.Get(s3_constants.AmzCopySourceIfMatch); ifMatch != "" { | |
| 		// Remove quotes if present | |
| 		ifMatch = strings.Trim(ifMatch, `"`) | |
| 		sourceETag = strings.Trim(sourceETag, `"`) | |
| 		glog.V(3).Infof("CopyObjectHandler: If-Match check - expected %s, got %s", ifMatch, sourceETag) | |
| 		if ifMatch != sourceETag { | |
| 			glog.V(3).Infof("CopyObjectHandler: If-Match failed - expected %s, got %s", ifMatch, sourceETag) | |
| 			return s3err.ErrPreconditionFailed | |
| 		} | |
| 	} | |
| 
 | |
| 	// Check X-Amz-Copy-Source-If-None-Match | |
| 	if ifNoneMatch := r.Header.Get(s3_constants.AmzCopySourceIfNoneMatch); ifNoneMatch != "" { | |
| 		// Remove quotes if present | |
| 		ifNoneMatch = strings.Trim(ifNoneMatch, `"`) | |
| 		sourceETag = strings.Trim(sourceETag, `"`) | |
| 		glog.V(3).Infof("CopyObjectHandler: If-None-Match check - comparing %s with %s", ifNoneMatch, sourceETag) | |
| 		if ifNoneMatch == sourceETag { | |
| 			glog.V(3).Infof("CopyObjectHandler: If-None-Match failed - matched %s", sourceETag) | |
| 			return s3err.ErrPreconditionFailed | |
| 		} | |
| 	} | |
| 
 | |
| 	// Check X-Amz-Copy-Source-If-Modified-Since | |
| 	if ifModifiedSince := r.Header.Get(s3_constants.AmzCopySourceIfModifiedSince); ifModifiedSince != "" { | |
| 		t, err := time.Parse(time.RFC1123, ifModifiedSince) | |
| 		if err != nil { | |
| 			glog.V(3).Infof("CopyObjectHandler: Invalid If-Modified-Since header: %v", err) | |
| 			return s3err.ErrInvalidRequest | |
| 		} | |
| 		if !time.Unix(entry.Attributes.Mtime, 0).After(t) { | |
| 			glog.V(3).Infof("CopyObjectHandler: If-Modified-Since failed") | |
| 			return s3err.ErrPreconditionFailed | |
| 		} | |
| 	} | |
| 
 | |
| 	// Check X-Amz-Copy-Source-If-Unmodified-Since | |
| 	if ifUnmodifiedSince := r.Header.Get(s3_constants.AmzCopySourceIfUnmodifiedSince); ifUnmodifiedSince != "" { | |
| 		t, err := time.Parse(time.RFC1123, ifUnmodifiedSince) | |
| 		if err != nil { | |
| 			glog.V(3).Infof("CopyObjectHandler: Invalid If-Unmodified-Since header: %v", err) | |
| 			return s3err.ErrInvalidRequest | |
| 		} | |
| 		if time.Unix(entry.Attributes.Mtime, 0).After(t) { | |
| 			glog.V(3).Infof("CopyObjectHandler: If-Unmodified-Since failed") | |
| 			return s3err.ErrPreconditionFailed | |
| 		} | |
| 	} | |
| 
 | |
| 	return s3err.ErrNone | |
| } | |
| 
 | |
| // createDestinationChunk creates a new chunk based on the source chunk with modified properties | |
| func (s3a *S3ApiServer) createDestinationChunk(sourceChunk *filer_pb.FileChunk, offset int64, size uint64) *filer_pb.FileChunk { | |
| 	return &filer_pb.FileChunk{ | |
| 		Offset:       offset, | |
| 		Size:         size, | |
| 		ModifiedTsNs: time.Now().UnixNano(), | |
| 		ETag:         sourceChunk.ETag, | |
| 		IsCompressed: sourceChunk.IsCompressed, | |
| 		CipherKey:    sourceChunk.CipherKey, | |
| 	} | |
| } | |
| 
 | |
| // lookupVolumeUrl looks up the volume URL for a given file ID using the filer's LookupVolume method | |
| func (s3a *S3ApiServer) lookupVolumeUrl(fileId string) (string, error) { | |
| 	var srcUrl string | |
| 	err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { | |
| 		vid, _, err := operation.ParseFileId(fileId) | |
| 		if err != nil { | |
| 			return fmt.Errorf("parse file ID: %w", err) | |
| 		} | |
| 
 | |
| 		resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ | |
| 			VolumeIds: []string{vid}, | |
| 		}) | |
| 		if err != nil { | |
| 			return fmt.Errorf("lookup volume: %w", err) | |
| 		} | |
| 
 | |
| 		if locations, found := resp.LocationsMap[vid]; found && len(locations.Locations) > 0 { | |
| 			srcUrl = "http://" + locations.Locations[0].Url + "/" + fileId | |
| 		} else { | |
| 			return fmt.Errorf("no location found for volume %s", vid) | |
| 		} | |
| 
 | |
| 		return nil | |
| 	}) | |
| 	if err != nil { | |
| 		return "", fmt.Errorf("lookup volume URL: %w", err) | |
| 	} | |
| 	return srcUrl, nil | |
| } | |
| 
 | |
| // setChunkFileId sets the file ID on the destination chunk | |
| func (s3a *S3ApiServer) setChunkFileId(chunk *filer_pb.FileChunk, assignResult *filer_pb.AssignVolumeResponse) error { | |
| 	chunk.FileId = assignResult.FileId | |
| 	fid, err := filer_pb.ToFileIdObject(assignResult.FileId) | |
| 	if err != nil { | |
| 		return fmt.Errorf("parse file ID: %w", err) | |
| 	} | |
| 	chunk.Fid = fid | |
| 	return nil | |
| } | |
| 
 | |
| // prepareChunkCopy prepares a chunk for copying by assigning a new volume and looking up the source URL | |
| func (s3a *S3ApiServer) prepareChunkCopy(sourceFileId, dstPath string) (*filer_pb.AssignVolumeResponse, string, error) { | |
| 	// Assign new volume | |
| 	assignResult, err := s3a.assignNewVolume(dstPath) | |
| 	if err != nil { | |
| 		return nil, "", fmt.Errorf("assign volume: %w", err) | |
| 	} | |
| 
 | |
| 	// Look up source URL | |
| 	srcUrl, err := s3a.lookupVolumeUrl(sourceFileId) | |
| 	if err != nil { | |
| 		return nil, "", fmt.Errorf("lookup source URL: %w", err) | |
| 	} | |
| 
 | |
| 	return assignResult, srcUrl, nil | |
| } | |
| 
 | |
| // uploadChunkData uploads chunk data to the destination using common upload logic | |
| func (s3a *S3ApiServer) uploadChunkData(chunkData []byte, assignResult *filer_pb.AssignVolumeResponse) error { | |
| 	dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId) | |
| 
 | |
| 	uploadOption := &operation.UploadOption{ | |
| 		UploadUrl:         dstUrl, | |
| 		Cipher:            false, | |
| 		IsInputCompressed: false, | |
| 		MimeType:          "", | |
| 		PairMap:           nil, | |
| 		Jwt:               security.EncodedJwt(assignResult.Auth), | |
| 	} | |
| 	uploader, err := operation.NewUploader() | |
| 	if err != nil { | |
| 		return fmt.Errorf("create uploader: %w", err) | |
| 	} | |
| 	_, err = uploader.UploadData(context.Background(), chunkData, uploadOption) | |
| 	if err != nil { | |
| 		return fmt.Errorf("upload chunk: %w", err) | |
| 	} | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| // downloadChunkData downloads chunk data from the source URL | |
| func (s3a *S3ApiServer) downloadChunkData(srcUrl string, offset, size int64) ([]byte, error) { | |
| 	var chunkData []byte | |
| 	shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, nil, false, false, offset, int(size), func(data []byte) { | |
| 		chunkData = append(chunkData, data...) | |
| 	}) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("download chunk: %w", err) | |
| 	} | |
| 	if shouldRetry { | |
| 		return nil, fmt.Errorf("download chunk: retry needed") | |
| 	} | |
| 	return chunkData, nil | |
| } | |
| 
 | |
| // copyMultipartSSECChunks handles copying multipart SSE-C objects | |
| // Returns chunks and destination metadata that should be applied to the destination entry | |
| func (s3a *S3ApiServer) copyMultipartSSECChunks(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { | |
| 	glog.Infof("copyMultipartSSECChunks called: copySourceKey=%v, destKey=%v, path=%s", copySourceKey != nil, destKey != nil, dstPath) | |
| 
 | |
| 	var sourceKeyMD5, destKeyMD5 string | |
| 	if copySourceKey != nil { | |
| 		sourceKeyMD5 = copySourceKey.KeyMD5 | |
| 	} | |
| 	if destKey != nil { | |
| 		destKeyMD5 = destKey.KeyMD5 | |
| 	} | |
| 	glog.Infof("Key MD5 comparison: source=%s, dest=%s, equal=%t", sourceKeyMD5, destKeyMD5, sourceKeyMD5 == destKeyMD5) | |
| 
 | |
| 	// For multipart SSE-C, always use decrypt/reencrypt path to ensure proper metadata handling | |
| 	// The standard copyChunks() doesn't preserve SSE metadata, so we need per-chunk processing | |
| 	glog.Infof("Taking multipart SSE-C reencrypt path to preserve metadata: %s", dstPath) | |
| 
 | |
| 	// Different keys or key changes: decrypt and re-encrypt each chunk individually | |
| 	glog.V(2).Infof("Multipart SSE-C reencrypt copy (different keys): %s", dstPath) | |
| 
 | |
| 	var dstChunks []*filer_pb.FileChunk | |
| 	var destIV []byte | |
| 
 | |
| 	for _, chunk := range entry.GetChunks() { | |
| 		if chunk.GetSseType() != filer_pb.SSEType_SSE_C { | |
| 			// Non-SSE-C chunk, copy directly | |
| 			copiedChunk, err := s3a.copySingleChunk(chunk, dstPath) | |
| 			if err != nil { | |
| 				return nil, nil, fmt.Errorf("failed to copy non-SSE-C chunk: %w", err) | |
| 			} | |
| 			dstChunks = append(dstChunks, copiedChunk) | |
| 			continue | |
| 		} | |
| 
 | |
| 		// SSE-C chunk: decrypt with stored per-chunk metadata, re-encrypt with dest key | |
| 		copiedChunk, chunkDestIV, err := s3a.copyMultipartSSECChunk(chunk, copySourceKey, destKey, dstPath) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to copy SSE-C chunk %s: %w", chunk.GetFileIdString(), err) | |
| 		} | |
| 
 | |
| 		dstChunks = append(dstChunks, copiedChunk) | |
| 
 | |
| 		// Store the first chunk's IV as the object's IV (for single-part compatibility) | |
| 		if len(destIV) == 0 { | |
| 			destIV = chunkDestIV | |
| 		} | |
| 	} | |
| 
 | |
| 	// Create destination metadata | |
| 	dstMetadata := make(map[string][]byte) | |
| 	if destKey != nil && len(destIV) > 0 { | |
| 		// Store the IV and SSE-C headers for single-part compatibility | |
| 		StoreIVInMetadata(dstMetadata, destIV) | |
| 		dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") | |
| 		dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destKey.KeyMD5) | |
| 		glog.V(2).Infof("Prepared multipart SSE-C destination metadata: %s", dstPath) | |
| 	} | |
| 
 | |
| 	return dstChunks, dstMetadata, nil | |
| } | |
| 
 | |
| // copyMultipartSSEKMSChunks handles copying multipart SSE-KMS objects (unified with SSE-C approach) | |
| // Returns chunks and destination metadata that should be applied to the destination entry | |
| func (s3a *S3ApiServer) copyMultipartSSEKMSChunks(entry *filer_pb.Entry, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) { | |
| 	glog.Infof("copyMultipartSSEKMSChunks called: destKeyID=%s, path=%s", destKeyID, dstPath) | |
| 
 | |
| 	// For multipart SSE-KMS, always use decrypt/reencrypt path to ensure proper metadata handling | |
| 	// The standard copyChunks() doesn't preserve SSE metadata, so we need per-chunk processing | |
| 	glog.Infof("Taking multipart SSE-KMS reencrypt path to preserve metadata: %s", dstPath) | |
| 
 | |
| 	var dstChunks []*filer_pb.FileChunk | |
| 
 | |
| 	for _, chunk := range entry.GetChunks() { | |
| 		if chunk.GetSseType() != filer_pb.SSEType_SSE_KMS { | |
| 			// Non-SSE-KMS chunk, copy directly | |
| 			copiedChunk, err := s3a.copySingleChunk(chunk, dstPath) | |
| 			if err != nil { | |
| 				return nil, nil, fmt.Errorf("failed to copy non-SSE-KMS chunk: %w", err) | |
| 			} | |
| 			dstChunks = append(dstChunks, copiedChunk) | |
| 			continue | |
| 		} | |
| 
 | |
| 		// SSE-KMS chunk: decrypt with stored per-chunk metadata, re-encrypt with dest key | |
| 		copiedChunk, err := s3a.copyMultipartSSEKMSChunk(chunk, destKeyID, encryptionContext, bucketKeyEnabled, dstPath, bucket) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to copy SSE-KMS chunk %s: %w", chunk.GetFileIdString(), err) | |
| 		} | |
| 
 | |
| 		dstChunks = append(dstChunks, copiedChunk) | |
| 	} | |
| 
 | |
| 	// Create destination metadata for SSE-KMS | |
| 	dstMetadata := make(map[string][]byte) | |
| 	if destKeyID != "" { | |
| 		// Store SSE-KMS metadata for single-part compatibility | |
| 		if encryptionContext == nil { | |
| 			encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled) | |
| 		} | |
| 		sseKey := &SSEKMSKey{ | |
| 			KeyID:             destKeyID, | |
| 			EncryptionContext: encryptionContext, | |
| 			BucketKeyEnabled:  bucketKeyEnabled, | |
| 		} | |
| 		if kmsMetadata, serErr := SerializeSSEKMSMetadata(sseKey); serErr == nil { | |
| 			dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata | |
| 			glog.Infof("Created object-level KMS metadata for GET compatibility") | |
| 		} else { | |
| 			glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr) | |
| 		} | |
| 	} | |
| 
 | |
| 	return dstChunks, dstMetadata, nil | |
| } | |
| 
 | |
| // copyMultipartSSEKMSChunk copies a single SSE-KMS chunk from a multipart object (unified with SSE-C approach) | |
| func (s3a *S3ApiServer) copyMultipartSSEKMSChunk(chunk *filer_pb.FileChunk, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) (*filer_pb.FileChunk, error) { | |
| 	// Create destination chunk | |
| 	dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) | |
| 
 | |
| 	// Prepare chunk copy (assign new volume and get source URL) | |
| 	assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Set file ID on destination chunk | |
| 	if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Download encrypted chunk data | |
| 	encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("download encrypted chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	var finalData []byte | |
| 
 | |
| 	// Decrypt source data using stored SSE-KMS metadata (same pattern as SSE-C) | |
| 	if len(chunk.GetSseMetadata()) == 0 { | |
| 		return nil, fmt.Errorf("SSE-KMS chunk missing per-chunk metadata") | |
| 	} | |
| 
 | |
| 	// Deserialize the SSE-KMS metadata (reusing unified metadata structure) | |
| 	sourceSSEKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata()) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) | |
| 	} | |
| 
 | |
| 	// Decrypt the chunk data using the source metadata | |
| 	decryptedReader, decErr := CreateSSEKMSDecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey) | |
| 	if decErr != nil { | |
| 		return nil, fmt.Errorf("create SSE-KMS decrypted reader: %w", decErr) | |
| 	} | |
| 
 | |
| 	decryptedData, readErr := io.ReadAll(decryptedReader) | |
| 	if readErr != nil { | |
| 		return nil, fmt.Errorf("decrypt chunk data: %w", readErr) | |
| 	} | |
| 	finalData = decryptedData | |
| 	glog.V(4).Infof("Decrypted multipart SSE-KMS chunk: %d bytes → %d bytes", len(encryptedData), len(finalData)) | |
| 
 | |
| 	// Re-encrypt with destination key if specified | |
| 	if destKeyID != "" { | |
| 		// Build encryption context if not provided | |
| 		if encryptionContext == nil { | |
| 			encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled) | |
| 		} | |
| 
 | |
| 		// Encrypt with destination key | |
| 		encryptedReader, destSSEKey, encErr := CreateSSEKMSEncryptedReaderWithBucketKey(bytes.NewReader(finalData), destKeyID, encryptionContext, bucketKeyEnabled) | |
| 		if encErr != nil { | |
| 			return nil, fmt.Errorf("create SSE-KMS encrypted reader: %w", encErr) | |
| 		} | |
| 
 | |
| 		reencryptedData, readErr := io.ReadAll(encryptedReader) | |
| 		if readErr != nil { | |
| 			return nil, fmt.Errorf("re-encrypt chunk data: %w", readErr) | |
| 		} | |
| 		finalData = reencryptedData | |
| 
 | |
| 		// Create per-chunk SSE-KMS metadata for the destination chunk | |
| 		// For copy operations, reset chunk offset to 0 (similar to SSE-C approach) | |
| 		// The copied chunks form a new object structure independent of original part boundaries | |
| 		destSSEKey.ChunkOffset = 0 | |
| 		kmsMetadata, err := SerializeSSEKMSMetadata(destSSEKey) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("serialize SSE-KMS metadata: %w", err) | |
| 		} | |
| 
 | |
| 		// Set the SSE type and metadata on destination chunk (unified approach) | |
| 		dstChunk.SseType = filer_pb.SSEType_SSE_KMS | |
| 		dstChunk.SseMetadata = kmsMetadata | |
| 
 | |
| 		glog.V(4).Infof("Re-encrypted multipart SSE-KMS chunk: %d bytes → %d bytes", len(finalData)-len(reencryptedData)+len(finalData), len(finalData)) | |
| 	} | |
| 
 | |
| 	// Upload the final data | |
| 	if err := s3a.uploadChunkData(finalData, assignResult); err != nil { | |
| 		return nil, fmt.Errorf("upload chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	// Update chunk size | |
| 	dstChunk.Size = uint64(len(finalData)) | |
| 
 | |
| 	glog.V(3).Infof("Successfully copied multipart SSE-KMS chunk %s → %s", | |
| 		chunk.GetFileIdString(), dstChunk.GetFileIdString()) | |
| 
 | |
| 	return dstChunk, nil | |
| } | |
| 
 | |
| // copyMultipartSSECChunk copies a single SSE-C chunk from a multipart object | |
| func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) (*filer_pb.FileChunk, []byte, error) { | |
| 	// Create destination chunk | |
| 	dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) | |
| 
 | |
| 	// Prepare chunk copy (assign new volume and get source URL) | |
| 	assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) | |
| 	if err != nil { | |
| 		return nil, nil, err | |
| 	} | |
| 
 | |
| 	// Set file ID on destination chunk | |
| 	if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil { | |
| 		return nil, nil, err | |
| 	} | |
| 
 | |
| 	// Download encrypted chunk data | |
| 	encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) | |
| 	if err != nil { | |
| 		return nil, nil, fmt.Errorf("download encrypted chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	var finalData []byte | |
| 	var destIV []byte | |
| 
 | |
| 	// Decrypt if source is encrypted | |
| 	if copySourceKey != nil { | |
| 		// Get the per-chunk SSE-C metadata | |
| 		if len(chunk.GetSseMetadata()) == 0 { | |
| 			return nil, nil, fmt.Errorf("SSE-C chunk missing per-chunk metadata") | |
| 		} | |
| 
 | |
| 		// Deserialize the SSE-C metadata | |
| 		ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata()) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err) | |
| 		} | |
| 
 | |
| 		// Decode the IV from the metadata | |
| 		chunkBaseIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to decode chunk IV: %w", err) | |
| 		} | |
| 
 | |
| 		// Calculate the correct IV for this chunk using within-part offset | |
| 		var chunkIV []byte | |
| 		if ssecMetadata.PartOffset > 0 { | |
| 			chunkIV = calculateIVWithOffset(chunkBaseIV, ssecMetadata.PartOffset) | |
| 		} else { | |
| 			chunkIV = chunkBaseIV | |
| 		} | |
| 
 | |
| 		// Decrypt the chunk data | |
| 		decryptedReader, decErr := CreateSSECDecryptedReader(bytes.NewReader(encryptedData), copySourceKey, chunkIV) | |
| 		if decErr != nil { | |
| 			return nil, nil, fmt.Errorf("create decrypted reader: %w", decErr) | |
| 		} | |
| 
 | |
| 		decryptedData, readErr := io.ReadAll(decryptedReader) | |
| 		if readErr != nil { | |
| 			return nil, nil, fmt.Errorf("decrypt chunk data: %w", readErr) | |
| 		} | |
| 		finalData = decryptedData | |
| 		glog.V(4).Infof("Decrypted multipart SSE-C chunk: %d bytes → %d bytes", len(encryptedData), len(finalData)) | |
| 	} else { | |
| 		// Source is unencrypted | |
| 		finalData = encryptedData | |
| 	} | |
| 
 | |
| 	// Re-encrypt if destination should be encrypted | |
| 	if destKey != nil { | |
| 		// Generate new IV for this chunk | |
| 		newIV := make([]byte, s3_constants.AESBlockSize) | |
| 		if _, err := rand.Read(newIV); err != nil { | |
| 			return nil, nil, fmt.Errorf("generate IV: %w", err) | |
| 		} | |
| 		destIV = newIV | |
| 
 | |
| 		// Encrypt with new key and IV | |
| 		encryptedReader, iv, encErr := CreateSSECEncryptedReader(bytes.NewReader(finalData), destKey) | |
| 		if encErr != nil { | |
| 			return nil, nil, fmt.Errorf("create encrypted reader: %w", encErr) | |
| 		} | |
| 		destIV = iv | |
| 
 | |
| 		reencryptedData, readErr := io.ReadAll(encryptedReader) | |
| 		if readErr != nil { | |
| 			return nil, nil, fmt.Errorf("re-encrypt chunk data: %w", readErr) | |
| 		} | |
| 		finalData = reencryptedData | |
| 
 | |
| 		// Create per-chunk SSE-C metadata for the destination chunk | |
| 		ssecMetadata, err := SerializeSSECMetadata(destIV, destKey.KeyMD5, 0) // partOffset=0 for copied chunks | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("serialize SSE-C metadata: %w", err) | |
| 		} | |
| 
 | |
| 		// Set the SSE type and metadata on destination chunk | |
| 		dstChunk.SseType = filer_pb.SSEType_SSE_C | |
| 		dstChunk.SseMetadata = ssecMetadata // Use unified metadata field | |
|  | |
| 		glog.V(4).Infof("Re-encrypted multipart SSE-C chunk: %d bytes → %d bytes", len(finalData)-len(reencryptedData)+len(finalData), len(finalData)) | |
| 	} | |
| 
 | |
| 	// Upload the final data | |
| 	if err := s3a.uploadChunkData(finalData, assignResult); err != nil { | |
| 		return nil, nil, fmt.Errorf("upload chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	// Update chunk size | |
| 	dstChunk.Size = uint64(len(finalData)) | |
| 
 | |
| 	glog.V(3).Infof("Successfully copied multipart SSE-C chunk %s → %s", | |
| 		chunk.GetFileIdString(), dstChunk.GetFileIdString()) | |
| 
 | |
| 	return dstChunk, destIV, nil | |
| } | |
| 
 | |
| // copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios | |
| // This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain | |
| func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { | |
| 	glog.Infof("copyMultipartCrossEncryption called: %s→%s, path=%s", | |
| 		s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, false), | |
| 		s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, false), dstPath) | |
| 
 | |
| 	var dstChunks []*filer_pb.FileChunk | |
| 
 | |
| 	// Parse destination encryption parameters | |
| 	var destSSECKey *SSECustomerKey | |
| 	var destKMSKeyID string | |
| 	var destKMSEncryptionContext map[string]string | |
| 	var destKMSBucketKeyEnabled bool | |
| 
 | |
| 	if state.DstSSEC { | |
| 		var err error | |
| 		destSSECKey, err = ParseSSECHeaders(r) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to parse destination SSE-C headers: %w", err) | |
| 		} | |
| 		glog.Infof("Destination SSE-C: keyMD5=%s", destSSECKey.KeyMD5) | |
| 	} else if state.DstSSEKMS { | |
| 		var err error | |
| 		destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, err = ParseSSEKMSCopyHeaders(r) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err) | |
| 		} | |
| 		glog.Infof("Destination SSE-KMS: keyID=%s, bucketKey=%t", destKMSKeyID, destKMSBucketKeyEnabled) | |
| 	} else { | |
| 		glog.Infof("Destination: Unencrypted") | |
| 	} | |
| 
 | |
| 	// Parse source encryption parameters | |
| 	var sourceSSECKey *SSECustomerKey | |
| 	if state.SrcSSEC { | |
| 		var err error | |
| 		sourceSSECKey, err = ParseSSECCopySourceHeaders(r) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to parse source SSE-C headers: %w", err) | |
| 		} | |
| 		glog.Infof("Source SSE-C: keyMD5=%s", sourceSSECKey.KeyMD5) | |
| 	} | |
| 
 | |
| 	// Process each chunk with unified cross-encryption logic | |
| 	for _, chunk := range entry.GetChunks() { | |
| 		var copiedChunk *filer_pb.FileChunk | |
| 		var err error | |
| 
 | |
| 		if chunk.GetSseType() == filer_pb.SSEType_SSE_C { | |
| 			copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state) | |
| 		} else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { | |
| 			copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state) | |
| 		} else { | |
| 			// Unencrypted chunk, copy directly | |
| 			copiedChunk, err = s3a.copySingleChunk(chunk, dstPath) | |
| 		} | |
| 
 | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to copy chunk %s: %w", chunk.GetFileIdString(), err) | |
| 		} | |
| 
 | |
| 		dstChunks = append(dstChunks, copiedChunk) | |
| 	} | |
| 
 | |
| 	// Create destination metadata based on destination encryption type | |
| 	dstMetadata := make(map[string][]byte) | |
| 
 | |
| 	// Clear any previous encryption metadata to avoid routing conflicts | |
| 	if state.SrcSSEKMS && state.DstSSEC { | |
| 		// SSE-KMS → SSE-C: Remove SSE-KMS headers | |
| 		// These will be excluded from dstMetadata, effectively removing them | |
| 	} else if state.SrcSSEC && state.DstSSEKMS { | |
| 		// SSE-C → SSE-KMS: Remove SSE-C headers | |
| 		// These will be excluded from dstMetadata, effectively removing them | |
| 	} else if !state.DstSSEC && !state.DstSSEKMS { | |
| 		// Encrypted → Unencrypted: Remove all encryption metadata | |
| 		// These will be excluded from dstMetadata, effectively removing them | |
| 	} | |
| 
 | |
| 	if state.DstSSEC && destSSECKey != nil { | |
| 		// For SSE-C destination, use first chunk's IV for compatibility | |
| 		if len(dstChunks) > 0 && dstChunks[0].GetSseType() == filer_pb.SSEType_SSE_C && len(dstChunks[0].GetSseMetadata()) > 0 { | |
| 			if ssecMetadata, err := DeserializeSSECMetadata(dstChunks[0].GetSseMetadata()); err == nil { | |
| 				if iv, ivErr := base64.StdEncoding.DecodeString(ssecMetadata.IV); ivErr == nil { | |
| 					StoreIVInMetadata(dstMetadata, iv) | |
| 					dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") | |
| 					dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destSSECKey.KeyMD5) | |
| 					glog.Infof("Created SSE-C object-level metadata from first chunk") | |
| 				} | |
| 			} | |
| 		} | |
| 	} else if state.DstSSEKMS && destKMSKeyID != "" { | |
| 		// For SSE-KMS destination, create object-level metadata | |
| 		if destKMSEncryptionContext == nil { | |
| 			destKMSEncryptionContext = BuildEncryptionContext(dstBucket, dstPath, destKMSBucketKeyEnabled) | |
| 		} | |
| 		sseKey := &SSEKMSKey{ | |
| 			KeyID:             destKMSKeyID, | |
| 			EncryptionContext: destKMSEncryptionContext, | |
| 			BucketKeyEnabled:  destKMSBucketKeyEnabled, | |
| 		} | |
| 		if kmsMetadata, serErr := SerializeSSEKMSMetadata(sseKey); serErr == nil { | |
| 			dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata | |
| 			glog.Infof("Created SSE-KMS object-level metadata") | |
| 		} else { | |
| 			glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr) | |
| 		} | |
| 	} | |
| 	// For unencrypted destination, no metadata needed (dstMetadata remains empty) | |
|  | |
| 	return dstChunks, dstMetadata, nil | |
| } | |
| 
 | |
| // copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support | |
| func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) { | |
| 	// Create destination chunk | |
| 	dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) | |
| 
 | |
| 	// Prepare chunk copy (assign new volume and get source URL) | |
| 	assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Set file ID on destination chunk | |
| 	if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Download encrypted chunk data | |
| 	encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("download encrypted chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	var finalData []byte | |
| 
 | |
| 	// Step 1: Decrypt source data | |
| 	if chunk.GetSseType() == filer_pb.SSEType_SSE_C { | |
| 		// Decrypt SSE-C source | |
| 		if len(chunk.GetSseMetadata()) == 0 { | |
| 			return nil, fmt.Errorf("SSE-C chunk missing per-chunk metadata") | |
| 		} | |
| 
 | |
| 		ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata()) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err) | |
| 		} | |
| 
 | |
| 		chunkBaseIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("failed to decode chunk IV: %w", err) | |
| 		} | |
| 
 | |
| 		// Calculate the correct IV for this chunk using within-part offset | |
| 		var chunkIV []byte | |
| 		if ssecMetadata.PartOffset > 0 { | |
| 			chunkIV = calculateIVWithOffset(chunkBaseIV, ssecMetadata.PartOffset) | |
| 		} else { | |
| 			chunkIV = chunkBaseIV | |
| 		} | |
| 
 | |
| 		decryptedReader, decErr := CreateSSECDecryptedReader(bytes.NewReader(encryptedData), sourceSSECKey, chunkIV) | |
| 		if decErr != nil { | |
| 			return nil, fmt.Errorf("create SSE-C decrypted reader: %w", decErr) | |
| 		} | |
| 
 | |
| 		decryptedData, readErr := io.ReadAll(decryptedReader) | |
| 		if readErr != nil { | |
| 			return nil, fmt.Errorf("decrypt SSE-C chunk data: %w", readErr) | |
| 		} | |
| 		finalData = decryptedData | |
| 		previewLen := 16 | |
| 		if len(finalData) < previewLen { | |
| 			previewLen = len(finalData) | |
| 		} | |
| 
 | |
| 	} else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { | |
| 		// Decrypt SSE-KMS source | |
| 		if len(chunk.GetSseMetadata()) == 0 { | |
| 			return nil, fmt.Errorf("SSE-KMS chunk missing per-chunk metadata") | |
| 		} | |
| 
 | |
| 		sourceSSEKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata()) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) | |
| 		} | |
| 
 | |
| 		decryptedReader, decErr := CreateSSEKMSDecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey) | |
| 		if decErr != nil { | |
| 			return nil, fmt.Errorf("create SSE-KMS decrypted reader: %w", decErr) | |
| 		} | |
| 
 | |
| 		decryptedData, readErr := io.ReadAll(decryptedReader) | |
| 		if readErr != nil { | |
| 			return nil, fmt.Errorf("decrypt SSE-KMS chunk data: %w", readErr) | |
| 		} | |
| 		finalData = decryptedData | |
| 		previewLen := 16 | |
| 		if len(finalData) < previewLen { | |
| 			previewLen = len(finalData) | |
| 		} | |
| 
 | |
| 	} else { | |
| 		// Source is unencrypted | |
| 		finalData = encryptedData | |
| 	} | |
| 
 | |
| 	// Step 2: Re-encrypt with destination encryption (if any) | |
| 	if state.DstSSEC && destSSECKey != nil { | |
| 		// Encrypt with SSE-C | |
| 		encryptedReader, iv, encErr := CreateSSECEncryptedReader(bytes.NewReader(finalData), destSSECKey) | |
| 		if encErr != nil { | |
| 			return nil, fmt.Errorf("create SSE-C encrypted reader: %w", encErr) | |
| 		} | |
| 
 | |
| 		reencryptedData, readErr := io.ReadAll(encryptedReader) | |
| 		if readErr != nil { | |
| 			return nil, fmt.Errorf("re-encrypt with SSE-C: %w", readErr) | |
| 		} | |
| 		finalData = reencryptedData | |
| 
 | |
| 		// Create per-chunk SSE-C metadata (offset=0 for cross-encryption copies) | |
| 		ssecMetadata, err := SerializeSSECMetadata(iv, destSSECKey.KeyMD5, 0) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("serialize SSE-C metadata: %w", err) | |
| 		} | |
| 
 | |
| 		dstChunk.SseType = filer_pb.SSEType_SSE_C | |
| 		dstChunk.SseMetadata = ssecMetadata | |
| 
 | |
| 		previewLen := 16 | |
| 		if len(finalData) < previewLen { | |
| 			previewLen = len(finalData) | |
| 		} | |
| 
 | |
| 	} else if state.DstSSEKMS && destKMSKeyID != "" { | |
| 		// Encrypt with SSE-KMS | |
| 		if destKMSEncryptionContext == nil { | |
| 			destKMSEncryptionContext = BuildEncryptionContext(dstBucket, dstPath, destKMSBucketKeyEnabled) | |
| 		} | |
| 
 | |
| 		encryptedReader, destSSEKey, encErr := CreateSSEKMSEncryptedReaderWithBucketKey(bytes.NewReader(finalData), destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled) | |
| 		if encErr != nil { | |
| 			return nil, fmt.Errorf("create SSE-KMS encrypted reader: %w", encErr) | |
| 		} | |
| 
 | |
| 		reencryptedData, readErr := io.ReadAll(encryptedReader) | |
| 		if readErr != nil { | |
| 			return nil, fmt.Errorf("re-encrypt with SSE-KMS: %w", readErr) | |
| 		} | |
| 		finalData = reencryptedData | |
| 
 | |
| 		// Create per-chunk SSE-KMS metadata (offset=0 for cross-encryption copies) | |
| 		destSSEKey.ChunkOffset = 0 | |
| 		kmsMetadata, err := SerializeSSEKMSMetadata(destSSEKey) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("serialize SSE-KMS metadata: %w", err) | |
| 		} | |
| 
 | |
| 		dstChunk.SseType = filer_pb.SSEType_SSE_KMS | |
| 		dstChunk.SseMetadata = kmsMetadata | |
| 
 | |
| 		glog.V(4).Infof("Re-encrypted chunk with SSE-KMS") | |
| 	} | |
| 	// For unencrypted destination, finalData remains as decrypted plaintext | |
|  | |
| 	// Upload the final data | |
| 	if err := s3a.uploadChunkData(finalData, assignResult); err != nil { | |
| 		return nil, fmt.Errorf("upload chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	// Update chunk size | |
| 	dstChunk.Size = uint64(len(finalData)) | |
| 
 | |
| 	glog.V(3).Infof("Successfully copied cross-encryption chunk %s → %s", | |
| 		chunk.GetFileIdString(), dstChunk.GetFileIdString()) | |
| 
 | |
| 	return dstChunk, nil | |
| } | |
| 
 | |
| // getEncryptionTypeString returns a string representation of encryption type for logging | |
| func (s3a *S3ApiServer) getEncryptionTypeString(isSSEC, isSSEKMS, isSSES3 bool) string { | |
| 	if isSSEC { | |
| 		return s3_constants.SSETypeC | |
| 	} else if isSSEKMS { | |
| 		return s3_constants.SSETypeKMS | |
| 	} else if isSSES3 { | |
| 		return s3_constants.SSETypeS3 | |
| 	} | |
| 	return "Plain" | |
| } | |
| 
 | |
| // copyChunksWithSSEC handles SSE-C aware copying with smart fast/slow path selection | |
| // Returns chunks and destination metadata that should be applied to the destination entry | |
| func (s3a *S3ApiServer) copyChunksWithSSEC(entry *filer_pb.Entry, r *http.Request) ([]*filer_pb.FileChunk, map[string][]byte, error) { | |
| 	glog.Infof("copyChunksWithSSEC called for %s with %d chunks", r.URL.Path, len(entry.GetChunks())) | |
| 
 | |
| 	// Parse SSE-C headers | |
| 	copySourceKey, err := ParseSSECCopySourceHeaders(r) | |
| 	if err != nil { | |
| 		glog.Errorf("Failed to parse SSE-C copy source headers: %v", err) | |
| 		return nil, nil, err | |
| 	} | |
| 
 | |
| 	destKey, err := ParseSSECHeaders(r) | |
| 	if err != nil { | |
| 		glog.Errorf("Failed to parse SSE-C headers: %v", err) | |
| 		return nil, nil, err | |
| 	} | |
| 
 | |
| 	// Check if this is a multipart SSE-C object | |
| 	isMultipartSSEC := false | |
| 	sseCChunks := 0 | |
| 	for i, chunk := range entry.GetChunks() { | |
| 		glog.V(4).Infof("Chunk %d: sseType=%d, hasMetadata=%t", i, chunk.GetSseType(), len(chunk.GetSseMetadata()) > 0) | |
| 		if chunk.GetSseType() == filer_pb.SSEType_SSE_C { | |
| 			sseCChunks++ | |
| 		} | |
| 	} | |
| 	isMultipartSSEC = sseCChunks > 1 | |
| 
 | |
| 	glog.Infof("SSE-C copy analysis: total chunks=%d, sseC chunks=%d, isMultipart=%t", len(entry.GetChunks()), sseCChunks, isMultipartSSEC) | |
| 
 | |
| 	if isMultipartSSEC { | |
| 		glog.V(2).Infof("Detected multipart SSE-C object with %d encrypted chunks for copy", sseCChunks) | |
| 		return s3a.copyMultipartSSECChunks(entry, copySourceKey, destKey, r.URL.Path) | |
| 	} | |
| 
 | |
| 	// Single-part SSE-C object: use original logic | |
| 	// Determine copy strategy | |
| 	strategy, err := DetermineSSECCopyStrategy(entry.Extended, copySourceKey, destKey) | |
| 	if err != nil { | |
| 		return nil, nil, err | |
| 	} | |
| 
 | |
| 	glog.V(2).Infof("SSE-C copy strategy for single-part %s: %v", r.URL.Path, strategy) | |
| 
 | |
| 	switch strategy { | |
| 	case SSECCopyStrategyDirect: | |
| 		// FAST PATH: Direct chunk copy | |
| 		glog.V(2).Infof("Using fast path: direct chunk copy for %s", r.URL.Path) | |
| 		chunks, err := s3a.copyChunks(entry, r.URL.Path) | |
| 		return chunks, nil, err | |
| 
 | |
| 	case SSECCopyStrategyDecryptEncrypt: | |
| 		// SLOW PATH: Decrypt and re-encrypt | |
| 		glog.V(2).Infof("Using slow path: decrypt/re-encrypt for %s", r.URL.Path) | |
| 		chunks, destIV, err := s3a.copyChunksWithReencryption(entry, copySourceKey, destKey, r.URL.Path) | |
| 		if err != nil { | |
| 			return nil, nil, err | |
| 		} | |
| 
 | |
| 		// Create destination metadata with IV and SSE-C headers | |
| 		dstMetadata := make(map[string][]byte) | |
| 		if destKey != nil && len(destIV) > 0 { | |
| 			// Store the IV | |
| 			StoreIVInMetadata(dstMetadata, destIV) | |
| 
 | |
| 			// Store SSE-C algorithm and key MD5 for proper metadata | |
| 			dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") | |
| 			dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destKey.KeyMD5) | |
| 
 | |
| 			glog.V(2).Infof("Prepared IV and SSE-C metadata for destination copy: %s", r.URL.Path) | |
| 		} | |
| 
 | |
| 		return chunks, dstMetadata, nil | |
| 
 | |
| 	default: | |
| 		return nil, nil, fmt.Errorf("unknown SSE-C copy strategy: %v", strategy) | |
| 	} | |
| } | |
| 
 | |
| // copyChunksWithReencryption handles the slow path: decrypt source and re-encrypt for destination | |
| // Returns the destination chunks and the IV used for encryption (if any) | |
| func (s3a *S3ApiServer) copyChunksWithReencryption(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, []byte, error) { | |
| 	dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks())) | |
| 	const defaultChunkCopyConcurrency = 4 | |
| 	executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations | |
| 	errChan := make(chan error, len(entry.GetChunks())) | |
| 
 | |
| 	// Generate a single IV for the destination object (if destination is encrypted) | |
| 	var destIV []byte | |
| 	if destKey != nil { | |
| 		destIV = make([]byte, s3_constants.AESBlockSize) | |
| 		if _, err := io.ReadFull(rand.Reader, destIV); err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to generate destination IV: %w", err) | |
| 		} | |
| 	} | |
| 
 | |
| 	for i, chunk := range entry.GetChunks() { | |
| 		chunkIndex := i | |
| 		executor.Execute(func() { | |
| 			dstChunk, err := s3a.copyChunkWithReencryption(chunk, copySourceKey, destKey, dstPath, entry.Extended, destIV) | |
| 			if err != nil { | |
| 				errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err) | |
| 				return | |
| 			} | |
| 			dstChunks[chunkIndex] = dstChunk | |
| 			errChan <- nil | |
| 		}) | |
| 	} | |
| 
 | |
| 	// Wait for all operations to complete and check for errors | |
| 	for i := 0; i < len(entry.GetChunks()); i++ { | |
| 		if err := <-errChan; err != nil { | |
| 			return nil, nil, err | |
| 		} | |
| 	} | |
| 
 | |
| 	return dstChunks, destIV, nil | |
| } | |
| 
 | |
| // copyChunkWithReencryption copies a single chunk with decrypt/re-encrypt | |
| func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string, srcMetadata map[string][]byte, destIV []byte) (*filer_pb.FileChunk, error) { | |
| 	// Create destination chunk | |
| 	dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) | |
| 
 | |
| 	// Prepare chunk copy (assign new volume and get source URL) | |
| 	assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Set file ID on destination chunk | |
| 	if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Download encrypted chunk data | |
| 	encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("download encrypted chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	var finalData []byte | |
| 
 | |
| 	// Decrypt if source is encrypted | |
| 	if copySourceKey != nil { | |
| 		// Get IV from source metadata | |
| 		srcIV, err := GetIVFromMetadata(srcMetadata) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("failed to get IV from metadata: %w", err) | |
| 		} | |
| 
 | |
| 		// Use counter offset based on chunk position in the original object | |
| 		decryptedReader, decErr := CreateSSECDecryptedReaderWithOffset(bytes.NewReader(encryptedData), copySourceKey, srcIV, uint64(chunk.Offset)) | |
| 		if decErr != nil { | |
| 			return nil, fmt.Errorf("create decrypted reader: %w", decErr) | |
| 		} | |
| 
 | |
| 		decryptedData, readErr := io.ReadAll(decryptedReader) | |
| 		if readErr != nil { | |
| 			return nil, fmt.Errorf("decrypt chunk data: %w", readErr) | |
| 		} | |
| 		finalData = decryptedData | |
| 	} else { | |
| 		// Source is unencrypted | |
| 		finalData = encryptedData | |
| 	} | |
| 
 | |
| 	// Re-encrypt if destination should be encrypted | |
| 	if destKey != nil { | |
| 		// Use the provided destination IV with counter offset based on chunk position | |
| 		// This ensures all chunks of the same object use the same IV with different counters | |
| 		encryptedReader, encErr := CreateSSECEncryptedReaderWithOffset(bytes.NewReader(finalData), destKey, destIV, uint64(chunk.Offset)) | |
| 		if encErr != nil { | |
| 			return nil, fmt.Errorf("create encrypted reader: %w", encErr) | |
| 		} | |
| 
 | |
| 		reencryptedData, readErr := io.ReadAll(encryptedReader) | |
| 		if readErr != nil { | |
| 			return nil, fmt.Errorf("re-encrypt chunk data: %w", readErr) | |
| 		} | |
| 		finalData = reencryptedData | |
| 
 | |
| 		// Update chunk size to include IV | |
| 		dstChunk.Size = uint64(len(finalData)) | |
| 	} | |
| 
 | |
| 	// Upload the processed data | |
| 	if err := s3a.uploadChunkData(finalData, assignResult); err != nil { | |
| 		return nil, fmt.Errorf("upload processed chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	return dstChunk, nil | |
| } | |
| 
 | |
| // copyChunksWithSSEKMS handles SSE-KMS aware copying with smart fast/slow path selection | |
| // Returns chunks and destination metadata like SSE-C for consistency | |
| func (s3a *S3ApiServer) copyChunksWithSSEKMS(entry *filer_pb.Entry, r *http.Request, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) { | |
| 	glog.Infof("copyChunksWithSSEKMS called for %s with %d chunks", r.URL.Path, len(entry.GetChunks())) | |
| 
 | |
| 	// Parse SSE-KMS headers from copy request | |
| 	destKeyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r) | |
| 	if err != nil { | |
| 		return nil, nil, err | |
| 	} | |
| 
 | |
| 	// Check if this is a multipart SSE-KMS object | |
| 	isMultipartSSEKMS := false | |
| 	sseKMSChunks := 0 | |
| 	for i, chunk := range entry.GetChunks() { | |
| 		glog.V(4).Infof("Chunk %d: sseType=%d, hasKMSMetadata=%t", i, chunk.GetSseType(), len(chunk.GetSseMetadata()) > 0) | |
| 		if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { | |
| 			sseKMSChunks++ | |
| 		} | |
| 	} | |
| 	isMultipartSSEKMS = sseKMSChunks > 1 | |
| 
 | |
| 	glog.Infof("SSE-KMS copy analysis: total chunks=%d, sseKMS chunks=%d, isMultipart=%t", len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS) | |
| 
 | |
| 	if isMultipartSSEKMS { | |
| 		glog.V(2).Infof("Detected multipart SSE-KMS object with %d encrypted chunks for copy", sseKMSChunks) | |
| 		return s3a.copyMultipartSSEKMSChunks(entry, destKeyID, encryptionContext, bucketKeyEnabled, r.URL.Path, bucket) | |
| 	} | |
| 
 | |
| 	// Single-part SSE-KMS object: use existing logic | |
| 	// If no SSE-KMS headers and source is not SSE-KMS encrypted, use regular copy | |
| 	if destKeyID == "" && !IsSSEKMSEncrypted(entry.Extended) { | |
| 		chunks, err := s3a.copyChunks(entry, r.URL.Path) | |
| 		return chunks, nil, err | |
| 	} | |
| 
 | |
| 	// Apply bucket default encryption if no explicit key specified | |
| 	if destKeyID == "" { | |
| 		bucketMetadata, err := s3a.getBucketMetadata(bucket) | |
| 		if err != nil { | |
| 			glog.V(2).Infof("Could not get bucket metadata for default encryption: %v", err) | |
| 		} else if bucketMetadata != nil && bucketMetadata.Encryption != nil && bucketMetadata.Encryption.SseAlgorithm == "aws:kms" { | |
| 			destKeyID = bucketMetadata.Encryption.KmsKeyId | |
| 			bucketKeyEnabled = bucketMetadata.Encryption.BucketKeyEnabled | |
| 		} | |
| 	} | |
| 
 | |
| 	// Determine copy strategy | |
| 	strategy, err := DetermineSSEKMSCopyStrategy(entry.Extended, destKeyID) | |
| 	if err != nil { | |
| 		return nil, nil, err | |
| 	} | |
| 
 | |
| 	glog.V(2).Infof("SSE-KMS copy strategy for %s: %v", r.URL.Path, strategy) | |
| 
 | |
| 	switch strategy { | |
| 	case SSEKMSCopyStrategyDirect: | |
| 		// FAST PATH: Direct chunk copy (same key or both unencrypted) | |
| 		glog.V(2).Infof("Using fast path: direct chunk copy for %s", r.URL.Path) | |
| 		chunks, err := s3a.copyChunks(entry, r.URL.Path) | |
| 		// For direct copy, generate destination metadata if we're encrypting to SSE-KMS | |
| 		var dstMetadata map[string][]byte | |
| 		if destKeyID != "" { | |
| 			dstMetadata = make(map[string][]byte) | |
| 			if encryptionContext == nil { | |
| 				encryptionContext = BuildEncryptionContext(bucket, r.URL.Path, bucketKeyEnabled) | |
| 			} | |
| 			sseKey := &SSEKMSKey{ | |
| 				KeyID:             destKeyID, | |
| 				EncryptionContext: encryptionContext, | |
| 				BucketKeyEnabled:  bucketKeyEnabled, | |
| 			} | |
| 			if kmsMetadata, serializeErr := SerializeSSEKMSMetadata(sseKey); serializeErr == nil { | |
| 				dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata | |
| 				glog.V(3).Infof("Generated SSE-KMS metadata for direct copy: keyID=%s", destKeyID) | |
| 			} else { | |
| 				glog.Errorf("Failed to serialize SSE-KMS metadata for direct copy: %v", serializeErr) | |
| 			} | |
| 		} | |
| 		return chunks, dstMetadata, err | |
| 
 | |
| 	case SSEKMSCopyStrategyDecryptEncrypt: | |
| 		// SLOW PATH: Decrypt source and re-encrypt for destination | |
| 		glog.V(2).Infof("Using slow path: decrypt/re-encrypt for %s", r.URL.Path) | |
| 		return s3a.copyChunksWithSSEKMSReencryption(entry, destKeyID, encryptionContext, bucketKeyEnabled, r.URL.Path, bucket) | |
| 
 | |
| 	default: | |
| 		return nil, nil, fmt.Errorf("unknown SSE-KMS copy strategy: %v", strategy) | |
| 	} | |
| } | |
| 
 | |
| // copyChunksWithSSEKMSReencryption handles the slow path: decrypt source and re-encrypt for destination | |
| // Returns chunks and destination metadata like SSE-C for consistency | |
| func (s3a *S3ApiServer) copyChunksWithSSEKMSReencryption(entry *filer_pb.Entry, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) { | |
| 	var dstChunks []*filer_pb.FileChunk | |
| 
 | |
| 	// Extract and deserialize source SSE-KMS metadata | |
| 	var sourceSSEKey *SSEKMSKey | |
| 	if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists { | |
| 		var err error | |
| 		sourceSSEKey, err = DeserializeSSEKMSMetadata(keyData) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("failed to deserialize source SSE-KMS metadata: %w", err) | |
| 		} | |
| 		glog.V(3).Infof("Extracted source SSE-KMS key: keyID=%s, bucketKey=%t", sourceSSEKey.KeyID, sourceSSEKey.BucketKeyEnabled) | |
| 	} | |
| 
 | |
| 	// Process chunks | |
| 	for _, chunk := range entry.GetChunks() { | |
| 		dstChunk, err := s3a.copyChunkWithSSEKMSReencryption(chunk, sourceSSEKey, destKeyID, encryptionContext, bucketKeyEnabled, dstPath, bucket) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("copy chunk with SSE-KMS re-encryption: %w", err) | |
| 		} | |
| 		dstChunks = append(dstChunks, dstChunk) | |
| 	} | |
| 
 | |
| 	// Generate destination metadata for SSE-KMS encryption (consistent with SSE-C pattern) | |
| 	dstMetadata := make(map[string][]byte) | |
| 	if destKeyID != "" { | |
| 		// Build encryption context if not provided | |
| 		if encryptionContext == nil { | |
| 			encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled) | |
| 		} | |
| 
 | |
| 		// Create SSE-KMS key structure for destination metadata | |
| 		sseKey := &SSEKMSKey{ | |
| 			KeyID:             destKeyID, | |
| 			EncryptionContext: encryptionContext, | |
| 			BucketKeyEnabled:  bucketKeyEnabled, | |
| 			// Note: EncryptedDataKey will be generated during actual encryption | |
| 			// IV is also generated per chunk during encryption | |
| 		} | |
| 
 | |
| 		// Serialize SSE-KMS metadata for storage | |
| 		kmsMetadata, err := SerializeSSEKMSMetadata(sseKey) | |
| 		if err != nil { | |
| 			return nil, nil, fmt.Errorf("serialize destination SSE-KMS metadata: %w", err) | |
| 		} | |
| 
 | |
| 		dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata | |
| 		glog.V(3).Infof("Generated destination SSE-KMS metadata: keyID=%s, bucketKey=%t", destKeyID, bucketKeyEnabled) | |
| 	} | |
| 
 | |
| 	return dstChunks, dstMetadata, nil | |
| } | |
| 
 | |
| // copyChunkWithSSEKMSReencryption copies a single chunk with SSE-KMS decrypt/re-encrypt | |
| func (s3a *S3ApiServer) copyChunkWithSSEKMSReencryption(chunk *filer_pb.FileChunk, sourceSSEKey *SSEKMSKey, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) (*filer_pb.FileChunk, error) { | |
| 	// Create destination chunk | |
| 	dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) | |
| 
 | |
| 	// Prepare chunk copy (assign new volume and get source URL) | |
| 	assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) | |
| 	if err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Set file ID on destination chunk | |
| 	if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil { | |
| 		return nil, err | |
| 	} | |
| 
 | |
| 	// Download chunk data | |
| 	chunkData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) | |
| 	if err != nil { | |
| 		return nil, fmt.Errorf("download chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	var finalData []byte | |
| 
 | |
| 	// Decrypt source data if it's SSE-KMS encrypted | |
| 	if sourceSSEKey != nil { | |
| 		// For SSE-KMS, the encrypted chunk data contains IV + encrypted content | |
| 		// Use the source SSE key to decrypt the chunk data | |
| 		decryptedReader, err := CreateSSEKMSDecryptedReader(bytes.NewReader(chunkData), sourceSSEKey) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("create SSE-KMS decrypted reader: %w", err) | |
| 		} | |
| 
 | |
| 		decryptedData, err := io.ReadAll(decryptedReader) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("decrypt chunk data: %w", err) | |
| 		} | |
| 		finalData = decryptedData | |
| 		glog.V(4).Infof("Decrypted chunk data: %d bytes → %d bytes", len(chunkData), len(finalData)) | |
| 	} else { | |
| 		// Source is not SSE-KMS encrypted, use data as-is | |
| 		finalData = chunkData | |
| 	} | |
| 
 | |
| 	// Re-encrypt if destination should be SSE-KMS encrypted | |
| 	if destKeyID != "" { | |
| 		// Encryption context should already be provided by the caller | |
| 		// But ensure we have a fallback for robustness | |
| 		if encryptionContext == nil { | |
| 			encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled) | |
| 		} | |
| 
 | |
| 		encryptedReader, _, err := CreateSSEKMSEncryptedReaderWithBucketKey(bytes.NewReader(finalData), destKeyID, encryptionContext, bucketKeyEnabled) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("create SSE-KMS encrypted reader: %w", err) | |
| 		} | |
| 
 | |
| 		reencryptedData, err := io.ReadAll(encryptedReader) | |
| 		if err != nil { | |
| 			return nil, fmt.Errorf("re-encrypt chunk data: %w", err) | |
| 		} | |
| 
 | |
| 		// Store original decrypted data size for logging | |
| 		originalSize := len(finalData) | |
| 		finalData = reencryptedData | |
| 		glog.V(4).Infof("Re-encrypted chunk data: %d bytes → %d bytes", originalSize, len(finalData)) | |
| 
 | |
| 		// Update chunk size to include IV and encryption overhead | |
| 		dstChunk.Size = uint64(len(finalData)) | |
| 	} | |
| 
 | |
| 	// Upload the processed data | |
| 	if err := s3a.uploadChunkData(finalData, assignResult); err != nil { | |
| 		return nil, fmt.Errorf("upload processed chunk data: %w", err) | |
| 	} | |
| 
 | |
| 	glog.V(3).Infof("Successfully processed SSE-KMS chunk re-encryption: src_key=%s, dst_key=%s, size=%d→%d", | |
| 		getKeyIDString(sourceSSEKey), destKeyID, len(chunkData), len(finalData)) | |
| 
 | |
| 	return dstChunk, nil | |
| } | |
| 
 | |
| // getKeyIDString safely gets the KeyID from an SSEKMSKey, handling nil cases | |
| func getKeyIDString(key *SSEKMSKey) string { | |
| 	if key == nil { | |
| 		return "none" | |
| 	} | |
| 	if key.KeyID == "" { | |
| 		return "default" | |
| 	} | |
| 	return key.KeyID | |
| } | |
| 
 | |
| // EncryptionHeaderContext holds encryption type information and header classifications | |
| type EncryptionHeaderContext struct { | |
| 	SrcSSEC, SrcSSEKMS, SrcSSES3                bool | |
| 	DstSSEC, DstSSEKMS, DstSSES3                bool | |
| 	IsSSECHeader, IsSSEKMSHeader, IsSSES3Header bool | |
| } | |
| 
 | |
| // newEncryptionHeaderContext creates a context for encryption header processing | |
| func newEncryptionHeaderContext(headerKey string, srcSSEC, srcSSEKMS, srcSSES3, dstSSEC, dstSSEKMS, dstSSES3 bool) *EncryptionHeaderContext { | |
| 	return &EncryptionHeaderContext{ | |
| 		SrcSSEC: srcSSEC, SrcSSEKMS: srcSSEKMS, SrcSSES3: srcSSES3, | |
| 		DstSSEC: dstSSEC, DstSSEKMS: dstSSEKMS, DstSSES3: dstSSES3, | |
| 		IsSSECHeader:   isSSECHeader(headerKey), | |
| 		IsSSEKMSHeader: isSSEKMSHeader(headerKey, srcSSEKMS, dstSSEKMS), | |
| 		IsSSES3Header:  isSSES3Header(headerKey, srcSSES3, dstSSES3), | |
| 	} | |
| } | |
| 
 | |
| // isSSECHeader checks if the header is SSE-C specific | |
| func isSSECHeader(headerKey string) bool { | |
| 	return headerKey == s3_constants.AmzServerSideEncryptionCustomerAlgorithm || | |
| 		headerKey == s3_constants.AmzServerSideEncryptionCustomerKeyMD5 || | |
| 		headerKey == s3_constants.SeaweedFSSSEIV | |
| } | |
| 
 | |
| // isSSEKMSHeader checks if the header is SSE-KMS specific | |
| func isSSEKMSHeader(headerKey string, srcSSEKMS, dstSSEKMS bool) bool { | |
| 	return (headerKey == s3_constants.AmzServerSideEncryption && (srcSSEKMS || dstSSEKMS)) || | |
| 		headerKey == s3_constants.AmzServerSideEncryptionAwsKmsKeyId || | |
| 		headerKey == s3_constants.SeaweedFSSSEKMSKey || | |
| 		headerKey == s3_constants.SeaweedFSSSEKMSKeyID || | |
| 		headerKey == s3_constants.SeaweedFSSSEKMSEncryption || | |
| 		headerKey == s3_constants.SeaweedFSSSEKMSBucketKeyEnabled || | |
| 		headerKey == s3_constants.SeaweedFSSSEKMSEncryptionContext || | |
| 		headerKey == s3_constants.SeaweedFSSSEKMSBaseIV | |
| } | |
| 
 | |
| // isSSES3Header checks if the header is SSE-S3 specific | |
| func isSSES3Header(headerKey string, srcSSES3, dstSSES3 bool) bool { | |
| 	return (headerKey == s3_constants.AmzServerSideEncryption && (srcSSES3 || dstSSES3)) || | |
| 		headerKey == s3_constants.SeaweedFSSSES3Key || | |
| 		headerKey == s3_constants.SeaweedFSSSES3Encryption || | |
| 		headerKey == s3_constants.SeaweedFSSSES3BaseIV || | |
| 		headerKey == s3_constants.SeaweedFSSSES3KeyData | |
| } | |
| 
 | |
| // shouldSkipCrossEncryptionHeader handles cross-encryption copy scenarios | |
| func (ctx *EncryptionHeaderContext) shouldSkipCrossEncryptionHeader() bool { | |
| 	// SSE-C to SSE-KMS: skip SSE-C headers | |
| 	if ctx.SrcSSEC && ctx.DstSSEKMS && ctx.IsSSECHeader { | |
| 		return true | |
| 	} | |
| 
 | |
| 	// SSE-KMS to SSE-C: skip SSE-KMS headers | |
| 	if ctx.SrcSSEKMS && ctx.DstSSEC && ctx.IsSSEKMSHeader { | |
| 		return true | |
| 	} | |
| 
 | |
| 	// SSE-C to SSE-S3: skip SSE-C headers | |
| 	if ctx.SrcSSEC && ctx.DstSSES3 && ctx.IsSSECHeader { | |
| 		return true | |
| 	} | |
| 
 | |
| 	// SSE-S3 to SSE-C: skip SSE-S3 headers | |
| 	if ctx.SrcSSES3 && ctx.DstSSEC && ctx.IsSSES3Header { | |
| 		return true | |
| 	} | |
| 
 | |
| 	// SSE-KMS to SSE-S3: skip SSE-KMS headers | |
| 	if ctx.SrcSSEKMS && ctx.DstSSES3 && ctx.IsSSEKMSHeader { | |
| 		return true | |
| 	} | |
| 
 | |
| 	// SSE-S3 to SSE-KMS: skip SSE-S3 headers | |
| 	if ctx.SrcSSES3 && ctx.DstSSEKMS && ctx.IsSSES3Header { | |
| 		return true | |
| 	} | |
| 
 | |
| 	return false | |
| } | |
| 
 | |
| // shouldSkipEncryptedToUnencryptedHeader handles encrypted to unencrypted copy scenarios | |
| func (ctx *EncryptionHeaderContext) shouldSkipEncryptedToUnencryptedHeader() bool { | |
| 	// Skip all encryption headers when copying from encrypted to unencrypted | |
| 	hasSourceEncryption := ctx.SrcSSEC || ctx.SrcSSEKMS || ctx.SrcSSES3 | |
| 	hasDestinationEncryption := ctx.DstSSEC || ctx.DstSSEKMS || ctx.DstSSES3 | |
| 	isAnyEncryptionHeader := ctx.IsSSECHeader || ctx.IsSSEKMSHeader || ctx.IsSSES3Header | |
| 
 | |
| 	return hasSourceEncryption && !hasDestinationEncryption && isAnyEncryptionHeader | |
| } | |
| 
 | |
| // shouldSkipEncryptionHeader determines if a header should be skipped when copying extended attributes | |
| // based on the source and destination encryption types. This consolidates the repetitive logic for | |
| // filtering encryption-related headers during copy operations. | |
| func shouldSkipEncryptionHeader(headerKey string, | |
| 	srcSSEC, srcSSEKMS, srcSSES3 bool, | |
| 	dstSSEC, dstSSEKMS, dstSSES3 bool) bool { | |
| 
 | |
| 	// Create context to reduce complexity and improve testability | |
| 	ctx := newEncryptionHeaderContext(headerKey, srcSSEC, srcSSEKMS, srcSSES3, dstSSEC, dstSSEKMS, dstSSES3) | |
| 
 | |
| 	// If it's not an encryption header, don't skip it | |
| 	if !ctx.IsSSECHeader && !ctx.IsSSEKMSHeader && !ctx.IsSSES3Header { | |
| 		return false | |
| 	} | |
| 
 | |
| 	// Handle cross-encryption scenarios (different encryption types) | |
| 	if ctx.shouldSkipCrossEncryptionHeader() { | |
| 		return true | |
| 	} | |
| 
 | |
| 	// Handle encrypted to unencrypted scenarios | |
| 	if ctx.shouldSkipEncryptedToUnencryptedHeader() { | |
| 		return true | |
| 	} | |
| 
 | |
| 	// Default: don't skip the header | |
| 	return false | |
| }
 |