Browse Source

fix sse

pull/7481/head
chrislu 1 month ago
parent
commit
eae129e818
  1. 228
      weed/s3api/s3api_object_handlers.go

228
weed/s3api/s3api_object_handlers.go

@ -380,8 +380,6 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// Fetch the correct entry for SSE processing (respects versionId)
// This consolidates entry lookups to avoid multiple filer calls
var objectEntryForSSE *filer_pb.Entry
originalRangeHeader := r.Header.Get("Range")
var sseObject = false
// Optimization: Reuse already-fetched entry to avoid redundant metadata fetches
if versioningConfigured {
@ -416,35 +414,13 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// NEW OPTIMIZATION: Stream directly from volume servers, bypassing filer proxy
// This eliminates the 19ms filer proxy overhead
// SSE decryption is handled inline during streaming
// Check if this is an SSE object for Range request handling
// Detect SSE encryption type
primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE)
if originalRangeHeader != "" && (primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS) {
sseObject = true
// Temporarily remove Range header to get full encrypted data
r.Header.Del("Range")
}
// Add SSE response headers before streaming
if objectEntryForSSE != nil {
// Create a fake response to get SSE headers
fakeResp := &http.Response{Header: make(http.Header)}
s3a.addSSEHeadersToResponse(fakeResp, objectEntryForSSE)
// Copy SSE headers to actual response
for k, v := range fakeResp.Header {
if strings.HasPrefix(k, "X-Amz-Server-Side-Encryption") {
w.Header()[k] = v
}
}
}
// Restore the original Range header for SSE processing
if sseObject && originalRangeHeader != "" {
r.Header.Set("Range", originalRangeHeader)
}
// Stream directly from volume servers
err = s3a.streamFromVolumeServers(w, r, objectEntryForSSE, primarySSEType)
// Stream directly from volume servers with SSE support
err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType)
if err != nil {
glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err)
// Don't write error response - headers already sent
@ -600,6 +576,200 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
return streamFn(w)
}
// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption
func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error {
// If not encrypted, use fast path without decryption
if sseType == "" || sseType == "None" {
return s3a.streamFromVolumeServers(w, r, entry, sseType)
}
glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object", sseType)
// Add SSE response headers before streaming
s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType)
// For encrypted objects, we need to stream encrypted data through a decryption wrapper
// Create a pipe: encrypted data goes into pipe writer, decrypted data comes from pipe reader
pipeReader, pipeWriter := io.Pipe()
// Start goroutine to stream encrypted data from volume servers to pipe
encryptedStreamErr := make(chan error, 1)
go func() {
defer pipeWriter.Close()
err := s3a.streamFromVolumeServers(&pipeWriterWrapper{pipeWriter}, r, entry, sseType)
encryptedStreamErr <- err
}()
// Create decrypted reader based on SSE type
var decryptedReader io.Reader
var decryptErr error
switch sseType {
case s3_constants.SSETypeC:
decryptedReader, decryptErr = s3a.createSSECDecryptedReaderFromEntry(r, pipeReader, entry)
case s3_constants.SSETypeKMS:
decryptedReader, decryptErr = s3a.createSSEKMSDecryptedReaderFromEntry(r, pipeReader, entry)
case s3_constants.SSETypeS3:
decryptedReader, decryptErr = s3a.createSSES3DecryptedReaderFromEntry(r, pipeReader, entry)
default:
decryptErr = fmt.Errorf("unsupported SSE type: %s", sseType)
}
if decryptErr != nil {
pipeReader.Close()
return fmt.Errorf("failed to create decrypted reader for %s: %v", sseType, decryptErr)
}
// Stream decrypted data to response
buf := make([]byte, 128*1024)
_, copyErr := io.CopyBuffer(w, decryptedReader, buf)
// Check if encrypted streaming had errors
if streamErr := <-encryptedStreamErr; streamErr != nil {
return fmt.Errorf("encrypted stream error: %v", streamErr)
}
return copyErr
}
// addSSEResponseHeadersFromEntry adds appropriate SSE response headers based on entry metadata
func (s3a *S3ApiServer) addSSEResponseHeadersFromEntry(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) {
if entry == nil || entry.Extended == nil {
return
}
switch sseType {
case s3_constants.SSETypeC:
// SSE-C: Echo back algorithm and key MD5
if algo, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists {
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algo))
}
if keyMD5, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists {
w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5))
}
case s3_constants.SSETypeKMS:
// SSE-KMS: Return algorithm and key ID
w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms")
if kmsMetadataB64, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader]; exists {
kmsMetadataBytes, err := base64.StdEncoding.DecodeString(string(kmsMetadataB64))
if err == nil {
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err == nil {
AddSSEKMSResponseHeaders(w, sseKMSKey)
}
}
}
case s3_constants.SSETypeS3:
// SSE-S3: Return algorithm
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
}
}
// pipeWriterWrapper wraps io.PipeWriter to implement http.ResponseWriter interface
type pipeWriterWrapper struct {
*io.PipeWriter
}
func (pw *pipeWriterWrapper) Header() http.Header {
// Headers are already set on the real ResponseWriter, ignore here
return make(http.Header)
}
func (pw *pipeWriterWrapper) WriteHeader(statusCode int) {
// Status is already set on the real ResponseWriter, ignore here
}
// createSSECDecryptedReaderFromEntry creates an SSE-C decrypted reader from entry metadata
func (s3a *S3ApiServer) createSSECDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) {
// Parse SSE-C headers from request
customerKey, err := ParseSSECHeaders(r)
if err != nil {
return nil, fmt.Errorf("failed to parse SSE-C headers: %w", err)
}
if customerKey == nil {
return nil, fmt.Errorf("SSE-C key required but not provided")
}
// Validate key MD5 from entry metadata
if entry.Extended != nil {
storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5])
if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 {
return nil, fmt.Errorf("SSE-C key mismatch")
}
}
// Get IV from entry metadata
ivBase64 := string(entry.Extended[s3_constants.SeaweedFSSSEIVHeader])
if ivBase64 == "" {
return nil, fmt.Errorf("SSE-C IV not found in metadata")
}
iv, err := base64.StdEncoding.DecodeString(ivBase64)
if err != nil {
return nil, fmt.Errorf("failed to decode IV: %w", err)
}
// Create decrypted reader
return CreateSSECDecryptedReader(encryptedReader, customerKey, iv)
}
// createSSEKMSDecryptedReaderFromEntry creates an SSE-KMS decrypted reader from entry metadata
func (s3a *S3ApiServer) createSSEKMSDecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) {
// Extract SSE-KMS metadata from entry
if entry.Extended == nil {
return nil, fmt.Errorf("no extended metadata found")
}
kmsMetadataB64, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader]
if !exists {
return nil, fmt.Errorf("SSE-KMS metadata not found")
}
kmsMetadataBytes, err := base64.StdEncoding.DecodeString(string(kmsMetadataB64))
if err != nil {
return nil, fmt.Errorf("failed to decode SSE-KMS metadata: %w", err)
}
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
}
// Create decrypted reader
return CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey)
}
// createSSES3DecryptedReaderFromEntry creates an SSE-S3 decrypted reader from entry metadata
func (s3a *S3ApiServer) createSSES3DecryptedReaderFromEntry(r *http.Request, encryptedReader io.Reader, entry *filer_pb.Entry) (io.Reader, error) {
// Extract SSE-S3 metadata from entry
if entry.Extended == nil {
return nil, fmt.Errorf("no extended metadata found")
}
keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]
if !exists {
return nil, fmt.Errorf("SSE-S3 metadata not found")
}
keyManager := GetSSES3KeyManager()
sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err)
}
// Get IV
iv, err := GetSSES3IV(entry, sseS3Key, keyManager)
if err != nil {
return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err)
}
// Create decrypted reader
return CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv)
}
// setResponseHeaders sets all standard HTTP response headers from entry metadata
func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) {
// Set content length and accept ranges

Loading…
Cancel
Save