From 53048ffffbde3ac60eb9094541e30da8167c8476 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Thu, 19 Feb 2026 00:40:08 +0100 Subject: [PATCH] Add md5 checksum validation support on PutObject and UploadPart (#8367) * Add md5 checksum validation support on PutObject and UploadPart Per the S3 specification, when a client sends a Content-MD5 header, the server must compare it against the MD5 of the received body and return BadDigest (HTTP 400) if they don't match. SeaweedFS was silently accepting objects with incorrect Content-MD5 headers, which breaks data integrity verification for clients that rely on this feature (e.g. boto3). The error infrastructure (ErrBadDigest, ErrMsgBadDigest) already existed from PR #7306 but was never wired to an actual check. This commit adds MD5 verification in putToFiler after the body is streamed and the MD5 is computed, and adds Content-MD5 header validation to PutObjectPartHandler (matching PutObjectHandler). Orphaned chunks are cleaned up on mismatch. Refs: https://github.com/seaweedfs/seaweedfs/discussions/3908 * handle SSE, add uploadpart test * s3 integration test: fix typo and add multipart upload checksum test * s3api: move validateContentMd5 after GetBucketAndObject in PutObjectPartHandler * s3api: move validateContentMd5 after GetBucketAndObject in PutObjectHandler * s3api: fix MD5 validation for SSE uploads and logging in putToFiler * add SSE test with checksum validation - mostly ai-generated * Update s3_integration_test.go * Address S3 integration test feedback: fix typos, rename variables, add verification steps, and clean up comments. --------- Co-authored-by: Chris Lu --- test/s3/normal/s3_integration_test.go | 295 ++++++++++++++++++ weed/s3api/s3api_object_handlers_multipart.go | 8 +- weed/s3api/s3api_object_handlers_put.go | 35 ++- 3 files changed, 326 insertions(+), 12 deletions(-) diff --git a/test/s3/normal/s3_integration_test.go b/test/s3/normal/s3_integration_test.go index 925a954f8..fe40babd1 100644 --- a/test/s3/normal/s3_integration_test.go +++ b/test/s3/normal/s3_integration_test.go @@ -3,6 +3,8 @@ package example import ( "bytes" "context" + "crypto/md5" + "encoding/base64" "fmt" "math/rand" "net" @@ -15,6 +17,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" @@ -68,6 +71,22 @@ func TestS3Integration(t *testing.T) { testPutObject(t, cluster) }) + t.Run("UploadPart", func(t *testing.T) { + testPutPartWithChecksum(t, cluster) + }) + + t.Run("PutObjectWithChecksum", func(t *testing.T) { + testPutObjectWithChecksum(t, cluster) + }) + + t.Run("UploadPartWithChecksum", func(t *testing.T) { + testUploadPartWithChecksum(t, cluster) + }) + + t.Run("PutObjectWithChecksumAndSSEC", func(t *testing.T) { + testPutObjectWithChecksumAndSSEC(t, cluster) + }) + t.Run("GetObject", func(t *testing.T) { testGetObject(t, cluster) }) @@ -344,6 +363,282 @@ func testPutObject(t *testing.T, cluster *TestCluster) { t.Logf("✓ Put object: %s/%s (%d bytes)", bucketName, objectKey, len(objectData)) } +func createTestBucket(t *testing.T, cluster *TestCluster, prefix string) string { + bucketName := prefix + randomString(8) + _, err := cluster.s3Client.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + return bucketName +} + +// generateSSECKey returns a 32-byte key as a raw string (what the SDK expects +// for SSECustomerKey) and its base64-encoded MD5 (for SSECustomerKeyMD5). +func generateSSECKey() (keyRaw, keyMD5B64 string) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + key := make([]byte, 32) + for i := range key { + key[i] = byte(rng.Intn(256)) + } + keyRaw = string(key) + keyHash := md5.Sum(key) + keyMD5B64 = base64.StdEncoding.EncodeToString(keyHash[:]) + return +} + +func testPutObjectWithChecksum(t *testing.T, cluster *TestCluster) { + bucketName := createTestBucket(t, cluster, "test-put-checksum-") + objectKey := "test-checksummed-object.txt" + objectData := "Hello, SeaweedFS S3!" + + correctMD5 := calculateMd5(objectData) + incorrectMD5 := calculateMd5(objectData + "incorrect") + + // Put object with incorrect MD5 should be rejected + _, err := cluster.s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(incorrectMD5), + }) + assertBadDigestError(t, err, "PutObject should fail with incorrect MD5") + + t.Logf("✓ Put object with incorrect MD5 rejected: %s/%s", bucketName, objectKey) + + // Put object with correct MD5 should succeed + _, err = cluster.s3Client.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(correctMD5), + }) + require.NoError(t, err, "Failed to put object") + + // Verify object exists + headResp, err := cluster.s3Client.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + assert.NotNil(t, headResp.ContentLength) + assert.Equal(t, int64(len(objectData)), aws.Int64Value(headResp.ContentLength)) + + t.Logf("✓ Put object with correct MD5: %s/%s (%d bytes)", bucketName, objectKey, len(objectData)) +} + +// putObjectSSEC sends a PutObject request with SSE-C headers over HTTP. +// The AWS SDK v1 refuses to send SSE-C keys over plain HTTP, so we use the +// low-level Request API and clear the Validate handlers to bypass that check. +// We use Clear() because the specific validator is internal and not easily removable by name. +func putObjectSSEC(client *s3.S3, input *s3.PutObjectInput) (*s3.PutObjectOutput, error) { + req, output := client.PutObjectRequest(input) + req.Handlers.Validate.Clear() + err := req.Send() + return output, err +} + +func headObjectSSEC(client *s3.S3, input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + req, output := client.HeadObjectRequest(input) + req.Handlers.Validate.Clear() + err := req.Send() + return output, err +} + +func testPutObjectWithChecksumAndSSEC(t *testing.T, cluster *TestCluster) { + bucketName := createTestBucket(t, cluster, "test-put-checksum-ssec-") + objectKey := "test-checksummed-ssec-object.txt" + objectData := "Hello, SeaweedFS S3 with SSE-C!" + + correctMD5 := calculateMd5(objectData) + incorrectMD5 := calculateMd5(objectData + "incorrect") + keyRaw, keyMD5B64 := generateSSECKey() + + // Put object with SSE-C and incorrect MD5 should be rejected + _, err := putObjectSSEC(cluster.s3Client, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(incorrectMD5), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(keyRaw), + SSECustomerKeyMD5: aws.String(keyMD5B64), + }) + assertBadDigestError(t, err, "PutObject with SSE-C should fail with incorrect MD5") + + t.Logf("Put object with SSE-C and incorrect MD5 rejected: %s/%s", bucketName, objectKey) + + // Put object with SSE-C and correct MD5 should succeed + _, err = putObjectSSEC(cluster.s3Client, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(correctMD5), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(keyRaw), + SSECustomerKeyMD5: aws.String(keyMD5B64), + }) + require.NoError(t, err, "Failed to put object with SSE-C and correct MD5") + + // Verify object exists (SSE-C requires the key for HeadObject too) + headResp, err := headObjectSSEC(cluster.s3Client, &s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(keyRaw), + SSECustomerKeyMD5: aws.String(keyMD5B64), + }) + require.NoError(t, err) + assert.NotNil(t, headResp.ContentLength) + assert.Equal(t, int64(len(objectData)), aws.Int64Value(headResp.ContentLength)) + + t.Logf("Put object with SSE-C and correct MD5: %s/%s (%d bytes)", bucketName, objectKey, len(objectData)) +} + +func testUploadPartWithChecksum(t *testing.T, cluster *TestCluster) { + bucketName := createTestBucket(t, cluster, "test-upload-part-checksum-") + objectKey := "test-multipart-checksum.txt" + objectData := "Hello, SeaweedFS S3 Multipart!" + + // Initiate multipart upload + initResp, err := cluster.s3Client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + uploadID := initResp.UploadId + + correctMD5 := calculateMd5(objectData) + incorrectMD5 := calculateMd5(objectData + "incorrect") + + // Upload part with incorrect MD5 + _, err = cluster.s3Client.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + PartNumber: aws.Int64(1), + UploadId: uploadID, + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(incorrectMD5), + }) + assertBadDigestError(t, err, "UploadPart should fail with incorrect MD5") + + // Upload part with correct MD5 + partResp, err := cluster.s3Client.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + PartNumber: aws.Int64(1), + UploadId: uploadID, + Body: bytes.NewReader([]byte(objectData)), + ContentMD5: aws.String(correctMD5), + }) + require.NoError(t, err, "Failed to upload part with correct MD5") + + // Complete multipart upload + _, err = cluster.s3Client.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadID, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: []*s3.CompletedPart{ + { + ETag: partResp.ETag, + PartNumber: aws.Int64(1), + }, + }, + }, + }) + require.NoError(t, err, "Failed to complete multipart upload") + + // Verify object exists + headResp, err := cluster.s3Client.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + assert.Equal(t, int64(len(objectData)), aws.Int64Value(headResp.ContentLength)) + + t.Logf("✓ Multipart upload with checksum successful: %s/%s", bucketName, objectKey) +} + +func testPutPartWithChecksum(t *testing.T, cluster *TestCluster) { + bucketName := createTestBucket(t, cluster, "test-put-checksum-") + objectKey := "test-checksummed-part.txt" + + partData := "Hello, SeaweedFS S3!" + + correctMD5 := calculateMd5(partData) + incorrectMD5 := calculateMd5(partData + "incorrect") + + createResp, err := cluster.s3Client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + + uploadID := createResp.UploadId + + partBody := []byte(partData) + + _, err = cluster.s3Client.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadID, + PartNumber: aws.Int64(1), + Body: bytes.NewReader(partBody), + ContentMD5: aws.String(incorrectMD5), + }) + assertBadDigestError(t, err, "UploadPart should fail with incorrect MD5") + + uploadResp, err := cluster.s3Client.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadID, + PartNumber: aws.Int64(1), + Body: bytes.NewReader(partBody), + ContentMD5: aws.String(correctMD5), + }) + require.NoError(t, err) + + _, err = cluster.s3Client.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + UploadId: uploadID, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: []*s3.CompletedPart{ + { + ETag: uploadResp.ETag, + PartNumber: aws.Int64(1), + }, + }, + }, + }) + require.NoError(t, err, "Failed to complete multipart upload") + + // Verify object exists + headResp, err := cluster.s3Client.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(objectKey), + }) + require.NoError(t, err) + assert.Equal(t, int64(len(partData)), aws.Int64Value(headResp.ContentLength)) + + t.Logf("✓ UploadPart with MD5 validation: %s/%s", bucketName, objectKey) +} + +func calculateMd5(objectData string) string { + dataBytes := []byte(objectData) + hash := md5.Sum(dataBytes) + return base64.StdEncoding.EncodeToString(hash[:]) +} + +func assertBadDigestError(t *testing.T, err error, description string) { + require.Error(t, err, description) + + var awsErr awserr.Error + require.ErrorAs(t, err, &awsErr) + assert.Equal(t, "BadDigest", awsErr.Code()) +} + func testGetObject(t *testing.T, cluster *TestCluster) { bucketName := "test-get-" + randomString(8) objectKey := "test-data.txt" diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index b1190a1b4..52dc00817 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -316,7 +316,11 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re // PutObjectPartHandler - Put an object part in a multipart upload. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) { bucket, object := s3_constants.GetBucketAndObject(r) - + _, err := validateContentMd5(r.Header) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest) + return + } // Check if bucket exists before putting object part if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { s3err.WriteErrorResponse(w, r, err) @@ -326,7 +330,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ uploadID := r.URL.Query().Get("uploadId") // validateTableBucketObjectPath is enforced at multipart initiation. checkUploadId // cryptographically binds uploadID to object path, so parts cannot switch paths. - err := s3a.checkUploadId(object, uploadID) + err = s3a.checkUploadId(object, uploadID) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) return diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 7903bd6a1..7ac8b04e2 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -1,7 +1,9 @@ package s3api import ( + "bytes" "context" + "crypto/md5" "encoding/base64" "encoding/json" "errors" @@ -72,21 +74,19 @@ type SSEResponseMetadata struct { } func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { - // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html bucket, object := s3_constants.GetBucketAndObject(r) - glog.V(2).Infof("PutObjectHandler bucket=%s object=%s size=%d", bucket, object, r.ContentLength) - if err := s3a.validateTableBucketObjectPath(bucket, object); err != nil { - s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) - return - } - _, err := validateContentMd5(r.Header) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest) return } + glog.V(2).Infof("PutObjectHandler bucket=%s object=%s size=%d", bucket, object, r.ContentLength) + if err := s3a.validateTableBucketObjectPath(bucket, object); err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return + } // Check conditional headers if errCode := s3a.checkConditionalHeaders(r, bucket, object); errCode != s3err.ErrNone { @@ -288,11 +288,13 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy // This eliminates the filer proxy overhead for PUT operations // Note: filePath is now passed directly instead of URL (no parsing needed) - // For SSE, encrypt with offset=0 for all parts // Each part is encrypted independently, then decrypted using metadata during GET partOffset := int64(0) + plaintextHash := md5.New() + dataReader = io.TeeReader(dataReader, plaintextHash) + // Handle all SSE encryption types in a unified manner sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset) if sseErrorCode != s3err.ErrNone { @@ -426,8 +428,21 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader } // Step 3: Calculate MD5 hash and add SSE metadata to chunks - md5Sum := chunkResult.Md5Hash.Sum(nil) - + md5Sum := plaintextHash.Sum(nil) + contentMd5 := r.Header.Get("Content-Md5") + if contentMd5 != "" { + expectedMd5, err := base64.StdEncoding.DecodeString(contentMd5) + if err != nil { + glog.Errorf("putToFiler: Invalid Content-Md5 header: %v, attempting to cleanup %d orphaned chunks", err, len(chunkResult.FileChunks)) + s3a.deleteOrphanedChunks(chunkResult.FileChunks) + return "", s3err.ErrInvalidDigest, SSEResponseMetadata{} + } + if !bytes.Equal(md5Sum, expectedMd5) { + glog.Warningf("putToFiler: Checksum verification failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks)) + s3a.deleteOrphanedChunks(chunkResult.FileChunks) + return "", s3err.ErrBadDigest, SSEResponseMetadata{} + } + } glog.V(4).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d", filePath, len(chunkResult.FileChunks), chunkResult.TotalSize)