From 8b12c4f301a70fdc41adef2a2543d57d13bc99c1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 26 Dec 2025 17:49:30 -0800 Subject: [PATCH] Add S3 volume encryption support with -s3.encryptVolumeData flag This change adds volume-level encryption support for S3 uploads, similar to the existing -filer.encryptVolumeData option. Each chunk is encrypted with its own auto-generated CipherKey when the flag is enabled. Changes: - Add -s3.encryptVolumeData flag to weed s3, weed server, and weed mini - Wire Cipher option through S3ApiServer and ChunkedUploadOption - Add integration tests for multi-chunk range reads with encryption - Tests verify encryption works across chunk boundaries Usage: weed s3 -encryptVolumeData weed server -s3 -s3.encryptVolumeData weed mini -s3.encryptVolumeData Integration tests: go test -v -tags=integration -timeout 5m ./test/s3/sse/... --- test/s3/sse/s3_volume_encryption_test.go | 445 +++++++++++++++++++++++ weed/command/mini.go | 1 + weed/command/s3.go | 3 + weed/command/server.go | 1 + weed/operation/upload_chunked.go | 3 +- weed/s3api/s3api_object_handlers_put.go | 1 + weed/s3api/s3api_server.go | 5 +- 7 files changed, 457 insertions(+), 2 deletions(-) create mode 100644 test/s3/sse/s3_volume_encryption_test.go diff --git a/test/s3/sse/s3_volume_encryption_test.go b/test/s3/sse/s3_volume_encryption_test.go new file mode 100644 index 000000000..182783797 --- /dev/null +++ b/test/s3/sse/s3_volume_encryption_test.go @@ -0,0 +1,445 @@ +//go:build integration +// +build integration + +package sse + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" +) + +// TestS3VolumeEncryptionRoundtrip tests that data uploaded with encryptVolumeData +// enabled can be read back correctly. This requires the S3 server to be started with +// -encryptVolumeData=true for the test to verify encryption is working. +// +// To run this test: +// 1. Start SeaweedFS: weed server -s3 -s3.encryptVolumeData=true +// 2. Run: go test -v -run TestS3VolumeEncryptionRoundtrip +func TestS3VolumeEncryptionRoundtrip(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-test-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Skipf("Skipping test - could not create bucket (server may not be running): %v", err) + } + defer cleanupBucket(t, svc, bucket) + + testCases := []struct { + name string + key string + content string + rangeReq string // Optional range request + }{ + { + name: "small file", + key: "small.txt", + content: "Hello, encrypted world!", + }, + { + name: "medium file", + key: "medium.txt", + content: strings.Repeat("SeaweedFS volume encryption test content. ", 1000), + }, + { + name: "binary content", + key: "binary.bin", + content: string([]byte{0x00, 0x01, 0x02, 0xFF, 0xFE, 0xFD, 0x00, 0x80}), + }, + { + name: "range request", + key: "range-test.txt", + content: "0123456789ABCDEFGHIJ", + rangeReq: "bytes=5-10", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Upload + _, err := svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(tc.key), + Body: strings.NewReader(tc.content), + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + // Download + getInput := &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(tc.key), + } + if tc.rangeReq != "" { + getInput.Range = aws.String(tc.rangeReq) + } + + result, err := svc.GetObject(getInput) + if err != nil { + t.Fatalf("GetObject failed: %v", err) + } + defer result.Body.Close() + + data, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read response body: %v", err) + } + + // Verify content + expected := tc.content + if tc.rangeReq != "" { + // For "bytes=5-10", we expect characters at positions 5-10 (inclusive) + expected = tc.content[5:11] + } + + if string(data) != expected { + t.Errorf("Content mismatch:\n expected: %q\n got: %q", expected, string(data)) + } else { + t.Logf("Successfully uploaded and downloaded %s (%d bytes)", tc.key, len(data)) + } + }) + } +} + +// TestS3VolumeEncryptionMultiChunk tests large files that span multiple chunks +// to ensure encryption works correctly across chunk boundaries. +func TestS3VolumeEncryptionMultiChunk(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-multichunk-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Skipf("Skipping test - could not create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + // Create a file larger than default chunk size (8MB) + // Use 10MB to ensure multiple chunks + largeContent := make([]byte, 10*1024*1024) + for i := range largeContent { + largeContent[i] = byte(i % 256) + } + + key := "large-file.bin" + + // Upload + _, err = svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(largeContent), + }) + if err != nil { + t.Fatalf("PutObject failed for large file: %v", err) + } + + // Download full file + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + t.Fatalf("GetObject failed: %v", err) + } + defer result.Body.Close() + + downloadedData, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read response body: %v", err) + } + + if len(downloadedData) != len(largeContent) { + t.Errorf("Size mismatch: expected %d, got %d", len(largeContent), len(downloadedData)) + } + + if !bytes.Equal(downloadedData, largeContent) { + t.Errorf("Content mismatch in multi-chunk file") + // Find first mismatch + for i := 0; i < len(downloadedData) && i < len(largeContent); i++ { + if downloadedData[i] != largeContent[i] { + t.Errorf("First mismatch at byte %d: expected %02x, got %02x", i, largeContent[i], downloadedData[i]) + break + } + } + } else { + t.Logf("Successfully uploaded and downloaded %d byte multi-chunk file", len(downloadedData)) + } + + // Test range request spanning chunk boundary (around 8MB) + rangeStart := int64(8*1024*1024 - 1000) + rangeEnd := int64(8*1024*1024 + 1000) + rangeResult, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd)), + }) + if err != nil { + t.Fatalf("Range GetObject failed: %v", err) + } + defer rangeResult.Body.Close() + + rangeData, err := io.ReadAll(rangeResult.Body) + if err != nil { + t.Fatalf("Failed to read range response: %v", err) + } + + expectedRange := largeContent[rangeStart : rangeEnd+1] + if !bytes.Equal(rangeData, expectedRange) { + t.Errorf("Range request content mismatch at chunk boundary") + } else { + t.Logf("Successfully retrieved range spanning chunk boundary (%d bytes)", len(rangeData)) + } +} + +// TestS3VolumeEncryptionMultiChunkRangeRead tests range reads that span multiple chunks: +// - Part of chunk 1 +// - Whole chunk 2 +// - Part of chunk 3 +// This is critical for verifying that cipher decryption works correctly when +// reading across chunk boundaries with the cipherKey from each chunk. +func TestS3VolumeEncryptionMultiChunkRangeRead(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-multirange-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Skipf("Skipping test - could not create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + // Default chunk size is 8MB. Create a file with 3+ chunks (25MB) + // to ensure we have multiple complete chunks + const chunkSize = 8 * 1024 * 1024 // 8MB + const fileSize = 25 * 1024 * 1024 // 25MB = 3 chunks + partial 4th + + largeContent := make([]byte, fileSize) + for i := range largeContent { + // Use recognizable pattern: each byte encodes its position + largeContent[i] = byte(i % 256) + } + + key := "multi-chunk-range-test.bin" + + // Upload + _, err = svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(largeContent), + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + t.Logf("Uploaded %d byte file (%d chunks of %d bytes)", fileSize, (fileSize+chunkSize-1)/chunkSize, chunkSize) + + // Test cases for range reads spanning multiple chunks + testCases := []struct { + name string + rangeStart int64 + rangeEnd int64 + desc string + }{ + { + name: "part_chunk1_whole_chunk2_part_chunk3", + rangeStart: chunkSize - 1000, // 1000 bytes before end of chunk 1 + rangeEnd: 2*chunkSize + 1000, // 1000 bytes into chunk 3 + desc: "Spans from end of chunk 1 through entire chunk 2 into beginning of chunk 3", + }, + { + name: "last_byte_chunk1_through_first_byte_chunk3", + rangeStart: chunkSize - 1, // Last byte of chunk 1 + rangeEnd: 2 * chunkSize, // First byte of chunk 3 + desc: "Minimal span: last byte of chunk 1, entire chunk 2, first byte of chunk 3", + }, + { + name: "middle_chunk1_to_middle_chunk3", + rangeStart: chunkSize / 2, // Middle of chunk 1 + rangeEnd: 2*chunkSize + chunkSize/2, // Middle of chunk 3 + desc: "From middle of chunk 1 to middle of chunk 3", + }, + { + name: "half_chunk1_whole_chunk2_half_chunk3", + rangeStart: chunkSize / 2, // Half of chunk 1 + rangeEnd: 2*chunkSize + chunkSize/2 - 1, // Half of chunk 3 + desc: "Half of each boundary chunk, whole middle chunk", + }, + { + name: "single_byte_each_boundary", + rangeStart: chunkSize - 1, // 1 byte from chunk 1 + rangeEnd: 2 * chunkSize, // 1 byte from chunk 3 + desc: "1 byte from chunk 1, entire chunk 2, 1 byte from chunk 3", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Test: %s", tc.desc) + t.Logf("Range: bytes=%d-%d (spanning bytes at offsets across chunk boundaries)", tc.rangeStart, tc.rangeEnd) + + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", tc.rangeStart, tc.rangeEnd)), + }) + if err != nil { + t.Fatalf("Range GetObject failed: %v", err) + } + defer result.Body.Close() + + rangeData, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read range response: %v", err) + } + + expectedLen := tc.rangeEnd - tc.rangeStart + 1 + if int64(len(rangeData)) != expectedLen { + t.Errorf("Size mismatch: expected %d bytes, got %d", expectedLen, len(rangeData)) + } + + expectedRange := largeContent[tc.rangeStart : tc.rangeEnd+1] + if !bytes.Equal(rangeData, expectedRange) { + t.Errorf("Content mismatch in multi-chunk range read") + // Find first mismatch for debugging + for i := 0; i < len(rangeData) && i < len(expectedRange); i++ { + if rangeData[i] != expectedRange[i] { + globalOffset := tc.rangeStart + int64(i) + chunkNum := globalOffset / chunkSize + offsetInChunk := globalOffset % chunkSize + t.Errorf("First mismatch at byte %d (chunk %d, offset %d in chunk): expected %02x, got %02x", + i, chunkNum, offsetInChunk, expectedRange[i], rangeData[i]) + break + } + } + } else { + t.Logf("✓ Successfully read %d bytes spanning chunks (offsets %d-%d)", len(rangeData), tc.rangeStart, tc.rangeEnd) + } + }) + } +} + +// TestS3VolumeEncryptionCopy tests that copying encrypted objects works correctly. +func TestS3VolumeEncryptionCopy(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-copy-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Skipf("Skipping test - could not create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + srcKey := "source-object.txt" + dstKey := "copied-object.txt" + content := "Content to be copied with volume-level encryption" + + // Upload source + _, err = svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(srcKey), + Body: strings.NewReader(content), + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + // Copy object + _, err = svc.CopyObject(&s3.CopyObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(dstKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", bucket, srcKey)), + }) + if err != nil { + t.Fatalf("CopyObject failed: %v", err) + } + + // Read copied object + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(dstKey), + }) + if err != nil { + t.Fatalf("GetObject failed for copied object: %v", err) + } + defer result.Body.Close() + + data, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read copied object: %v", err) + } + + if string(data) != content { + t.Errorf("Copied content mismatch:\n expected: %q\n got: %q", content, string(data)) + } else { + t.Logf("Successfully copied encrypted object") + } +} + +// Helper functions + +func getS3Client(t *testing.T) *s3.S3 { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String("http://localhost:8333"), + DisableSSL: aws.Bool(true), + S3ForcePathStyle: aws.Bool(true), + Credentials: credentials.NewStaticCredentials("any", "any", ""), + }) + if err != nil { + t.Fatalf("Failed to create session: %v", err) + } + return s3.New(sess) +} + +func cleanupBucket(t *testing.T, svc *s3.S3, bucket string) { + ctx := context.Background() + _ = ctx + + // List and delete all objects + listResult, err := svc.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Logf("Warning: failed to list objects for cleanup: %v", err) + return + } + + for _, obj := range listResult.Contents { + _, err := svc.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: obj.Key, + }) + if err != nil { + t.Logf("Warning: failed to delete object %s: %v", *obj.Key, err) + } + } + + // Delete bucket + _, err = svc.DeleteBucket(&s3.DeleteBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Logf("Warning: failed to delete bucket %s: %v", bucket, err) + } +} diff --git a/weed/command/mini.go b/weed/command/mini.go index 6aa30acbd..39a271ee9 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -229,6 +229,7 @@ func initMiniS3Flags() { miniS3Options.concurrentFileUploadLimit = cmdMini.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads") miniS3Options.enableIam = cmdMini.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same port") miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center") + miniS3Options.cipher = cmdMini.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") miniS3Options.config = miniS3Config miniS3Options.iamConfig = miniIamConfig miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") diff --git a/weed/command/s3.go b/weed/command/s3.go index d7140ded5..afd7a8f9c 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -62,6 +62,7 @@ type S3Options struct { enableIam *bool debug *bool debugPort *int + cipher *bool } func init() { @@ -93,6 +94,7 @@ func init() { s3StandaloneOptions.enableIam = cmdS3.Flag.Bool("iam", true, "enable embedded IAM API on the same port") s3StandaloneOptions.debug = cmdS3.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") s3StandaloneOptions.debugPort = cmdS3.Flag.Int("debug.port", 6060, "http port for debugging") + s3StandaloneOptions.cipher = cmdS3.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") } var cmdS3 = &Command{ @@ -290,6 +292,7 @@ func (s3opt *S3Options) startS3Server() bool { ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024, ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit), EnableIam: *s3opt.enableIam, // Embedded IAM API (enabled by default) + Cipher: *s3opt.cipher, // encrypt data on volume servers }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/server.go b/weed/command/server.go index 5e59ea246..ae2e421ba 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -175,6 +175,7 @@ func init() { s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 0, "limit total concurrent upload size for S3, 0 means unlimited") s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") s3Options.enableIam = cmdServer.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port") + s3Options.cipher = cmdServer.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port") sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication") diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go index 352b329f8..9a95e23ec 100644 --- a/weed/operation/upload_chunked.go +++ b/weed/operation/upload_chunked.go @@ -34,6 +34,7 @@ type ChunkedUploadOption struct { SaveSmallInline bool Jwt security.EncodedJwt MimeType string + Cipher bool // encrypt data on volume servers AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing } @@ -172,7 +173,7 @@ uploadLoop: uploadOption := &UploadOption{ UploadUrl: uploadUrl, - Cipher: false, + Cipher: opt.Cipher, IsInputCompressed: false, MimeType: opt.MimeType, PairMap: nil, diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index e9e523138..d17e14aed 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -392,6 +392,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader DataCenter: s3a.option.DataCenter, SaveSmallInline: false, // S3 API always creates chunks, never stores inline MimeType: r.Header.Get("Content-Type"), + Cipher: s3a.cipher, // encrypt data on volume servers AssignFunc: assignFunc, }) if err != nil { diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index ffb50e8c1..7a8062a7a 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -51,6 +51,7 @@ type S3ApiServerOption struct { ConcurrentUploadLimit int64 ConcurrentFileUploadLimit int64 EnableIam bool // Enable embedded IAM API on the same port + Cipher bool // encrypt data on volume servers } type S3ApiServer struct { @@ -70,7 +71,8 @@ type S3ApiServer struct { inFlightDataSize int64 inFlightUploads int64 inFlightDataLimitCond *sync.Cond - embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) + embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) + cipher bool // encrypt data on volume servers } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { @@ -154,6 +156,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven policyEngine: policyEngine, // Initialize bucket policy engine inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), + cipher: option.Cipher, } // Set s3a reference in circuit breaker for upload limiting