From 4b4dc6d10b1dd5cebbd56a714cfe2d4dc699ad25 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 14 Nov 2025 00:32:47 -0800 Subject: [PATCH] implement sse --- weed/s3api/s3api_object_handlers.go | 184 ++++++++++++++++++++++------ 1 file changed, 148 insertions(+), 36 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index b25045ed1..8e2cf9f7c 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -582,54 +582,166 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r if sseType == "" || sseType == "None" { return s3a.streamFromVolumeServers(w, r, entry, sseType) } - - glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object", sseType) - - // Add SSE response headers before streaming + + glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object with inline decryption", sseType) + + // Validate SSE keys BEFORE streaming + var decryptionKey interface{} + switch sseType { + case s3_constants.SSETypeC: + customerKey, err := ParseSSECHeaders(r) + if err != nil { + s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err)) + return err + } + if customerKey == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return fmt.Errorf("SSE-C key required") + } + // Validate key MD5 + if entry.Extended != nil { + storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) + if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return fmt.Errorf("SSE-C key mismatch") + } + } + decryptionKey = customerKey + case s3_constants.SSETypeKMS: + // Extract KMS key from metadata + if entry.Extended == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return fmt.Errorf("no SSE-KMS metadata") + } + kmsMetadataB64 := entry.Extended[s3_constants.SeaweedFSSSEKMSKeyHeader] + kmsMetadataBytes, _ := base64.StdEncoding.DecodeString(string(kmsMetadataB64)) + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return err + } + decryptionKey = sseKMSKey + case s3_constants.SSETypeS3: + // Extract S3 key from metadata + if entry.Extended == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return fmt.Errorf("no SSE-S3 metadata") + } + keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return err + } + decryptionKey = sseS3Key + } + + // Set response headers + totalSize := int64(filer.FileSize(entry)) + s3a.setResponseHeaders(w, entry, totalSize) s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType) - - // For encrypted objects, we need to stream encrypted data through a decryption wrapper - // Create a pipe: encrypted data goes into pipe writer, decrypted data comes from pipe reader - pipeReader, pipeWriter := io.Pipe() - - // Start goroutine to stream encrypted data from volume servers to pipe - encryptedStreamErr := make(chan error, 1) - go func() { - defer pipeWriter.Close() - err := s3a.streamFromVolumeServers(&pipeWriterWrapper{pipeWriter}, r, entry, sseType) - encryptedStreamErr <- err - }() - - // Create decrypted reader based on SSE type + + // Get encrypted data stream (without headers) + encryptedReader, err := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) + if err != nil { + return err + } + + // Wrap with decryption var decryptedReader io.Reader - var decryptErr error - switch sseType { case s3_constants.SSETypeC: - decryptedReader, decryptErr = s3a.createSSECDecryptedReaderFromEntry(r, pipeReader, entry) + customerKey := decryptionKey.(*SSECustomerKey) + ivBase64 := string(entry.Extended[s3_constants.SeaweedFSSSEIVHeader]) + iv, _ := base64.StdEncoding.DecodeString(ivBase64) + decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv) case s3_constants.SSETypeKMS: - decryptedReader, decryptErr = s3a.createSSEKMSDecryptedReaderFromEntry(r, pipeReader, entry) + sseKMSKey := decryptionKey.(*SSEKMSKey) + decryptedReader, err = CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey) case s3_constants.SSETypeS3: - decryptedReader, decryptErr = s3a.createSSES3DecryptedReaderFromEntry(r, pipeReader, entry) - default: - decryptErr = fmt.Errorf("unsupported SSE type: %s", sseType) + sseS3Key := decryptionKey.(*SSES3Key) + keyManager := GetSSES3KeyManager() + iv, _ := GetSSES3IV(entry, sseS3Key, keyManager) + decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) } - - if decryptErr != nil { - pipeReader.Close() - return fmt.Errorf("failed to create decrypted reader for %s: %v", sseType, decryptErr) + + if err != nil { + return fmt.Errorf("failed to create decrypted reader: %w", err) } - - // Stream decrypted data to response + + // Stream decrypted data to client buf := make([]byte, 128*1024) _, copyErr := io.CopyBuffer(w, decryptedReader, buf) + return copyErr +} - // Check if encrypted streaming had errors - if streamErr := <-encryptedStreamErr; streamErr != nil { - return fmt.Errorf("encrypted stream error: %v", streamErr) +// getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers +func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry *filer_pb.Entry) (io.ReadCloser, error) { + // Handle inline content + if len(entry.Content) > 0 { + return io.NopCloser(bytes.NewReader(entry.Content)), nil } - - return copyErr + + // Handle empty files + chunks := entry.GetChunks() + if len(chunks) == 0 { + return io.NopCloser(bytes.NewReader([]byte{})), nil + } + + // Create lookup function + lookupFileIdFn := func(ctx context.Context, fileId string) ([]string, error) { + var urls []string + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + vid := filer.VolumeId(fileId) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + if locs, found := resp.LocationsMap[vid]; found { + for _, loc := range locs.Locations { + urls = append(urls, "http://"+loc.Url+"/"+fileId) + } + } + return nil + }) + return urls, err + } + + // Resolve chunks + totalSize := int64(filer.FileSize(entry)) + resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize) + if err != nil { + return nil, err + } + + // Create streaming reader + masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} + streamFn, err := filer.PrepareStreamContentWithThrottler( + ctx, + masterClient, + func(fileId string) string { + return string(security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec)) + }, + resolvedChunks, + 0, + totalSize, + 0, + ) + if err != nil { + return nil, err + } + + // Create a pipe to get io.ReadCloser + pipeReader, pipeWriter := io.Pipe() + go func() { + defer pipeWriter.Close() + streamFn(pipeWriter) + }() + + return pipeReader, nil } // addSSEResponseHeadersFromEntry adds appropriate SSE response headers based on entry metadata