You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							561 lines
						
					
					
						
							17 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							561 lines
						
					
					
						
							17 KiB
						
					
					
				
								package s3api
							 | 
						|
								
							 | 
						|
								import (
							 | 
						|
									"context"
							 | 
						|
									"crypto/md5"
							 | 
						|
									"crypto/sha256"
							 | 
						|
									"encoding/hex"
							 | 
						|
									"fmt"
							 | 
						|
									"hash"
							 | 
						|
									"io"
							 | 
						|
									"net/http"
							 | 
						|
								
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
							 | 
						|
									"github.com/seaweedfs/seaweedfs/weed/util"
							 | 
						|
								)
							 | 
						|
								
							 | 
						|
								// StreamingCopySpec defines the specification for streaming copy operations
							 | 
						|
								type StreamingCopySpec struct {
							 | 
						|
									SourceReader    io.Reader
							 | 
						|
									TargetSize      int64
							 | 
						|
									EncryptionSpec  *EncryptionSpec
							 | 
						|
									CompressionSpec *CompressionSpec
							 | 
						|
									HashCalculation bool
							 | 
						|
									BufferSize      int
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// EncryptionSpec defines encryption parameters for streaming
							 | 
						|
								type EncryptionSpec struct {
							 | 
						|
									NeedsDecryption bool
							 | 
						|
									NeedsEncryption bool
							 | 
						|
									SourceKey       interface{} // SSECustomerKey or SSEKMSKey
							 | 
						|
									DestinationKey  interface{} // SSECustomerKey or SSEKMSKey
							 | 
						|
									SourceType      EncryptionType
							 | 
						|
									DestinationType EncryptionType
							 | 
						|
									SourceMetadata  map[string][]byte // Source metadata for IV extraction
							 | 
						|
									DestinationIV   []byte            // Generated IV for destination
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// CompressionSpec defines compression parameters for streaming
							 | 
						|
								type CompressionSpec struct {
							 | 
						|
									IsCompressed       bool
							 | 
						|
									CompressionType    string
							 | 
						|
									NeedsDecompression bool
							 | 
						|
									NeedsCompression   bool
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// StreamingCopyManager handles streaming copy operations
							 | 
						|
								type StreamingCopyManager struct {
							 | 
						|
									s3a        *S3ApiServer
							 | 
						|
									bufferSize int
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewStreamingCopyManager creates a new streaming copy manager
							 | 
						|
								func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
							 | 
						|
									return &StreamingCopyManager{
							 | 
						|
										s3a:        s3a,
							 | 
						|
										bufferSize: 64 * 1024, // 64KB default buffer
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// ExecuteStreamingCopy performs a streaming copy operation
							 | 
						|
								func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
							 | 
						|
									// Create streaming copy specification
							 | 
						|
									spec, err := scm.createStreamingSpec(entry, r, state)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("create streaming spec: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create source reader from entry
							 | 
						|
									sourceReader, err := scm.createSourceReader(entry)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("create source reader: %w", err)
							 | 
						|
									}
							 | 
						|
									defer sourceReader.Close()
							 | 
						|
								
							 | 
						|
									spec.SourceReader = sourceReader
							 | 
						|
								
							 | 
						|
									// Create processing pipeline
							 | 
						|
									processedReader, err := scm.createProcessingPipeline(spec)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("create processing pipeline: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Stream to destination
							 | 
						|
									return scm.streamToDestination(ctx, processedReader, spec, dstPath)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createStreamingSpec creates a streaming specification based on copy parameters
							 | 
						|
								func (scm *StreamingCopyManager) createStreamingSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*StreamingCopySpec, error) {
							 | 
						|
									spec := &StreamingCopySpec{
							 | 
						|
										BufferSize:      scm.bufferSize,
							 | 
						|
										HashCalculation: true,
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Calculate target size
							 | 
						|
									sizeCalc := NewCopySizeCalculator(entry, r)
							 | 
						|
									spec.TargetSize = sizeCalc.CalculateTargetSize()
							 | 
						|
								
							 | 
						|
									// Create encryption specification
							 | 
						|
									encSpec, err := scm.createEncryptionSpec(entry, r, state)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
									spec.EncryptionSpec = encSpec
							 | 
						|
								
							 | 
						|
									// Create compression specification
							 | 
						|
									spec.CompressionSpec = scm.createCompressionSpec(entry, r)
							 | 
						|
								
							 | 
						|
									return spec, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createEncryptionSpec creates encryption specification for streaming
							 | 
						|
								func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*EncryptionSpec, error) {
							 | 
						|
									spec := &EncryptionSpec{
							 | 
						|
										NeedsDecryption: state.IsSourceEncrypted(),
							 | 
						|
										NeedsEncryption: state.IsTargetEncrypted(),
							 | 
						|
										SourceMetadata:  entry.Extended, // Pass source metadata for IV extraction
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Set source encryption details
							 | 
						|
									if state.SrcSSEC {
							 | 
						|
										spec.SourceType = EncryptionTypeSSEC
							 | 
						|
										sourceKey, err := ParseSSECCopySourceHeaders(r)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("parse SSE-C copy source headers: %w", err)
							 | 
						|
										}
							 | 
						|
										spec.SourceKey = sourceKey
							 | 
						|
									} else if state.SrcSSEKMS {
							 | 
						|
										spec.SourceType = EncryptionTypeSSEKMS
							 | 
						|
										// Extract SSE-KMS key from metadata
							 | 
						|
										if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
							 | 
						|
											sseKey, err := DeserializeSSEKMSMetadata(keyData)
							 | 
						|
											if err != nil {
							 | 
						|
												return nil, fmt.Errorf("deserialize SSE-KMS metadata: %w", err)
							 | 
						|
											}
							 | 
						|
											spec.SourceKey = sseKey
							 | 
						|
										}
							 | 
						|
									} else if state.SrcSSES3 {
							 | 
						|
										spec.SourceType = EncryptionTypeSSES3
							 | 
						|
										// Extract SSE-S3 key from metadata
							 | 
						|
										if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists {
							 | 
						|
											// TODO: This should use a proper SSE-S3 key manager from S3ApiServer
							 | 
						|
											// For now, create a temporary key manager to handle deserialization
							 | 
						|
											tempKeyManager := NewSSES3KeyManager()
							 | 
						|
											sseKey, err := DeserializeSSES3Metadata(keyData, tempKeyManager)
							 | 
						|
											if err != nil {
							 | 
						|
												return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err)
							 | 
						|
											}
							 | 
						|
											spec.SourceKey = sseKey
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Set destination encryption details
							 | 
						|
									if state.DstSSEC {
							 | 
						|
										spec.DestinationType = EncryptionTypeSSEC
							 | 
						|
										destKey, err := ParseSSECHeaders(r)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("parse SSE-C headers: %w", err)
							 | 
						|
										}
							 | 
						|
										spec.DestinationKey = destKey
							 | 
						|
									} else if state.DstSSEKMS {
							 | 
						|
										spec.DestinationType = EncryptionTypeSSEKMS
							 | 
						|
										// Parse KMS parameters
							 | 
						|
										keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("parse SSE-KMS copy headers: %w", err)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										// Create SSE-KMS key for destination
							 | 
						|
										sseKey := &SSEKMSKey{
							 | 
						|
											KeyID:             keyID,
							 | 
						|
											EncryptionContext: encryptionContext,
							 | 
						|
											BucketKeyEnabled:  bucketKeyEnabled,
							 | 
						|
										}
							 | 
						|
										spec.DestinationKey = sseKey
							 | 
						|
									} else if state.DstSSES3 {
							 | 
						|
										spec.DestinationType = EncryptionTypeSSES3
							 | 
						|
										// Generate or retrieve SSE-S3 key
							 | 
						|
										keyManager := GetSSES3KeyManager()
							 | 
						|
										sseKey, err := keyManager.GetOrCreateKey("")
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("get SSE-S3 key: %w", err)
							 | 
						|
										}
							 | 
						|
										spec.DestinationKey = sseKey
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return spec, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createCompressionSpec creates compression specification for streaming
							 | 
						|
								func (scm *StreamingCopyManager) createCompressionSpec(entry *filer_pb.Entry, r *http.Request) *CompressionSpec {
							 | 
						|
									return &CompressionSpec{
							 | 
						|
										IsCompressed: isCompressedEntry(entry),
							 | 
						|
										// For now, we don't change compression during copy
							 | 
						|
										NeedsDecompression: false,
							 | 
						|
										NeedsCompression:   false,
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createSourceReader creates a reader for the source entry
							 | 
						|
								func (scm *StreamingCopyManager) createSourceReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
							 | 
						|
									// Create a multi-chunk reader that streams from all chunks
							 | 
						|
									return scm.s3a.createMultiChunkReader(entry)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createProcessingPipeline creates a processing pipeline for the copy operation
							 | 
						|
								func (scm *StreamingCopyManager) createProcessingPipeline(spec *StreamingCopySpec) (io.Reader, error) {
							 | 
						|
									reader := spec.SourceReader
							 | 
						|
								
							 | 
						|
									// Add decryption if needed
							 | 
						|
									if spec.EncryptionSpec.NeedsDecryption {
							 | 
						|
										decryptedReader, err := scm.createDecryptionReader(reader, spec.EncryptionSpec)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("create decryption reader: %w", err)
							 | 
						|
										}
							 | 
						|
										reader = decryptedReader
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add decompression if needed
							 | 
						|
									if spec.CompressionSpec.NeedsDecompression {
							 | 
						|
										decompressedReader, err := scm.createDecompressionReader(reader, spec.CompressionSpec)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("create decompression reader: %w", err)
							 | 
						|
										}
							 | 
						|
										reader = decompressedReader
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add compression if needed
							 | 
						|
									if spec.CompressionSpec.NeedsCompression {
							 | 
						|
										compressedReader, err := scm.createCompressionReader(reader, spec.CompressionSpec)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("create compression reader: %w", err)
							 | 
						|
										}
							 | 
						|
										reader = compressedReader
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add encryption if needed
							 | 
						|
									if spec.EncryptionSpec.NeedsEncryption {
							 | 
						|
										encryptedReader, err := scm.createEncryptionReader(reader, spec.EncryptionSpec)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("create encryption reader: %w", err)
							 | 
						|
										}
							 | 
						|
										reader = encryptedReader
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Add hash calculation if needed
							 | 
						|
									if spec.HashCalculation {
							 | 
						|
										reader = scm.createHashReader(reader)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return reader, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createDecryptionReader creates a decryption reader based on encryption type
							 | 
						|
								func (scm *StreamingCopyManager) createDecryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
							 | 
						|
									switch encSpec.SourceType {
							 | 
						|
									case EncryptionTypeSSEC:
							 | 
						|
										if sourceKey, ok := encSpec.SourceKey.(*SSECustomerKey); ok {
							 | 
						|
											// Get IV from metadata
							 | 
						|
											iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
							 | 
						|
											if err != nil {
							 | 
						|
												return nil, fmt.Errorf("get IV from metadata: %w", err)
							 | 
						|
											}
							 | 
						|
											return CreateSSECDecryptedReader(reader, sourceKey, iv)
							 | 
						|
										}
							 | 
						|
										return nil, fmt.Errorf("invalid SSE-C source key type")
							 | 
						|
								
							 | 
						|
									case EncryptionTypeSSEKMS:
							 | 
						|
										if sseKey, ok := encSpec.SourceKey.(*SSEKMSKey); ok {
							 | 
						|
											return CreateSSEKMSDecryptedReader(reader, sseKey)
							 | 
						|
										}
							 | 
						|
										return nil, fmt.Errorf("invalid SSE-KMS source key type")
							 | 
						|
								
							 | 
						|
									case EncryptionTypeSSES3:
							 | 
						|
										if sseKey, ok := encSpec.SourceKey.(*SSES3Key); ok {
							 | 
						|
											// Get IV from metadata
							 | 
						|
											iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
							 | 
						|
											if err != nil {
							 | 
						|
												return nil, fmt.Errorf("get IV from metadata: %w", err)
							 | 
						|
											}
							 | 
						|
											return CreateSSES3DecryptedReader(reader, sseKey, iv)
							 | 
						|
										}
							 | 
						|
										return nil, fmt.Errorf("invalid SSE-S3 source key type")
							 | 
						|
								
							 | 
						|
									default:
							 | 
						|
										return reader, nil
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createEncryptionReader creates an encryption reader based on encryption type
							 | 
						|
								func (scm *StreamingCopyManager) createEncryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
							 | 
						|
									switch encSpec.DestinationType {
							 | 
						|
									case EncryptionTypeSSEC:
							 | 
						|
										if destKey, ok := encSpec.DestinationKey.(*SSECustomerKey); ok {
							 | 
						|
											encryptedReader, iv, err := CreateSSECEncryptedReader(reader, destKey)
							 | 
						|
											if err != nil {
							 | 
						|
												return nil, err
							 | 
						|
											}
							 | 
						|
											// Store IV in destination metadata (this would need to be handled by caller)
							 | 
						|
											encSpec.DestinationIV = iv
							 | 
						|
											return encryptedReader, nil
							 | 
						|
										}
							 | 
						|
										return nil, fmt.Errorf("invalid SSE-C destination key type")
							 | 
						|
								
							 | 
						|
									case EncryptionTypeSSEKMS:
							 | 
						|
										if sseKey, ok := encSpec.DestinationKey.(*SSEKMSKey); ok {
							 | 
						|
											encryptedReader, updatedKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(reader, sseKey.KeyID, sseKey.EncryptionContext, sseKey.BucketKeyEnabled)
							 | 
						|
											if err != nil {
							 | 
						|
												return nil, err
							 | 
						|
											}
							 | 
						|
											// Store IV from the updated key
							 | 
						|
											encSpec.DestinationIV = updatedKey.IV
							 | 
						|
											return encryptedReader, nil
							 | 
						|
										}
							 | 
						|
										return nil, fmt.Errorf("invalid SSE-KMS destination key type")
							 | 
						|
								
							 | 
						|
									case EncryptionTypeSSES3:
							 | 
						|
										if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
							 | 
						|
											encryptedReader, iv, err := CreateSSES3EncryptedReader(reader, sseKey)
							 | 
						|
											if err != nil {
							 | 
						|
												return nil, err
							 | 
						|
											}
							 | 
						|
											// Store IV for metadata
							 | 
						|
											encSpec.DestinationIV = iv
							 | 
						|
											return encryptedReader, nil
							 | 
						|
										}
							 | 
						|
										return nil, fmt.Errorf("invalid SSE-S3 destination key type")
							 | 
						|
								
							 | 
						|
									default:
							 | 
						|
										return reader, nil
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createDecompressionReader creates a decompression reader
							 | 
						|
								func (scm *StreamingCopyManager) createDecompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
							 | 
						|
									if !compSpec.NeedsDecompression {
							 | 
						|
										return reader, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									switch compSpec.CompressionType {
							 | 
						|
									case "gzip":
							 | 
						|
										// Use SeaweedFS's streaming gzip decompression
							 | 
						|
										pr, pw := io.Pipe()
							 | 
						|
										go func() {
							 | 
						|
											defer pw.Close()
							 | 
						|
											_, err := util.GunzipStream(pw, reader)
							 | 
						|
											if err != nil {
							 | 
						|
												pw.CloseWithError(fmt.Errorf("gzip decompression failed: %v", err))
							 | 
						|
											}
							 | 
						|
										}()
							 | 
						|
										return pr, nil
							 | 
						|
									default:
							 | 
						|
										// Unknown compression type, return as-is
							 | 
						|
										return reader, nil
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createCompressionReader creates a compression reader
							 | 
						|
								func (scm *StreamingCopyManager) createCompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
							 | 
						|
									if !compSpec.NeedsCompression {
							 | 
						|
										return reader, nil
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									switch compSpec.CompressionType {
							 | 
						|
									case "gzip":
							 | 
						|
										// Use SeaweedFS's streaming gzip compression
							 | 
						|
										pr, pw := io.Pipe()
							 | 
						|
										go func() {
							 | 
						|
											defer pw.Close()
							 | 
						|
											_, err := util.GzipStream(pw, reader)
							 | 
						|
											if err != nil {
							 | 
						|
												pw.CloseWithError(fmt.Errorf("gzip compression failed: %v", err))
							 | 
						|
											}
							 | 
						|
										}()
							 | 
						|
										return pr, nil
							 | 
						|
									default:
							 | 
						|
										// Unknown compression type, return as-is
							 | 
						|
										return reader, nil
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// HashReader wraps an io.Reader to calculate MD5 and SHA256 hashes
							 | 
						|
								type HashReader struct {
							 | 
						|
									reader     io.Reader
							 | 
						|
									md5Hash    hash.Hash
							 | 
						|
									sha256Hash hash.Hash
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// NewHashReader creates a new hash calculating reader
							 | 
						|
								func NewHashReader(reader io.Reader) *HashReader {
							 | 
						|
									return &HashReader{
							 | 
						|
										reader:     reader,
							 | 
						|
										md5Hash:    md5.New(),
							 | 
						|
										sha256Hash: sha256.New(),
							 | 
						|
									}
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// Read implements io.Reader and calculates hashes as data flows through
							 | 
						|
								func (hr *HashReader) Read(p []byte) (n int, err error) {
							 | 
						|
									n, err = hr.reader.Read(p)
							 | 
						|
									if n > 0 {
							 | 
						|
										// Update both hashes with the data read
							 | 
						|
										hr.md5Hash.Write(p[:n])
							 | 
						|
										hr.sha256Hash.Write(p[:n])
							 | 
						|
									}
							 | 
						|
									return n, err
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// MD5Sum returns the current MD5 hash
							 | 
						|
								func (hr *HashReader) MD5Sum() []byte {
							 | 
						|
									return hr.md5Hash.Sum(nil)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SHA256Sum returns the current SHA256 hash
							 | 
						|
								func (hr *HashReader) SHA256Sum() []byte {
							 | 
						|
									return hr.sha256Hash.Sum(nil)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// MD5Hex returns the MD5 hash as a hex string
							 | 
						|
								func (hr *HashReader) MD5Hex() string {
							 | 
						|
									return hex.EncodeToString(hr.MD5Sum())
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// SHA256Hex returns the SHA256 hash as a hex string
							 | 
						|
								func (hr *HashReader) SHA256Hex() string {
							 | 
						|
									return hex.EncodeToString(hr.SHA256Sum())
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createHashReader creates a hash calculation reader
							 | 
						|
								func (scm *StreamingCopyManager) createHashReader(reader io.Reader) io.Reader {
							 | 
						|
									return NewHashReader(reader)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// streamToDestination streams the processed data to the destination
							 | 
						|
								func (scm *StreamingCopyManager) streamToDestination(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
							 | 
						|
									// For now, we'll use the existing chunk-based approach
							 | 
						|
									// In a full implementation, this would stream directly to the destination
							 | 
						|
									// without creating intermediate chunks
							 | 
						|
								
							 | 
						|
									// This is a placeholder that converts back to chunk-based approach
							 | 
						|
									// A full streaming implementation would write directly to the destination
							 | 
						|
									return scm.streamToChunks(ctx, reader, spec, dstPath)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// streamToChunks converts streaming data back to chunks (temporary implementation)
							 | 
						|
								func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
							 | 
						|
									// This is a simplified implementation that reads the stream and creates chunks
							 | 
						|
									// A full implementation would be more sophisticated
							 | 
						|
								
							 | 
						|
									var chunks []*filer_pb.FileChunk
							 | 
						|
									buffer := make([]byte, spec.BufferSize)
							 | 
						|
									offset := int64(0)
							 | 
						|
								
							 | 
						|
									for {
							 | 
						|
										n, err := reader.Read(buffer)
							 | 
						|
										if n > 0 {
							 | 
						|
											// Create chunk for this data
							 | 
						|
											chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
							 | 
						|
											if chunkErr != nil {
							 | 
						|
												return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
							 | 
						|
											}
							 | 
						|
											chunks = append(chunks, chunk)
							 | 
						|
											offset += int64(n)
							 | 
						|
										}
							 | 
						|
								
							 | 
						|
										if err == io.EOF {
							 | 
						|
											break
							 | 
						|
										}
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("read stream: %w", err)
							 | 
						|
										}
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return chunks, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createChunkFromData creates a chunk from streaming data
							 | 
						|
								func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
							 | 
						|
									// Assign new volume
							 | 
						|
									assignResult, err := scm.s3a.assignNewVolume(dstPath)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("assign volume: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create chunk
							 | 
						|
									chunk := &filer_pb.FileChunk{
							 | 
						|
										Offset: offset,
							 | 
						|
										Size:   uint64(len(data)),
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Set file ID
							 | 
						|
									if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
							 | 
						|
										return nil, err
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Upload data
							 | 
						|
									if err := scm.s3a.uploadChunkData(data, assignResult); err != nil {
							 | 
						|
										return nil, fmt.Errorf("upload chunk data: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return chunk, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createMultiChunkReader creates a reader that streams from multiple chunks
							 | 
						|
								func (s3a *S3ApiServer) createMultiChunkReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
							 | 
						|
									// Create a multi-reader that combines all chunks
							 | 
						|
									var readers []io.Reader
							 | 
						|
								
							 | 
						|
									for _, chunk := range entry.GetChunks() {
							 | 
						|
										chunkReader, err := s3a.createChunkReader(chunk)
							 | 
						|
										if err != nil {
							 | 
						|
											return nil, fmt.Errorf("create chunk reader: %w", err)
							 | 
						|
										}
							 | 
						|
										readers = append(readers, chunkReader)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									multiReader := io.MultiReader(readers...)
							 | 
						|
									return &multiReadCloser{reader: multiReader}, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// createChunkReader creates a reader for a single chunk
							 | 
						|
								func (s3a *S3ApiServer) createChunkReader(chunk *filer_pb.FileChunk) (io.Reader, error) {
							 | 
						|
									// Get chunk URL
							 | 
						|
									srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("lookup volume URL: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Create HTTP request for chunk data
							 | 
						|
									req, err := http.NewRequest("GET", srcUrl, nil)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("create HTTP request: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									// Execute request
							 | 
						|
									resp, err := http.DefaultClient.Do(req)
							 | 
						|
									if err != nil {
							 | 
						|
										return nil, fmt.Errorf("execute HTTP request: %w", err)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									if resp.StatusCode != http.StatusOK {
							 | 
						|
										resp.Body.Close()
							 | 
						|
										return nil, fmt.Errorf("HTTP request failed: %d", resp.StatusCode)
							 | 
						|
									}
							 | 
						|
								
							 | 
						|
									return resp.Body, nil
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								// multiReadCloser wraps a multi-reader with a close method
							 | 
						|
								type multiReadCloser struct {
							 | 
						|
									reader io.Reader
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (mrc *multiReadCloser) Read(p []byte) (int, error) {
							 | 
						|
									return mrc.reader.Read(p)
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								func (mrc *multiReadCloser) Close() error {
							 | 
						|
									return nil
							 | 
						|
								}
							 |