Browse Source

Add S3 volume encryption support with -s3.encryptVolumeData flag (#7890)

* Add S3 volume encryption support with -s3.encryptVolumeData flag

This change adds volume-level encryption support for S3 uploads, similar
to the existing -filer.encryptVolumeData option. Each chunk is encrypted
with its own auto-generated CipherKey when the flag is enabled.

Changes:
- Add -s3.encryptVolumeData flag to weed s3, weed server, and weed mini
- Wire Cipher option through S3ApiServer and ChunkedUploadOption
- Add integration tests for multi-chunk range reads with encryption
- Tests verify encryption works across chunk boundaries

Usage:
  weed s3 -encryptVolumeData
  weed server -s3 -s3.encryptVolumeData
  weed mini -s3.encryptVolumeData

Integration tests:
  go test -v -tags=integration -timeout 5m ./test/s3/sse/...

* Add GitHub Actions CI for S3 volume encryption tests

- Add test-volume-encryption target to Makefile that starts server with -s3.encryptVolumeData
- Add s3-volume-encryption job to GitHub Actions workflow
- Tests run with integration build tag and 10m timeout
- Server logs uploaded on failure for debugging

* Fix S3 client credentials to use environment variables

The test was using hardcoded credentials "any"/"any" but the Makefile
sets AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY to "some_access_key1"/
"some_secret_key1". Updated getS3Client() to read from environment
variables with fallback to "any"/"any" for manual testing.

* Change bucket creation errors from skip to fatal

Tests should fail, not skip, when bucket creation fails. This ensures
that credential mismatches and other configuration issues are caught
rather than silently skipped.

* Make copy and multipart test jobs fail instead of succeed

Changed exit 0 to exit 1 for s3-sse-copy-operations and s3-sse-multipart
jobs. These jobs document known limitations but should fail to ensure
the issues are tracked and addressed, not silently ignored.

* Hardcode S3 credentials to match Makefile

Changed from environment variables to hardcoded credentials
"some_access_key1"/"some_secret_key1" to match the Makefile
configuration. This ensures tests work reliably.

* fix Double Encryption

* fix Chunk Size Mismatch

* Added IsCompressed

* is gzipped

* fix copying

* only perform HEAD request when len(cipherKey) > 0

* Revert "Make copy and multipart test jobs fail instead of succeed"

This reverts commit bc34a7eb3c.

* fix security vulnerability

* fix security

* Update s3api_object_handlers_copy.go

* Update s3api_object_handlers_copy.go

* jwt to get content length
pull/7879/merge
Chris Lu 2 days ago
committed by GitHub
parent
commit
8d6bcddf60
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 50
      .github/workflows/s3-sse-tests.yml
  2. 33
      test/s3/sse/Makefile
  3. 448
      test/s3/sse/s3_volume_encryption_test.go
  4. 1
      weed/command/mini.go
  5. 3
      weed/command/s3.go
  6. 1
      weed/command/server.go
  7. 5
      weed/operation/upload_chunked.go
  8. 8
      weed/operation/upload_content.go
  9. 4
      weed/s3api/s3api_key_rotation.go
  10. 2
      weed/s3api/s3api_object_handlers.go
  11. 48
      weed/s3api/s3api_object_handlers_copy.go
  12. 3
      weed/s3api/s3api_object_handlers_put.go
  13. 5
      weed/s3api/s3api_server.go

50
.github/workflows/s3-sse-tests.yml

@ -345,3 +345,53 @@ jobs:
name: s3-sse-performance-logs name: s3-sse-performance-logs
path: test/s3/sse/weed-test*.log path: test/s3/sse/weed-test*.log
retention-days: 7 retention-days: 7
s3-volume-encryption:
name: S3 Volume Encryption Test
runs-on: ubuntu-22.04
timeout-minutes: 20
steps:
- name: Check out code
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: 'go.mod'
id: go
- name: Install SeaweedFS
run: |
go install -buildvcs=false
- name: Run S3 Volume Encryption Integration Tests
timeout-minutes: 15
working-directory: test/s3/sse
run: |
set -x
echo "=== System Information ==="
uname -a
free -h
# Run volume encryption tests with -s3.encryptVolumeData flag
echo "🚀 Running S3 volume encryption integration tests..."
make test-volume-encryption || {
echo "❌ Volume encryption tests failed, checking logs..."
if [ -f /tmp/seaweedfs-sse-mini.log ]; then
echo "=== Server logs ==="
tail -100 /tmp/seaweedfs-sse-mini.log
fi
echo "=== Process information ==="
ps aux | grep -E "(weed|test)" || true
exit 1
}
- name: Upload server logs on failure
if: failure()
uses: actions/upload-artifact@v6
with:
name: s3-volume-encryption-logs
path: /tmp/seaweedfs-sse-*.log
retention-days: 3

33
test/s3/sse/Makefile

@ -470,3 +470,36 @@ dev-kms: setup-openbao
@echo "OpenBao: $(OPENBAO_ADDR)" @echo "OpenBao: $(OPENBAO_ADDR)"
@echo "Token: $(OPENBAO_TOKEN)" @echo "Token: $(OPENBAO_TOKEN)"
@echo "Use 'make test-ssekms-integration' to run tests" @echo "Use 'make test-ssekms-integration' to run tests"
# Volume encryption integration tests
test-volume-encryption: build-weed
@echo "🚀 Starting S3 volume encryption integration tests..."
@echo "Starting SeaweedFS cluster with volume encryption enabled..."
@# Start server with -s3.encryptVolumeData flag
@mkdir -p /tmp/seaweedfs-test-sse
@rm -f /tmp/seaweedfs-sse-*.log || true
@sed -e 's/ACCESS_KEY_PLACEHOLDER/$(ACCESS_KEY)/g' \
-e 's/SECRET_KEY_PLACEHOLDER/$(SECRET_KEY)/g' \
s3-config-template.json > /tmp/seaweedfs-s3.json
@echo "Starting weed mini with S3 volume encryption..."
@AWS_ACCESS_KEY_ID=$(ACCESS_KEY) AWS_SECRET_ACCESS_KEY=$(SECRET_KEY) GLOG_v=4 $(SEAWEEDFS_BINARY) mini \
-dir=/tmp/seaweedfs-test-sse \
-s3.port=$(S3_PORT) \
-s3.config=/tmp/seaweedfs-s3.json \
-s3.encryptVolumeData \
-ip=127.0.0.1 \
> /tmp/seaweedfs-sse-mini.log 2>&1 & echo $$! > /tmp/weed-mini.pid
@echo "Checking S3 service is ready..."
@for i in $$(seq 1 30); do \
if curl -s http://127.0.0.1:$(S3_PORT) > /dev/null 2>&1; then \
echo "✅ S3 service is ready"; \
break; \
fi; \
sleep 1; \
done
@echo "Running volume encryption integration tests..."
@trap '$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true' EXIT; \
cd $(SEAWEEDFS_ROOT) && go test -v -tags=integration -timeout=10m -run "TestS3VolumeEncryption" ./test/s3/sse || exit 1; \
echo "✅ Volume encryption tests completed successfully"; \
$(MAKE) -C $(TEST_DIR) stop-seaweedfs-safe || true

448
test/s3/sse/s3_volume_encryption_test.go

@ -0,0 +1,448 @@
//go:build integration
// +build integration
package sse
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
// TestS3VolumeEncryptionRoundtrip tests that data uploaded with encryptVolumeData
// enabled can be read back correctly. This requires the S3 server to be started with
// -encryptVolumeData=true for the test to verify encryption is working.
//
// To run this test:
// 1. Start SeaweedFS: weed server -s3 -s3.encryptVolumeData=true
// 2. Run: go test -v -run TestS3VolumeEncryptionRoundtrip
func TestS3VolumeEncryptionRoundtrip(t *testing.T) {
svc := getS3Client(t)
bucket := fmt.Sprintf("volume-encryption-test-%d", time.Now().Unix())
// Create bucket
_, err := svc.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucket),
})
if err != nil {
t.Fatalf("Failed to create bucket: %v", err)
}
defer cleanupBucket(t, svc, bucket)
testCases := []struct {
name string
key string
content string
rangeReq string // Optional range request
}{
{
name: "small file",
key: "small.txt",
content: "Hello, encrypted world!",
},
{
name: "medium file",
key: "medium.txt",
content: strings.Repeat("SeaweedFS volume encryption test content. ", 1000),
},
{
name: "binary content",
key: "binary.bin",
content: string([]byte{0x00, 0x01, 0x02, 0xFF, 0xFE, 0xFD, 0x00, 0x80}),
},
{
name: "range request",
key: "range-test.txt",
content: "0123456789ABCDEFGHIJ",
rangeReq: "bytes=5-10",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Upload
_, err := svc.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(tc.key),
Body: strings.NewReader(tc.content),
})
if err != nil {
t.Fatalf("PutObject failed: %v", err)
}
// Download
getInput := &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(tc.key),
}
if tc.rangeReq != "" {
getInput.Range = aws.String(tc.rangeReq)
}
result, err := svc.GetObject(getInput)
if err != nil {
t.Fatalf("GetObject failed: %v", err)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
// Verify content
expected := tc.content
if tc.rangeReq != "" {
// For "bytes=5-10", we expect characters at positions 5-10 (inclusive)
expected = tc.content[5:11]
}
if string(data) != expected {
t.Errorf("Content mismatch:\n expected: %q\n got: %q", expected, string(data))
} else {
t.Logf("Successfully uploaded and downloaded %s (%d bytes)", tc.key, len(data))
}
})
}
}
// TestS3VolumeEncryptionMultiChunk tests large files that span multiple chunks
// to ensure encryption works correctly across chunk boundaries.
func TestS3VolumeEncryptionMultiChunk(t *testing.T) {
svc := getS3Client(t)
bucket := fmt.Sprintf("volume-encryption-multichunk-%d", time.Now().Unix())
// Create bucket
_, err := svc.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucket),
})
if err != nil {
t.Fatalf("Failed to create bucket: %v", err)
}
defer cleanupBucket(t, svc, bucket)
// Create a file larger than default chunk size (8MB)
// Use 10MB to ensure multiple chunks
largeContent := make([]byte, 10*1024*1024)
for i := range largeContent {
largeContent[i] = byte(i % 256)
}
key := "large-file.bin"
// Upload
_, err = svc.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: bytes.NewReader(largeContent),
})
if err != nil {
t.Fatalf("PutObject failed for large file: %v", err)
}
// Download full file
result, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
t.Fatalf("GetObject failed: %v", err)
}
defer result.Body.Close()
downloadedData, err := io.ReadAll(result.Body)
if err != nil {
t.Fatalf("Failed to read response body: %v", err)
}
if len(downloadedData) != len(largeContent) {
t.Errorf("Size mismatch: expected %d, got %d", len(largeContent), len(downloadedData))
}
if !bytes.Equal(downloadedData, largeContent) {
t.Errorf("Content mismatch in multi-chunk file")
// Find first mismatch
for i := 0; i < len(downloadedData) && i < len(largeContent); i++ {
if downloadedData[i] != largeContent[i] {
t.Errorf("First mismatch at byte %d: expected %02x, got %02x", i, largeContent[i], downloadedData[i])
break
}
}
} else {
t.Logf("Successfully uploaded and downloaded %d byte multi-chunk file", len(downloadedData))
}
// Test range request spanning chunk boundary (around 8MB)
rangeStart := int64(8*1024*1024 - 1000)
rangeEnd := int64(8*1024*1024 + 1000)
rangeResult, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Range: aws.String(fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd)),
})
if err != nil {
t.Fatalf("Range GetObject failed: %v", err)
}
defer rangeResult.Body.Close()
rangeData, err := io.ReadAll(rangeResult.Body)
if err != nil {
t.Fatalf("Failed to read range response: %v", err)
}
expectedRange := largeContent[rangeStart : rangeEnd+1]
if !bytes.Equal(rangeData, expectedRange) {
t.Errorf("Range request content mismatch at chunk boundary")
} else {
t.Logf("Successfully retrieved range spanning chunk boundary (%d bytes)", len(rangeData))
}
}
// TestS3VolumeEncryptionMultiChunkRangeRead tests range reads that span multiple chunks:
// - Part of chunk 1
// - Whole chunk 2
// - Part of chunk 3
// This is critical for verifying that cipher decryption works correctly when
// reading across chunk boundaries with the cipherKey from each chunk.
func TestS3VolumeEncryptionMultiChunkRangeRead(t *testing.T) {
svc := getS3Client(t)
bucket := fmt.Sprintf("volume-encryption-multirange-%d", time.Now().Unix())
// Create bucket
_, err := svc.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucket),
})
if err != nil {
t.Fatalf("Failed to create bucket: %v", err)
}
defer cleanupBucket(t, svc, bucket)
// Default chunk size is 8MB. Create a file with 3+ chunks (25MB)
// to ensure we have multiple complete chunks
const chunkSize = 8 * 1024 * 1024 // 8MB
const fileSize = 25 * 1024 * 1024 // 25MB = 3 chunks + partial 4th
largeContent := make([]byte, fileSize)
for i := range largeContent {
// Use recognizable pattern: each byte encodes its position
largeContent[i] = byte(i % 256)
}
key := "multi-chunk-range-test.bin"
// Upload
_, err = svc.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: bytes.NewReader(largeContent),
})
if err != nil {
t.Fatalf("PutObject failed: %v", err)
}
t.Logf("Uploaded %d byte file (%d chunks of %d bytes)", fileSize, (fileSize+chunkSize-1)/chunkSize, chunkSize)
// Test cases for range reads spanning multiple chunks
testCases := []struct {
name string
rangeStart int64
rangeEnd int64
desc string
}{
{
name: "part_chunk1_whole_chunk2_part_chunk3",
rangeStart: chunkSize - 1000, // 1000 bytes before end of chunk 1
rangeEnd: 2*chunkSize + 1000, // 1000 bytes into chunk 3
desc: "Spans from end of chunk 1 through entire chunk 2 into beginning of chunk 3",
},
{
name: "last_byte_chunk1_through_first_byte_chunk3",
rangeStart: chunkSize - 1, // Last byte of chunk 1
rangeEnd: 2 * chunkSize, // First byte of chunk 3
desc: "Minimal span: last byte of chunk 1, entire chunk 2, first byte of chunk 3",
},
{
name: "middle_chunk1_to_middle_chunk3",
rangeStart: chunkSize / 2, // Middle of chunk 1
rangeEnd: 2*chunkSize + chunkSize/2, // Middle of chunk 3
desc: "From middle of chunk 1 to middle of chunk 3",
},
{
name: "half_chunk1_whole_chunk2_half_chunk3",
rangeStart: chunkSize / 2, // Half of chunk 1
rangeEnd: 2*chunkSize + chunkSize/2 - 1, // Half of chunk 3
desc: "Half of each boundary chunk, whole middle chunk",
},
{
name: "single_byte_each_boundary",
rangeStart: chunkSize - 1, // 1 byte from chunk 1
rangeEnd: 2 * chunkSize, // 1 byte from chunk 3
desc: "1 byte from chunk 1, entire chunk 2, 1 byte from chunk 3",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Logf("Test: %s", tc.desc)
t.Logf("Range: bytes=%d-%d (spanning bytes at offsets across chunk boundaries)", tc.rangeStart, tc.rangeEnd)
result, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Range: aws.String(fmt.Sprintf("bytes=%d-%d", tc.rangeStart, tc.rangeEnd)),
})
if err != nil {
t.Fatalf("Range GetObject failed: %v", err)
}
defer result.Body.Close()
rangeData, err := io.ReadAll(result.Body)
if err != nil {
t.Fatalf("Failed to read range response: %v", err)
}
expectedLen := tc.rangeEnd - tc.rangeStart + 1
if int64(len(rangeData)) != expectedLen {
t.Errorf("Size mismatch: expected %d bytes, got %d", expectedLen, len(rangeData))
}
expectedRange := largeContent[tc.rangeStart : tc.rangeEnd+1]
if !bytes.Equal(rangeData, expectedRange) {
t.Errorf("Content mismatch in multi-chunk range read")
// Find first mismatch for debugging
for i := 0; i < len(rangeData) && i < len(expectedRange); i++ {
if rangeData[i] != expectedRange[i] {
globalOffset := tc.rangeStart + int64(i)
chunkNum := globalOffset / chunkSize
offsetInChunk := globalOffset % chunkSize
t.Errorf("First mismatch at byte %d (chunk %d, offset %d in chunk): expected %02x, got %02x",
i, chunkNum, offsetInChunk, expectedRange[i], rangeData[i])
break
}
}
} else {
t.Logf("✓ Successfully read %d bytes spanning chunks (offsets %d-%d)", len(rangeData), tc.rangeStart, tc.rangeEnd)
}
})
}
}
// TestS3VolumeEncryptionCopy tests that copying encrypted objects works correctly.
func TestS3VolumeEncryptionCopy(t *testing.T) {
svc := getS3Client(t)
bucket := fmt.Sprintf("volume-encryption-copy-%d", time.Now().Unix())
// Create bucket
_, err := svc.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucket),
})
if err != nil {
t.Fatalf("Failed to create bucket: %v", err)
}
defer cleanupBucket(t, svc, bucket)
srcKey := "source-object.txt"
dstKey := "copied-object.txt"
content := "Content to be copied with volume-level encryption"
// Upload source
_, err = svc.PutObject(&s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(srcKey),
Body: strings.NewReader(content),
})
if err != nil {
t.Fatalf("PutObject failed: %v", err)
}
// Copy object
_, err = svc.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(dstKey),
CopySource: aws.String(fmt.Sprintf("%s/%s", bucket, srcKey)),
})
if err != nil {
t.Fatalf("CopyObject failed: %v", err)
}
// Read copied object
result, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(dstKey),
})
if err != nil {
t.Fatalf("GetObject failed for copied object: %v", err)
}
defer result.Body.Close()
data, err := io.ReadAll(result.Body)
if err != nil {
t.Fatalf("Failed to read copied object: %v", err)
}
if string(data) != content {
t.Errorf("Copied content mismatch:\n expected: %q\n got: %q", content, string(data))
} else {
t.Logf("Successfully copied encrypted object")
}
}
// Helper functions
func getS3Client(t *testing.T) *s3.S3 {
// Use credentials that match the Makefile configuration
// ACCESS_KEY ?= some_access_key1
// SECRET_KEY ?= some_secret_key1
sess, err := session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
Endpoint: aws.String("http://localhost:8333"),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(true),
Credentials: credentials.NewStaticCredentials("some_access_key1", "some_secret_key1", ""),
})
if err != nil {
t.Fatalf("Failed to create session: %v", err)
}
return s3.New(sess)
}
func cleanupBucket(t *testing.T, svc *s3.S3, bucket string) {
ctx := context.Background()
_ = ctx
// List and delete all objects
listResult, err := svc.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
})
if err != nil {
t.Logf("Warning: failed to list objects for cleanup: %v", err)
return
}
for _, obj := range listResult.Contents {
_, err := svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: obj.Key,
})
if err != nil {
t.Logf("Warning: failed to delete object %s: %v", *obj.Key, err)
}
}
// Delete bucket
_, err = svc.DeleteBucket(&s3.DeleteBucketInput{
Bucket: aws.String(bucket),
})
if err != nil {
t.Logf("Warning: failed to delete bucket %s: %v", bucket, err)
}
}

