Browse Source

range sse

pull/7481/head
chrislu 3 weeks ago
parent
commit
efd66bb45c
  1. 373
      weed/s3api/s3api_object_handlers.go

373
weed/s3api/s3api_object_handlers.go

@ -693,27 +693,9 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
return nil return nil
} }
// Create lookup function via filer client
// Create lookup function via filer client (reuse shared helper)
ctx := r.Context() ctx := r.Context()
lookupFileIdFn := func(ctx context.Context, fileId string) ([]string, error) {
var urls []string
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
vid := filer.VolumeId(fileId)
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
if locs, found := resp.LocationsMap[vid]; found {
for _, loc := range locs.Locations {
urls = append(urls, "http://"+loc.Url+"/"+fileId)
}
}
return nil
})
return urls, err
}
lookupFileIdFn := s3a.createLookupFileIdFunction()
// Resolve chunk manifests with the requested range // Resolve chunk manifests with the requested range
tChunkResolve := time.Now() tChunkResolve := time.Now()
@ -756,6 +738,39 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
return err return err
} }
// Shared HTTP client for volume server requests (connection pooling)
var volumeServerHTTPClient = &http.Client{
Timeout: 5 * time.Minute,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
// createLookupFileIdFunction creates a reusable lookup function for resolving volume URLs
func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, string) ([]string, error) {
return func(ctx context.Context, fileId string) ([]string, error) {
var urls []string
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
vid := filer.VolumeId(fileId)
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
if locs, found := resp.LocationsMap[vid]; found {
for _, loc := range locs.Locations {
urls = append(urls, loc.Url)
}
}
return nil
})
return urls, err
}
}
// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption // streamFromVolumeServersWithSSE handles streaming with inline SSE decryption
func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error {
// If not encrypted, use fast path without decryption // If not encrypted, use fast path without decryption
@ -926,8 +941,20 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
} }
headerSetTime = time.Since(tHeaderSet) headerSetTime = time.Since(tHeaderSet)
// Optimization: Check if multipart before creating stream to avoid wasteful fetch
// Full Range Optimization: Use ViewFromChunks to only fetch/decrypt needed chunks
tDecryptSetup := time.Now() tDecryptSetup := time.Now()
// Use range-aware chunk resolution (like filer does)
if isRangeRequest {
glog.V(2).Infof("Using range-aware SSE decryption for offset=%d size=%d", offset, size)
streamFetchTime = 0 // No full stream fetch in range-aware path
err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey)
decryptSetupTime = time.Since(tDecryptSetup)
copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path
return err
}
// Full object path: Optimize multipart vs single-part
var decryptedReader io.Reader var decryptedReader io.Reader
var err error var err error
@ -1028,45 +1055,276 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
return fmt.Errorf("failed to create decrypted reader: %w", err) return fmt.Errorf("failed to create decrypted reader: %w", err)
} }
// Stream decrypted data to client
// Stream full decrypted object to client
tCopy := time.Now() tCopy := time.Now()
buf := make([]byte, 128*1024) buf := make([]byte, 128*1024)
copied, copyErr := io.CopyBuffer(w, decryptedReader, buf)
copyTime = time.Since(tCopy)
if copyErr != nil {
glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr)
return copyErr
}
glog.V(3).Infof("Full object request: copied %d bytes", copied)
return nil
}
if isRangeRequest {
// Range-aware optimization: For large offsets, skip unnecessary decryption
// by using LimitReader instead of io.Discard when offset is significant
// Note: A full range-aware implementation would reconstruct the decrypted reader
// from only the necessary chunks using CTR IV offset calculation.
// Current approach: skip efficiently with io.Discard, copy only needed bytes
if offset > 0 {
// Use io.Discard to skip to the offset without allocating buffer
discarded, err := io.CopyN(io.Discard, decryptedReader, offset)
if err != nil {
glog.Errorf("Failed to seek to range offset %d (discarded %d bytes): %v", offset, discarded, err)
return fmt.Errorf("failed to seek to range offset: %w", err)
// streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks
// This implements the filer's ViewFromChunks approach for optimal range performance
func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) error {
// Use filer's ViewFromChunks to resolve only needed chunks for the range
lookupFileIdFn := s3a.createLookupFileIdFunction()
chunkViews := filer.ViewFromChunks(ctx, lookupFileIdFn, entry.GetChunks(), offset, size)
totalWritten := int64(0)
targetOffset := offset
// Stream each chunk view
for x := chunkViews.Front(); x != nil; x = x.Next {
chunkView := x.Value
// Handle gaps between chunks (write zeros)
if targetOffset < chunkView.ViewOffset {
gap := chunkView.ViewOffset - targetOffset
glog.V(4).Infof("Writing %d zero bytes for gap [%d,%d)", gap, targetOffset, chunkView.ViewOffset)
if err := writeZeroBytes(w, gap); err != nil {
return fmt.Errorf("failed to write zero padding: %w", err)
}
totalWritten += gap
targetOffset = chunkView.ViewOffset
}
// Find the corresponding FileChunk for this chunkView
var fileChunk *filer_pb.FileChunk
for _, chunk := range entry.GetChunks() {
if chunk.GetFileIdString() == chunkView.FileId {
fileChunk = chunk
break
} }
glog.V(3).Infof("Range request: skipped %d bytes to reach offset %d", discarded, offset)
} }
// Copy only the requested size to the client
copied, copyErr := io.CopyN(w, decryptedReader, size)
copyTime = time.Since(tCopy)
if copyErr != nil && copyErr != io.EOF {
glog.Errorf("Failed to copy range data: copied %d/%d bytes: %v", copied, size, copyErr)
return copyErr
if fileChunk == nil {
return fmt.Errorf("chunk %s not found in entry", chunkView.FileId)
}
// Fetch and decrypt this chunk view
var decryptedChunkReader io.Reader
var err error
switch sseType {
case s3_constants.SSETypeC:
decryptedChunkReader, err = s3a.decryptSSECChunkView(ctx, fileChunk, chunkView, decryptionKey.(*SSECustomerKey))
case s3_constants.SSETypeKMS:
decryptedChunkReader, err = s3a.decryptSSEKMSChunkView(ctx, fileChunk, chunkView)
case s3_constants.SSETypeS3:
decryptedChunkReader, err = s3a.decryptSSES3ChunkView(ctx, fileChunk, chunkView, entry)
default:
// Non-encrypted chunk
decryptedChunkReader, err = s3a.fetchChunkViewData(ctx, chunkView)
}
if err != nil {
return fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err)
}
// Copy the decrypted chunk data
written, copyErr := io.Copy(w, decryptedChunkReader)
if closer, ok := decryptedChunkReader.(io.Closer); ok {
closer.Close()
} }
glog.V(3).Infof("Range request: copied %d bytes [%d-%d]", copied, offset, offset+size-1)
return nil
} else {
// Full object request
copied, copyErr := io.CopyBuffer(w, decryptedReader, buf)
copyTime = time.Since(tCopy)
if copyErr != nil { if copyErr != nil {
glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr)
return copyErr
return fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr)
} }
glog.V(3).Infof("Full object request: copied %d bytes", copied)
return nil
totalWritten += written
targetOffset += written
glog.V(4).Infof("Wrote %d bytes from chunk %s [%d,%d)", written, chunkView.FileId, chunkView.ViewOffset, chunkView.ViewOffset+int64(chunkView.ViewSize))
} }
// Handle trailing zeros if needed
remaining := size - totalWritten
if remaining > 0 {
glog.V(4).Infof("Writing %d trailing zero bytes", remaining)
if err := writeZeroBytes(w, remaining); err != nil {
return fmt.Errorf("failed to write trailing zeros: %w", err)
}
}
glog.V(3).Infof("Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)", totalWritten, offset, offset+size)
return nil
}
// writeZeroBytes writes n zero bytes to writer
func writeZeroBytes(w io.Writer, n int64) error {
zeroBuf := make([]byte, min(n, 32*1024))
for n > 0 {
toWrite := min(n, int64(len(zeroBuf)))
written, err := w.Write(zeroBuf[:toWrite])
if err != nil {
return err
}
n -= int64(written)
}
return nil
}
// decryptSSECChunkView decrypts a specific chunk view with SSE-C
func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, customerKey *SSECustomerKey) (io.Reader, error) {
// For multipart SSE-C, each chunk has its own IV in chunk.SseMetadata
if fileChunk.GetSseType() == filer_pb.SSEType_SSE_C && len(fileChunk.GetSseMetadata()) > 0 {
ssecMetadata, err := DeserializeSSECMetadata(fileChunk.GetSseMetadata())
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err)
}
chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV)
if err != nil {
return nil, fmt.Errorf("failed to decode IV: %w", err)
}
// Fetch encrypted chunk data with range
encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView)
if err != nil {
return nil, fmt.Errorf("failed to fetch chunk data: %w", err)
}
// Decrypt with CTR IV offset adjustment for the view offset within the chunk
// CTR mode: IV for block N = base_IV + (N / 16)
adjustedIV := adjustCTRIV(chunkIV, chunkView.OffsetInChunk)
return CreateSSECDecryptedReader(encryptedReader, customerKey, adjustedIV)
}
// Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it)
encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView)
if err != nil {
return nil, err
}
// For single-part, the IV is stored at object level, already handled in non-range path
return encryptedReader, nil
}
// decryptSSEKMSChunkView decrypts a specific chunk view with SSE-KMS
func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView) (io.Reader, error) {
if fileChunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(fileChunk.GetSseMetadata()) > 0 {
sseKMSKey, err := DeserializeSSEKMSMetadata(fileChunk.GetSseMetadata())
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
}
// Fetch encrypted chunk data
encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView)
if err != nil {
return nil, fmt.Errorf("failed to fetch chunk data: %w", err)
}
// Decrypt with CTR IV offset adjustment
adjustedIV := adjustCTRIV(sseKMSKey.IV, chunkView.OffsetInChunk)
adjustedKey := &SSEKMSKey{
KeyID: sseKMSKey.KeyID,
EncryptedDataKey: sseKMSKey.EncryptedDataKey,
EncryptionContext: sseKMSKey.EncryptionContext,
BucketKeyEnabled: sseKMSKey.BucketKeyEnabled,
IV: adjustedIV,
ChunkOffset: chunkView.OffsetInChunk,
}
return CreateSSEKMSDecryptedReader(encryptedReader, adjustedKey)
}
// Non-KMS encrypted chunk
return s3a.fetchChunkViewData(ctx, chunkView)
}
// decryptSSES3ChunkView decrypts a specific chunk view with SSE-S3
func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, entry *filer_pb.Entry) (io.Reader, error) {
// SSE-S3 typically uses object-level encryption, not per-chunk
// Fetch encrypted chunk data
encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView)
if err != nil {
return nil, err
}
// Get SSE-S3 key from object metadata
keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key]
keyManager := GetSSES3KeyManager()
sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
if err != nil {
return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err)
}
// Decrypt with CTR IV offset adjustment for the range
iv, err := GetSSES3IV(entry, sseS3Key, keyManager)
if err != nil {
return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err)
}
adjustedIV := adjustCTRIV(iv, chunkView.OffsetInChunk)
return CreateSSES3DecryptedReader(encryptedReader, sseS3Key, adjustedIV)
}
// adjustCTRIV adjusts the IV for CTR mode based on byte offset
// CTR mode increments the counter for each 16-byte block
func adjustCTRIV(baseIV []byte, offset int64) []byte {
if offset == 0 {
return baseIV
}
adjustedIV := make([]byte, len(baseIV))
copy(adjustedIV, baseIV)
// Calculate block offset (CTR increments per 16-byte block)
blockOffset := uint64(offset / 16)
// Add block offset to the IV (treating IV as big-endian counter)
for i := len(adjustedIV) - 1; i >= 0 && blockOffset > 0; i-- {
sum := uint64(adjustedIV[i]) + (blockOffset & 0xFF)
adjustedIV[i] = byte(sum & 0xFF)
blockOffset = (blockOffset >> 8) + (sum >> 8)
}
return adjustedIV
}
// fetchChunkViewData fetches encrypted data for a chunk view (with range)
func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer.ChunkView) (io.ReadCloser, error) {
// Lookup the volume server URLs for this chunk
lookupFileIdFn := s3a.createLookupFileIdFunction()
urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId)
if err != nil || len(urlStrings) == 0 {
return nil, fmt.Errorf("failed to lookup chunk %s: %w", chunkView.FileId, err)
}
// Use the first URL
chunkUrl := urlStrings[0]
// Generate JWT for volume server authentication
jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunkView.FileId)
// Create request with Range header for the chunk view
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s/%s", chunkUrl, chunkView.FileId), nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
// Set Range header to fetch only the needed portion of the chunk
if !chunkView.IsFullChunk() {
rangeEnd := chunkView.OffsetInChunk + int64(chunkView.ViewSize) - 1
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", chunkView.OffsetInChunk, rangeEnd))
glog.V(4).Infof("Fetching chunk %s with range bytes=%d-%d", chunkView.FileId, chunkView.OffsetInChunk, rangeEnd)
}
// Set JWT for authentication
if jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(jwt))
}
// Use shared HTTP client with connection pooling
resp, err := volumeServerHTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch chunk: %w", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close()
return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, chunkView.FileId)
}
return resp.Body, nil
} }
// getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers // getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers
@ -2586,17 +2844,8 @@ func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (i
req.Header.Set("Authorization", "BEARER "+string(jwt)) req.Header.Set("Authorization", "BEARER "+string(jwt))
} }
// Use HTTP client with reasonable timeouts
httpClient := &http.Client{
Timeout: 5 * time.Minute,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
resp, err := httpClient.Do(req)
// Use shared HTTP client with connection pooling
resp, err := volumeServerHTTPClient.Do(req)
if err != nil { if err != nil {
return nil, fmt.Errorf("execute HTTP request for chunk: %v", err) return nil, fmt.Errorf("execute HTTP request for chunk: %v", err)
} }

Loading…
Cancel
Save