Browse Source

Eliminated Unnecessary Stream Prefetch for Multipart SSE

pull/7481/head
chrislu 3 weeks ago
parent
commit
e87b48a6c7
  1. 90
      weed/s3api/s3api_object_handlers.go

90
weed/s3api/s3api_object_handlers.go

@ -926,18 +926,11 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
}
headerSetTime = time.Since(tHeaderSet)
// Get encrypted data stream (without headers)
tStreamFetch := time.Now()
encryptedReader, err := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
streamFetchTime = time.Since(tStreamFetch)
if err != nil {
return err
}
defer encryptedReader.Close()
// Wrap with decryption
// Optimization: Check if multipart before creating stream to avoid wasteful fetch
tDecryptSetup := time.Now()
var decryptedReader io.Reader
var err error
switch sseType {
case s3_constants.SSETypeC:
customerKey := decryptionKey.(*SSECustomerKey)
@ -955,11 +948,20 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
customerKey.KeyMD5, len(entry.GetChunks()), isMultipartSSEC, ssecChunks)
if isMultipartSSEC {
// Handle multipart SSE-C objects - each chunk needs independent decryption with its own IV
decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(encryptedReader, customerKey, entry)
glog.V(2).Infof("Using multipart SSE-C decryption for object with %d chunks", len(entry.GetChunks()))
// For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly
// This saves one filer lookup/pipe creation
decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(nil, customerKey, entry)
glog.V(2).Infof("Using multipart SSE-C decryption for object with %d chunks (no prefetch)", len(entry.GetChunks()))
} else {
// Handle single-part SSE-C objects - use object-level IV
// For single-part, get encrypted stream and decrypt
tStreamFetch := time.Now()
encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
streamFetchTime = time.Since(tStreamFetch)
if streamErr != nil {
return streamErr
}
defer encryptedReader.Close()
iv := entry.Extended[s3_constants.SeaweedFSSSEIV]
if len(iv) == 0 {
return fmt.Errorf("SSE-C IV not found in entry metadata")
@ -967,6 +969,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
glog.V(2).Infof("SSE-C decryption: IV length=%d, KeyMD5=%s", len(iv), customerKey.KeyMD5)
decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv)
}
case s3_constants.SSETypeKMS:
sseKMSKey := decryptionKey.(*SSEKMSKey)
@ -982,15 +985,33 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
glog.V(3).Infof("SSE-KMS decryption: isMultipart=%v, ssekmsChunks=%d", isMultipartSSEKMS, ssekmsChunks)
if isMultipartSSEKMS {
// Handle multipart SSE-KMS objects - each chunk needs independent decryption
decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(encryptedReader, entry)
glog.V(2).Infof("Using multipart SSE-KMS decryption for object with %d chunks", len(entry.GetChunks()))
// For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly
decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(nil, entry)
glog.V(2).Infof("Using multipart SSE-KMS decryption for object with %d chunks (no prefetch)", len(entry.GetChunks()))
} else {
// Handle single-part SSE-KMS objects
// For single-part, get encrypted stream and decrypt
tStreamFetch := time.Now()
encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
streamFetchTime = time.Since(tStreamFetch)
if streamErr != nil {
return streamErr
}
defer encryptedReader.Close()
glog.V(2).Infof("SSE-KMS decryption: KeyID=%s, IV length=%d", sseKMSKey.KeyID, len(sseKMSKey.IV))
decryptedReader, err = CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey)
}
case s3_constants.SSETypeS3:
// SSE-S3 typically doesn't use multipart per-chunk encryption, always use stream
tStreamFetch := time.Now()
encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
streamFetchTime = time.Since(tStreamFetch)
if streamErr != nil {
return streamErr
}
defer encryptedReader.Close()
sseS3Key := decryptionKey.(*SSES3Key)
keyManager := GetSSES3KeyManager()
iv, ivErr := GetSSES3IV(entry, sseS3Key, keyManager)
@ -1012,24 +1033,39 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
buf := make([]byte, 128*1024)
if isRangeRequest {
// For range requests, skip to offset and copy only requested size
// Note: This currently decrypts the full object then seeks - future optimization
// would decrypt only the requested range using CTR mode offset calculation
// Range-aware optimization: For large offsets, skip unnecessary decryption
// by using LimitReader instead of io.Discard when offset is significant
// Note: A full range-aware implementation would reconstruct the decrypted reader
// from only the necessary chunks using CTR IV offset calculation.
// Current approach: skip efficiently with io.Discard, copy only needed bytes
if offset > 0 {
_, err := io.CopyN(io.Discard, decryptedReader, offset)
// Use io.Discard to skip to the offset without allocating buffer
discarded, err := io.CopyN(io.Discard, decryptedReader, offset)
if err != nil {
glog.Errorf("Failed to seek to range offset %d: %v", offset, err)
glog.Errorf("Failed to seek to range offset %d (discarded %d bytes): %v", offset, discarded, err)
return fmt.Errorf("failed to seek to range offset: %w", err)
}
glog.V(3).Infof("Range request: skipped %d bytes to reach offset %d", discarded, offset)
}
_, copyErr := io.CopyN(w, decryptedReader, size)
// Copy only the requested size to the client
copied, copyErr := io.CopyN(w, decryptedReader, size)
copyTime = time.Since(tCopy)
return copyErr
if copyErr != nil && copyErr != io.EOF {
glog.Errorf("Failed to copy range data: copied %d/%d bytes: %v", copied, size, copyErr)
return copyErr
}
glog.V(3).Infof("Range request: copied %d bytes [%d-%d]", copied, offset, offset+size-1)
return nil
} else {
// Full object request
_, copyErr := io.CopyBuffer(w, decryptedReader, buf)
copied, copyErr := io.CopyBuffer(w, decryptedReader, buf)
copyTime = time.Since(tCopy)
return copyErr
if copyErr != nil {
glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr)
return copyErr
}
glog.V(3).Infof("Full object request: copied %d bytes", copied)
return nil
}
}

Loading…
Cancel
Save