1
weed/command/mini.go

@ -229,6 +229,7 @@ func initMiniS3Flags() {
miniS3Options.concurrentFileUploadLimit = cmdMini.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads") miniS3Options.concurrentFileUploadLimit = cmdMini.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads")
miniS3Options.enableIam = cmdMini.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same port") miniS3Options.enableIam = cmdMini.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same port")
miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center") miniS3Options.dataCenter = cmdMini.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center")
miniS3Options.cipher = cmdMini.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads")
miniS3Options.config = miniS3Config miniS3Options.config = miniS3Config
miniS3Options.iamConfig = miniIamConfig miniS3Options.iamConfig = miniIamConfig
miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") miniS3Options.auditLogConfig = cmdMini.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")

3
weed/command/s3.go

@ -62,6 +62,7 @@ type S3Options struct {
enableIam *bool enableIam *bool
debug *bool debug *bool
debugPort *int debugPort *int
cipher *bool
} }
func init() { func init() {
@ -93,6 +94,7 @@ func init() {
s3StandaloneOptions.enableIam = cmdS3.Flag.Bool("iam", true, "enable embedded IAM API on the same port") s3StandaloneOptions.enableIam = cmdS3.Flag.Bool("iam", true, "enable embedded IAM API on the same port")
s3StandaloneOptions.debug = cmdS3.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") s3StandaloneOptions.debug = cmdS3.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
s3StandaloneOptions.debugPort = cmdS3.Flag.Int("debug.port", 6060, "http port for debugging") s3StandaloneOptions.debugPort = cmdS3.Flag.Int("debug.port", 6060, "http port for debugging")
s3StandaloneOptions.cipher = cmdS3.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
} }
var cmdS3 = &Command{ var cmdS3 = &Command{
@ -290,6 +292,7 @@ func (s3opt *S3Options) startS3Server() bool {
ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024, ConcurrentUploadLimit: int64(*s3opt.concurrentUploadLimitMB) * 1024 * 1024,
ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit), ConcurrentFileUploadLimit: int64(*s3opt.concurrentFileUploadLimit),
EnableIam: *s3opt.enableIam, // Embedded IAM API (enabled by default) EnableIam: *s3opt.enableIam, // Embedded IAM API (enabled by default)
Cipher: *s3opt.cipher, // encrypt data on volume servers
}) })
if s3ApiServer_err != nil { if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)

