From 8d6bcddf60c918f57120031ca27a67d32faab710 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 27 Dec 2025 00:09:14 -0800 Subject: [PATCH] Add S3 volume encryption support with -s3.encryptVolumeData flag (#7890) * Add S3 volume encryption support with -s3.encryptVolumeData flag This change adds volume-level encryption support for S3 uploads, similar to the existing -filer.encryptVolumeData option. Each chunk is encrypted with its own auto-generated CipherKey when the flag is enabled. Changes: - Add -s3.encryptVolumeData flag to weed s3, weed server, and weed mini - Wire Cipher option through S3ApiServer and ChunkedUploadOption - Add integration tests for multi-chunk range reads with encryption - Tests verify encryption works across chunk boundaries Usage: weed s3 -encryptVolumeData weed server -s3 -s3.encryptVolumeData weed mini -s3.encryptVolumeData Integration tests: go test -v -tags=integration -timeout 5m ./test/s3/sse/... * Add GitHub Actions CI for S3 volume encryption tests - Add test-volume-encryption target to Makefile that starts server with -s3.encryptVolumeData - Add s3-volume-encryption job to GitHub Actions workflow - Tests run with integration build tag and 10m timeout - Server logs uploaded on failure for debugging * Fix S3 client credentials to use environment variables The test was using hardcoded credentials "any"/"any" but the Makefile sets AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY to "some_access_key1"/ "some_secret_key1". Updated getS3Client() to read from environment variables with fallback to "any"/"any" for manual testing. * Change bucket creation errors from skip to fatal Tests should fail, not skip, when bucket creation fails. This ensures that credential mismatches and other configuration issues are caught rather than silently skipped. * Make copy and multipart test jobs fail instead of succeed Changed exit 0 to exit 1 for s3-sse-copy-operations and s3-sse-multipart jobs. These jobs document known limitations but should fail to ensure the issues are tracked and addressed, not silently ignored. * Hardcode S3 credentials to match Makefile Changed from environment variables to hardcoded credentials "some_access_key1"/"some_secret_key1" to match the Makefile configuration. This ensures tests work reliably. * fix Double Encryption * fix Chunk Size Mismatch * Added IsCompressed * is gzipped * fix copying * only perform HEAD request when len(cipherKey) > 0 * Revert "Make copy and multipart test jobs fail instead of succeed" This reverts commit bc34a7eb3c103ae7ab2000da2a6c3925712eb226. * fix security vulnerability * fix security * Update s3api_object_handlers_copy.go * Update s3api_object_handlers_copy.go * jwt to get content length --- .github/workflows/s3-sse-tests.yml | 50 +++ test/s3/sse/Makefile | 33 ++ test/s3/sse/s3_volume_encryption_test.go | 448 +++++++++++++++++++++++ weed/command/mini.go | 1 + weed/command/s3.go | 3 + weed/command/server.go | 1 + weed/operation/upload_chunked.go | 5 +- weed/operation/upload_content.go | 8 +- weed/s3api/s3api_key_rotation.go | 4 +- weed/s3api/s3api_object_handlers.go | 2 + weed/s3api/s3api_object_handlers_copy.go | 48 ++- weed/s3api/s3api_object_handlers_put.go | 3 +- weed/s3api/s3api_server.go | 5 +- 13 files changed, 593 insertions(+), 18 deletions(-) create mode 100644 test/s3/sse/s3_volume_encryption_test.go diff --git a/.github/workflows/s3-sse-tests.yml b/.github/workflows/s3-sse-tests.yml index 946e4735e..2a8c0b332 100644 --- a/.github/workflows/s3-sse-tests.yml +++ b/.github/workflows/s3-sse-tests.yml @@ -345,3 +345,53 @@ jobs: name: s3-sse-performance-logs path: test/s3/sse/weed-test*.log retention-days: 7 + + s3-volume-encryption: + name: S3 Volume Encryption Test + runs-on: ubuntu-22.04 + timeout-minutes: 20 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Install SeaweedFS + run: | + go install -buildvcs=false + + - name: Run S3 Volume Encryption Integration Tests + timeout-minutes: 15 + working-directory: test/s3/sse + run: | + set -x + echo "=== System Information ===" + uname -a + free -h + + # Run volume encryption tests with -s3.encryptVolumeData flag + echo "🚀 Running S3 volume encryption integration tests..." + make test-volume-encryption || { + echo "❌ Volume encryption tests failed, checking logs..." + if [ -f /tmp/seaweedfs-sse-mini.log ]; then + echo "=== Server logs ===" + tail -100 /tmp/seaweedfs-sse-mini.log + fi + echo "=== Process information ===" + ps aux | grep -E "(weed|test)" || true + exit 1 + } + + - name: Upload server logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: s3-volume-encryption-logs + path: /tmp/seaweedfs-sse-*.log + retention-days: 3 + diff --git a/test/s3/sse/Makefile b/test/s3/sse/Makefile index e646ef901..87c171486 100644 --- a/test/s3/sse/Makefile +++ b/test/s3/sse/Makefile @@ -470,3 +470,36 @@ dev-kms: setup-openbao @echo "OpenBao: $(OPENBAO_ADDR)" @echo "Token: $(OPENBAO_TOKEN)" @echo "Use 'make test-ssekms-integration' to run tests" + +# Volume encryption integration tests +test-volume-encryption: build-weed + @echo "🚀 Starting S3 volume encryption integration tests..." + @echo "Starting SeaweedFS cluster with volume encryption enabled..." + @# Start server with -s3.encryptVolumeData flag + @mkdir -p /tmp/seaweedfs-test-sse + @rm -f /tmp/seaweedfs-sse-*.log || true + @sed -e 's/ACCESS_KEY_PLACEHOLDER/$(ACCESS_KEY)/g' \ + -e 's/SECRET_KEY_PLACEHOLDER/$(SECRET_KEY)/g' \ + s3-config-template.json > /tmp/seaweedfs-s3.json + @echo "Starting weed mini with S3 volume encryption..." + @AWS_ACCESS_KEY_ID=$(ACCESS_KEY) AWS_SECRET_ACCESS_KEY=$(SECRET_KEY) GLOG_v=4 $(SEAWEEDFS_BINARY) mini \ + -dir=/tmp/seaweedfs-test-sse \ + -s3.port=$(S3_PORT) \ + -s3.config=/tmp/seaweedfs-s3.json \ + -s3.encryptVolumeData \ + -ip=127.0.0.1 \ + > /tmp/seaweedfs-sse-mini.log 2>&1 & echo $$! > /tmp/weed-mini.pid + @echo "Checking S3 service is ready..." + @for i in $$(seq 1 30); do \ + if curl -s http://127.0.0.1:$(S3_PORT) > /dev/null 2>&1; then \ + echo "✅ S3 service is ready"; \ + break; \ + fi; \ + sleep 1; \ + done + @echo "Running volume encryption integration tests..." + @trap '$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true' EXIT; \ + cd $(SEAWEEDFS_ROOT) && go test -v -tags=integration -timeout=10m -run "TestS3VolumeEncryption" ./test/s3/sse || exit 1; \ + echo "✅ Volume encryption tests completed successfully"; \ + $(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true + diff --git a/test/s3/sse/s3_volume_encryption_test.go b/test/s3/sse/s3_volume_encryption_test.go new file mode 100644 index 000000000..349763c10 --- /dev/null +++ b/test/s3/sse/s3_volume_encryption_test.go @@ -0,0 +1,448 @@ +//go:build integration +// +build integration + +package sse + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" +) + +// TestS3VolumeEncryptionRoundtrip tests that data uploaded with encryptVolumeData +// enabled can be read back correctly. This requires the S3 server to be started with +// -encryptVolumeData=true for the test to verify encryption is working. +// +// To run this test: +// 1. Start SeaweedFS: weed server -s3 -s3.encryptVolumeData=true +// 2. Run: go test -v -run TestS3VolumeEncryptionRoundtrip +func TestS3VolumeEncryptionRoundtrip(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-test-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Fatalf("Failed to create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + testCases := []struct { + name string + key string + content string + rangeReq string // Optional range request + }{ + { + name: "small file", + key: "small.txt", + content: "Hello, encrypted world!", + }, + { + name: "medium file", + key: "medium.txt", + content: strings.Repeat("SeaweedFS volume encryption test content. ", 1000), + }, + { + name: "binary content", + key: "binary.bin", + content: string([]byte{0x00, 0x01, 0x02, 0xFF, 0xFE, 0xFD, 0x00, 0x80}), + }, + { + name: "range request", + key: "range-test.txt", + content: "0123456789ABCDEFGHIJ", + rangeReq: "bytes=5-10", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Upload + _, err := svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(tc.key), + Body: strings.NewReader(tc.content), + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + // Download + getInput := &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(tc.key), + } + if tc.rangeReq != "" { + getInput.Range = aws.String(tc.rangeReq) + } + + result, err := svc.GetObject(getInput) + if err != nil { + t.Fatalf("GetObject failed: %v", err) + } + defer result.Body.Close() + + data, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read response body: %v", err) + } + + // Verify content + expected := tc.content + if tc.rangeReq != "" { + // For "bytes=5-10", we expect characters at positions 5-10 (inclusive) + expected = tc.content[5:11] + } + + if string(data) != expected { + t.Errorf("Content mismatch:\n expected: %q\n got: %q", expected, string(data)) + } else { + t.Logf("Successfully uploaded and downloaded %s (%d bytes)", tc.key, len(data)) + } + }) + } +} + +// TestS3VolumeEncryptionMultiChunk tests large files that span multiple chunks +// to ensure encryption works correctly across chunk boundaries. +func TestS3VolumeEncryptionMultiChunk(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-multichunk-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Fatalf("Failed to create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + // Create a file larger than default chunk size (8MB) + // Use 10MB to ensure multiple chunks + largeContent := make([]byte, 10*1024*1024) + for i := range largeContent { + largeContent[i] = byte(i % 256) + } + + key := "large-file.bin" + + // Upload + _, err = svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(largeContent), + }) + if err != nil { + t.Fatalf("PutObject failed for large file: %v", err) + } + + // Download full file + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + t.Fatalf("GetObject failed: %v", err) + } + defer result.Body.Close() + + downloadedData, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read response body: %v", err) + } + + if len(downloadedData) != len(largeContent) { + t.Errorf("Size mismatch: expected %d, got %d", len(largeContent), len(downloadedData)) + } + + if !bytes.Equal(downloadedData, largeContent) { + t.Errorf("Content mismatch in multi-chunk file") + // Find first mismatch + for i := 0; i < len(downloadedData) && i < len(largeContent); i++ { + if downloadedData[i] != largeContent[i] { + t.Errorf("First mismatch at byte %d: expected %02x, got %02x", i, largeContent[i], downloadedData[i]) + break + } + } + } else { + t.Logf("Successfully uploaded and downloaded %d byte multi-chunk file", len(downloadedData)) + } + + // Test range request spanning chunk boundary (around 8MB) + rangeStart := int64(8*1024*1024 - 1000) + rangeEnd := int64(8*1024*1024 + 1000) + rangeResult, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd)), + }) + if err != nil { + t.Fatalf("Range GetObject failed: %v", err) + } + defer rangeResult.Body.Close() + + rangeData, err := io.ReadAll(rangeResult.Body) + if err != nil { + t.Fatalf("Failed to read range response: %v", err) + } + + expectedRange := largeContent[rangeStart : rangeEnd+1] + if !bytes.Equal(rangeData, expectedRange) { + t.Errorf("Range request content mismatch at chunk boundary") + } else { + t.Logf("Successfully retrieved range spanning chunk boundary (%d bytes)", len(rangeData)) + } +} + +// TestS3VolumeEncryptionMultiChunkRangeRead tests range reads that span multiple chunks: +// - Part of chunk 1 +// - Whole chunk 2 +// - Part of chunk 3 +// This is critical for verifying that cipher decryption works correctly when +// reading across chunk boundaries with the cipherKey from each chunk. +func TestS3VolumeEncryptionMultiChunkRangeRead(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-multirange-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Fatalf("Failed to create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + // Default chunk size is 8MB. Create a file with 3+ chunks (25MB) + // to ensure we have multiple complete chunks + const chunkSize = 8 * 1024 * 1024 // 8MB + const fileSize = 25 * 1024 * 1024 // 25MB = 3 chunks + partial 4th + + largeContent := make([]byte, fileSize) + for i := range largeContent { + // Use recognizable pattern: each byte encodes its position + largeContent[i] = byte(i % 256) + } + + key := "multi-chunk-range-test.bin" + + // Upload + _, err = svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(largeContent), + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + t.Logf("Uploaded %d byte file (%d chunks of %d bytes)", fileSize, (fileSize+chunkSize-1)/chunkSize, chunkSize) + + // Test cases for range reads spanning multiple chunks + testCases := []struct { + name string + rangeStart int64 + rangeEnd int64 + desc string + }{ + { + name: "part_chunk1_whole_chunk2_part_chunk3", + rangeStart: chunkSize - 1000, // 1000 bytes before end of chunk 1 + rangeEnd: 2*chunkSize + 1000, // 1000 bytes into chunk 3 + desc: "Spans from end of chunk 1 through entire chunk 2 into beginning of chunk 3", + }, + { + name: "last_byte_chunk1_through_first_byte_chunk3", + rangeStart: chunkSize - 1, // Last byte of chunk 1 + rangeEnd: 2 * chunkSize, // First byte of chunk 3 + desc: "Minimal span: last byte of chunk 1, entire chunk 2, first byte of chunk 3", + }, + { + name: "middle_chunk1_to_middle_chunk3", + rangeStart: chunkSize / 2, // Middle of chunk 1 + rangeEnd: 2*chunkSize + chunkSize/2, // Middle of chunk 3 + desc: "From middle of chunk 1 to middle of chunk 3", + }, + { + name: "half_chunk1_whole_chunk2_half_chunk3", + rangeStart: chunkSize / 2, // Half of chunk 1 + rangeEnd: 2*chunkSize + chunkSize/2 - 1, // Half of chunk 3 + desc: "Half of each boundary chunk, whole middle chunk", + }, + { + name: "single_byte_each_boundary", + rangeStart: chunkSize - 1, // 1 byte from chunk 1 + rangeEnd: 2 * chunkSize, // 1 byte from chunk 3 + desc: "1 byte from chunk 1, entire chunk 2, 1 byte from chunk 3", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Test: %s", tc.desc) + t.Logf("Range: bytes=%d-%d (spanning bytes at offsets across chunk boundaries)", tc.rangeStart, tc.rangeEnd) + + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", tc.rangeStart, tc.rangeEnd)), + }) + if err != nil { + t.Fatalf("Range GetObject failed: %v", err) + } + defer result.Body.Close() + + rangeData, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read range response: %v", err) + } + + expectedLen := tc.rangeEnd - tc.rangeStart + 1 + if int64(len(rangeData)) != expectedLen { + t.Errorf("Size mismatch: expected %d bytes, got %d", expectedLen, len(rangeData)) + } + + expectedRange := largeContent[tc.rangeStart : tc.rangeEnd+1] + if !bytes.Equal(rangeData, expectedRange) { + t.Errorf("Content mismatch in multi-chunk range read") + // Find first mismatch for debugging + for i := 0; i < len(rangeData) && i < len(expectedRange); i++ { + if rangeData[i] != expectedRange[i] { + globalOffset := tc.rangeStart + int64(i) + chunkNum := globalOffset / chunkSize + offsetInChunk := globalOffset % chunkSize + t.Errorf("First mismatch at byte %d (chunk %d, offset %d in chunk): expected %02x, got %02x", + i, chunkNum, offsetInChunk, expectedRange[i], rangeData[i]) + break + } + } + } else { + t.Logf("✓ Successfully read %d bytes spanning chunks (offsets %d-%d)", len(rangeData), tc.rangeStart, tc.rangeEnd) + } + }) + } +} + +// TestS3VolumeEncryptionCopy tests that copying encrypted objects works correctly. +func TestS3VolumeEncryptionCopy(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-copy-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Fatalf("Failed to create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + srcKey := "source-object.txt" + dstKey := "copied-object.txt" + content := "Content to be copied with volume-level encryption" + + // Upload source + _, err = svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(srcKey), + Body: strings.NewReader(content), + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + // Copy object + _, err = svc.CopyObject(&s3.CopyObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(dstKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", bucket, srcKey)), + }) + if err != nil { + t.Fatalf("CopyObject failed: %v", err) + } + + // Read copied object + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(dstKey), + }) + if err != nil { + t.Fatalf("GetObject failed for copied object: %v", err) + } + defer result.Body.Close() + + data, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read copied object: %v", err) + } + + if string(data) != content { + t.Errorf("Copied content mismatch:\n expected: %q\n got: %q", content, string(data)) + } else { + t.Logf("Successfully copied encrypted object") + } +} + +// Helper functions + +func getS3Client(t *testing.T) *s3.S3 { + // Use credentials that match the Makefile configuration + // ACCESS_KEY ?= some_access_key1 + // SECRET_KEY ?= some_secret_key1 + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String("http://localhost:8333"), + DisableSSL: aws.Bool(true), + S3ForcePathStyle: aws.Bool(true), + Credentials: credentials.NewStaticCredentials("some_access_key1", "some_secret_key1", ""), + }) + if err != nil { + t.Fatalf("Failed to create session: %v", err) + } + return s3.New(sess) +} + +func cleanupBucket(t *testing.T, svc *s3.S3, bucket string) { + ctx := context.Background() + _ = ctx + + // List and delete all objects + listResult, err := svc.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Logf("Warning: failed to list objects for cleanup: %v", err) + return + } + + for _, obj := range listResult.Contents { + _, err := svc.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: obj.Key, + }) + if err != nil { + t.Logf("Warning: failed to delete object %s: %v", *obj.Key, err) + } + } + + // Delete bucket + _, err = svc.DeleteBucket(&s3.DeleteBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Logf("Warning: failed to delete bucket %s: %v", bucket, err) + } +} diff --git a/weed/command/mini.go b/weed/command/mini.go index 6aa30acbd..39a271ee9 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -229,6 +229,7 @@ func initMiniS3Flags() { miniS3Options.concurrentFileUploadLimit = cmdMini.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads") miniS3Options.enableIam = cmdMini.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same port") miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center") + miniS3Options.cipher = cmdMini.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") miniS3Options.config = miniS3Config miniS3Options.iamConfig = miniIamConfig miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") diff --git a/weed/command/s3.go b/weed/command/s3.go index d7140ded5..afd7a8f9c 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -62,6 +62,7 @@ type S3Options struct { enableIam *bool debug *bool debugPort *int + cipher *bool } func init() { @@ -93,6 +94,7 @@ func init() { s3StandaloneOptions.enableIam = cmdS3.Flag.Bool("iam", true, "enable embedded IAM API on the same port") s3StandaloneOptions.debug = cmdS3.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") s3StandaloneOptions.debugPort = cmdS3.Flag.Int("debug.port", 6060, "http port for debugging") + s3StandaloneOptions.cipher = cmdS3.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") } var cmdS3 = &Command{ @@ -290,6 +292,7 @@ func (s3opt *S3Options) startS3Server() bool { ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024, ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit), EnableIam: *s3opt.enableIam, // Embedded IAM API (enabled by default) + Cipher: *s3opt.cipher, // encrypt data on volume servers }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/server.go b/weed/command/server.go index 5e59ea246..ae2e421ba 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -175,6 +175,7 @@ func init() { s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 0, "limit total concurrent upload size for S3, 0 means unlimited") s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") s3Options.enableIam = cmdServer.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port") + s3Options.cipher = cmdServer.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port") sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication") diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go index 352b329f8..394bf4362 100644 --- a/weed/operation/upload_chunked.go +++ b/weed/operation/upload_chunked.go @@ -34,6 +34,7 @@ type ChunkedUploadOption struct { SaveSmallInline bool Jwt security.EncodedJwt MimeType string + Cipher bool // encrypt data on volume servers AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing } @@ -172,7 +173,7 @@ uploadLoop: uploadOption := &UploadOption{ UploadUrl: uploadUrl, - Cipher: false, + Cipher: opt.Cipher, IsInputCompressed: false, MimeType: opt.MimeType, PairMap: nil, @@ -220,8 +221,8 @@ uploadLoop: ETag: uploadResult.ContentMd5, Fid: fid, CipherKey: uploadResult.CipherKey, + IsCompressed: uploadResult.Gzip > 0, } - fileChunksLock.Lock() fileChunks = append(fileChunks, chunk) glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index a2fff4792..56c358174 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -249,8 +249,10 @@ func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option compressed, compressErr := util.GzipData(data) // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) if compressErr == nil { - data = compressed - contentIsGzipped = true + if len(compressed) < len(data) { + data = compressed + contentIsGzipped = true + } } } else if option.IsInputCompressed { // just to get the clear data length @@ -290,7 +292,7 @@ func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option uploadResult.Name = option.Filename uploadResult.Mime = option.MimeType uploadResult.CipherKey = cipherKey - uploadResult.Size = uint32(clearDataLen) + uploadResult.Size = uint32(len(data)) if contentIsGzipped { uploadResult.Gzip = 1 } diff --git a/weed/s3api/s3api_key_rotation.go b/weed/s3api/s3api_key_rotation.go index f2d406fb7..1881f3696 100644 --- a/weed/s3api/s3api_key_rotation.go +++ b/weed/s3api/s3api_key_rotation.go @@ -182,7 +182,7 @@ func (s3a *S3ApiServer) rotateSSECChunk(chunk *filer_pb.FileChunk, sourceKey, de } // Download encrypted data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } @@ -251,7 +251,7 @@ func (s3a *S3ApiServer) rotateSSEKMSChunk(chunk *filer_pb.FileChunk, srcKeyID, d } // Download data (this would be encrypted with the old KMS key) - chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 67c40d0c3..1a4e104cf 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -1019,6 +1019,8 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R if isRangeRequest { w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + } else { + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) } headerSetTime = time.Since(tHeaderSet) diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 26775f9ae..d4ef3b52e 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -846,7 +846,7 @@ func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath strin } // Download and upload the chunk - chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } @@ -881,7 +881,7 @@ func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer offsetInChunk := overlapStart - chunkStart // Download and upload the chunk portion - chunkData, err := s3a.downloadChunkData(srcUrl, fileId, offsetInChunk, int64(rangeChunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, offsetInChunk, int64(rangeChunk.Size), originalChunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk range data: %w", err) } @@ -1199,10 +1199,40 @@ func (s3a *S3ApiServer) uploadChunkData(chunkData []byte, assignResult *filer_pb } // downloadChunkData downloads chunk data from the source URL -func (s3a *S3ApiServer) downloadChunkData(srcUrl, fileId string, offset, size int64) ([]byte, error) { +func (s3a *S3ApiServer) downloadChunkData(srcUrl, fileId string, offset, size int64, cipherKey []byte) ([]byte, error) { jwt := filer.JwtForVolumeServer(fileId) + // Only perform HEAD request for encrypted chunks to get physical size + if offset == 0 && len(cipherKey) > 0 { + req, err := http.NewRequest(http.MethodHead, srcUrl, nil) + if err == nil { + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + resp, err := util_http.GetGlobalHttpClient().Do(req) + if err == nil { + defer util_http.CloseResponse(resp) + if resp.StatusCode == http.StatusOK { + contentLengthStr := resp.Header.Get("Content-Length") + if contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64); err == nil { + // Validate contentLength fits in int32 range before comparison + if contentLength > int64(2147483647) { // math.MaxInt32 + return nil, fmt.Errorf("content length %d exceeds maximum int32 size", contentLength) + } + if contentLength > size { + size = contentLength + } + } + } + } + } + } + // Validate size fits in int32 range before conversion to int + if size > int64(2147483647) { // math.MaxInt32 + return nil, fmt.Errorf("chunk size %d exceeds maximum int32 size", size) + } + sizeInt := int(size) var chunkData []byte - shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, jwt, nil, false, false, offset, int(size), func(data []byte) { + shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, jwt, nil, false, false, offset, sizeInt, func(data []byte) { chunkData = append(chunkData, data...) }) if err != nil { @@ -1334,7 +1364,7 @@ func (s3a *S3ApiServer) copyMultipartSSEKMSChunk(chunk *filer_pb.FileChunk, dest } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -1433,7 +1463,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -1714,7 +1744,7 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -2076,7 +2106,7 @@ func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, cop } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -2295,7 +2325,7 @@ func (s3a *S3ApiServer) copyChunkWithSSEKMSReencryption(chunk *filer_pb.FileChun } // Download chunk data - chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index e9e523138..959893e57 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -299,7 +299,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader // Apply bucket default encryption if no explicit encryption was provided // This implements AWS S3 behavior where bucket default encryption automatically applies - if !hasExplicitEncryption(customerKey, sseKMSKey, sseS3Key) { + if !hasExplicitEncryption(customerKey, sseKMSKey, sseS3Key) && !s3a.cipher { glog.V(4).Infof("putToFiler: no explicit encryption detected, checking for bucket default encryption") // Apply bucket default encryption and get the result @@ -392,6 +392,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader DataCenter: s3a.option.DataCenter, SaveSmallInline: false, // S3 API always creates chunks, never stores inline MimeType: r.Header.Get("Content-Type"), + Cipher: s3a.cipher, // encrypt data on volume servers AssignFunc: assignFunc, }) if err != nil { diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index ffb50e8c1..7a8062a7a 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -51,6 +51,7 @@ type S3ApiServerOption struct { ConcurrentUploadLimit int64 ConcurrentFileUploadLimit int64 EnableIam bool // Enable embedded IAM API on the same port + Cipher bool // encrypt data on volume servers } type S3ApiServer struct { @@ -70,7 +71,8 @@ type S3ApiServer struct { inFlightDataSize int64 inFlightUploads int64 inFlightDataLimitCond *sync.Cond - embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) + embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) + cipher bool // encrypt data on volume servers } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { @@ -154,6 +156,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven policyEngine: policyEngine, // Initialize bucket policy engine inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), + cipher: option.Cipher, } // Set s3a reference in circuit breaker for upload limiting