From efd66bb45cd26fbcdb2b52551e09842038fa13cd Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 16 Nov 2025 16:24:11 -0800 Subject: [PATCH] range sse --- weed/s3api/s3api_object_handlers.go | 373 +++++++++++++++++++++++----- 1 file changed, 311 insertions(+), 62 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index a77e39322..3ed003b1c 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -693,27 +693,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R return nil } - // Create lookup function via filer client + // Create lookup function via filer client (reuse shared helper) ctx := r.Context() - lookupFileIdFn := func(ctx context.Context, fileId string) ([]string, error) { - var urls []string - err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - vid := filer.VolumeId(fileId) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: []string{vid}, - }) - if err != nil { - return err - } - if locs, found := resp.LocationsMap[vid]; found { - for _, loc := range locs.Locations { - urls = append(urls, "http://"+loc.Url+"/"+fileId) - } - } - return nil - }) - return urls, err - } + lookupFileIdFn := s3a.createLookupFileIdFunction() // Resolve chunk manifests with the requested range tChunkResolve := time.Now() @@ -756,6 +738,39 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R return err } +// Shared HTTP client for volume server requests (connection pooling) +var volumeServerHTTPClient = &http.Client{ + Timeout: 5 * time.Minute, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + }, +} + +// createLookupFileIdFunction creates a reusable lookup function for resolving volume URLs +func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, string) ([]string, error) { + return func(ctx context.Context, fileId string) ([]string, error) { + var urls []string + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + vid := filer.VolumeId(fileId) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + if locs, found := resp.LocationsMap[vid]; found { + for _, loc := range locs.Locations { + urls = append(urls, loc.Url) + } + } + return nil + }) + return urls, err + } +} + // streamFromVolumeServersWithSSE handles streaming with inline SSE decryption func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { // If not encrypted, use fast path without decryption @@ -926,8 +941,20 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r } headerSetTime = time.Since(tHeaderSet) - // Optimization: Check if multipart before creating stream to avoid wasteful fetch + // Full Range Optimization: Use ViewFromChunks to only fetch/decrypt needed chunks tDecryptSetup := time.Now() + + // Use range-aware chunk resolution (like filer does) + if isRangeRequest { + glog.V(2).Infof("Using range-aware SSE decryption for offset=%d size=%d", offset, size) + streamFetchTime = 0 // No full stream fetch in range-aware path + err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey) + decryptSetupTime = time.Since(tDecryptSetup) + copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path + return err + } + + // Full object path: Optimize multipart vs single-part var decryptedReader io.Reader var err error @@ -1028,45 +1055,276 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r return fmt.Errorf("failed to create decrypted reader: %w", err) } - // Stream decrypted data to client + // Stream full decrypted object to client tCopy := time.Now() buf := make([]byte, 128*1024) + copied, copyErr := io.CopyBuffer(w, decryptedReader, buf) + copyTime = time.Since(tCopy) + if copyErr != nil { + glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr) + return copyErr + } + glog.V(3).Infof("Full object request: copied %d bytes", copied) + return nil +} - if isRangeRequest { - // Range-aware optimization: For large offsets, skip unnecessary decryption - // by using LimitReader instead of io.Discard when offset is significant - // Note: A full range-aware implementation would reconstruct the decrypted reader - // from only the necessary chunks using CTR IV offset calculation. - // Current approach: skip efficiently with io.Discard, copy only needed bytes - if offset > 0 { - // Use io.Discard to skip to the offset without allocating buffer - discarded, err := io.CopyN(io.Discard, decryptedReader, offset) - if err != nil { - glog.Errorf("Failed to seek to range offset %d (discarded %d bytes): %v", offset, discarded, err) - return fmt.Errorf("failed to seek to range offset: %w", err) +// streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks +// This implements the filer's ViewFromChunks approach for optimal range performance +func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) error { + // Use filer's ViewFromChunks to resolve only needed chunks for the range + lookupFileIdFn := s3a.createLookupFileIdFunction() + chunkViews := filer.ViewFromChunks(ctx, lookupFileIdFn, entry.GetChunks(), offset, size) + + totalWritten := int64(0) + targetOffset := offset + + // Stream each chunk view + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + + // Handle gaps between chunks (write zeros) + if targetOffset < chunkView.ViewOffset { + gap := chunkView.ViewOffset - targetOffset + glog.V(4).Infof("Writing %d zero bytes for gap [%d,%d)", gap, targetOffset, chunkView.ViewOffset) + if err := writeZeroBytes(w, gap); err != nil { + return fmt.Errorf("failed to write zero padding: %w", err) + } + totalWritten += gap + targetOffset = chunkView.ViewOffset + } + + // Find the corresponding FileChunk for this chunkView + var fileChunk *filer_pb.FileChunk + for _, chunk := range entry.GetChunks() { + if chunk.GetFileIdString() == chunkView.FileId { + fileChunk = chunk + break } - glog.V(3).Infof("Range request: skipped %d bytes to reach offset %d", discarded, offset) } - // Copy only the requested size to the client - copied, copyErr := io.CopyN(w, decryptedReader, size) - copyTime = time.Since(tCopy) - if copyErr != nil && copyErr != io.EOF { - glog.Errorf("Failed to copy range data: copied %d/%d bytes: %v", copied, size, copyErr) - return copyErr + if fileChunk == nil { + return fmt.Errorf("chunk %s not found in entry", chunkView.FileId) + } + + // Fetch and decrypt this chunk view + var decryptedChunkReader io.Reader + var err error + + switch sseType { + case s3_constants.SSETypeC: + decryptedChunkReader, err = s3a.decryptSSECChunkView(ctx, fileChunk, chunkView, decryptionKey.(*SSECustomerKey)) + case s3_constants.SSETypeKMS: + decryptedChunkReader, err = s3a.decryptSSEKMSChunkView(ctx, fileChunk, chunkView) + case s3_constants.SSETypeS3: + decryptedChunkReader, err = s3a.decryptSSES3ChunkView(ctx, fileChunk, chunkView, entry) + default: + // Non-encrypted chunk + decryptedChunkReader, err = s3a.fetchChunkViewData(ctx, chunkView) + } + + if err != nil { + return fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err) + } + + // Copy the decrypted chunk data + written, copyErr := io.Copy(w, decryptedChunkReader) + if closer, ok := decryptedChunkReader.(io.Closer); ok { + closer.Close() } - glog.V(3).Infof("Range request: copied %d bytes [%d-%d]", copied, offset, offset+size-1) - return nil - } else { - // Full object request - copied, copyErr := io.CopyBuffer(w, decryptedReader, buf) - copyTime = time.Since(tCopy) if copyErr != nil { - glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr) - return copyErr + return fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr) } - glog.V(3).Infof("Full object request: copied %d bytes", copied) - return nil + + totalWritten += written + targetOffset += written + glog.V(4).Infof("Wrote %d bytes from chunk %s [%d,%d)", written, chunkView.FileId, chunkView.ViewOffset, chunkView.ViewOffset+int64(chunkView.ViewSize)) } + + // Handle trailing zeros if needed + remaining := size - totalWritten + if remaining > 0 { + glog.V(4).Infof("Writing %d trailing zero bytes", remaining) + if err := writeZeroBytes(w, remaining); err != nil { + return fmt.Errorf("failed to write trailing zeros: %w", err) + } + } + + glog.V(3).Infof("Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)", totalWritten, offset, offset+size) + return nil +} + +// writeZeroBytes writes n zero bytes to writer +func writeZeroBytes(w io.Writer, n int64) error { + zeroBuf := make([]byte, min(n, 32*1024)) + for n > 0 { + toWrite := min(n, int64(len(zeroBuf))) + written, err := w.Write(zeroBuf[:toWrite]) + if err != nil { + return err + } + n -= int64(written) + } + return nil +} + +// decryptSSECChunkView decrypts a specific chunk view with SSE-C +func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, customerKey *SSECustomerKey) (io.Reader, error) { + // For multipart SSE-C, each chunk has its own IV in chunk.SseMetadata + if fileChunk.GetSseType() == filer_pb.SSEType_SSE_C && len(fileChunk.GetSseMetadata()) > 0 { + ssecMetadata, err := DeserializeSSECMetadata(fileChunk.GetSseMetadata()) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err) + } + chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) + if err != nil { + return nil, fmt.Errorf("failed to decode IV: %w", err) + } + + // Fetch encrypted chunk data with range + encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView) + if err != nil { + return nil, fmt.Errorf("failed to fetch chunk data: %w", err) + } + + // Decrypt with CTR IV offset adjustment for the view offset within the chunk + // CTR mode: IV for block N = base_IV + (N / 16) + adjustedIV := adjustCTRIV(chunkIV, chunkView.OffsetInChunk) + return CreateSSECDecryptedReader(encryptedReader, customerKey, adjustedIV) + } + + // Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it) + encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView) + if err != nil { + return nil, err + } + // For single-part, the IV is stored at object level, already handled in non-range path + return encryptedReader, nil +} + +// decryptSSEKMSChunkView decrypts a specific chunk view with SSE-KMS +func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView) (io.Reader, error) { + if fileChunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(fileChunk.GetSseMetadata()) > 0 { + sseKMSKey, err := DeserializeSSEKMSMetadata(fileChunk.GetSseMetadata()) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) + } + + // Fetch encrypted chunk data + encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView) + if err != nil { + return nil, fmt.Errorf("failed to fetch chunk data: %w", err) + } + + // Decrypt with CTR IV offset adjustment + adjustedIV := adjustCTRIV(sseKMSKey.IV, chunkView.OffsetInChunk) + adjustedKey := &SSEKMSKey{ + KeyID: sseKMSKey.KeyID, + EncryptedDataKey: sseKMSKey.EncryptedDataKey, + EncryptionContext: sseKMSKey.EncryptionContext, + BucketKeyEnabled: sseKMSKey.BucketKeyEnabled, + IV: adjustedIV, + ChunkOffset: chunkView.OffsetInChunk, + } + return CreateSSEKMSDecryptedReader(encryptedReader, adjustedKey) + } + + // Non-KMS encrypted chunk + return s3a.fetchChunkViewData(ctx, chunkView) +} + +// decryptSSES3ChunkView decrypts a specific chunk view with SSE-S3 +func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, entry *filer_pb.Entry) (io.Reader, error) { + // SSE-S3 typically uses object-level encryption, not per-chunk + // Fetch encrypted chunk data + encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView) + if err != nil { + return nil, err + } + + // Get SSE-S3 key from object metadata + keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) + } + + // Decrypt with CTR IV offset adjustment for the range + iv, err := GetSSES3IV(entry, sseS3Key, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) + } + adjustedIV := adjustCTRIV(iv, chunkView.OffsetInChunk) + return CreateSSES3DecryptedReader(encryptedReader, sseS3Key, adjustedIV) +} + +// adjustCTRIV adjusts the IV for CTR mode based on byte offset +// CTR mode increments the counter for each 16-byte block +func adjustCTRIV(baseIV []byte, offset int64) []byte { + if offset == 0 { + return baseIV + } + + adjustedIV := make([]byte, len(baseIV)) + copy(adjustedIV, baseIV) + + // Calculate block offset (CTR increments per 16-byte block) + blockOffset := uint64(offset / 16) + + // Add block offset to the IV (treating IV as big-endian counter) + for i := len(adjustedIV) - 1; i >= 0 && blockOffset > 0; i-- { + sum := uint64(adjustedIV[i]) + (blockOffset & 0xFF) + adjustedIV[i] = byte(sum & 0xFF) + blockOffset = (blockOffset >> 8) + (sum >> 8) + } + + return adjustedIV +} + +// fetchChunkViewData fetches encrypted data for a chunk view (with range) +func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer.ChunkView) (io.ReadCloser, error) { + // Lookup the volume server URLs for this chunk + lookupFileIdFn := s3a.createLookupFileIdFunction() + urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId) + if err != nil || len(urlStrings) == 0 { + return nil, fmt.Errorf("failed to lookup chunk %s: %w", chunkView.FileId, err) + } + + // Use the first URL + chunkUrl := urlStrings[0] + + // Generate JWT for volume server authentication + jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunkView.FileId) + + // Create request with Range header for the chunk view + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s/%s", chunkUrl, chunkView.FileId), nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Set Range header to fetch only the needed portion of the chunk + if !chunkView.IsFullChunk() { + rangeEnd := chunkView.OffsetInChunk + int64(chunkView.ViewSize) - 1 + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", chunkView.OffsetInChunk, rangeEnd)) + glog.V(4).Infof("Fetching chunk %s with range bytes=%d-%d", chunkView.FileId, chunkView.OffsetInChunk, rangeEnd) + } + + // Set JWT for authentication + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + + // Use shared HTTP client with connection pooling + resp, err := volumeServerHTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to fetch chunk: %w", err) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + resp.Body.Close() + return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, chunkView.FileId) + } + + return resp.Body, nil } // getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers @@ -2586,17 +2844,8 @@ func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (i req.Header.Set("Authorization", "BEARER "+string(jwt)) } - // Use HTTP client with reasonable timeouts - httpClient := &http.Client{ - Timeout: 5 * time.Minute, - Transport: &http.Transport{ - MaxIdleConns: 100, - MaxIdleConnsPerHost: 10, - IdleConnTimeout: 90 * time.Second, - }, - } - - resp, err := httpClient.Do(req) + // Use shared HTTP client with connection pooling + resp, err := volumeServerHTTPClient.Do(req) if err != nil { return nil, fmt.Errorf("execute HTTP request for chunk: %v", err) }