1
weed/command/server.go

@ -175,6 +175,7 @@ func init() {
s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 0, "limit total concurrent upload size for S3, 0 means unlimited") s3Options.concurrentUploadLimitMB = cmdServer.Flag.Int("s3.concurrentUploadLimitMB", 0, "limit total concurrent upload size for S3, 0 means unlimited")
s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited") s3Options.concurrentFileUploadLimit = cmdServer.Flag.Int("s3.concurrentFileUploadLimit", 0, "limit number of concurrent file uploads for S3, 0 means unlimited")
s3Options.enableIam = cmdServer.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port") s3Options.enableIam = cmdServer.Flag.Bool("s3.iam", true, "enable embedded IAM API on the same S3 port")
s3Options.cipher = cmdServer.Flag.Bool("s3.encryptVolumeData", false, "encrypt data on volume servers for S3 uploads")
sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port") sftpOptions.port = cmdServer.Flag.Int("sftp.port", 2022, "SFTP server listen port")
sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication") sftpOptions.sshPrivateKey = cmdServer.Flag.String("sftp.sshPrivateKey", "", "path to the SSH private key file for host authentication")

5
weed/operation/upload_chunked.go

@ -34,6 +34,7 @@ type ChunkedUploadOption struct {
SaveSmallInline bool SaveSmallInline bool
Jwt security.EncodedJwt Jwt security.EncodedJwt
MimeType string MimeType string
Cipher bool // encrypt data on volume servers
AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error)
UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing
} }
@ -172,7 +173,7 @@ uploadLoop:
uploadOption := &UploadOption{ uploadOption := &UploadOption{
UploadUrl: uploadUrl, UploadUrl: uploadUrl,
Cipher: false,
Cipher: opt.Cipher,
IsInputCompressed: false, IsInputCompressed: false,
MimeType: opt.MimeType, MimeType: opt.MimeType,
PairMap: nil, PairMap: nil,
@ -220,8 +221,8 @@ uploadLoop:
ETag: uploadResult.ContentMd5, ETag: uploadResult.ContentMd5,
Fid: fid, Fid: fid,
CipherKey: uploadResult.CipherKey, CipherKey: uploadResult.CipherKey,
IsCompressed: uploadResult.Gzip > 0,
} }
fileChunksLock.Lock() fileChunksLock.Lock()
fileChunks = append(fileChunks, chunk) fileChunks = append(fileChunks, chunk)
glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size))

