You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

561 lines
17 KiB

package s3api
import (
"context"
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"io"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// StreamingCopySpec defines the specification for streaming copy operations
type StreamingCopySpec struct {
SourceReader io.Reader
TargetSize int64
EncryptionSpec *EncryptionSpec
CompressionSpec *CompressionSpec
HashCalculation bool
BufferSize int
}
// EncryptionSpec defines encryption parameters for streaming
type EncryptionSpec struct {
NeedsDecryption bool
NeedsEncryption bool
SourceKey interface{} // SSECustomerKey or SSEKMSKey
DestinationKey interface{} // SSECustomerKey or SSEKMSKey
SourceType EncryptionType
DestinationType EncryptionType
SourceMetadata map[string][]byte // Source metadata for IV extraction
DestinationIV []byte // Generated IV for destination
}
// CompressionSpec defines compression parameters for streaming
type CompressionSpec struct {
IsCompressed bool
CompressionType string
NeedsDecompression bool
NeedsCompression bool
}
// StreamingCopyManager handles streaming copy operations
type StreamingCopyManager struct {
s3a *S3ApiServer
bufferSize int
}
// NewStreamingCopyManager creates a new streaming copy manager
func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
return &StreamingCopyManager{
s3a: s3a,
bufferSize: 64 * 1024, // 64KB default buffer
}
}
// ExecuteStreamingCopy performs a streaming copy operation
func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
// Create streaming copy specification
spec, err := scm.createStreamingSpec(entry, r, state)
if err != nil {
return nil, fmt.Errorf("create streaming spec: %w", err)
}
// Create source reader from entry
sourceReader, err := scm.createSourceReader(entry)
if err != nil {
return nil, fmt.Errorf("create source reader: %w", err)
}
defer sourceReader.Close()
spec.SourceReader = sourceReader
// Create processing pipeline
processedReader, err := scm.createProcessingPipeline(spec)
if err != nil {
return nil, fmt.Errorf("create processing pipeline: %w", err)
}
// Stream to destination
return scm.streamToDestination(ctx, processedReader, spec, dstPath)
}
// createStreamingSpec creates a streaming specification based on copy parameters
func (scm *StreamingCopyManager) createStreamingSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*StreamingCopySpec, error) {
spec := &StreamingCopySpec{
BufferSize: scm.bufferSize,
HashCalculation: true,
}
// Calculate target size
sizeCalc := NewCopySizeCalculator(entry, r)
spec.TargetSize = sizeCalc.CalculateTargetSize()
// Create encryption specification
encSpec, err := scm.createEncryptionSpec(entry, r, state)
if err != nil {
return nil, err
}
spec.EncryptionSpec = encSpec
// Create compression specification
spec.CompressionSpec = scm.createCompressionSpec(entry, r)
return spec, nil
}
// createEncryptionSpec creates encryption specification for streaming
func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*EncryptionSpec, error) {
spec := &EncryptionSpec{
NeedsDecryption: state.IsSourceEncrypted(),
NeedsEncryption: state.IsTargetEncrypted(),
SourceMetadata: entry.Extended, // Pass source metadata for IV extraction
}
// Set source encryption details
if state.SrcSSEC {
spec.SourceType = EncryptionTypeSSEC
sourceKey, err := ParseSSECCopySourceHeaders(r)
if err != nil {
return nil, fmt.Errorf("parse SSE-C copy source headers: %w", err)
}
spec.SourceKey = sourceKey
} else if state.SrcSSEKMS {
spec.SourceType = EncryptionTypeSSEKMS
// Extract SSE-KMS key from metadata
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
sseKey, err := DeserializeSSEKMSMetadata(keyData)
if err != nil {
return nil, fmt.Errorf("deserialize SSE-KMS metadata: %w", err)
}
spec.SourceKey = sseKey
}
} else if state.SrcSSES3 {
spec.SourceType = EncryptionTypeSSES3
// Extract SSE-S3 key from metadata
if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists {
// TODO: This should use a proper SSE-S3 key manager from S3ApiServer
// For now, create a temporary key manager to handle deserialization
tempKeyManager := NewSSES3KeyManager()
sseKey, err := DeserializeSSES3Metadata(keyData, tempKeyManager)
if err != nil {
return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err)
}
spec.SourceKey = sseKey
}
}
// Set destination encryption details
if state.DstSSEC {
spec.DestinationType = EncryptionTypeSSEC
destKey, err := ParseSSECHeaders(r)
if err != nil {
return nil, fmt.Errorf("parse SSE-C headers: %w", err)
}
spec.DestinationKey = destKey
} else if state.DstSSEKMS {
spec.DestinationType = EncryptionTypeSSEKMS
// Parse KMS parameters
keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
if err != nil {
return nil, fmt.Errorf("parse SSE-KMS copy headers: %w", err)
}
// Create SSE-KMS key for destination
sseKey := &SSEKMSKey{
KeyID: keyID,
EncryptionContext: encryptionContext,
BucketKeyEnabled: bucketKeyEnabled,
}
spec.DestinationKey = sseKey
} else if state.DstSSES3 {
spec.DestinationType = EncryptionTypeSSES3
// Generate or retrieve SSE-S3 key
keyManager := GetSSES3KeyManager()
sseKey, err := keyManager.GetOrCreateKey("")
if err != nil {
return nil, fmt.Errorf("get SSE-S3 key: %w", err)
}
spec.DestinationKey = sseKey
}
return spec, nil
}
// createCompressionSpec creates compression specification for streaming
func (scm *StreamingCopyManager) createCompressionSpec(entry *filer_pb.Entry, r *http.Request) *CompressionSpec {
return &CompressionSpec{
IsCompressed: isCompressedEntry(entry),
// For now, we don't change compression during copy
NeedsDecompression: false,
NeedsCompression: false,
}
}
// createSourceReader creates a reader for the source entry
func (scm *StreamingCopyManager) createSourceReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
// Create a multi-chunk reader that streams from all chunks
return scm.s3a.createMultiChunkReader(entry)
}
// createProcessingPipeline creates a processing pipeline for the copy operation
func (scm *StreamingCopyManager) createProcessingPipeline(spec *StreamingCopySpec) (io.Reader, error) {
reader := spec.SourceReader
// Add decryption if needed
if spec.EncryptionSpec.NeedsDecryption {
decryptedReader, err := scm.createDecryptionReader(reader, spec.EncryptionSpec)
if err != nil {
return nil, fmt.Errorf("create decryption reader: %w", err)
}
reader = decryptedReader
}
// Add decompression if needed
if spec.CompressionSpec.NeedsDecompression {
decompressedReader, err := scm.createDecompressionReader(reader, spec.CompressionSpec)
if err != nil {
return nil, fmt.Errorf("create decompression reader: %w", err)
}
reader = decompressedReader
}
// Add compression if needed
if spec.CompressionSpec.NeedsCompression {
compressedReader, err := scm.createCompressionReader(reader, spec.CompressionSpec)
if err != nil {
return nil, fmt.Errorf("create compression reader: %w", err)
}
reader = compressedReader
}
// Add encryption if needed
if spec.EncryptionSpec.NeedsEncryption {
encryptedReader, err := scm.createEncryptionReader(reader, spec.EncryptionSpec)
if err != nil {
return nil, fmt.Errorf("create encryption reader: %w", err)
}
reader = encryptedReader
}
// Add hash calculation if needed
if spec.HashCalculation {
reader = scm.createHashReader(reader)
}
return reader, nil
}
// createDecryptionReader creates a decryption reader based on encryption type
func (scm *StreamingCopyManager) createDecryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
switch encSpec.SourceType {
case EncryptionTypeSSEC:
if sourceKey, ok := encSpec.SourceKey.(*SSECustomerKey); ok {
// Get IV from metadata
iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
if err != nil {
return nil, fmt.Errorf("get IV from metadata: %w", err)
}
return CreateSSECDecryptedReader(reader, sourceKey, iv)
}
return nil, fmt.Errorf("invalid SSE-C source key type")
case EncryptionTypeSSEKMS:
if sseKey, ok := encSpec.SourceKey.(*SSEKMSKey); ok {
return CreateSSEKMSDecryptedReader(reader, sseKey)
}
return nil, fmt.Errorf("invalid SSE-KMS source key type")
case EncryptionTypeSSES3:
if sseKey, ok := encSpec.SourceKey.(*SSES3Key); ok {
// Get IV from metadata
iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
if err != nil {
return nil, fmt.Errorf("get IV from metadata: %w", err)
}
return CreateSSES3DecryptedReader(reader, sseKey, iv)
}
return nil, fmt.Errorf("invalid SSE-S3 source key type")
default:
return reader, nil
}
}
// createEncryptionReader creates an encryption reader based on encryption type
func (scm *StreamingCopyManager) createEncryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
switch encSpec.DestinationType {
case EncryptionTypeSSEC:
if destKey, ok := encSpec.DestinationKey.(*SSECustomerKey); ok {
encryptedReader, iv, err := CreateSSECEncryptedReader(reader, destKey)
if err != nil {
return nil, err
}
// Store IV in destination metadata (this would need to be handled by caller)
encSpec.DestinationIV = iv
return encryptedReader, nil
}
return nil, fmt.Errorf("invalid SSE-C destination key type")
case EncryptionTypeSSEKMS:
if sseKey, ok := encSpec.DestinationKey.(*SSEKMSKey); ok {
encryptedReader, updatedKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(reader, sseKey.KeyID, sseKey.EncryptionContext, sseKey.BucketKeyEnabled)
if err != nil {
return nil, err
}
// Store IV from the updated key
encSpec.DestinationIV = updatedKey.IV
return encryptedReader, nil
}
return nil, fmt.Errorf("invalid SSE-KMS destination key type")
case EncryptionTypeSSES3:
if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
encryptedReader, iv, err := CreateSSES3EncryptedReader(reader, sseKey)
if err != nil {
return nil, err
}
// Store IV for metadata
encSpec.DestinationIV = iv
return encryptedReader, nil
}
return nil, fmt.Errorf("invalid SSE-S3 destination key type")
default:
return reader, nil
}
}
// createDecompressionReader creates a decompression reader
func (scm *StreamingCopyManager) createDecompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
if !compSpec.NeedsDecompression {
return reader, nil
}
switch compSpec.CompressionType {
case "gzip":
// Use SeaweedFS's streaming gzip decompression
pr, pw := io.Pipe()
go func() {
defer pw.Close()
_, err := util.GunzipStream(pw, reader)
if err != nil {
pw.CloseWithError(fmt.Errorf("gzip decompression failed: %v", err))
}
}()
return pr, nil
default:
// Unknown compression type, return as-is
return reader, nil
}
}
// createCompressionReader creates a compression reader
func (scm *StreamingCopyManager) createCompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
if !compSpec.NeedsCompression {
return reader, nil
}
switch compSpec.CompressionType {
case "gzip":
// Use SeaweedFS's streaming gzip compression
pr, pw := io.Pipe()
go func() {
defer pw.Close()
_, err := util.GzipStream(pw, reader)
if err != nil {
pw.CloseWithError(fmt.Errorf("gzip compression failed: %v", err))
}
}()
return pr, nil
default:
// Unknown compression type, return as-is
return reader, nil
}
}
// HashReader wraps an io.Reader to calculate MD5 and SHA256 hashes
type HashReader struct {
reader io.Reader
md5Hash hash.Hash
sha256Hash hash.Hash
}
// NewHashReader creates a new hash calculating reader
func NewHashReader(reader io.Reader) *HashReader {
return &HashReader{
reader: reader,
md5Hash: md5.New(),
sha256Hash: sha256.New(),
}
}
// Read implements io.Reader and calculates hashes as data flows through
func (hr *HashReader) Read(p []byte) (n int, err error) {
n, err = hr.reader.Read(p)
if n > 0 {
// Update both hashes with the data read
hr.md5Hash.Write(p[:n])
hr.sha256Hash.Write(p[:n])
}
return n, err
}
// MD5Sum returns the current MD5 hash
func (hr *HashReader) MD5Sum() []byte {
return hr.md5Hash.Sum(nil)
}
// SHA256Sum returns the current SHA256 hash
func (hr *HashReader) SHA256Sum() []byte {
return hr.sha256Hash.Sum(nil)
}
// MD5Hex returns the MD5 hash as a hex string
func (hr *HashReader) MD5Hex() string {
return hex.EncodeToString(hr.MD5Sum())
}
// SHA256Hex returns the SHA256 hash as a hex string
func (hr *HashReader) SHA256Hex() string {
return hex.EncodeToString(hr.SHA256Sum())
}
// createHashReader creates a hash calculation reader
func (scm *StreamingCopyManager) createHashReader(reader io.Reader) io.Reader {
return NewHashReader(reader)
}
// streamToDestination streams the processed data to the destination
func (scm *StreamingCopyManager) streamToDestination(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
// For now, we'll use the existing chunk-based approach
// In a full implementation, this would stream directly to the destination
// without creating intermediate chunks
// This is a placeholder that converts back to chunk-based approach
// A full streaming implementation would write directly to the destination
return scm.streamToChunks(ctx, reader, spec, dstPath)
}
// streamToChunks converts streaming data back to chunks (temporary implementation)
func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
// This is a simplified implementation that reads the stream and creates chunks
// A full implementation would be more sophisticated
var chunks []*filer_pb.FileChunk
buffer := make([]byte, spec.BufferSize)
offset := int64(0)
for {
n, err := reader.Read(buffer)
if n > 0 {
// Create chunk for this data
chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
if chunkErr != nil {
return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
}
chunks = append(chunks, chunk)
offset += int64(n)
}
if err == io.EOF {
break
}
if err != nil {
return nil, fmt.Errorf("read stream: %w", err)
}
}
return chunks, nil
}
// createChunkFromData creates a chunk from streaming data
func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
// Assign new volume
assignResult, err := scm.s3a.assignNewVolume(dstPath)
if err != nil {
return nil, fmt.Errorf("assign volume: %w", err)
}
// Create chunk
chunk := &filer_pb.FileChunk{
Offset: offset,
Size: uint64(len(data)),
}
// Set file ID
if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
return nil, err
}
// Upload data
if err := scm.s3a.uploadChunkData(data, assignResult); err != nil {
return nil, fmt.Errorf("upload chunk data: %w", err)
}
return chunk, nil
}
// createMultiChunkReader creates a reader that streams from multiple chunks
func (s3a *S3ApiServer) createMultiChunkReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
// Create a multi-reader that combines all chunks
var readers []io.Reader
for _, chunk := range entry.GetChunks() {
chunkReader, err := s3a.createChunkReader(chunk)
if err != nil {
return nil, fmt.Errorf("create chunk reader: %w", err)
}
readers = append(readers, chunkReader)
}
multiReader := io.MultiReader(readers...)
return &multiReadCloser{reader: multiReader}, nil
}
// createChunkReader creates a reader for a single chunk
func (s3a *S3ApiServer) createChunkReader(chunk *filer_pb.FileChunk) (io.Reader, error) {
// Get chunk URL
srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
if err != nil {
return nil, fmt.Errorf("lookup volume URL: %w", err)
}
// Create HTTP request for chunk data
req, err := http.NewRequest("GET", srcUrl, nil)
if err != nil {
return nil, fmt.Errorf("create HTTP request: %w", err)
}
// Execute request
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("execute HTTP request: %w", err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("HTTP request failed: %d", resp.StatusCode)
}
return resp.Body, nil
}
// multiReadCloser wraps a multi-reader with a close method
type multiReadCloser struct {
reader io.Reader
}
func (mrc *multiReadCloser) Read(p []byte) (int, error) {
return mrc.reader.Read(p)
}
func (mrc *multiReadCloser) Close() error {
return nil
}