diff --git a/test/s3/normal/s3_integration_test.go b/test/s3/normal/s3_integration_test.go index 925a954f8..fe40babd1 100644 --- a/test/s3/normal/s3_integration_test.go +++ b/test/s3/normal/s3_integration_test.go @@ -3,6 +3,8 @@ package example import ( "bytes" "context" + "crypto/md5" + "encoding/base64" "fmt" "math/rand" "net" @@ -15,6 +17,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" @@ -68,6 +71,22 @@ func TestS3Integration(t *testing.T) { testPutObject(t, cluster) }) + t.Run("UploadPart", func(t *testing.T) { + testPutPartWithChecksum(t, cluster) + }) + + t.Run("PutObjectWithChecksum", func(t *testing.T) { + testPutObjectWithChecksum(t, cluster) + }) + + t.Run("UploadPartWithChecksum", func(t *testing.T) { + testUploadPartWithChecksum(t, cluster) + }) + + t.Run("PutObjectWithChecksumAndSSEC", func(t *testing.T) { + testPutObjectWithChecksumAndSSEC(t, cluster) + }) + t.Run("GetObject", func(t *testing.T) { testGetObject(t, cluster) }) @@ -344,6 +363,282 @@ func testPutObject(t *testing.T, cluster *TestCluster) { t.Logf("✓ Put object: %s/%s (%d bytes)", bucketName, objectKey, len(objectData)) } +func createTestBucket(t *testing.T, cluster *TestCluster, prefix string) string { + bucketName := prefix + randomString(8) + _, err := cluster.s3Client.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + return bucketName +} + +// generateSSECKey returns a 32-byte key as a raw string (what the SDK expects +// for SSECustomerKey) and its base64-encoded MD5 (for SSECustomerKeyMD5). +func generateSSECKey() (keyRaw, keyMD5B64 string) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + key := make([]byte, 32) + for i := range key { + key[i] = byte(rng.Intn(256)) + } + keyRaw = string(key) + keyHash := md5.Sum(key) + keyMD5B64 = base64.StdEncoding.EncodeToString(keyHash[:]) + return +} + +func testPutObjectWithChecksum(t *testing.T, cluster *TestCluster) { + bucketName := createTestBucket(t, cluster, "test-put-checksum-") + objectKey := "test-checksummed-object.txt" + objectData := "Hello, SeaweedFS S3!" + + correctMD5 := calculateMd5(objectData) + incorrectMD5 := calculateMd5(objectData + "incorrect") + + // Put object with incorrect MD5 should be rejected + _, err := cluster.s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(incorrectMD5), + }) + assertBadDigestError(t, err, "PutObject should fail with incorrect MD5") + + t.Logf("✓ Put object with incorrect MD5 rejected: %s/%s", bucketName, objectKey) + + // Put object with correct MD5 should succeed + _, err = cluster.s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(correctMD5), + }) + require.NoError(t, err, "Failed to put object") + + // Verify object exists + headResp, err := cluster.s3Client.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + assert.NotNil(t, headResp.ContentLength) + assert.Equal(t, int64(len(objectData)), aws.Int64Value(headResp.ContentLength)) + + t.Logf("✓ Put object with correct MD5: %s/%s (%d bytes)", bucketName, objectKey, len(objectData)) +} + +// putObjectSSEC sends a PutObject request with SSE-C headers over HTTP. +// The AWS SDK v1 refuses to send SSE-C keys over plain HTTP, so we use the +// low-level Request API and clear the Validate handlers to bypass that check. +// We use Clear() because the specific validator is internal and not easily removable by name. +func putObjectSSEC(client *s3.S3, input *s3.PutObjectInput) (*s3.PutObjectOutput, error) { + req, output := client.PutObjectRequest(input) + req.Handlers.Validate.Clear() + err := req.Send() + return output, err +} + +func headObjectSSEC(client *s3.S3, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + req, output := client.HeadObjectRequest(input) + req.Handlers.Validate.Clear() + err := req.Send() + return output, err +} + +func testPutObjectWithChecksumAndSSEC(t *testing.T, cluster *TestCluster) { + bucketName := createTestBucket(t, cluster, "test-put-checksum-ssec-") + objectKey := "test-checksummed-ssec-object.txt" + objectData := "Hello, SeaweedFS S3 with SSE-C!" + + correctMD5 := calculateMd5(objectData) + incorrectMD5 := calculateMd5(objectData + "incorrect") + keyRaw, keyMD5B64 := generateSSECKey() + + // Put object with SSE-C and incorrect MD5 should be rejected + _, err := putObjectSSEC(cluster.s3Client, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(incorrectMD5), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(keyRaw), + SSECustomerKeyMD5: aws.String(keyMD5B64), + }) + assertBadDigestError(t, err, "PutObject with SSE-C should fail with incorrect MD5") + + t.Logf("Put object with SSE-C and incorrect MD5 rejected: %s/%s", bucketName, objectKey) + + // Put object with SSE-C and correct MD5 should succeed + _, err = putObjectSSEC(cluster.s3Client, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(correctMD5), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(keyRaw), + SSECustomerKeyMD5: aws.String(keyMD5B64), + }) + require.NoError(t, err, "Failed to put object with SSE-C and correct MD5") + + // Verify object exists (SSE-C requires the key for HeadObject too) + headResp, err := headObjectSSEC(cluster.s3Client, &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(keyRaw), + SSECustomerKeyMD5: aws.String(keyMD5B64), + }) + require.NoError(t, err) + assert.NotNil(t, headResp.ContentLength) + assert.Equal(t, int64(len(objectData)), aws.Int64Value(headResp.ContentLength)) + + t.Logf("Put object with SSE-C and correct MD5: %s/%s (%d bytes)", bucketName, objectKey, len(objectData)) +} + +func testUploadPartWithChecksum(t *testing.T, cluster *TestCluster) { + bucketName := createTestBucket(t, cluster, "test-upload-part-checksum-") + objectKey := "test-multipart-checksum.txt" + objectData := "Hello, SeaweedFS S3 Multipart!" + + // Initiate multipart upload + initResp, err := cluster.s3Client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + uploadID := initResp.UploadId + + correctMD5 := calculateMd5(objectData) + incorrectMD5 := calculateMd5(objectData + "incorrect") + + // Upload part with incorrect MD5 + _, err = cluster.s3Client.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + PartNumber: aws.Int64(1), + UploadId: uploadID, + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(incorrectMD5), + }) + assertBadDigestError(t, err, "UploadPart should fail with incorrect MD5") + + // Upload part with correct MD5 + partResp, err := cluster.s3Client.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + PartNumber: aws.Int64(1), + UploadId: uploadID, + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(correctMD5), + }) + require.NoError(t, err, "Failed to upload part with correct MD5") + + // Complete multipart upload + _, err = cluster.s3Client.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadID, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: []*s3.CompletedPart{ + { + ETag: partResp.ETag, + PartNumber: aws.Int64(1), + }, + }, + }, + }) + require.NoError(t, err, "Failed to complete multipart upload") + + // Verify object exists + headResp, err := cluster.s3Client.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + assert.Equal(t, int64(len(objectData)), aws.Int64Value(headResp.ContentLength)) + + t.Logf("✓ Multipart upload with checksum successful: %s/%s", bucketName, objectKey) +} + +func testPutPartWithChecksum(t *testing.T, cluster *TestCluster) { + bucketName := createTestBucket(t, cluster, "test-put-checksum-") + objectKey := "test-checksummed-part.txt" + + partData := "Hello, SeaweedFS S3!" + + correctMD5 := calculateMd5(partData) + incorrectMD5 := calculateMd5(partData + "incorrect") + + createResp, err := cluster.s3Client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + + uploadID := createResp.UploadId + + partBody := []byte(partData) + + _, err = cluster.s3Client.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadID, + PartNumber: aws.Int64(1), + Body: bytes.NewReader(partBody), + ContentMD5: aws.String(incorrectMD5), + }) + assertBadDigestError(t, err, "UploadPart should fail with incorrect MD5") + + uploadResp, err := cluster.s3Client.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadID, + PartNumber: aws.Int64(1), + Body: bytes.NewReader(partBody), + ContentMD5: aws.String(correctMD5), + }) + require.NoError(t, err) + + _, err = cluster.s3Client.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadID, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: []*s3.CompletedPart{ + { + ETag: uploadResp.ETag, + PartNumber: aws.Int64(1), + }, + }, + }, + }) + require.NoError(t, err, "Failed to complete multipart upload") + + // Verify object exists + headResp, err := cluster.s3Client.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + assert.Equal(t, int64(len(partData)), aws.Int64Value(headResp.ContentLength)) + + t.Logf("✓ UploadPart with MD5 validation: %s/%s", bucketName, objectKey) +} + +func calculateMd5(objectData string) string { + dataBytes := []byte(objectData) + hash := md5.Sum(dataBytes) + return base64.StdEncoding.EncodeToString(hash[:]) +} + +func assertBadDigestError(t *testing.T, err error, description string) { + require.Error(t, err, description) + + var awsErr awserr.Error + require.ErrorAs(t, err, &awsErr) + assert.Equal(t, "BadDigest", awsErr.Code()) +} + func testGetObject(t *testing.T, cluster *TestCluster) { bucketName := "test-get-" + randomString(8) objectKey := "test-data.txt" diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index b1190a1b4..52dc00817 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -316,7 +316,11 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re // PutObjectPartHandler - Put an object part in a multipart upload. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { bucket, object := s3_constants.GetBucketAndObject(r) - + _, err := validateContentMd5(r.Header) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest) + return + } // Check if bucket exists before putting object part if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) @@ -326,7 +330,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ uploadID := r.URL.Query().Get("uploadId") // validateTableBucketObjectPath is enforced at multipart initiation. checkUploadId // cryptographically binds uploadID to object path, so parts cannot switch paths. - err := s3a.checkUploadId(object, uploadID) + err = s3a.checkUploadId(object, uploadID) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) return diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 7903bd6a1..7ac8b04e2 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -1,7 +1,9 @@ package s3api import ( + "bytes" "context" + "crypto/md5" "encoding/base64" "encoding/json" "errors" @@ -72,21 +74,19 @@ type SSEResponseMetadata struct { } func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { - // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html bucket, object := s3_constants.GetBucketAndObject(r) - glog.V(2).Infof("PutObjectHandler bucket=%s object=%s size=%d", bucket, object, r.ContentLength) - if err := s3a.validateTableBucketObjectPath(bucket, object); err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) - return - } - _, err := validateContentMd5(r.Header) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest) return } + glog.V(2).Infof("PutObjectHandler bucket=%s object=%s size=%d", bucket, object, r.ContentLength) + if err := s3a.validateTableBucketObjectPath(bucket, object); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return + } // Check conditional headers if errCode := s3a.checkConditionalHeaders(r, bucket, object); errCode != s3err.ErrNone { @@ -288,11 +288,13 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy // This eliminates the filer proxy overhead for PUT operations // Note: filePath is now passed directly instead of URL (no parsing needed) - // For SSE, encrypt with offset=0 for all parts // Each part is encrypted independently, then decrypted using metadata during GET partOffset := int64(0) + plaintextHash := md5.New() + dataReader = io.TeeReader(dataReader, plaintextHash) + // Handle all SSE encryption types in a unified manner sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset) if sseErrorCode != s3err.ErrNone { @@ -426,8 +428,21 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader } // Step 3: Calculate MD5 hash and add SSE metadata to chunks - md5Sum := chunkResult.Md5Hash.Sum(nil) - + md5Sum := plaintextHash.Sum(nil) + contentMd5 := r.Header.Get("Content-Md5") + if contentMd5 != "" { + expectedMd5, err := base64.StdEncoding.DecodeString(contentMd5) + if err != nil { + glog.Errorf("putToFiler: Invalid Content-Md5 header: %v, attempting to cleanup %d orphaned chunks", err, len(chunkResult.FileChunks)) + s3a.deleteOrphanedChunks(chunkResult.FileChunks) + return "", s3err.ErrInvalidDigest, SSEResponseMetadata{} + } + if !bytes.Equal(md5Sum, expectedMd5) { + glog.Warningf("putToFiler: Checksum verification failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks)) + s3a.deleteOrphanedChunks(chunkResult.FileChunks) + return "", s3err.ErrBadDigest, SSEResponseMetadata{} + } + } glog.V(4).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d", filePath, len(chunkResult.FileChunks), chunkResult.TotalSize)