8
weed/operation/upload_content.go

@ -249,8 +249,10 @@ func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option
compressed, compressErr := util.GzipData(data) compressed, compressErr := util.GzipData(data)
// fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
if compressErr == nil { if compressErr == nil {
data = compressed
contentIsGzipped = true
if len(compressed) < len(data) {
data = compressed
contentIsGzipped = true
}
} }
} else if option.IsInputCompressed { } else if option.IsInputCompressed {
// just to get the clear data length // just to get the clear data length
@ -290,7 +292,7 @@ func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option
uploadResult.Name = option.Filename uploadResult.Name = option.Filename
uploadResult.Mime = option.MimeType uploadResult.Mime = option.MimeType
uploadResult.CipherKey = cipherKey uploadResult.CipherKey = cipherKey
uploadResult.Size = uint32(clearDataLen)
uploadResult.Size = uint32(len(data))
if contentIsGzipped { if contentIsGzipped {
uploadResult.Gzip = 1 uploadResult.Gzip = 1
} }

4
weed/s3api/s3api_key_rotation.go

@ -182,7 +182,7 @@ func (s3a *S3ApiServer) rotateSSECChunk(chunk *filer_pb.FileChunk, sourceKey, de
} }
// Download encrypted data // Download encrypted data
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size))
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey)
if err != nil { if err != nil {
return nil, fmt.Errorf("download chunk data: %w", err) return nil, fmt.Errorf("download chunk data: %w", err)
} }
@ -251,7 +251,7 @@ func (s3a *S3ApiServer) rotateSSEKMSChunk(chunk *filer_pb.FileChunk, srcKeyID, d
} }
// Download data (this would be encrypted with the old KMS key) // Download data (this would be encrypted with the old KMS key)
chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size))
chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey)
if err != nil { if err != nil {
return nil, fmt.Errorf("download chunk data: %w", err) return nil, fmt.Errorf("download chunk data: %w", err)
} }

