Browse Source

purge unused code

pull/7481/head
chrislu 2 weeks ago
parent
commit
def705fd51
  1. 576
      weed/s3api/s3api_object_handlers.go
  2. 20
      weed/s3api/s3api_object_handlers_put.go

576
weed/s3api/s3api_object_handlers.go

@ -113,18 +113,19 @@ func removeDuplicateSlashes(object string) string {
// - If a 0-byte object or directory has no children → it's empty → HEAD returns 200
//
// Examples:
// hasChildren("bucket", "dataset") where "dataset/file.txt" exists → true
// hasChildren("bucket", "empty-dir") where no children exist → false
//
// hasChildren("bucket", "dataset") where "dataset/file.txt" exists → true
// hasChildren("bucket", "empty-dir") where no children exist → false
//
// Performance: ~1-5ms per call (one gRPC LIST request with Limit=1)
func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool {
// Clean up prefix: remove leading slashes
cleanPrefix := strings.TrimPrefix(prefix, "/")
// The directory to list is bucketDir + cleanPrefix
bucketDir := s3a.option.BucketsPath + "/" + bucket
fullPath := bucketDir + "/" + cleanPrefix
// Try to list one child object in the directory
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.ListEntriesRequest{
@ -132,12 +133,12 @@ func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool {
Limit: 1,
InclusiveStartFrom: true,
}
stream, err := client.ListEntries(context.Background(), request)
if err != nil {
return err
}
// Check if we got at least one entry
_, err = stream.Recv()
if err == io.EOF {
@ -148,7 +149,7 @@ func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool {
}
return nil
})
// If we got an entry (not EOF), then it has children
return err == nil
}
@ -402,14 +403,14 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
if versionId != "" {
// Request for specific version - must look in .versions directory
glog.V(3).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
targetVersionId = versionId
glog.V(3).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
targetVersionId = versionId
} else {
// Request for latest version - OPTIMIZATION:
// Check if .versions/ directory exists quickly (no retries) to decide path
@ -447,15 +448,15 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}
} else {
// Transient error checking .versions/, fall back to getLatestObjectVersion with retries
glog.V(2).Infof("GetObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
glog.V(2).Infof("GetObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
}
// Extract version ID if not already set
// Extract version ID if not already set
if targetVersionId == "" {
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
@ -1751,99 +1752,6 @@ func (s3a *S3ApiServer) addSSEResponseHeadersFromEntry(w http.ResponseWriter, r
}
}
// pipeWriterWrapper wraps io.PipeWriter to implement http.ResponseWriter interface
type pipeWriterWrapper struct {
*io.PipeWriter
}
func (pw *pipeWriterWrapper) Header() http.Header {
// Headers are already set on the real ResponseWriter, ignore here
return make(http.Header)
}
func (pw *pipeWriterWrapper) WriteHeader(statusCode int) {
// Status is already set on the real ResponseWriter, ignore here
}
// createSSECDecryptedReaderFromEntry creates an SSE-C decrypted reader from entry metadata
func (s3a *S3ApiServer) createSSECDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) {
// Parse SSE-C headers from request
customerKey, err := ParseSSECHeaders(r)
if err != nil {
return nil, fmt.Errorf("failed to parse SSE-C headers: %w", err)
}
if customerKey == nil {
return nil, fmt.Errorf("SSE-C key required but not provided")
}
// Validate key MD5 from entry metadata
if entry.Extended != nil {
storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5])
if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 {
return nil, fmt.Errorf("SSE-C key mismatch")
}
}
// Get IV from entry metadata (stored as raw bytes, matching filer behavior)
iv := entry.Extended[s3_constants.SeaweedFSSSEIV]
if len(iv) == 0 {
return nil, fmt.Errorf("SSE-C IV not found in metadata")
}
// Create decrypted reader
return CreateSSECDecryptedReader(encryptedReader, customerKey, iv)
}
// createSSEKMSDecryptedReaderFromEntry creates an SSE-KMS decrypted reader from entry metadata
func (s3a *S3ApiServer) createSSEKMSDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) {
// Extract SSE-KMS metadata from entry (stored as raw bytes, matching filer behavior)
if entry.Extended == nil {
return nil, fmt.Errorf("no extended metadata found")
}
kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]
if !exists {
return nil, fmt.Errorf("SSE-KMS metadata not found")
}
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
}
// Create decrypted reader
return CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey)
}
// createSSES3DecryptedReaderFromEntry creates an SSE-S3 decrypted reader from entry metadata
func (s3a *S3ApiServer) createSSES3DecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) {
// Extract SSE-S3 metadata from entry (stored as raw bytes, matching filer behavior)
if entry.Extended == nil {
return nil, fmt.Errorf("no extended metadata found")
}
keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]
if !exists {
return nil, fmt.Errorf("SSE-S3 metadata not found")
}
keyManager := GetSSES3KeyManager()
sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err)
}
// Get IV
iv, err := GetSSES3IV(entry, sseS3Key, keyManager)
if err != nil {
return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err)
}
// Create decrypted reader
return CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv)
}
// setResponseHeaders sets all standard HTTP response headers from entry metadata
func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) {
// Safety check: entry must be valid
@ -1954,10 +1862,11 @@ func (s *simpleMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunc
// which correctly identifies directories by checking for children
//
// Examples:
// HEAD /bucket/dataset (no trailing slash, has children) → 404 Not Found (implicit directory)
// HEAD /bucket/dataset/ (trailing slash) → 200 OK (explicit directory request)
// HEAD /bucket/empty.txt (0-byte file, no children) → 200 OK (legitimate empty file)
// HEAD /bucket/file.txt (regular file) → 200 OK (normal operation)
//
// HEAD /bucket/dataset (no trailing slash, has children) → 404 Not Found (implicit directory)
// HEAD /bucket/dataset/ (trailing slash) → 200 OK (explicit directory request)
// HEAD /bucket/empty.txt (0-byte file, no children) → 200 OK (legitimate empty file)
// HEAD /bucket/file.txt (regular file) → 200 OK (normal operation)
//
// This behavior only applies to:
// - Non-versioned buckets (versioned buckets use different semantics)
@ -2009,14 +1918,14 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
if versionId != "" {
// Request for specific version
glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
targetVersionId = versionId
glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
targetVersionId = versionId
} else {
// Request for latest version - OPTIMIZATION:
// Check if .versions/ directory exists quickly (no retries) to decide path
@ -2054,15 +1963,15 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
}
} else {
// Transient error checking .versions/, fall back to getLatestObjectVersion with retries
glog.V(2).Infof("HeadObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
glog.V(2).Infof("HeadObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
}
// Extract version ID if not already set
// Extract version ID if not already set
if targetVersionId == "" {
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
@ -2176,7 +2085,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
if objectEntryForSSE.Attributes != nil {
isZeroByteFile := objectEntryForSSE.Attributes.FileSize == 0 && !objectEntryForSSE.IsDirectory
isActualDirectory := objectEntryForSSE.IsDirectory
if isZeroByteFile || isActualDirectory {
// Check if it has children (making it an implicit directory)
if s3a.hasChildren(bucket, object) {
@ -2520,247 +2429,6 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http.
}
}
// handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing
// The objectEntry parameter should be the correct entry for the requested version (if versioned)
func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, objectEntry *filer_pb.Entry) (statusCode int, bytesTransferred int64) {
// Check what the client is expecting based on request headers
clientExpectsSSEC := IsSSECRequest(r)
// Check what the stored object has in headers (may be conflicting after copy)
kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
// Detect actual object SSE type from the provided entry (respects versionId)
actualObjectType := "Unknown"
if objectEntry != nil {
actualObjectType = s3a.detectPrimarySSEType(objectEntry)
}
// If objectEntry is nil, we cannot determine SSE type from chunks
// This should only happen for 404s which will be handled by the proxy
if objectEntry == nil {
glog.V(4).Infof("Object entry not available for SSE routing, passing through")
return passThroughResponse(proxyResponse, w)
}
// Route based on ACTUAL object type (from chunks) rather than conflicting headers
if actualObjectType == s3_constants.SSETypeC && clientExpectsSSEC {
// Object is SSE-C and client expects SSE-C → SSE-C handler
return s3a.handleSSECResponse(r, proxyResponse, w, objectEntry)
} else if actualObjectType == s3_constants.SSETypeKMS && !clientExpectsSSEC {
// Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler
return s3a.handleSSEKMSResponse(r, proxyResponse, w, objectEntry, kmsMetadataHeader)
} else if actualObjectType == s3_constants.SSETypeS3 && !clientExpectsSSEC {
// Object is SSE-S3 and client doesn't expect SSE-C → SSE-S3 handler
return s3a.handleSSES3Response(r, proxyResponse, w, objectEntry)
} else if actualObjectType == "None" && !clientExpectsSSEC {
// Object is unencrypted and client doesn't expect SSE-C → pass through
return passThroughResponse(proxyResponse, w)
} else if actualObjectType == s3_constants.SSETypeC && !clientExpectsSSEC {
// Object is SSE-C but client doesn't provide SSE-C headers → Error
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
return http.StatusBadRequest, 0
} else if actualObjectType == s3_constants.SSETypeKMS && clientExpectsSSEC {
// Object is SSE-KMS but client provides SSE-C headers → Error
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
return http.StatusBadRequest, 0
} else if actualObjectType == s3_constants.SSETypeS3 && clientExpectsSSEC {
// Object is SSE-S3 but client provides SSE-C headers → Error (mismatched encryption)
s3err.WriteErrorResponse(w, r, s3err.ErrSSEEncryptionTypeMismatch)
return http.StatusBadRequest, 0
} else if actualObjectType == "None" && clientExpectsSSEC {
// Object is unencrypted but client provides SSE-C headers → Error
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
return http.StatusBadRequest, 0
}
// Unknown state - pass through and let proxy handle it
glog.V(4).Infof("Unknown SSE state: objectType=%s, clientExpectsSSEC=%v", actualObjectType, clientExpectsSSEC)
return passThroughResponse(proxyResponse, w)
}
// handleSSEKMSResponse handles SSE-KMS decryption and response processing
func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, entry *filer_pb.Entry, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) {
// Deserialize SSE-KMS metadata
kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader)
if err != nil {
glog.Errorf("Failed to decode SSE-KMS metadata: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err != nil {
glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
// For HEAD requests, we don't need to decrypt the body, just add response headers
if r.Method == "HEAD" {
// Capture existing CORS headers that may have been set by middleware
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
// Copy headers from proxy response (excluding internal SeaweedFS headers)
copyResponseHeaders(w, proxyResponse, false)
// Add SSE-KMS response headers
AddSSEKMSResponseHeaders(w, sseKMSKey)
return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
}
// For GET requests, check if this is a multipart SSE-KMS object
// We need to check the object structure to determine if it's multipart encrypted
isMultipartSSEKMS := false
if sseKMSKey != nil && entry != nil {
// Use the entry parameter passed from the caller (avoids redundant lookup)
// Check for multipart SSE-KMS
sseKMSChunks := 0
for _, chunk := range entry.GetChunks() {
if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
sseKMSChunks++
}
}
isMultipartSSEKMS = sseKMSChunks > 1
}
var decryptedReader io.Reader
if isMultipartSSEKMS {
// Handle multipart SSE-KMS objects - each chunk needs independent decryption
multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse, entry)
if decErr != nil {
glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
decryptedReader = multipartReader
glog.V(3).Infof("Using multipart SSE-KMS decryption for object")
} else {
// Handle single-part SSE-KMS objects
singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey)
if decErr != nil {
glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
decryptedReader = singlePartReader
glog.V(3).Infof("Using single-part SSE-KMS decryption for object")
}
// Capture existing CORS headers that may have been set by middleware
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
// Copy headers from proxy response (excluding body-related headers that might change and internal SeaweedFS headers)
copyResponseHeaders(w, proxyResponse, true)
// Set correct Content-Length for SSE-KMS
if proxyResponse.Header.Get("Content-Range") == "" {
// For full object requests, encrypted length equals original length
if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
w.Header().Set("Content-Length", contentLengthStr)
}
}
// Add SSE-KMS response headers
AddSSEKMSResponseHeaders(w, sseKMSKey)
return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
}
// handleSSES3Response handles SSE-S3 decryption and response processing
func (s3a *S3ApiServer) handleSSES3Response(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, entry *filer_pb.Entry) (statusCode int, bytesTransferred int64) {
// For HEAD requests, we don't need to decrypt the body, just add response headers
if r.Method == "HEAD" {
// Capture existing CORS headers that may have been set by middleware
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
// Copy headers from proxy response (excluding internal SeaweedFS headers)
copyResponseHeaders(w, proxyResponse, false)
// Add SSE-S3 response headers
w.Header().Set(s3_constants.AmzServerSideEncryption, SSES3Algorithm)
return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
}
// For GET requests, check if this is a multipart SSE-S3 object
isMultipartSSES3 := false
sses3Chunks := 0
for _, chunk := range entry.GetChunks() {
if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(chunk.GetSseMetadata()) > 0 {
sses3Chunks++
}
}
isMultipartSSES3 = sses3Chunks > 1
var decryptedReader io.Reader
if isMultipartSSES3 {
// Handle multipart SSE-S3 objects - each chunk needs independent decryption
multipartReader, decErr := s3a.createMultipartSSES3DecryptedReader(r, entry)
if decErr != nil {
glog.Errorf("Failed to create multipart SSE-S3 decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
decryptedReader = multipartReader
glog.V(3).Infof("Using multipart SSE-S3 decryption for object")
} else {
// Handle single-part SSE-S3 objects
// Extract SSE-S3 key from metadata
keyManager := GetSSES3KeyManager()
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; !exists {
glog.Errorf("SSE-S3 key metadata not found in object entry")
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
} else {
sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
if err != nil {
glog.Errorf("Failed to deserialize SSE-S3 metadata: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
// Extract IV from metadata using helper function
iv, err := GetSSES3IV(entry, sseS3Key, keyManager)
if err != nil {
glog.Errorf("Failed to get SSE-S3 IV: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
singlePartReader, decErr := CreateSSES3DecryptedReader(proxyResponse.Body, sseS3Key, iv)
if decErr != nil {
glog.Errorf("Failed to create SSE-S3 decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return http.StatusInternalServerError, 0
}
decryptedReader = singlePartReader
glog.V(3).Infof("Using single-part SSE-S3 decryption for object")
}
}
// Capture existing CORS headers that may have been set by middleware
capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
// Copy headers from proxy response (excluding body-related headers that might change and internal SeaweedFS headers)
copyResponseHeaders(w, proxyResponse, true)
// Set correct Content-Length for SSE-S3
if proxyResponse.Header.Get("Content-Range") == "" {
// For full object requests, encrypted length equals original length
if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
w.Header().Set("Content-Length", contentLengthStr)
}
}
// Add SSE-S3 response headers
w.Header().Set(s3_constants.AmzServerSideEncryption, SSES3Algorithm)
return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
}
// addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes
// and adds the appropriate S3 headers to the response
func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) {
@ -3170,158 +2838,6 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Co
return NewMultipartSSEReader(readers), nil
}
// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
// Entry is passed from caller to avoid redundant filer lookup
ctx := r.Context()
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].GetOffset() < chunks[j].GetOffset()
})
// Create readers for each chunk, decrypting them independently
var readers []io.Reader
for _, chunk := range chunks {
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
// Get SSE-KMS metadata for this chunk
var chunkSSEKMSKey *SSEKMSKey
// Check if this chunk has per-chunk SSE-KMS metadata (new architecture)
if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
// Use the per-chunk SSE-KMS metadata
kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
if err != nil {
glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
} else {
// ChunkOffset is already set from the stored metadata (PartOffset)
chunkSSEKMSKey = kmsKey
}
}
// Note: No fallback to object-level metadata for multipart objects
// Each chunk in a multipart SSE-KMS object must have its own unique IV
// Falling back to object-level metadata could lead to IV reuse or incorrect decryption
if chunkSSEKMSKey == nil {
return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString())
}
// Create decrypted reader for this chunk
decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey)
if decErr != nil {
chunkReader.Close() // Close the chunk reader if decryption fails
return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
}
// Wrap the decrypted reader with the underlying chunkReader to ensure HTTP body is closed
// This matches the SSE-S3 pattern and prevents resource leaks
readers = append(readers, struct {
io.Reader
io.Closer
}{
Reader: decryptedChunkReader,
Closer: chunkReader,
})
glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString())
}
// Combine all decrypted chunk readers into a single stream with proper resource management
multiReader := NewMultipartSSEReader(readers)
glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers))
return multiReader, nil
}
// createMultipartSSES3DecryptedReader creates a reader for multipart SSE-S3 objects
func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, entry *filer_pb.Entry) (io.Reader, error) {
ctx := r.Context()
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].GetOffset() < chunks[j].GetOffset()
})
// Create readers for each chunk, decrypting them independently
var readers []io.Reader
keyManager := GetSSES3KeyManager()
for _, chunk := range chunks {
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
// Handle based on chunk's encryption type
if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
var chunkSSES3Key *SSES3Key
// Check if this chunk has per-chunk SSE-S3 metadata
if len(chunk.GetSseMetadata()) > 0 {
// Use the per-chunk SSE-S3 metadata
sseKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager)
if err != nil {
glog.Errorf("Failed to deserialize per-chunk SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err)
chunkReader.Close()
return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %v", err)
}
chunkSSES3Key = sseKey
}
// Note: No fallback to object-level metadata for multipart objects
// Each chunk in a multipart SSE-S3 object must have its own unique IV
// Falling back to object-level metadata could lead to IV reuse or incorrect decryption
if chunkSSES3Key == nil {
chunkReader.Close()
return nil, fmt.Errorf("no SSE-S3 metadata found for chunk %s in multipart object", chunk.GetFileIdString())
}
// Extract IV from chunk metadata
if len(chunkSSES3Key.IV) == 0 {
chunkReader.Close()
return nil, fmt.Errorf("no IV found in SSE-S3 metadata for chunk %s", chunk.GetFileIdString())
}
// Create decrypted reader for this chunk
decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, chunkSSES3Key, chunkSSES3Key.IV)
if decErr != nil {
chunkReader.Close()
return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
}
// Use the streaming decrypted reader directly, ensuring the underlying chunkReader can be closed
readers = append(readers, struct {
io.Reader
io.Closer
}{
Reader: decryptedChunkReader,
Closer: chunkReader,
})
glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-S3 object", chunk.GetFileIdString())
} else {
// Non-SSE-S3 chunk (unencrypted or other encryption type), use as-is
readers = append(readers, chunkReader)
glog.V(4).Infof("Added passthrough reader for non-SSE-S3 chunk %s (type: %v)", chunk.GetFileIdString(), chunk.GetSseType())
}
}
// Combine all decrypted chunk readers into a single stream
multiReader := NewMultipartSSEReader(readers)
glog.V(3).Infof("Created multipart SSE-S3 decrypted reader with %d chunks", len(readers))
return multiReader, nil
}
// createEncryptedChunkReader creates a reader for a single encrypted chunk
// Context propagation ensures cancellation if the S3 client disconnects
func (s3a *S3ApiServer) createEncryptedChunkReader(ctx context.Context, chunk *filer_pb.FileChunk) (io.ReadCloser, error) {

20
weed/s3api/s3api_object_handlers_put.go

@ -641,26 +641,6 @@ func filerErrorToS3Error(errString string) s3err.ErrorCode {
}
}
func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) {
encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite)
if encodedJwt == "" {
return
}
r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
}
func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string {
var encodedJwt security.EncodedJwt
if isWrite {
encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec)
} else {
encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec)
}
return string(encodedJwt)
}
// setObjectOwnerFromRequest sets the object owner metadata based on the authenticated user
func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_pb.Entry) {
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)

Loading…
Cancel
Save