You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
561 lines
17 KiB
561 lines
17 KiB
package s3api
|
|
|
|
import (
|
|
"context"
|
|
"crypto/md5"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"net/http"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
// StreamingCopySpec defines the specification for streaming copy operations
|
|
type StreamingCopySpec struct {
|
|
SourceReader io.Reader
|
|
TargetSize int64
|
|
EncryptionSpec *EncryptionSpec
|
|
CompressionSpec *CompressionSpec
|
|
HashCalculation bool
|
|
BufferSize int
|
|
}
|
|
|
|
// EncryptionSpec defines encryption parameters for streaming
|
|
type EncryptionSpec struct {
|
|
NeedsDecryption bool
|
|
NeedsEncryption bool
|
|
SourceKey interface{} // SSECustomerKey or SSEKMSKey
|
|
DestinationKey interface{} // SSECustomerKey or SSEKMSKey
|
|
SourceType EncryptionType
|
|
DestinationType EncryptionType
|
|
SourceMetadata map[string][]byte // Source metadata for IV extraction
|
|
DestinationIV []byte // Generated IV for destination
|
|
}
|
|
|
|
// CompressionSpec defines compression parameters for streaming
|
|
type CompressionSpec struct {
|
|
IsCompressed bool
|
|
CompressionType string
|
|
NeedsDecompression bool
|
|
NeedsCompression bool
|
|
}
|
|
|
|
// StreamingCopyManager handles streaming copy operations
|
|
type StreamingCopyManager struct {
|
|
s3a *S3ApiServer
|
|
bufferSize int
|
|
}
|
|
|
|
// NewStreamingCopyManager creates a new streaming copy manager
|
|
func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
|
|
return &StreamingCopyManager{
|
|
s3a: s3a,
|
|
bufferSize: 64 * 1024, // 64KB default buffer
|
|
}
|
|
}
|
|
|
|
// ExecuteStreamingCopy performs a streaming copy operation
|
|
func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
|
|
// Create streaming copy specification
|
|
spec, err := scm.createStreamingSpec(entry, r, state)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create streaming spec: %w", err)
|
|
}
|
|
|
|
// Create source reader from entry
|
|
sourceReader, err := scm.createSourceReader(entry)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create source reader: %w", err)
|
|
}
|
|
defer sourceReader.Close()
|
|
|
|
spec.SourceReader = sourceReader
|
|
|
|
// Create processing pipeline
|
|
processedReader, err := scm.createProcessingPipeline(spec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create processing pipeline: %w", err)
|
|
}
|
|
|
|
// Stream to destination
|
|
return scm.streamToDestination(ctx, processedReader, spec, dstPath)
|
|
}
|
|
|
|
// createStreamingSpec creates a streaming specification based on copy parameters
|
|
func (scm *StreamingCopyManager) createStreamingSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*StreamingCopySpec, error) {
|
|
spec := &StreamingCopySpec{
|
|
BufferSize: scm.bufferSize,
|
|
HashCalculation: true,
|
|
}
|
|
|
|
// Calculate target size
|
|
sizeCalc := NewCopySizeCalculator(entry, r)
|
|
spec.TargetSize = sizeCalc.CalculateTargetSize()
|
|
|
|
// Create encryption specification
|
|
encSpec, err := scm.createEncryptionSpec(entry, r, state)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
spec.EncryptionSpec = encSpec
|
|
|
|
// Create compression specification
|
|
spec.CompressionSpec = scm.createCompressionSpec(entry, r)
|
|
|
|
return spec, nil
|
|
}
|
|
|
|
// createEncryptionSpec creates encryption specification for streaming
|
|
func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*EncryptionSpec, error) {
|
|
spec := &EncryptionSpec{
|
|
NeedsDecryption: state.IsSourceEncrypted(),
|
|
NeedsEncryption: state.IsTargetEncrypted(),
|
|
SourceMetadata: entry.Extended, // Pass source metadata for IV extraction
|
|
}
|
|
|
|
// Set source encryption details
|
|
if state.SrcSSEC {
|
|
spec.SourceType = EncryptionTypeSSEC
|
|
sourceKey, err := ParseSSECCopySourceHeaders(r)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse SSE-C copy source headers: %w", err)
|
|
}
|
|
spec.SourceKey = sourceKey
|
|
} else if state.SrcSSEKMS {
|
|
spec.SourceType = EncryptionTypeSSEKMS
|
|
// Extract SSE-KMS key from metadata
|
|
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
|
|
sseKey, err := DeserializeSSEKMSMetadata(keyData)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("deserialize SSE-KMS metadata: %w", err)
|
|
}
|
|
spec.SourceKey = sseKey
|
|
}
|
|
} else if state.SrcSSES3 {
|
|
spec.SourceType = EncryptionTypeSSES3
|
|
// Extract SSE-S3 key from metadata
|
|
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists {
|
|
// TODO: This should use a proper SSE-S3 key manager from S3ApiServer
|
|
// For now, create a temporary key manager to handle deserialization
|
|
tempKeyManager := NewSSES3KeyManager()
|
|
sseKey, err := DeserializeSSES3Metadata(keyData, tempKeyManager)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err)
|
|
}
|
|
spec.SourceKey = sseKey
|
|
}
|
|
}
|
|
|
|
// Set destination encryption details
|
|
if state.DstSSEC {
|
|
spec.DestinationType = EncryptionTypeSSEC
|
|
destKey, err := ParseSSECHeaders(r)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse SSE-C headers: %w", err)
|
|
}
|
|
spec.DestinationKey = destKey
|
|
} else if state.DstSSEKMS {
|
|
spec.DestinationType = EncryptionTypeSSEKMS
|
|
// Parse KMS parameters
|
|
keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse SSE-KMS copy headers: %w", err)
|
|
}
|
|
|
|
// Create SSE-KMS key for destination
|
|
sseKey := &SSEKMSKey{
|
|
KeyID: keyID,
|
|
EncryptionContext: encryptionContext,
|
|
BucketKeyEnabled: bucketKeyEnabled,
|
|
}
|
|
spec.DestinationKey = sseKey
|
|
} else if state.DstSSES3 {
|
|
spec.DestinationType = EncryptionTypeSSES3
|
|
// Generate or retrieve SSE-S3 key
|
|
keyManager := GetSSES3KeyManager()
|
|
sseKey, err := keyManager.GetOrCreateKey("")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get SSE-S3 key: %w", err)
|
|
}
|
|
spec.DestinationKey = sseKey
|
|
}
|
|
|
|
return spec, nil
|
|
}
|
|
|
|
// createCompressionSpec creates compression specification for streaming
|
|
func (scm *StreamingCopyManager) createCompressionSpec(entry *filer_pb.Entry, r *http.Request) *CompressionSpec {
|
|
return &CompressionSpec{
|
|
IsCompressed: isCompressedEntry(entry),
|
|
// For now, we don't change compression during copy
|
|
NeedsDecompression: false,
|
|
NeedsCompression: false,
|
|
}
|
|
}
|
|
|
|
// createSourceReader creates a reader for the source entry
|
|
func (scm *StreamingCopyManager) createSourceReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
|
|
// Create a multi-chunk reader that streams from all chunks
|
|
return scm.s3a.createMultiChunkReader(entry)
|
|
}
|
|
|
|
// createProcessingPipeline creates a processing pipeline for the copy operation
|
|
func (scm *StreamingCopyManager) createProcessingPipeline(spec *StreamingCopySpec) (io.Reader, error) {
|
|
reader := spec.SourceReader
|
|
|
|
// Add decryption if needed
|
|
if spec.EncryptionSpec.NeedsDecryption {
|
|
decryptedReader, err := scm.createDecryptionReader(reader, spec.EncryptionSpec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create decryption reader: %w", err)
|
|
}
|
|
reader = decryptedReader
|
|
}
|
|
|
|
// Add decompression if needed
|
|
if spec.CompressionSpec.NeedsDecompression {
|
|
decompressedReader, err := scm.createDecompressionReader(reader, spec.CompressionSpec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create decompression reader: %w", err)
|
|
}
|
|
reader = decompressedReader
|
|
}
|
|
|
|
// Add compression if needed
|
|
if spec.CompressionSpec.NeedsCompression {
|
|
compressedReader, err := scm.createCompressionReader(reader, spec.CompressionSpec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create compression reader: %w", err)
|
|
}
|
|
reader = compressedReader
|
|
}
|
|
|
|
// Add encryption if needed
|
|
if spec.EncryptionSpec.NeedsEncryption {
|
|
encryptedReader, err := scm.createEncryptionReader(reader, spec.EncryptionSpec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create encryption reader: %w", err)
|
|
}
|
|
reader = encryptedReader
|
|
}
|
|
|
|
// Add hash calculation if needed
|
|
if spec.HashCalculation {
|
|
reader = scm.createHashReader(reader)
|
|
}
|
|
|
|
return reader, nil
|
|
}
|
|
|
|
// createDecryptionReader creates a decryption reader based on encryption type
|
|
func (scm *StreamingCopyManager) createDecryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
|
|
switch encSpec.SourceType {
|
|
case EncryptionTypeSSEC:
|
|
if sourceKey, ok := encSpec.SourceKey.(*SSECustomerKey); ok {
|
|
// Get IV from metadata
|
|
iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get IV from metadata: %w", err)
|
|
}
|
|
return CreateSSECDecryptedReader(reader, sourceKey, iv)
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-C source key type")
|
|
|
|
case EncryptionTypeSSEKMS:
|
|
if sseKey, ok := encSpec.SourceKey.(*SSEKMSKey); ok {
|
|
return CreateSSEKMSDecryptedReader(reader, sseKey)
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-KMS source key type")
|
|
|
|
case EncryptionTypeSSES3:
|
|
if sseKey, ok := encSpec.SourceKey.(*SSES3Key); ok {
|
|
// Get IV from metadata
|
|
iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get IV from metadata: %w", err)
|
|
}
|
|
return CreateSSES3DecryptedReader(reader, sseKey, iv)
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-S3 source key type")
|
|
|
|
default:
|
|
return reader, nil
|
|
}
|
|
}
|
|
|
|
// createEncryptionReader creates an encryption reader based on encryption type
|
|
func (scm *StreamingCopyManager) createEncryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
|
|
switch encSpec.DestinationType {
|
|
case EncryptionTypeSSEC:
|
|
if destKey, ok := encSpec.DestinationKey.(*SSECustomerKey); ok {
|
|
encryptedReader, iv, err := CreateSSECEncryptedReader(reader, destKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Store IV in destination metadata (this would need to be handled by caller)
|
|
encSpec.DestinationIV = iv
|
|
return encryptedReader, nil
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-C destination key type")
|
|
|
|
case EncryptionTypeSSEKMS:
|
|
if sseKey, ok := encSpec.DestinationKey.(*SSEKMSKey); ok {
|
|
encryptedReader, updatedKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(reader, sseKey.KeyID, sseKey.EncryptionContext, sseKey.BucketKeyEnabled)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Store IV from the updated key
|
|
encSpec.DestinationIV = updatedKey.IV
|
|
return encryptedReader, nil
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-KMS destination key type")
|
|
|
|
case EncryptionTypeSSES3:
|
|
if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
|
|
encryptedReader, iv, err := CreateSSES3EncryptedReader(reader, sseKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Store IV for metadata
|
|
encSpec.DestinationIV = iv
|
|
return encryptedReader, nil
|
|
}
|
|
return nil, fmt.Errorf("invalid SSE-S3 destination key type")
|
|
|
|
default:
|
|
return reader, nil
|
|
}
|
|
}
|
|
|
|
// createDecompressionReader creates a decompression reader
|
|
func (scm *StreamingCopyManager) createDecompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
|
|
if !compSpec.NeedsDecompression {
|
|
return reader, nil
|
|
}
|
|
|
|
switch compSpec.CompressionType {
|
|
case "gzip":
|
|
// Use SeaweedFS's streaming gzip decompression
|
|
pr, pw := io.Pipe()
|
|
go func() {
|
|
defer pw.Close()
|
|
_, err := util.GunzipStream(pw, reader)
|
|
if err != nil {
|
|
pw.CloseWithError(fmt.Errorf("gzip decompression failed: %v", err))
|
|
}
|
|
}()
|
|
return pr, nil
|
|
default:
|
|
// Unknown compression type, return as-is
|
|
return reader, nil
|
|
}
|
|
}
|
|
|
|
// createCompressionReader creates a compression reader
|
|
func (scm *StreamingCopyManager) createCompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
|
|
if !compSpec.NeedsCompression {
|
|
return reader, nil
|
|
}
|
|
|
|
switch compSpec.CompressionType {
|
|
case "gzip":
|
|
// Use SeaweedFS's streaming gzip compression
|
|
pr, pw := io.Pipe()
|
|
go func() {
|
|
defer pw.Close()
|
|
_, err := util.GzipStream(pw, reader)
|
|
if err != nil {
|
|
pw.CloseWithError(fmt.Errorf("gzip compression failed: %v", err))
|
|
}
|
|
}()
|
|
return pr, nil
|
|
default:
|
|
// Unknown compression type, return as-is
|
|
return reader, nil
|
|
}
|
|
}
|
|
|
|
// HashReader wraps an io.Reader to calculate MD5 and SHA256 hashes
|
|
type HashReader struct {
|
|
reader io.Reader
|
|
md5Hash hash.Hash
|
|
sha256Hash hash.Hash
|
|
}
|
|
|
|
// NewHashReader creates a new hash calculating reader
|
|
func NewHashReader(reader io.Reader) *HashReader {
|
|
return &HashReader{
|
|
reader: reader,
|
|
md5Hash: md5.New(),
|
|
sha256Hash: sha256.New(),
|
|
}
|
|
}
|
|
|
|
// Read implements io.Reader and calculates hashes as data flows through
|
|
func (hr *HashReader) Read(p []byte) (n int, err error) {
|
|
n, err = hr.reader.Read(p)
|
|
if n > 0 {
|
|
// Update both hashes with the data read
|
|
hr.md5Hash.Write(p[:n])
|
|
hr.sha256Hash.Write(p[:n])
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// MD5Sum returns the current MD5 hash
|
|
func (hr *HashReader) MD5Sum() []byte {
|
|
return hr.md5Hash.Sum(nil)
|
|
}
|
|
|
|
// SHA256Sum returns the current SHA256 hash
|
|
func (hr *HashReader) SHA256Sum() []byte {
|
|
return hr.sha256Hash.Sum(nil)
|
|
}
|
|
|
|
// MD5Hex returns the MD5 hash as a hex string
|
|
func (hr *HashReader) MD5Hex() string {
|
|
return hex.EncodeToString(hr.MD5Sum())
|
|
}
|
|
|
|
// SHA256Hex returns the SHA256 hash as a hex string
|
|
func (hr *HashReader) SHA256Hex() string {
|
|
return hex.EncodeToString(hr.SHA256Sum())
|
|
}
|
|
|
|
// createHashReader creates a hash calculation reader
|
|
func (scm *StreamingCopyManager) createHashReader(reader io.Reader) io.Reader {
|
|
return NewHashReader(reader)
|
|
}
|
|
|
|
// streamToDestination streams the processed data to the destination
|
|
func (scm *StreamingCopyManager) streamToDestination(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
|
|
// For now, we'll use the existing chunk-based approach
|
|
// In a full implementation, this would stream directly to the destination
|
|
// without creating intermediate chunks
|
|
|
|
// This is a placeholder that converts back to chunk-based approach
|
|
// A full streaming implementation would write directly to the destination
|
|
return scm.streamToChunks(ctx, reader, spec, dstPath)
|
|
}
|
|
|
|
// streamToChunks converts streaming data back to chunks (temporary implementation)
|
|
func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
|
|
// This is a simplified implementation that reads the stream and creates chunks
|
|
// A full implementation would be more sophisticated
|
|
|
|
var chunks []*filer_pb.FileChunk
|
|
buffer := make([]byte, spec.BufferSize)
|
|
offset := int64(0)
|
|
|
|
for {
|
|
n, err := reader.Read(buffer)
|
|
if n > 0 {
|
|
// Create chunk for this data
|
|
chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
|
|
if chunkErr != nil {
|
|
return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
|
|
}
|
|
chunks = append(chunks, chunk)
|
|
offset += int64(n)
|
|
}
|
|
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read stream: %w", err)
|
|
}
|
|
}
|
|
|
|
return chunks, nil
|
|
}
|
|
|
|
// createChunkFromData creates a chunk from streaming data
|
|
func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
|
|
// Assign new volume
|
|
assignResult, err := scm.s3a.assignNewVolume(dstPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("assign volume: %w", err)
|
|
}
|
|
|
|
// Create chunk
|
|
chunk := &filer_pb.FileChunk{
|
|
Offset: offset,
|
|
Size: uint64(len(data)),
|
|
}
|
|
|
|
// Set file ID
|
|
if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Upload data
|
|
if err := scm.s3a.uploadChunkData(data, assignResult); err != nil {
|
|
return nil, fmt.Errorf("upload chunk data: %w", err)
|
|
}
|
|
|
|
return chunk, nil
|
|
}
|
|
|
|
// createMultiChunkReader creates a reader that streams from multiple chunks
|
|
func (s3a *S3ApiServer) createMultiChunkReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
|
|
// Create a multi-reader that combines all chunks
|
|
var readers []io.Reader
|
|
|
|
for _, chunk := range entry.GetChunks() {
|
|
chunkReader, err := s3a.createChunkReader(chunk)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create chunk reader: %w", err)
|
|
}
|
|
readers = append(readers, chunkReader)
|
|
}
|
|
|
|
multiReader := io.MultiReader(readers...)
|
|
return &multiReadCloser{reader: multiReader}, nil
|
|
}
|
|
|
|
// createChunkReader creates a reader for a single chunk
|
|
func (s3a *S3ApiServer) createChunkReader(chunk *filer_pb.FileChunk) (io.Reader, error) {
|
|
// Get chunk URL
|
|
srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("lookup volume URL: %w", err)
|
|
}
|
|
|
|
// Create HTTP request for chunk data
|
|
req, err := http.NewRequest("GET", srcUrl, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create HTTP request: %w", err)
|
|
}
|
|
|
|
// Execute request
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("execute HTTP request: %w", err)
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
resp.Body.Close()
|
|
return nil, fmt.Errorf("HTTP request failed: %d", resp.StatusCode)
|
|
}
|
|
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// multiReadCloser wraps a multi-reader with a close method
|
|
type multiReadCloser struct {
|
|
reader io.Reader
|
|
}
|
|
|
|
func (mrc *multiReadCloser) Read(p []byte) (int, error) {
|
|
return mrc.reader.Read(p)
|
|
}
|
|
|
|
func (mrc *multiReadCloser) Close() error {
|
|
return nil
|
|
}
|