2
weed/s3api/s3api_object_handlers.go

@ -1019,6 +1019,8 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
if isRangeRequest { if isRangeRequest {
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize))
w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
} else {
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
} }
headerSetTime = time.Since(tHeaderSet) headerSetTime = time.Since(tHeaderSet)

48
weed/s3api/s3api_object_handlers_copy.go

@ -846,7 +846,7 @@ func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath strin
} }
// Download and upload the chunk // Download and upload the chunk
chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size))
chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey)
if err != nil { if err != nil {
return nil, fmt.Errorf("download chunk data: %w", err) return nil, fmt.Errorf("download chunk data: %w", err)
} }
@ -881,7 +881,7 @@ func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer
offsetInChunk := overlapStart - chunkStart offsetInChunk := overlapStart - chunkStart
// Download and upload the chunk portion // Download and upload the chunk portion
chunkData, err := s3a.downloadChunkData(srcUrl, fileId, offsetInChunk, int64(rangeChunk.Size))
chunkData, err := s3a.downloadChunkData(srcUrl, fileId, offsetInChunk, int64(rangeChunk.Size), originalChunk.CipherKey)
if err != nil { if err != nil {
return nil, fmt.Errorf("download chunk range data: %w", err) return nil, fmt.Errorf("download chunk range data: %w", err)
} }
@ -1199,10 +1199,40 @@ func (s3a *S3ApiServer) uploadChunkData(chunkData []byte, assignResult *filer_pb
} }
// downloadChunkData downloads chunk data from the source URL // downloadChunkData downloads chunk data from the source URL
func (s3a *S3ApiServer) downloadChunkData(srcUrl, fileId string, offset, size int64) ([]byte, error) {
func (s3a *S3ApiServer) downloadChunkData(srcUrl, fileId string, offset, size int64, cipherKey []byte) ([]byte, error) {
jwt := filer.JwtForVolumeServer(fileId) jwt := filer.JwtForVolumeServer(fileId)
// Only perform HEAD request for encrypted chunks to get physical size
if offset == 0 && len(cipherKey) > 0 {
req, err := http.NewRequest(http.MethodHead, srcUrl, nil)
if err == nil {
if jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(jwt))
}
resp, err := util_http.GetGlobalHttpClient().Do(req)
if err == nil {
defer util_http.CloseResponse(resp)
if resp.StatusCode == http.StatusOK {
contentLengthStr := resp.Header.Get("Content-Length")
if contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64); err == nil {
// Validate contentLength fits in int32 range before comparison
if contentLength > int64(2147483647) { // math.MaxInt32
return nil, fmt.Errorf("content length %d exceeds maximum int32 size", contentLength)
}
if contentLength > size {
size = contentLength
}
}
}
}
}
}
// Validate size fits in int32 range before conversion to int
if size > int64(2147483647) { // math.MaxInt32
return nil, fmt.Errorf("chunk size %d exceeds maximum int32 size", size)
}
sizeInt := int(size)
var chunkData []byte var chunkData []byte
shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, jwt, nil, false, false, offset, int(size), func(data []byte) {
shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, jwt, nil, false, false, offset, sizeInt, func(data []byte) {
chunkData = append(chunkData, data...) chunkData = append(chunkData, data...)
}) })
if err != nil { if err != nil {
@ -1334,7 +1364,7 @@ func (s3a *S3ApiServer) copyMultipartSSEKMSChunk(chunk *filer_pb.FileChunk, dest
} }
// Download encrypted chunk data // Download encrypted chunk data
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size))
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey)
if err != nil { if err != nil {
return nil, fmt.Errorf("download encrypted chunk data: %w", err) return nil, fmt.Errorf("download encrypted chunk data: %w", err)
} }
@ -1433,7 +1463,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo
} }
// Download encrypted chunk data // Download encrypted chunk data
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size))
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("download encrypted chunk data: %w", err) return nil, nil, fmt.Errorf("download encrypted chunk data: %w", err)
} }
@ -1714,7 +1744,7 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour
} }
// Download encrypted chunk data // Download encrypted chunk data
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size))
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey)
if err != nil { if err != nil {
return nil, fmt.Errorf("download encrypted chunk data: %w", err) return nil, fmt.Errorf("download encrypted chunk data: %w", err)
} }
@ -2076,7 +2106,7 @@ func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, cop
} }
// Download encrypted chunk data // Download encrypted chunk data
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size))
encryptedData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey)
if err != nil { if err != nil {
return nil, fmt.Errorf("download encrypted chunk data: %w", err) return nil, fmt.Errorf("download encrypted chunk data: %w", err)
} }
@ -2295,7 +2325,7 @@ func (s3a *S3ApiServer) copyChunkWithSSEKMSReencryption(chunk *filer_pb.FileChun
} }
// Download chunk data // Download chunk data
chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size))
chunkData, err := s3a.downloadChunkData(srcUrl, fileId, 0, int64(chunk.Size), chunk.CipherKey)
if err != nil { if err != nil {
return nil, fmt.Errorf("download chunk data: %w", err) return nil, fmt.Errorf("download chunk data: %w", err)
} }

