diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 3ed003b1c..5abf48eb3 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -977,7 +977,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r if isMultipartSSEC { // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly // This saves one filer lookup/pipe creation - decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(nil, customerKey, entry) + decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(r.Context(), nil, customerKey, entry) glog.V(2).Infof("Using multipart SSE-C decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) } else { // For single-part, get encrypted stream and decrypt @@ -1013,7 +1013,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r if isMultipartSSEKMS { // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly - decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(nil, entry) + decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(r.Context(), nil, entry) glog.V(2).Infof("Using multipart SSE-KMS decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) } else { // For single-part, get encrypted stream and decrypt @@ -1843,9 +1843,9 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } // Detect and handle SSE - glog.V(3).Infof("GetObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks)) + glog.V(3).Infof("HeadObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks)) sseType := s3a.detectPrimarySSEType(objectEntryForSSE) - glog.V(0).Infof("GetObjectHandler: Detected SSE type: %s", sseType) + glog.V(0).Infof("HeadObjectHandler: Detected SSE type: %s", sseType) if sseType != "" && sseType != "None" { // Validate SSE headers for encrypted objects switch sseType { @@ -2538,7 +2538,7 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { } // createMultipartSSECDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-C objects (direct volume path) -func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) { +func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) { // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() sort.Slice(chunks, func(i, j int) bool { @@ -2550,7 +2550,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -2613,7 +2613,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream } // createMultipartSSEKMSDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-KMS objects (direct volume path) -func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { +func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() sort.Slice(chunks, func(i, j int) bool { @@ -2625,7 +2625,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(encryptedStre for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -2682,6 +2682,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(encryptedStre // createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { // Entry is passed from caller to avoid redundant filer lookup + ctx := r.Context() // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() @@ -2694,7 +2695,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -2743,6 +2744,8 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr // createMultipartSSES3DecryptedReader creates a reader for multipart SSE-S3 objects func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, entry *filer_pb.Entry) (io.Reader, error) { + ctx := r.Context() + // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() sort.Slice(chunks, func(i, j int) bool { @@ -2755,7 +2758,7 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -2822,17 +2825,15 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent } // createEncryptedChunkReader creates a reader for a single encrypted chunk -func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) { +// Context propagation ensures cancellation if the S3 client disconnects +func (s3a *S3ApiServer) createEncryptedChunkReader(ctx context.Context, chunk *filer_pb.FileChunk) (io.ReadCloser, error) { // Get chunk URL srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString()) if err != nil { return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err) } - // Create HTTP request with context for timeout control - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - + // Create HTTP request with context for cancellation propagation req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil) if err != nil { return nil, fmt.Errorf("create HTTP request for chunk: %v", err) @@ -2942,6 +2943,8 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) { // createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects // Each chunk has its own IV and encryption key from the original multipart parts func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { + ctx := r.Context() + // Parse SSE-C headers from the request for decryption key customerKey, err := ParseSSECHeaders(r) if err != nil { @@ -3001,7 +3004,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox for _, chunk := range neededChunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) }