Browse Source

implement sse

pull/7481/head
chrislu 3 weeks ago
parent
commit
4b4dc6d10b
  1. 170
      weed/s3api/s3api_object_handlers.go

170
weed/s3api/s3api_object_handlers.go

@ -583,53 +583,165 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
return s3a.streamFromVolumeServers(w, r, entry, sseType)
}
glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object", sseType)
glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object with inline decryption", sseType)
// Add SSE response headers before streaming
s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType)
// Validate SSE keys BEFORE streaming
var decryptionKey interface{}
switch sseType {
case s3_constants.SSETypeC:
customerKey, err := ParseSSECHeaders(r)
if err != nil {
s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err))
return err
}
if customerKey == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
return fmt.Errorf("SSE-C key required")
}
// Validate key MD5
if entry.Extended != nil {
storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5])
if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 {
s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
return fmt.Errorf("SSE-C key mismatch")
}
}
decryptionKey = customerKey
case s3_constants.SSETypeKMS:
// Extract KMS key from metadata
if entry.Extended == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return fmt.Errorf("no SSE-KMS metadata")
}
kmsMetadataB64 := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader]
kmsMetadataBytes, _ := base64.StdEncoding.DecodeString(string(kmsMetadataB64))
sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return err
}
decryptionKey = sseKMSKey
case s3_constants.SSETypeS3:
// Extract S3 key from metadata
if entry.Extended == nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return fmt.Errorf("no SSE-S3 metadata")
}
keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key]
keyManager := GetSSES3KeyManager()
sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return err
}
decryptionKey = sseS3Key
}
// For encrypted objects, we need to stream encrypted data through a decryption wrapper
// Create a pipe: encrypted data goes into pipe writer, decrypted data comes from pipe reader
pipeReader, pipeWriter := io.Pipe()
// Set response headers
totalSize := int64(filer.FileSize(entry))
s3a.setResponseHeaders(w, entry, totalSize)
s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType)
// Start goroutine to stream encrypted data from volume servers to pipe
encryptedStreamErr := make(chan error, 1)
go func() {
defer pipeWriter.Close()
err := s3a.streamFromVolumeServers(&pipeWriterWrapper{pipeWriter}, r, entry, sseType)
encryptedStreamErr <- err
}()
// Get encrypted data stream (without headers)
encryptedReader, err := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
if err != nil {
return err
}
// Create decrypted reader based on SSE type
// Wrap with decryption
var decryptedReader io.Reader
var decryptErr error
switch sseType {
case s3_constants.SSETypeC:
decryptedReader, decryptErr = s3a.createSSECDecryptedReaderFromEntry(r, pipeReader, entry)
customerKey := decryptionKey.(*SSECustomerKey)
ivBase64 := string(entry.Extended[s3_constants.SeaweedFSSSEIVHeader])
iv, _ := base64.StdEncoding.DecodeString(ivBase64)
decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv)
case s3_constants.SSETypeKMS:
decryptedReader, decryptErr = s3a.createSSEKMSDecryptedReaderFromEntry(r, pipeReader, entry)
sseKMSKey := decryptionKey.(*SSEKMSKey)
decryptedReader, err = CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey)
case s3_constants.SSETypeS3:
decryptedReader, decryptErr = s3a.createSSES3DecryptedReaderFromEntry(r, pipeReader, entry)
default:
decryptErr = fmt.Errorf("unsupported SSE type: %s", sseType)
sseS3Key := decryptionKey.(*SSES3Key)
keyManager := GetSSES3KeyManager()
iv, _ := GetSSES3IV(entry, sseS3Key, keyManager)
decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv)
}
if decryptErr != nil {
pipeReader.Close()
return fmt.Errorf("failed to create decrypted reader for %s: %v", sseType, decryptErr)
if err != nil {
return fmt.Errorf("failed to create decrypted reader: %w", err)
}
// Stream decrypted data to response
// Stream decrypted data to client
buf := make([]byte, 128*1024)
_, copyErr := io.CopyBuffer(w, decryptedReader, buf)
return copyErr
}
// Check if encrypted streaming had errors
if streamErr := <-encryptedStreamErr; streamErr != nil {
return fmt.Errorf("encrypted stream error: %v", streamErr)
// getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers
func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry *filer_pb.Entry) (io.ReadCloser, error) {
// Handle inline content
if len(entry.Content) > 0 {
return io.NopCloser(bytes.NewReader(entry.Content)), nil
}
return copyErr
// Handle empty files
chunks := entry.GetChunks()
if len(chunks) == 0 {
return io.NopCloser(bytes.NewReader([]byte{})), nil
}
// Create lookup function
lookupFileIdFn := func(ctx context.Context, fileId string) ([]string, error) {
var urls []string
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
vid := filer.VolumeId(fileId)
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
if locs, found := resp.LocationsMap[vid]; found {
for _, loc := range locs.Locations {
urls = append(urls, "http://"+loc.Url+"/"+fileId)
}
}
return nil
})
return urls, err
}
// Resolve chunks
totalSize := int64(filer.FileSize(entry))
resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize)
if err != nil {
return nil, err
}
// Create streaming reader
masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn}
streamFn, err := filer.PrepareStreamContentWithThrottler(
ctx,
masterClient,
func(fileId string) string {
return string(security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec))
},
resolvedChunks,
0,
totalSize,
0,
)
if err != nil {
return nil, err
}
// Create a pipe to get io.ReadCloser
pipeReader, pipeWriter := io.Pipe()
go func() {
defer pipeWriter.Close()
streamFn(pipeWriter)
}()
return pipeReader, nil
}
// addSSEResponseHeadersFromEntry adds appropriate SSE response headers based on entry metadata

Loading…
Cancel
Save