3
weed/s3api/s3api_object_handlers_put.go

@ -299,7 +299,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
// Apply bucket default encryption if no explicit encryption was provided // Apply bucket default encryption if no explicit encryption was provided
// This implements AWS S3 behavior where bucket default encryption automatically applies // This implements AWS S3 behavior where bucket default encryption automatically applies
if !hasExplicitEncryption(customerKey, sseKMSKey, sseS3Key) {
if !hasExplicitEncryption(customerKey, sseKMSKey, sseS3Key) && !s3a.cipher {
glog.V(4).Infof("putToFiler: no explicit encryption detected, checking for bucket default encryption") glog.V(4).Infof("putToFiler: no explicit encryption detected, checking for bucket default encryption")
// Apply bucket default encryption and get the result // Apply bucket default encryption and get the result
@ -392,6 +392,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
DataCenter: s3a.option.DataCenter, DataCenter: s3a.option.DataCenter,
SaveSmallInline: false, // S3 API always creates chunks, never stores inline SaveSmallInline: false, // S3 API always creates chunks, never stores inline
MimeType: r.Header.Get("Content-Type"), MimeType: r.Header.Get("Content-Type"),
Cipher: s3a.cipher, // encrypt data on volume servers
AssignFunc: assignFunc, AssignFunc: assignFunc,
}) })
if err != nil { if err != nil {

5
weed/s3api/s3api_server.go

@ -51,6 +51,7 @@ type S3ApiServerOption struct {
ConcurrentUploadLimit int64 ConcurrentUploadLimit int64
ConcurrentFileUploadLimit int64 ConcurrentFileUploadLimit int64
EnableIam bool // Enable embedded IAM API on the same port EnableIam bool // Enable embedded IAM API on the same port
Cipher bool // encrypt data on volume servers
} }
type S3ApiServer struct { type S3ApiServer struct {
@ -70,7 +71,8 @@ type S3ApiServer struct {
inFlightDataSize int64 inFlightDataSize int64
inFlightUploads int64 inFlightUploads int64
inFlightDataLimitCond *sync.Cond inFlightDataLimitCond *sync.Cond
embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled)
embeddedIam *EmbeddedIamApi // Embedded IAM API server (when enabled)
cipher bool // encrypt data on volume servers
} }
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) { func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
@ -154,6 +156,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven
policyEngine: policyEngine, // Initialize bucket policy engine policyEngine: policyEngine, // Initialize bucket policy engine
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
cipher: option.Cipher,
} }
// Set s3a reference in circuit breaker for upload limiting // Set s3a reference in circuit breaker for upload limiting

Loading…
Cancel
Save