Browse Source

context

pull/7481/head
chrislu 1 month ago
parent
commit
e863d1e6d3
  1. 35
      weed/s3api/s3api_object_handlers.go

35
weed/s3api/s3api_object_handlers.go

@ -977,7 +977,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
if isMultipartSSEC {
// For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly
// This saves one filer lookup/pipe creation
decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(nil, customerKey, entry)
decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(r.Context(), nil, customerKey, entry)
glog.V(2).Infof("Using multipart SSE-C decryption for object with %d chunks (no prefetch)", len(entry.GetChunks()))
} else {
// For single-part, get encrypted stream and decrypt
@ -1013,7 +1013,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
if isMultipartSSEKMS {
// For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly
decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(nil, entry)
decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(r.Context(), nil, entry)
glog.V(2).Infof("Using multipart SSE-KMS decryption for object with %d chunks (no prefetch)", len(entry.GetChunks()))
} else {
// For single-part, get encrypted stream and decrypt
@ -1843,9 +1843,9 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
}
// Detect and handle SSE
glog.V(3).Infof("GetObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks))
glog.V(3).Infof("HeadObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks))
sseType := s3a.detectPrimarySSEType(objectEntryForSSE)
glog.V(0).Infof("GetObjectHandler: Detected SSE type: %s", sseType)
glog.V(0).Infof("HeadObjectHandler: Detected SSE type: %s", sseType)
if sseType != "" && sseType != "None" {
// Validate SSE headers for encrypted objects
switch sseType {
@ -2538,7 +2538,7 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
}
// createMultipartSSECDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-C objects (direct volume path)
func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) {
func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) {
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
@ -2550,7 +2550,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream
for _, chunk := range chunks {
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
@ -2613,7 +2613,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream
}
// createMultipartSSEKMSDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-KMS objects (direct volume path)
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) {
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) {
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
@ -2625,7 +2625,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(encryptedStre
for _, chunk := range chunks {
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
@ -2682,6 +2682,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(encryptedStre
// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
// Entry is passed from caller to avoid redundant filer lookup
ctx := r.Context()
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
@ -2694,7 +2695,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr
for _, chunk := range chunks {
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
@ -2743,6 +2744,8 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr
// createMultipartSSES3DecryptedReader creates a reader for multipart SSE-S3 objects
func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, entry *filer_pb.Entry) (io.Reader, error) {
ctx := r.Context()
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
@ -2755,7 +2758,7 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent
for _, chunk := range chunks {
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
@ -2822,17 +2825,15 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent
}
// createEncryptedChunkReader creates a reader for a single encrypted chunk
func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) {
// Context propagation ensures cancellation if the S3 client disconnects
func (s3a *S3ApiServer) createEncryptedChunkReader(ctx context.Context, chunk *filer_pb.FileChunk) (io.ReadCloser, error) {
// Get chunk URL
srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
if err != nil {
return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err)
}
// Create HTTP request with context for timeout control
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Create HTTP request with context for cancellation propagation
req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil)
if err != nil {
return nil, fmt.Errorf("create HTTP request for chunk: %v", err)
@ -2942,6 +2943,8 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) {
// createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects
// Each chunk has its own IV and encryption key from the original multipart parts
func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
ctx := r.Context()
// Parse SSE-C headers from the request for decryption key
customerKey, err := ParseSSECHeaders(r)
if err != nil {
@ -3001,7 +3004,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox
for _, chunk := range neededChunks {
// Get this chunk's encrypted data
chunkReader, err := s3a.createEncryptedChunkReader(chunk)
chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}

Loading…
Cancel
Save