diff --git a/.github/workflows/s3-sse-tests.yml b/.github/workflows/s3-sse-tests.yml index 946e4735e..2a8c0b332 100644 --- a/.github/workflows/s3-sse-tests.yml +++ b/.github/workflows/s3-sse-tests.yml @@ -345,3 +345,53 @@ jobs: name: s3-sse-performance-logs path: test/s3/sse/weed-test*.log retention-days: 7 + + s3-volume-encryption: + name: S3 Volume Encryption Test + runs-on: ubuntu-22.04 + timeout-minutes: 20 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Install SeaweedFS + run: | + go install -buildvcs=false + + - name: Run S3 Volume Encryption Integration Tests + timeout-minutes: 15 + working-directory: test/s3/sse + run: | + set -x + echo "=== System Information ===" + uname -a + free -h + + # Run volume encryption tests with -s3.encryptVolumeData flag + echo "🚀 Running S3 volume encryption integration tests..." + make test-volume-encryption || { + echo "❌ Volume encryption tests failed, checking logs..." + if [ -f /tmp/seaweedfs-sse-mini.log ]; then + echo "=== Server logs ===" + tail -100 /tmp/seaweedfs-sse-mini.log + fi + echo "=== Process information ===" + ps aux | grep -E "(weed|test)" || true + exit 1 + } + + - name: Upload server logs on failure + if: failure() + uses: actions/upload-artifact@v6 + with: + name: s3-volume-encryption-logs + path: /tmp/seaweedfs-sse-*.log + retention-days: 3 + diff --git a/test/s3/sse/Makefile b/test/s3/sse/Makefile index e646ef901..87c171486 100644 --- a/test/s3/sse/Makefile +++ b/test/s3/sse/Makefile @@ -470,3 +470,36 @@ dev-kms: setup-openbao @echo "OpenBao: $(OPENBAO_ADDR)" @echo "Token: $(OPENBAO_TOKEN)" @echo "Use 'make test-ssekms-integration' to run tests" + +# Volume encryption integration tests +test-volume-encryption: build-weed + @echo "🚀 Starting S3 volume encryption integration tests..." + @echo "Starting SeaweedFS cluster with volume encryption enabled..." + @# Start server with -s3.encryptVolumeData flag + @mkdir -p /tmp/seaweedfs-test-sse + @rm -f /tmp/seaweedfs-sse-*.log || true + @sed -e 's/ACCESS_KEY_PLACEHOLDER/$(ACCESS_KEY)/g' \ + -e 's/SECRET_KEY_PLACEHOLDER/$(SECRET_KEY)/g' \ + s3-config-template.json > /tmp/seaweedfs-s3.json + @echo "Starting weed mini with S3 volume encryption..." + @AWS_ACCESS_KEY_ID=$(ACCESS_KEY) AWS_SECRET_ACCESS_KEY=$(SECRET_KEY) GLOG_v=4 $(SEAWEEDFS_BINARY) mini \ + -dir=/tmp/seaweedfs-test-sse \ + -s3.port=$(S3_PORT) \ + -s3.config=/tmp/seaweedfs-s3.json \ + -s3.encryptVolumeData \ + -ip=127.0.0.1 \ + > /tmp/seaweedfs-sse-mini.log 2>&1 & echo $$! > /tmp/weed-mini.pid + @echo "Checking S3 service is ready..." + @for i in $$(seq 1 30); do \ + if curl -s http://127.0.0.1:$(S3_PORT) > /dev/null 2>&1; then \ + echo "✅ S3 service is ready"; \ + break; \ + fi; \ + sleep 1; \ + done + @echo "Running volume encryption integration tests..." + @trap '$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true' EXIT; \ + cd $(SEAWEEDFS_ROOT) && go test -v -tags=integration -timeout=10m -run "TestS3VolumeEncryption" ./test/s3/sse || exit 1; \ + echo "✅ Volume encryption tests completed successfully"; \ + $(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true + diff --git a/test/s3/sse/s3_volume_encryption_test.go b/test/s3/sse/s3_volume_encryption_test.go new file mode 100644 index 000000000..349763c10 --- /dev/null +++ b/test/s3/sse/s3_volume_encryption_test.go @@ -0,0 +1,448 @@ +//go:build integration +// +build integration + +package sse + +import ( + "bytes" + "context" + "fmt" + "io" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" +) + +// TestS3VolumeEncryptionRoundtrip tests that data uploaded with encryptVolumeData +// enabled can be read back correctly. This requires the S3 server to be started with +// -encryptVolumeData=true for the test to verify encryption is working. +// +// To run this test: +// 1. Start SeaweedFS: weed server -s3 -s3.encryptVolumeData=true +// 2. Run: go test -v -run TestS3VolumeEncryptionRoundtrip +func TestS3VolumeEncryptionRoundtrip(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-test-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Fatalf("Failed to create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + testCases := []struct { + name string + key string + content string + rangeReq string // Optional range request + }{ + { + name: "small file", + key: "small.txt", + content: "Hello, encrypted world!", + }, + { + name: "medium file", + key: "medium.txt", + content: strings.Repeat("SeaweedFS volume encryption test content. ", 1000), + }, + { + name: "binary content", + key: "binary.bin", + content: string([]byte{0x00, 0x01, 0x02, 0xFF, 0xFE, 0xFD, 0x00, 0x80}), + }, + { + name: "range request", + key: "range-test.txt", + content: "0123456789ABCDEFGHIJ", + rangeReq: "bytes=5-10", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Upload + _, err := svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(tc.key), + Body: strings.NewReader(tc.content), + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + // Download + getInput := &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(tc.key), + } + if tc.rangeReq != "" { + getInput.Range = aws.String(tc.rangeReq) + } + + result, err := svc.GetObject(getInput) + if err != nil { + t.Fatalf("GetObject failed: %v", err) + } + defer result.Body.Close() + + data, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read response body: %v", err) + } + + // Verify content + expected := tc.content + if tc.rangeReq != "" { + // For "bytes=5-10", we expect characters at positions 5-10 (inclusive) + expected = tc.content[5:11] + } + + if string(data) != expected { + t.Errorf("Content mismatch:\n expected: %q\n got: %q", expected, string(data)) + } else { + t.Logf("Successfully uploaded and downloaded %s (%d bytes)", tc.key, len(data)) + } + }) + } +} + +// TestS3VolumeEncryptionMultiChunk tests large files that span multiple chunks +// to ensure encryption works correctly across chunk boundaries. +func TestS3VolumeEncryptionMultiChunk(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-multichunk-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Fatalf("Failed to create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + // Create a file larger than default chunk size (8MB) + // Use 10MB to ensure multiple chunks + largeContent := make([]byte, 10*1024*1024) + for i := range largeContent { + largeContent[i] = byte(i % 256) + } + + key := "large-file.bin" + + // Upload + _, err = svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(largeContent), + }) + if err != nil { + t.Fatalf("PutObject failed for large file: %v", err) + } + + // Download full file + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + t.Fatalf("GetObject failed: %v", err) + } + defer result.Body.Close() + + downloadedData, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read response body: %v", err) + } + + if len(downloadedData) != len(largeContent) { + t.Errorf("Size mismatch: expected %d, got %d", len(largeContent), len(downloadedData)) + } + + if !bytes.Equal(downloadedData, largeContent) { + t.Errorf("Content mismatch in multi-chunk file") + // Find first mismatch + for i := 0; i < len(downloadedData) && i < len(largeContent); i++ { + if downloadedData[i] != largeContent[i] { + t.Errorf("First mismatch at byte %d: expected %02x, got %02x", i, largeContent[i], downloadedData[i]) + break + } + } + } else { + t.Logf("Successfully uploaded and downloaded %d byte multi-chunk file", len(downloadedData)) + } + + // Test range request spanning chunk boundary (around 8MB) + rangeStart := int64(8*1024*1024 - 1000) + rangeEnd := int64(8*1024*1024 + 1000) + rangeResult, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd)), + }) + if err != nil { + t.Fatalf("Range GetObject failed: %v", err) + } + defer rangeResult.Body.Close() + + rangeData, err := io.ReadAll(rangeResult.Body) + if err != nil { + t.Fatalf("Failed to read range response: %v", err) + } + + expectedRange := largeContent[rangeStart : rangeEnd+1] + if !bytes.Equal(rangeData, expectedRange) { + t.Errorf("Range request content mismatch at chunk boundary") + } else { + t.Logf("Successfully retrieved range spanning chunk boundary (%d bytes)", len(rangeData)) + } +} + +// TestS3VolumeEncryptionMultiChunkRangeRead tests range reads that span multiple chunks: +// - Part of chunk 1 +// - Whole chunk 2 +// - Part of chunk 3 +// This is critical for verifying that cipher decryption works correctly when +// reading across chunk boundaries with the cipherKey from each chunk. +func TestS3VolumeEncryptionMultiChunkRangeRead(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-multirange-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Fatalf("Failed to create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + // Default chunk size is 8MB. Create a file with 3+ chunks (25MB) + // to ensure we have multiple complete chunks + const chunkSize = 8 * 1024 * 1024 // 8MB + const fileSize = 25 * 1024 * 1024 // 25MB = 3 chunks + partial 4th + + largeContent := make([]byte, fileSize) + for i := range largeContent { + // Use recognizable pattern: each byte encodes its position + largeContent[i] = byte(i % 256) + } + + key := "multi-chunk-range-test.bin" + + // Upload + _, err = svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(largeContent), + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + t.Logf("Uploaded %d byte file (%d chunks of %d bytes)", fileSize, (fileSize+chunkSize-1)/chunkSize, chunkSize) + + // Test cases for range reads spanning multiple chunks + testCases := []struct { + name string + rangeStart int64 + rangeEnd int64 + desc string + }{ + { + name: "part_chunk1_whole_chunk2_part_chunk3", + rangeStart: chunkSize - 1000, // 1000 bytes before end of chunk 1 + rangeEnd: 2*chunkSize + 1000, // 1000 bytes into chunk 3 + desc: "Spans from end of chunk 1 through entire chunk 2 into beginning of chunk 3", + }, + { + name: "last_byte_chunk1_through_first_byte_chunk3", + rangeStart: chunkSize - 1, // Last byte of chunk 1 + rangeEnd: 2 * chunkSize, // First byte of chunk 3 + desc: "Minimal span: last byte of chunk 1, entire chunk 2, first byte of chunk 3", + }, + { + name: "middle_chunk1_to_middle_chunk3", + rangeStart: chunkSize / 2, // Middle of chunk 1 + rangeEnd: 2*chunkSize + chunkSize/2, // Middle of chunk 3 + desc: "From middle of chunk 1 to middle of chunk 3", + }, + { + name: "half_chunk1_whole_chunk2_half_chunk3", + rangeStart: chunkSize / 2, // Half of chunk 1 + rangeEnd: 2*chunkSize + chunkSize/2 - 1, // Half of chunk 3 + desc: "Half of each boundary chunk, whole middle chunk", + }, + { + name: "single_byte_each_boundary", + rangeStart: chunkSize - 1, // 1 byte from chunk 1 + rangeEnd: 2 * chunkSize, // 1 byte from chunk 3 + desc: "1 byte from chunk 1, entire chunk 2, 1 byte from chunk 3", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Test: %s", tc.desc) + t.Logf("Range: bytes=%d-%d (spanning bytes at offsets across chunk boundaries)", tc.rangeStart, tc.rangeEnd) + + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", tc.rangeStart, tc.rangeEnd)), + }) + if err != nil { + t.Fatalf("Range GetObject failed: %v", err) + } + defer result.Body.Close() + + rangeData, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read range response: %v", err) + } + + expectedLen := tc.rangeEnd - tc.rangeStart + 1 + if int64(len(rangeData)) != expectedLen { + t.Errorf("Size mismatch: expected %d bytes, got %d", expectedLen, len(rangeData)) + } + + expectedRange := largeContent[tc.rangeStart : tc.rangeEnd+1] + if !bytes.Equal(rangeData, expectedRange) { + t.Errorf("Content mismatch in multi-chunk range read") + // Find first mismatch for debugging + for i := 0; i < len(rangeData) && i < len(expectedRange); i++ { + if rangeData[i] != expectedRange[i] { + globalOffset := tc.rangeStart + int64(i) + chunkNum := globalOffset / chunkSize + offsetInChunk := globalOffset % chunkSize + t.Errorf("First mismatch at byte %d (chunk %d, offset %d in chunk): expected %02x, got %02x", + i, chunkNum, offsetInChunk, expectedRange[i], rangeData[i]) + break + } + } + } else { + t.Logf("✓ Successfully read %d bytes spanning chunks (offsets %d-%d)", len(rangeData), tc.rangeStart, tc.rangeEnd) + } + }) + } +} + +// TestS3VolumeEncryptionCopy tests that copying encrypted objects works correctly. +func TestS3VolumeEncryptionCopy(t *testing.T) { + svc := getS3Client(t) + bucket := fmt.Sprintf("volume-encryption-copy-%d", time.Now().Unix()) + + // Create bucket + _, err := svc.CreateBucket(&s3.CreateBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Fatalf("Failed to create bucket: %v", err) + } + defer cleanupBucket(t, svc, bucket) + + srcKey := "source-object.txt" + dstKey := "copied-object.txt" + content := "Content to be copied with volume-level encryption" + + // Upload source + _, err = svc.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(srcKey), + Body: strings.NewReader(content), + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + // Copy object + _, err = svc.CopyObject(&s3.CopyObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(dstKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", bucket, srcKey)), + }) + if err != nil { + t.Fatalf("CopyObject failed: %v", err) + } + + // Read copied object + result, err := svc.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(dstKey), + }) + if err != nil { + t.Fatalf("GetObject failed for copied object: %v", err) + } + defer result.Body.Close() + + data, err := io.ReadAll(result.Body) + if err != nil { + t.Fatalf("Failed to read copied object: %v", err) + } + + if string(data) != content { + t.Errorf("Copied content mismatch:\n expected: %q\n got: %q", content, string(data)) + } else { + t.Logf("Successfully copied encrypted object") + } +} + +// Helper functions + +func getS3Client(t *testing.T) *s3.S3 { + // Use credentials that match the Makefile configuration + // ACCESS_KEY ?= some_access_key1 + // SECRET_KEY ?= some_secret_key1 + sess, err := session.NewSession(&aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String("http://localhost:8333"), + DisableSSL: aws.Bool(true), + S3ForcePathStyle: aws.Bool(true), + Credentials: credentials.NewStaticCredentials("some_access_key1", "some_secret_key1", ""), + }) + if err != nil { + t.Fatalf("Failed to create session: %v", err) + } + return s3.New(sess) +} + +func cleanupBucket(t *testing.T, svc *s3.S3, bucket string) { + ctx := context.Background() + _ = ctx + + // List and delete all objects + listResult, err := svc.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Logf("Warning: failed to list objects for cleanup: %v", err) + return + } + + for _, obj := range listResult.Contents { + _, err := svc.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: obj.Key, + }) + if err != nil { + t.Logf("Warning: failed to delete object %s: %v", *obj.Key, err) + } + } + + // Delete bucket + _, err = svc.DeleteBucket(&s3.DeleteBucketInput{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Logf("Warning: failed to delete bucket %s: %v", bucket, err) + } +} diff --git a/weed/command/mini.go b/weed/command/mini.go index 6aa30acbd..39a271ee9 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -229,6 +229,7 @@ func initMiniS3Flags() { miniS3Options.concurrentFileUploadLimit = cmdMini.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads") miniS3Options.enableIam = cmdMini.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same port") miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center") + miniS3Options.cipher = cmdMini.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") miniS3Options.config = miniS3Config miniS3Options.iamConfig = miniIamConfig miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") diff --git a/weed/command/s3.go b/weed/command/s3.go index d7140ded5..afd7a8f9c 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -62,6 +62,7 @@ type S3Options struct { enableIam *bool debug *bool debugPort *int + cipher *bool } func init() { @@ -93,6 +94,7 @@ func init() { s3StandaloneOptions.enableIam = cmdS3.Flag.Bool("iam", true, "enable embedded IAM API on the same port") s3StandaloneOptions.debug = cmdS3.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") s3StandaloneOptions.debugPort = cmdS3.Flag.Int("debug.port", 6060, "http port for debugging") + s3StandaloneOptions.cipher = cmdS3.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") } var cmdS3 = &Command{ @@ -290,6 +292,7 @@ func (s3opt *S3Options) startS3Server() bool { ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024, ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit), EnableIam: *s3opt.enableIam, // Embedded IAM API (enabled by default) + Cipher: *s3opt.cipher, // encrypt data on volume servers }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/server.go b/weed/command/server.go index 5e59ea246..ae2e421ba 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -175,6 +175,7 @@ func init() { s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 0, "limit total concurrent upload size for S3, 0 means unlimited") s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") s3Options.enableIam = cmdServer.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port") + s3Options.cipher = cmdServer.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads") sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port") sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication") diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go index 352b329f8..394bf4362 100644 --- a/weed/operation/upload_chunked.go +++ b/weed/operation/upload_chunked.go @@ -34,6 +34,7 @@ type ChunkedUploadOption struct { SaveSmallInline bool Jwt security.EncodedJwt MimeType string + Cipher bool // encrypt data on volume servers AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing } @@ -172,7 +173,7 @@ uploadLoop: uploadOption := &UploadOption{ UploadUrl: uploadUrl, - Cipher: false, + Cipher: opt.Cipher, IsInputCompressed: false, MimeType: opt.MimeType, PairMap: nil, @@ -220,8 +221,8 @@ uploadLoop: ETag: uploadResult.ContentMd5, Fid: fid, CipherKey: uploadResult.CipherKey, + IsCompressed: uploadResult.Gzip > 0, } - fileChunksLock.Lock() fileChunks = append(fileChunks, chunk) glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index a2fff4792..56c358174 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -249,8 +249,10 @@ func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option compressed, compressErr := util.GzipData(data) // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) if compressErr == nil { - data = compressed - contentIsGzipped = true + if len(compressed) < len(data) { + data = compressed + contentIsGzipped = true + } } } else if option.IsInputCompressed { // just to get the clear data length @@ -290,7 +292,7 @@ func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option uploadResult.Name = option.Filename uploadResult.Mime = option.MimeType uploadResult.CipherKey = cipherKey - uploadResult.Size = uint32(clearDataLen) + uploadResult.Size = uint32(len(data)) if contentIsGzipped { uploadResult.Gzip = 1 } diff --git a/weed/s3api/s3api_key_rotation.go b/weed/s3api/s3api_key_rotation.go index f2d406fb7..1881f3696 100644 --- a/weed/s3api/s3api_key_rotation.go +++ b/weed/s3api/s3api_key_rotation.go @@ -182,7 +182,7 @@ func (s3a *S3ApiServer) rotateSSECChunk(chunk *filer_pb.FileChunk, sourceKey, de } // Download encrypted data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } @@ -251,7 +251,7 @@ func (s3a *S3ApiServer) rotateSSEKMSChunk(chunk *filer_pb.FileChunk, srcKeyID, d } // Download data (this would be encrypted with the old KMS key) - chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 67c40d0c3..1a4e104cf 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -1019,6 +1019,8 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R if isRangeRequest { w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + } else { + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) } headerSetTime = time.Since(tHeaderSet) diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 26775f9ae..d4ef3b52e 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -846,7 +846,7 @@ func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath strin } // Download and upload the chunk - chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } @@ -881,7 +881,7 @@ func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer offsetInChunk := overlapStart - chunkStart // Download and upload the chunk portion - chunkData, err := s3a.downloadChunkData(srcUrl, fileId, offsetInChunk, int64(rangeChunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, offsetInChunk, int64(rangeChunk.Size), originalChunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk range data: %w", err) } @@ -1199,10 +1199,40 @@ func (s3a *S3ApiServer) uploadChunkData(chunkData []byte, assignResult *filer_pb } // downloadChunkData downloads chunk data from the source URL -func (s3a *S3ApiServer) downloadChunkData(srcUrl, fileId string, offset, size int64) ([]byte, error) { +func (s3a *S3ApiServer) downloadChunkData(srcUrl, fileId string, offset, size int64, cipherKey []byte) ([]byte, error) { jwt := filer.JwtForVolumeServer(fileId) + // Only perform HEAD request for encrypted chunks to get physical size + if offset == 0 && len(cipherKey) > 0 { + req, err := http.NewRequest(http.MethodHead, srcUrl, nil) + if err == nil { + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + resp, err := util_http.GetGlobalHttpClient().Do(req) + if err == nil { + defer util_http.CloseResponse(resp) + if resp.StatusCode == http.StatusOK { + contentLengthStr := resp.Header.Get("Content-Length") + if contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64); err == nil { + // Validate contentLength fits in int32 range before comparison + if contentLength > int64(2147483647) { // math.MaxInt32 + return nil, fmt.Errorf("content length %d exceeds maximum int32 size", contentLength) + } + if contentLength > size { + size = contentLength + } + } + } + } + } + } + // Validate size fits in int32 range before conversion to int + if size > int64(2147483647) { // math.MaxInt32 + return nil, fmt.Errorf("chunk size %d exceeds maximum int32 size", size) + } + sizeInt := int(size) var chunkData []byte - shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, jwt, nil, false, false, offset, int(size), func(data []byte) { + shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, jwt, nil, false, false, offset, sizeInt, func(data []byte) { chunkData = append(chunkData, data...) }) if err != nil { @@ -1334,7 +1364,7 @@ func (s3a *S3ApiServer) copyMultipartSSEKMSChunk(chunk *filer_pb.FileChunk, dest } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -1433,7 +1463,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -1714,7 +1744,7 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -2076,7 +2106,7 @@ func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, cop } // Download encrypted chunk data - encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download encrypted chunk data: %w", err) } @@ -2295,7 +2325,7 @@ func (s3a *S3ApiServer) copyChunkWithSSEKMSReencryption(chunk *filer_pb.FileChun } // Download chunk data - chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size)) + chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey) if err != nil { return nil, fmt.Errorf("download chunk data: %w", err) } diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index e9e523138..959893e57 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -299,7 +299,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader // Apply bucket default encryption if no explicit encryption was provided // This implements AWS S3 behavior where bucket default encryption automatically applies - if !hasExplicitEncryption(customerKey, sseKMSKey, sseS3Key) { + if !hasExplicitEncryption(customerKey, sseKMSKey, sseS3Key) && !s3a.cipher { glog.V(4).Infof("putToFiler: no explicit encryption detected, checking for bucket default encryption") // Apply bucket default encryption and get the result @@ -392,6 +392,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader DataCenter: s3a.option.DataCenter, SaveSmallInline: false, // S3 API always creates chunks, never stores inline MimeType: r.Header.Get("Content-Type"), + Cipher: s3a.cipher, // encrypt data on volume servers AssignFunc: assignFunc, }) if err != nil { diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index ffb50e8c1..7a8062a7a 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -51,6 +51,7 @@ type S3ApiServerOption struct { ConcurrentUploadLimit int64 ConcurrentFileUploadLimit int64 EnableIam bool // Enable embedded IAM API on the same port + Cipher bool // encrypt data on volume servers } type S3ApiServer struct { @@ -70,7 +71,8 @@ type S3ApiServer struct { inFlightDataSize int64 inFlightUploads int64 inFlightDataLimitCond *sync.Cond - embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) + embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled) + cipher bool // encrypt data on volume servers } func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { @@ -154,6 +156,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven policyEngine: policyEngine, // Initialize bucket policy engine inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), + cipher: option.Cipher, } // Set s3a reference in circuit breaker for upload limiting