diff --git a/.github/workflows/tus-tests.yml b/.github/workflows/tus-tests.yml new file mode 100644 index 000000000..7fa8818ce --- /dev/null +++ b/.github/workflows/tus-tests.yml @@ -0,0 +1,114 @@ +name: "TUS Protocol Tests" + +on: + pull_request: + paths: + - 'weed/server/filer_server_tus*.go' + - 'weed/server/filer_server.go' + - 'test/tus/**' + - '.github/workflows/tus-tests.yml' + push: + branches: [ master, main ] + paths: + - 'weed/server/filer_server_tus*.go' + - 'weed/server/filer_server.go' + - 'test/tus/**' + +concurrency: + group: ${{ github.head_ref || github.ref }}/tus-tests + cancel-in-progress: true + +permissions: + contents: read + +defaults: + run: + working-directory: weed + +jobs: + tus-integration-tests: + name: TUS Protocol Integration Tests + runs-on: ubuntu-22.04 + timeout-minutes: 20 + + steps: + - name: Check out code + uses: actions/checkout@v6 + + - name: Set up Go + uses: actions/setup-go@v6 + with: + go-version-file: 'go.mod' + id: go + + - name: Install SeaweedFS + run: | + go install -buildvcs=false + + - name: Run TUS Integration Tests + timeout-minutes: 15 + working-directory: test/tus + run: | + set -x + echo "=== System Information ===" + uname -a + free -h + df -h + echo "=== Starting TUS Tests ===" + + # Run tests with automatic server management + make test-with-server || { + echo "TUS integration tests failed, checking logs..." + if [ -f /tmp/seaweedfs-tus-filer.log ]; then + echo "=== Filer logs ===" + tail -100 /tmp/seaweedfs-tus-filer.log + fi + if [ -f /tmp/seaweedfs-tus-master.log ]; then + echo "=== Master logs ===" + tail -50 /tmp/seaweedfs-tus-master.log + fi + if [ -f /tmp/seaweedfs-tus-volume.log ]; then + echo "=== Volume logs ===" + tail -50 /tmp/seaweedfs-tus-volume.log + fi + exit 1 + } + + - name: Show server logs on failure + if: failure() + working-directory: test/tus + run: | + echo "=== Filer Server Logs ===" + if [ -f /tmp/seaweedfs-tus-filer.log ]; then + echo "Last 100 lines of filer logs:" + tail -100 /tmp/seaweedfs-tus-filer.log + else + echo "No filer log file found" + fi + + echo "=== Master Server Logs ===" + if [ -f /tmp/seaweedfs-tus-master.log ]; then + tail -50 /tmp/seaweedfs-tus-master.log + else + echo "No master log file found" + fi + + echo "=== Volume Server Logs ===" + if [ -f /tmp/seaweedfs-tus-volume.log ]; then + tail -50 /tmp/seaweedfs-tus-volume.log + else + echo "No volume log file found" + fi + + echo "=== Test Environment ===" + ps aux | grep -E "(weed|test)" || true + netstat -tlnp 2>/dev/null | grep -E "(18888|19333|18080)" || true + + - name: Upload test logs on failure + if: failure() + uses: actions/upload-artifact@v5 + with: + name: tus-test-logs + path: | + /tmp/seaweedfs-tus-*.log + retention-days: 3 diff --git a/.gitignore b/.gitignore index cd240ab6d..81b4b107d 100644 --- a/.gitignore +++ b/.gitignore @@ -124,3 +124,4 @@ ADVANCED_IAM_DEVELOPMENT_PLAN.md *.log weed-iam test/kafka/kafka-client-loadtest/weed-linux-arm64 +/test/tus/filerldb2 diff --git a/test/s3/sse/github_7562_copy_test.go b/test/s3/sse/github_7562_copy_test.go new file mode 100644 index 000000000..5831c0b80 --- /dev/null +++ b/test/s3/sse/github_7562_copy_test.go @@ -0,0 +1,505 @@ +package sse_test + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestGitHub7562CopyFromEncryptedToTempToEncrypted reproduces the exact scenario from +// GitHub issue #7562: copying from an encrypted bucket to a temp bucket, then to another +// encrypted bucket fails with InternalError. +// +// Reproduction steps: +// 1. Create source bucket with SSE-S3 encryption enabled +// 2. Upload object (automatically encrypted) +// 3. Create temp bucket (no encryption) +// 4. Copy object from source to temp (decrypts) +// 5. Delete source bucket +// 6. Create destination bucket with SSE-S3 encryption +// 7. Copy object from temp to dest (should re-encrypt) - THIS FAILS +func TestGitHub7562CopyFromEncryptedToTempToEncrypted(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + // Create three buckets + srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-src-") + require.NoError(t, err, "Failed to create source bucket") + + tempBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-temp-") + require.NoError(t, err, "Failed to create temp bucket") + + destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-dest-") + require.NoError(t, err, "Failed to create destination bucket") + + // Cleanup at the end + defer func() { + // Clean up in reverse order of creation + cleanupTestBucket(ctx, client, destBucket) + cleanupTestBucket(ctx, client, tempBucket) + // Note: srcBucket is deleted during the test + }() + + testData := []byte("Test data for GitHub issue #7562 - copy from encrypted to temp to encrypted bucket") + objectKey := "demo-file.txt" + + t.Logf("[1] Creating source bucket with SSE-S3 default encryption: %s", srcBucket) + + // Step 1: Enable SSE-S3 default encryption on source bucket + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(srcBucket), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set source bucket default encryption") + + t.Log("[2] Uploading demo object to source bucket") + + // Step 2: Upload object to source bucket (will be automatically encrypted) + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(objectKey), + Body: bytes.NewReader(testData), + // No encryption header - bucket default applies + }) + require.NoError(t, err, "Failed to upload to source bucket") + + // Verify source object is encrypted + srcHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to HEAD source object") + assert.Equal(t, types.ServerSideEncryptionAes256, srcHead.ServerSideEncryption, + "Source object should be SSE-S3 encrypted") + t.Logf("Source object encryption: %v", srcHead.ServerSideEncryption) + + t.Logf("[3] Creating temp bucket (no encryption): %s", tempBucket) + // Temp bucket already created without encryption + + t.Log("[4] Copying object from source to temp (should decrypt)") + + // Step 4: Copy to temp bucket (no encryption = decrypts) + _, err = client.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(tempBucket), + Key: aws.String(objectKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)), + // No encryption header - data stored unencrypted + }) + require.NoError(t, err, "Failed to copy to temp bucket") + + // Verify temp object is NOT encrypted + tempHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(tempBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to HEAD temp object") + assert.Empty(t, tempHead.ServerSideEncryption, "Temp object should NOT be encrypted") + t.Logf("Temp object encryption: %v (should be empty)", tempHead.ServerSideEncryption) + + // Verify temp object content + tempGet, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(tempBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to GET temp object") + tempData, err := io.ReadAll(tempGet.Body) + tempGet.Body.Close() + require.NoError(t, err, "Failed to read temp object") + assertDataEqual(t, testData, tempData, "Temp object data should match original") + + t.Log("[5] Deleting original source bucket") + + // Step 5: Delete source bucket + err = cleanupTestBucket(ctx, client, srcBucket) + require.NoError(t, err, "Failed to delete source bucket") + + t.Logf("[6] Creating destination bucket with SSE-S3 encryption: %s", destBucket) + + // Step 6: Enable SSE-S3 default encryption on destination bucket + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(destBucket), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set destination bucket default encryption") + + t.Log("[7] Copying object from temp to dest (should re-encrypt) - THIS IS WHERE #7562 FAILS") + + // Step 7: Copy from temp to dest bucket (should re-encrypt with SSE-S3) + // THIS IS THE STEP THAT FAILS IN GITHUB ISSUE #7562 + _, err = client.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)), + // No encryption header - bucket default should apply + }) + require.NoError(t, err, "GitHub #7562: Failed to copy from temp to encrypted dest bucket") + + // Verify destination object is encrypted + destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to HEAD destination object") + assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption, + "Destination object should be SSE-S3 encrypted via bucket default") + t.Logf("Destination object encryption: %v", destHead.ServerSideEncryption) + + // Verify destination object content is correct + destGet, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to GET destination object") + destData, err := io.ReadAll(destGet.Body) + destGet.Body.Close() + require.NoError(t, err, "Failed to read destination object") + assertDataEqual(t, testData, destData, "GitHub #7562: Destination object data mismatch after re-encryption") + + t.Log("[done] GitHub #7562 reproduction test completed successfully!") +} + +// TestGitHub7562SimpleScenario tests the simpler variant: just copy unencrypted to encrypted bucket +func TestGitHub7562SimpleScenario(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + // Create two buckets + srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-simple-src-") + require.NoError(t, err, "Failed to create source bucket") + defer cleanupTestBucket(ctx, client, srcBucket) + + destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-simple-dest-") + require.NoError(t, err, "Failed to create destination bucket") + defer cleanupTestBucket(ctx, client, destBucket) + + testData := []byte("Simple test for unencrypted to encrypted copy") + objectKey := "test-object.txt" + + t.Logf("Source bucket (no encryption): %s", srcBucket) + t.Logf("Dest bucket (SSE-S3 default): %s", destBucket) + + // Upload to unencrypted source bucket + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(objectKey), + Body: bytes.NewReader(testData), + }) + require.NoError(t, err, "Failed to upload to source bucket") + + // Enable SSE-S3 on destination bucket + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(destBucket), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set dest bucket encryption") + + // Copy to encrypted bucket (should use bucket default encryption) + _, err = client.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)), + }) + require.NoError(t, err, "Failed to copy to encrypted bucket") + + // Verify destination is encrypted + destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to HEAD dest object") + assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption, + "Object should be encrypted via bucket default") + + // Verify content + destGet, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to GET dest object") + destData, err := io.ReadAll(destGet.Body) + destGet.Body.Close() + require.NoError(t, err, "Failed to read dest object") + assertDataEqual(t, testData, destData, "Data mismatch") +} + +// TestGitHub7562DebugMetadata helps debug what metadata is present on objects at each step +func TestGitHub7562DebugMetadata(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + // Create three buckets + srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-debug-src-") + require.NoError(t, err, "Failed to create source bucket") + + tempBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-debug-temp-") + require.NoError(t, err, "Failed to create temp bucket") + + destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-debug-dest-") + require.NoError(t, err, "Failed to create destination bucket") + + defer func() { + cleanupTestBucket(ctx, client, destBucket) + cleanupTestBucket(ctx, client, tempBucket) + }() + + testData := []byte("Debug metadata test for GitHub #7562") + objectKey := "debug-file.txt" + + // Enable SSE-S3 on source + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(srcBucket), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set source bucket encryption") + + // Upload + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(objectKey), + Body: bytes.NewReader(testData), + }) + require.NoError(t, err, "Failed to upload") + + // Log source object headers + srcHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to HEAD source") + t.Logf("=== SOURCE OBJECT (encrypted) ===") + t.Logf("ServerSideEncryption: %v", srcHead.ServerSideEncryption) + t.Logf("Metadata: %v", srcHead.Metadata) + t.Logf("ContentLength: %d", aws.ToInt64(srcHead.ContentLength)) + + // Copy to temp + _, err = client.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(tempBucket), + Key: aws.String(objectKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)), + }) + require.NoError(t, err, "Failed to copy to temp") + + // Log temp object headers + tempHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(tempBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to HEAD temp") + t.Logf("=== TEMP OBJECT (should be unencrypted) ===") + t.Logf("ServerSideEncryption: %v (should be empty)", tempHead.ServerSideEncryption) + t.Logf("Metadata: %v", tempHead.Metadata) + t.Logf("ContentLength: %d", aws.ToInt64(tempHead.ContentLength)) + + // Verify temp is NOT encrypted + if tempHead.ServerSideEncryption != "" { + t.Logf("WARNING: Temp object unexpectedly has encryption: %v", tempHead.ServerSideEncryption) + } + + // Delete source bucket + cleanupTestBucket(ctx, client, srcBucket) + + // Enable SSE-S3 on dest + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(destBucket), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set dest bucket encryption") + + // Copy to dest - THIS IS WHERE #7562 FAILS + t.Log("=== COPYING TO ENCRYPTED DEST ===") + _, err = client.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)), + }) + if err != nil { + t.Logf("!!! COPY FAILED (GitHub #7562): %v", err) + t.FailNow() + } + + // Log dest object headers + destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to HEAD dest") + t.Logf("=== DEST OBJECT (should be encrypted) ===") + t.Logf("ServerSideEncryption: %v", destHead.ServerSideEncryption) + t.Logf("Metadata: %v", destHead.Metadata) + t.Logf("ContentLength: %d", aws.ToInt64(destHead.ContentLength)) + + // Verify dest IS encrypted + assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption, + "Dest object should be encrypted") + + // Verify content is readable + destGet, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to GET dest") + destData, err := io.ReadAll(destGet.Body) + destGet.Body.Close() + require.NoError(t, err, "Failed to read dest") + assertDataEqual(t, testData, destData, "Data mismatch") + + t.Log("=== DEBUG TEST PASSED ===") +} + +// TestGitHub7562LargeFile tests the issue with larger files that might trigger multipart handling +func TestGitHub7562LargeFile(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-large-src-") + require.NoError(t, err, "Failed to create source bucket") + + tempBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-large-temp-") + require.NoError(t, err, "Failed to create temp bucket") + + destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-large-dest-") + require.NoError(t, err, "Failed to create destination bucket") + + defer func() { + cleanupTestBucket(ctx, client, destBucket) + cleanupTestBucket(ctx, client, tempBucket) + }() + + // Use larger file to potentially trigger different code paths + testData := generateTestData(5 * 1024 * 1024) // 5MB + objectKey := "large-file.bin" + + t.Logf("Testing with %d byte file", len(testData)) + + // Enable SSE-S3 on source + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(srcBucket), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set source bucket encryption") + + // Upload + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(srcBucket), + Key: aws.String(objectKey), + Body: bytes.NewReader(testData), + }) + require.NoError(t, err, "Failed to upload") + + // Copy to temp (decrypt) + _, err = client.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(tempBucket), + Key: aws.String(objectKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)), + }) + require.NoError(t, err, "Failed to copy to temp") + + // Delete source + cleanupTestBucket(ctx, client, srcBucket) + + // Enable SSE-S3 on dest + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(destBucket), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set dest bucket encryption") + + // Copy to dest (re-encrypt) - GitHub #7562 + _, err = client.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)), + }) + require.NoError(t, err, "GitHub #7562: Large file copy to encrypted bucket failed") + + // Verify + destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to HEAD dest") + assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption) + assert.Equal(t, int64(len(testData)), aws.ToInt64(destHead.ContentLength)) + + // Verify content + destGet, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(destBucket), + Key: aws.String(objectKey), + }) + require.NoError(t, err, "Failed to GET dest") + destData, err := io.ReadAll(destGet.Body) + destGet.Body.Close() + require.NoError(t, err, "Failed to read dest") + assertDataEqual(t, testData, destData, "Large file data mismatch") + + t.Log("Large file test passed!") +} + diff --git a/test/tus/Makefile b/test/tus/Makefile new file mode 100644 index 000000000..71b05e8ab --- /dev/null +++ b/test/tus/Makefile @@ -0,0 +1,226 @@ +# Makefile for TUS Protocol Integration Tests +# This Makefile provides targets for running TUS (resumable upload) integration tests + +# Default values +SEAWEEDFS_BINARY ?= weed +FILER_PORT ?= 18888 +VOLUME_PORT ?= 18080 +MASTER_PORT ?= 19333 +TEST_TIMEOUT ?= 10m +VOLUME_MAX_SIZE_MB ?= 50 +VOLUME_MAX_COUNT ?= 100 + +# Test directory +TEST_DIR := $(shell pwd) +SEAWEEDFS_ROOT := $(shell cd ../.. && pwd) + +# Colors for output +RED := \033[0;31m +GREEN := \033[0;32m +YELLOW := \033[1;33m +NC := \033[0m # No Color + +.PHONY: all test clean start-seaweedfs stop-seaweedfs check-binary build-weed help test-basic test-chunked test-resume test-errors test-with-server + +all: test + +# Build SeaweedFS binary +build-weed: + @echo "Building SeaweedFS binary..." + @cd $(SEAWEEDFS_ROOT)/weed && go build -o weed + @echo "$(GREEN)SeaweedFS binary built successfully$(NC)" + +help: + @echo "SeaweedFS TUS Protocol Integration Tests" + @echo "" + @echo "Available targets:" + @echo " test - Run all TUS integration tests" + @echo " test-basic - Run basic TUS upload tests" + @echo " test-chunked - Run chunked upload tests" + @echo " test-resume - Run upload resume tests" + @echo " test-errors - Run error handling tests" + @echo " test-with-server - Run tests with automatic server management" + @echo " start-seaweedfs - Start SeaweedFS server for testing" + @echo " stop-seaweedfs - Stop SeaweedFS server" + @echo " clean - Clean up test artifacts" + @echo " check-binary - Check if SeaweedFS binary exists" + @echo " build-weed - Build SeaweedFS binary" + @echo "" + @echo "Configuration:" + @echo " SEAWEEDFS_BINARY=$(SEAWEEDFS_BINARY)" + @echo " FILER_PORT=$(FILER_PORT)" + @echo " VOLUME_PORT=$(VOLUME_PORT)" + @echo " MASTER_PORT=$(MASTER_PORT)" + @echo " TEST_TIMEOUT=$(TEST_TIMEOUT)" + +check-binary: + @if ! command -v $(SEAWEEDFS_BINARY) > /dev/null 2>&1 && [ ! -f "$(SEAWEEDFS_ROOT)/weed/weed" ]; then \ + echo "$(RED)Error: SeaweedFS binary not found$(NC)"; \ + echo "Please build SeaweedFS first: make build-weed"; \ + exit 1; \ + fi + @echo "$(GREEN)SeaweedFS binary found$(NC)" + +start-seaweedfs: check-binary + @echo "$(YELLOW)Starting SeaweedFS server for TUS testing...$(NC)" + @# Clean up any existing processes on our test ports + @lsof -ti :$(MASTER_PORT) | xargs kill -TERM 2>/dev/null || true + @lsof -ti :$(VOLUME_PORT) | xargs kill -TERM 2>/dev/null || true + @lsof -ti :$(FILER_PORT) | xargs kill -TERM 2>/dev/null || true + @sleep 2 + + # Create necessary directories + @mkdir -p /tmp/seaweedfs-test-tus-master + @mkdir -p /tmp/seaweedfs-test-tus-volume + @mkdir -p /tmp/seaweedfs-test-tus-filer + + # Start master server (use freshly built binary) + @echo "Starting master server..." + @nohup $(SEAWEEDFS_ROOT)/weed/weed master \ + -port=$(MASTER_PORT) \ + -mdir=/tmp/seaweedfs-test-tus-master \ + -volumeSizeLimitMB=$(VOLUME_MAX_SIZE_MB) \ + -ip=127.0.0.1 \ + > /tmp/seaweedfs-tus-master.log 2>&1 & + @sleep 3 + + # Start volume server + @echo "Starting volume server..." + @nohup $(SEAWEEDFS_ROOT)/weed/weed volume \ + -port=$(VOLUME_PORT) \ + -mserver=127.0.0.1:$(MASTER_PORT) \ + -dir=/tmp/seaweedfs-test-tus-volume \ + -max=$(VOLUME_MAX_COUNT) \ + -ip=127.0.0.1 \ + > /tmp/seaweedfs-tus-volume.log 2>&1 & + @sleep 3 + + # Start filer server with TUS enabled (default tusBasePath is .tus) + @echo "Starting filer server..." + @nohup $(SEAWEEDFS_ROOT)/weed/weed filer \ + -port=$(FILER_PORT) \ + -master=127.0.0.1:$(MASTER_PORT) \ + -ip=127.0.0.1 \ + > /tmp/seaweedfs-tus-filer.log 2>&1 & + @sleep 5 + + # Wait for filer to be ready + @echo "$(YELLOW)Waiting for filer to be ready...$(NC)" + @for i in $$(seq 1 30); do \ + if curl -s -f http://127.0.0.1:$(FILER_PORT)/ > /dev/null 2>&1; then \ + echo "$(GREEN)Filer is ready$(NC)"; \ + break; \ + fi; \ + if [ $$i -eq 30 ]; then \ + echo "$(RED)Filer failed to start within 30 seconds$(NC)"; \ + $(MAKE) debug-logs; \ + exit 1; \ + fi; \ + echo "Waiting for filer... ($$i/30)"; \ + sleep 1; \ + done + + @echo "$(GREEN)SeaweedFS server started successfully for TUS testing$(NC)" + @echo "Master: http://localhost:$(MASTER_PORT)" + @echo "Volume: http://localhost:$(VOLUME_PORT)" + @echo "Filer: http://localhost:$(FILER_PORT)" + @echo "TUS Endpoint: http://localhost:$(FILER_PORT)/.tus/" + +stop-seaweedfs: + @echo "$(YELLOW)Stopping SeaweedFS server...$(NC)" + @lsof -ti :$(MASTER_PORT) | xargs -r kill -TERM 2>/dev/null || true + @lsof -ti :$(VOLUME_PORT) | xargs -r kill -TERM 2>/dev/null || true + @lsof -ti :$(FILER_PORT) | xargs -r kill -TERM 2>/dev/null || true + @sleep 2 + @echo "$(GREEN)SeaweedFS server stopped$(NC)" + +clean: + @echo "$(YELLOW)Cleaning up TUS test artifacts...$(NC)" + @rm -rf /tmp/seaweedfs-test-tus-* + @rm -f /tmp/seaweedfs-tus-*.log + @echo "$(GREEN)TUS test cleanup completed$(NC)" + +# Run all tests +test: check-binary + @echo "$(YELLOW)Running all TUS integration tests...$(NC)" + @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) ./test/tus/... + @echo "$(GREEN)All TUS tests completed$(NC)" + +# Run basic upload tests +test-basic: check-binary + @echo "$(YELLOW)Running basic TUS upload tests...$(NC)" + @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusBasicUpload|TestTusOptionsHandler" ./test/tus/... + @echo "$(GREEN)Basic TUS tests completed$(NC)" + +# Run chunked upload tests +test-chunked: check-binary + @echo "$(YELLOW)Running chunked TUS upload tests...$(NC)" + @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusChunkedUpload" ./test/tus/... + @echo "$(GREEN)Chunked TUS tests completed$(NC)" + +# Run resume tests +test-resume: check-binary + @echo "$(YELLOW)Running TUS upload resume tests...$(NC)" + @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusResumeAfterInterruption|TestTusHeadRequest" ./test/tus/... + @echo "$(GREEN)TUS resume tests completed$(NC)" + +# Run error handling tests +test-errors: check-binary + @echo "$(YELLOW)Running TUS error handling tests...$(NC)" + @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusInvalidOffset|TestTusUploadNotFound|TestTusDeleteUpload" ./test/tus/... + @echo "$(GREEN)TUS error tests completed$(NC)" + +# Run tests with automatic server management +test-with-server: build-weed + @echo "$(YELLOW)Running TUS tests with automatic server management...$(NC)" + @$(MAKE) -C $(TEST_DIR) start-seaweedfs && \ + sleep 3 && \ + cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) ./test/tus/...; \ + TEST_RESULT=$$?; \ + $(MAKE) -C $(TEST_DIR) stop-seaweedfs; \ + $(MAKE) -C $(TEST_DIR) clean; \ + if [ $$TEST_RESULT -eq 0 ]; then echo "$(GREEN)All TUS tests passed!$(NC)"; fi; \ + exit $$TEST_RESULT + +# Debug targets +debug-logs: + @echo "$(YELLOW)=== Master Log ===$(NC)" + @tail -n 50 /tmp/seaweedfs-tus-master.log 2>/dev/null || echo "No master log found" + @echo "$(YELLOW)=== Volume Log ===$(NC)" + @tail -n 50 /tmp/seaweedfs-tus-volume.log 2>/dev/null || echo "No volume log found" + @echo "$(YELLOW)=== Filer Log ===$(NC)" + @tail -n 50 /tmp/seaweedfs-tus-filer.log 2>/dev/null || echo "No filer log found" + +debug-status: + @echo "$(YELLOW)=== Process Status ===$(NC)" + @ps aux | grep -E "(weed|seaweedfs)" | grep -v grep || echo "No SeaweedFS processes found" + @echo "$(YELLOW)=== Port Status ===$(NC)" + @lsof -i :$(MASTER_PORT) -i :$(VOLUME_PORT) -i :$(FILER_PORT) 2>/dev/null || echo "No ports in use" + +# Manual testing targets +manual-start: start-seaweedfs + @echo "$(GREEN)SeaweedFS is now running for manual TUS testing$(NC)" + @echo "" + @echo "TUS Endpoints:" + @echo " OPTIONS /.tus/ - Capability discovery" + @echo " POST /.tus/{path} - Create upload" + @echo " HEAD /.tus/.uploads/{id} - Get offset" + @echo " PATCH /.tus/.uploads/{id} - Upload data" + @echo " DELETE /.tus/.uploads/{id} - Cancel upload" + @echo "" + @echo "Example curl commands:" + @echo " curl -X OPTIONS http://localhost:$(FILER_PORT)/.tus/ -H 'Tus-Resumable: 1.0.0'" + @echo "" + @echo "Run 'make manual-stop' when finished" + +manual-stop: stop-seaweedfs clean + +# CI targets +ci-test: test-with-server + +# Skip integration tests (short mode) +test-short: + @echo "$(YELLOW)Running TUS tests in short mode (skipping integration tests)...$(NC)" + @cd $(SEAWEEDFS_ROOT) && go test -v -short ./test/tus/... + @echo "$(GREEN)Short tests completed$(NC)" + diff --git a/test/tus/README.md b/test/tus/README.md new file mode 100644 index 000000000..03c980a3d --- /dev/null +++ b/test/tus/README.md @@ -0,0 +1,241 @@ +# TUS Protocol Integration Tests + +This directory contains integration tests for the TUS (resumable upload) protocol support in SeaweedFS Filer. + +## Overview + +TUS is an open protocol for resumable file uploads over HTTP. It allows clients to upload files in chunks and resume uploads after network failures or interruptions. + +### Why TUS? + +- **Resumable uploads**: Resume interrupted uploads without re-sending data +- **Chunked uploads**: Upload large files in smaller pieces +- **Simple protocol**: Standard HTTP methods with custom headers +- **Wide client support**: Libraries available for JavaScript, Python, Go, and more + +## TUS Protocol Endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `OPTIONS` | `/.tus/` | Server capability discovery | +| `POST` | `/.tus/{path}` | Create new upload session | +| `HEAD` | `/.tus/.uploads/{id}` | Get current upload offset | +| `PATCH` | `/.tus/.uploads/{id}` | Upload data at offset | +| `DELETE` | `/.tus/.uploads/{id}` | Cancel upload | + +### TUS Headers + +**Request Headers:** +- `Tus-Resumable: 1.0.0` - Protocol version (required) +- `Upload-Length` - Total file size in bytes (required on POST) +- `Upload-Offset` - Current byte offset (required on PATCH) +- `Upload-Metadata` - Base64-encoded key-value pairs (optional) +- `Content-Type: application/offset+octet-stream` (required on PATCH) + +**Response Headers:** +- `Tus-Resumable` - Protocol version +- `Tus-Version` - Supported versions +- `Tus-Extension` - Supported extensions +- `Tus-Max-Size` - Maximum upload size +- `Upload-Offset` - Current byte offset +- `Location` - Upload URL (on POST) + +## Enabling TUS + +TUS protocol support is enabled by default at `/.tus` path. You can customize the path using the `-tusBasePath` flag: + +```bash +# Start filer with default TUS path (/.tus) +weed filer -master=localhost:9333 + +# Use a custom path +weed filer -master=localhost:9333 -tusBasePath=uploads/tus + +# Disable TUS by setting empty path +weed filer -master=localhost:9333 -tusBasePath= +``` + +## Test Structure + +### Integration Tests + +The tests cover: + +1. **Basic Functionality** + - `TestTusOptionsHandler` - Capability discovery + - `TestTusBasicUpload` - Simple complete upload + - `TestTusCreationWithUpload` - Creation-with-upload extension + +2. **Chunked Uploads** + - `TestTusChunkedUpload` - Upload in multiple chunks + +3. **Resumable Uploads** + - `TestTusHeadRequest` - Offset tracking + - `TestTusResumeAfterInterruption` - Resume after failure + +4. **Error Handling** + - `TestTusInvalidOffset` - Offset mismatch (409 Conflict) + - `TestTusUploadNotFound` - Missing upload (404 Not Found) + - `TestTusDeleteUpload` - Upload cancellation + +## Running Tests + +### Prerequisites + +1. **Build SeaweedFS**: +```bash +make build-weed +# or +cd ../../weed && go build -o weed +``` + +### Using Makefile + +```bash +# Show available targets +make help + +# Run all tests with automatic server management +make test-with-server + +# Run all tests (requires running server) +make test + +# Run specific test categories +make test-basic # Basic upload tests +make test-chunked # Chunked upload tests +make test-resume # Resume/HEAD tests +make test-errors # Error handling tests + +# Manual testing +make manual-start # Start SeaweedFS for manual testing +make manual-stop # Stop and cleanup +``` + +### Using Go Test Directly + +```bash +# Run all TUS tests +go test -v ./test/tus/... + +# Run specific test +go test -v ./test/tus -run TestTusBasicUpload + +# Skip integration tests (short mode) +go test -v -short ./test/tus/... +``` + +### Debug + +```bash +# View server logs +make debug-logs + +# Check process and port status +make debug-status +``` + +## Test Environment + +Each test run: +1. Starts a SeaweedFS cluster (master, volume, filer) +2. Creates uploads using TUS protocol +3. Verifies files are stored correctly +4. Cleans up test data + +### Default Ports + +| Service | Port | +|---------|------| +| Master | 19333 | +| Volume | 18080 | +| Filer | 18888 | + +### Configuration + +Override defaults via environment or Makefile variables: +```bash +FILER_PORT=8889 MASTER_PORT=9334 make test +``` + +## Example Usage + +### Create Upload + +```bash +curl -X POST http://localhost:18888/.tus/mydir/file.txt \ + -H "Tus-Resumable: 1.0.0" \ + -H "Upload-Length: 1000" \ + -H "Upload-Metadata: filename dGVzdC50eHQ=" +``` + +### Upload Data + +```bash +curl -X PATCH http://localhost:18888/.tus/.uploads/{upload-id} \ + -H "Tus-Resumable: 1.0.0" \ + -H "Upload-Offset: 0" \ + -H "Content-Type: application/offset+octet-stream" \ + --data-binary @file.txt +``` + +### Check Offset + +```bash +curl -I http://localhost:18888/.tus/.uploads/{upload-id} \ + -H "Tus-Resumable: 1.0.0" +``` + +### Cancel Upload + +```bash +curl -X DELETE http://localhost:18888/.tus/.uploads/{upload-id} \ + -H "Tus-Resumable: 1.0.0" +``` + +## TUS Extensions Supported + +- **creation**: Create new uploads with POST +- **creation-with-upload**: Send data in creation request +- **termination**: Cancel uploads with DELETE + +## Architecture + +```text +Client Filer Volume Servers + | | | + |-- POST /.tus/path/file.mp4 ->| | + | |-- Create session dir ------->| + |<-- 201 Location: /.../{id} --| | + | | | + |-- PATCH /.tus/.uploads/{id} >| | + | Upload-Offset: 0 |-- Assign volume ------------>| + | [chunk data] |-- Upload chunk ------------->| + |<-- 204 Upload-Offset: N -----| | + | | | + | (network failure) | | + | | | + |-- HEAD /.tus/.uploads/{id} ->| | + |<-- Upload-Offset: N ---------| | + | | | + |-- PATCH (resume) ----------->|-- Upload remaining -------->| + |<-- 204 (complete) -----------|-- Assemble final file ----->| +``` + +## Comparison with S3 Multipart + +| Feature | TUS | S3 Multipart | +|---------|-----|--------------| +| Protocol | Custom HTTP headers | S3 API | +| Session Init | POST with Upload-Length | CreateMultipartUpload | +| Upload Data | PATCH with offset | UploadPart with partNumber | +| Resume | HEAD to get offset | ListParts | +| Complete | Automatic at final offset | CompleteMultipartUpload | +| Ordering | Sequential (offset-based) | Parallel (part numbers) | + +## Related Resources + +- [TUS Protocol Specification](https://tus.io/protocols/resumable-upload) +- [tus-js-client](https://github.com/tus/tus-js-client) - JavaScript client +- [go-tus](https://github.com/eventials/go-tus) - Go client +- [SeaweedFS S3 API](../../weed/s3api) - Alternative multipart upload diff --git a/test/tus/tus_integration_test.go b/test/tus/tus_integration_test.go new file mode 100644 index 000000000..a03c21dab --- /dev/null +++ b/test/tus/tus_integration_test.go @@ -0,0 +1,772 @@ +package tus + +import ( + "bytes" + "context" + "encoding/base64" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + TusVersion = "1.0.0" + testFilerPort = "18888" + testMasterPort = "19333" + testVolumePort = "18080" +) + +// TestCluster represents a running SeaweedFS cluster for testing +type TestCluster struct { + masterCmd *exec.Cmd + volumeCmd *exec.Cmd + filerCmd *exec.Cmd + dataDir string +} + +func (c *TestCluster) Stop() { + if c.filerCmd != nil && c.filerCmd.Process != nil { + c.filerCmd.Process.Signal(os.Interrupt) + c.filerCmd.Wait() + } + if c.volumeCmd != nil && c.volumeCmd.Process != nil { + c.volumeCmd.Process.Signal(os.Interrupt) + c.volumeCmd.Wait() + } + if c.masterCmd != nil && c.masterCmd.Process != nil { + c.masterCmd.Process.Signal(os.Interrupt) + c.masterCmd.Wait() + } +} + +func (c *TestCluster) FilerURL() string { + return fmt.Sprintf("http://127.0.0.1:%s", testFilerPort) +} + +func (c *TestCluster) TusURL() string { + return fmt.Sprintf("%s/.tus", c.FilerURL()) +} + +// FullURL converts a relative path to a full URL +func (c *TestCluster) FullURL(path string) string { + if strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://") { + return path + } + return fmt.Sprintf("http://127.0.0.1:%s%s", testFilerPort, path) +} + +// startTestCluster starts a SeaweedFS cluster for testing +func startTestCluster(t *testing.T, ctx context.Context) (*TestCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found - please build it first: cd weed && go build") + } + + dataDir, err := os.MkdirTemp("", "seaweedfs_tus_test_") + if err != nil { + return nil, err + } + + cluster := &TestCluster{dataDir: dataDir} + + // Create subdirectories + masterDir := filepath.Join(dataDir, "master") + volumeDir := filepath.Join(dataDir, "volume") + filerDir := filepath.Join(dataDir, "filer") + os.MkdirAll(masterDir, 0755) + os.MkdirAll(volumeDir, 0755) + os.MkdirAll(filerDir, 0755) + + // Start master + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", testMasterPort, + "-mdir", masterDir, + "-ip", "127.0.0.1", + ) + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + os.RemoveAll(dataDir) + return nil, fmt.Errorf("failed to create master log: %v", err) + } + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + if err := masterCmd.Start(); err != nil { + os.RemoveAll(dataDir) + return nil, fmt.Errorf("failed to start master: %v", err) + } + cluster.masterCmd = masterCmd + + // Wait for master to be ready + if err := waitForHTTPServer("http://127.0.0.1:"+testMasterPort+"/dir/status", 30*time.Second); err != nil { + cluster.Stop() + os.RemoveAll(dataDir) + return nil, fmt.Errorf("master not ready: %v", err) + } + + // Start volume server + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", testVolumePort, + "-dir", volumeDir, + "-mserver", "127.0.0.1:"+testMasterPort, + "-ip", "127.0.0.1", + ) + volumeLogFile, err := os.Create(filepath.Join(volumeDir, "volume.log")) + if err != nil { + cluster.Stop() + os.RemoveAll(dataDir) + return nil, fmt.Errorf("failed to create volume log: %v", err) + } + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + os.RemoveAll(dataDir) + return nil, fmt.Errorf("failed to start volume server: %v", err) + } + cluster.volumeCmd = volumeCmd + + // Wait for volume server to register with master + if err := waitForHTTPServer("http://127.0.0.1:"+testVolumePort+"/status", 30*time.Second); err != nil { + cluster.Stop() + os.RemoveAll(dataDir) + return nil, fmt.Errorf("volume server not ready: %v", err) + } + + // Start filer with TUS enabled + filerCmd := exec.CommandContext(ctx, weedBinary, "filer", + "-port", testFilerPort, + "-master", "127.0.0.1:"+testMasterPort, + "-ip", "127.0.0.1", + "-defaultStoreDir", filerDir, + ) + filerLogFile, err := os.Create(filepath.Join(filerDir, "filer.log")) + if err != nil { + cluster.Stop() + os.RemoveAll(dataDir) + return nil, fmt.Errorf("failed to create filer log: %v", err) + } + filerCmd.Stdout = filerLogFile + filerCmd.Stderr = filerLogFile + if err := filerCmd.Start(); err != nil { + cluster.Stop() + os.RemoveAll(dataDir) + return nil, fmt.Errorf("failed to start filer: %v", err) + } + cluster.filerCmd = filerCmd + + // Wait for filer + if err := waitForHTTPServer("http://127.0.0.1:"+testFilerPort+"/", 30*time.Second); err != nil { + cluster.Stop() + os.RemoveAll(dataDir) + return nil, fmt.Errorf("filer not ready: %v", err) + } + + // Wait a bit more for the cluster to fully stabilize + // Volumes are created lazily, and we need to ensure the master topology is ready + time.Sleep(5 * time.Second) + + return cluster, nil +} + +func findWeedBinary() string { + candidates := []string{ + "../../weed/weed", + "../weed/weed", + "./weed/weed", + "weed", + } + for _, candidate := range candidates { + if _, err := os.Stat(candidate); err == nil { + return candidate + } + } + if path, err := exec.LookPath("weed"); err == nil { + return path + } + return "" +} + +func waitForHTTPServer(url string, timeout time.Duration) error { + start := time.Now() + client := &http.Client{Timeout: 1 * time.Second} + for time.Since(start) < timeout { + resp, err := client.Get(url) + if err == nil { + resp.Body.Close() + return nil + } + time.Sleep(500 * time.Millisecond) + } + return fmt.Errorf("timeout waiting for %s", url) +} + +// encodeTusMetadata encodes key-value pairs for Upload-Metadata header +func encodeTusMetadata(metadata map[string]string) string { + var parts []string + for k, v := range metadata { + encoded := base64.StdEncoding.EncodeToString([]byte(v)) + parts = append(parts, fmt.Sprintf("%s %s", k, encoded)) + } + return strings.Join(parts, ",") +} + +// TestTusOptionsHandler tests the OPTIONS endpoint for capability discovery +func TestTusOptionsHandler(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startTestCluster(t, ctx) + require.NoError(t, err) + defer func() { + cluster.Stop() + os.RemoveAll(cluster.dataDir) + }() + + // Test OPTIONS request + req, err := http.NewRequest(http.MethodOptions, cluster.TusURL()+"/", nil) + require.NoError(t, err) + req.Header.Set("Tus-Resumable", TusVersion) + + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + // Verify TUS headers + assert.Equal(t, http.StatusOK, resp.StatusCode, "OPTIONS should return 200 OK") + assert.Equal(t, TusVersion, resp.Header.Get("Tus-Resumable"), "Should return Tus-Resumable header") + assert.NotEmpty(t, resp.Header.Get("Tus-Version"), "Should return Tus-Version header") + assert.NotEmpty(t, resp.Header.Get("Tus-Extension"), "Should return Tus-Extension header") + assert.NotEmpty(t, resp.Header.Get("Tus-Max-Size"), "Should return Tus-Max-Size header") +} + +// TestTusBasicUpload tests a simple complete upload +func TestTusBasicUpload(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startTestCluster(t, ctx) + require.NoError(t, err) + defer func() { + cluster.Stop() + os.RemoveAll(cluster.dataDir) + }() + + testData := []byte("Hello, TUS Protocol! This is a test file.") + targetPath := "/testdir/testfile.txt" + + // Step 1: Create upload (POST) + createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) + require.NoError(t, err) + createReq.Header.Set("Tus-Resumable", TusVersion) + createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) + createReq.Header.Set("Upload-Metadata", encodeTusMetadata(map[string]string{ + "filename": "testfile.txt", + "content-type": "text/plain", + })) + + client := &http.Client{} + createResp, err := client.Do(createReq) + require.NoError(t, err) + defer createResp.Body.Close() + + assert.Equal(t, http.StatusCreated, createResp.StatusCode, "POST should return 201 Created") + uploadLocation := createResp.Header.Get("Location") + assert.NotEmpty(t, uploadLocation, "Should return Location header with upload URL") + t.Logf("Upload location: %s", uploadLocation) + + // Step 2: Upload data (PATCH) + patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData)) + require.NoError(t, err) + patchReq.Header.Set("Tus-Resumable", TusVersion) + patchReq.Header.Set("Upload-Offset", "0") + patchReq.Header.Set("Content-Type", "application/offset+octet-stream") + patchReq.Header.Set("Content-Length", strconv.Itoa(len(testData))) + + patchResp, err := client.Do(patchReq) + require.NoError(t, err) + defer patchResp.Body.Close() + + assert.Equal(t, http.StatusNoContent, patchResp.StatusCode, "PATCH should return 204 No Content") + newOffset := patchResp.Header.Get("Upload-Offset") + assert.Equal(t, strconv.Itoa(len(testData)), newOffset, "Upload-Offset should equal total file size") + + // Step 3: Verify the file was created + getResp, err := client.Get(cluster.FilerURL() + targetPath) + require.NoError(t, err) + defer getResp.Body.Close() + + assert.Equal(t, http.StatusOK, getResp.StatusCode, "GET should return 200 OK") + body, err := io.ReadAll(getResp.Body) + require.NoError(t, err) + assert.Equal(t, testData, body, "File content should match uploaded data") +} + +// TestTusChunkedUpload tests uploading a file in multiple chunks +func TestTusChunkedUpload(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startTestCluster(t, ctx) + require.NoError(t, err) + defer func() { + cluster.Stop() + os.RemoveAll(cluster.dataDir) + }() + + // Create test data (100KB) + testData := make([]byte, 100*1024) + for i := range testData { + testData[i] = byte(i % 256) + } + chunkSize := 32 * 1024 // 32KB chunks + targetPath := "/chunked/largefile.bin" + + client := &http.Client{} + + // Step 1: Create upload + createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) + require.NoError(t, err) + createReq.Header.Set("Tus-Resumable", TusVersion) + createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) + + createResp, err := client.Do(createReq) + require.NoError(t, err) + defer createResp.Body.Close() + + require.Equal(t, http.StatusCreated, createResp.StatusCode) + uploadLocation := createResp.Header.Get("Location") + require.NotEmpty(t, uploadLocation) + t.Logf("Upload location: %s", uploadLocation) + + // Step 2: Upload in chunks + offset := 0 + for offset < len(testData) { + end := offset + chunkSize + if end > len(testData) { + end = len(testData) + } + chunk := testData[offset:end] + + patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(chunk)) + require.NoError(t, err) + patchReq.Header.Set("Tus-Resumable", TusVersion) + patchReq.Header.Set("Upload-Offset", strconv.Itoa(offset)) + patchReq.Header.Set("Content-Type", "application/offset+octet-stream") + patchReq.Header.Set("Content-Length", strconv.Itoa(len(chunk))) + + patchResp, err := client.Do(patchReq) + require.NoError(t, err) + patchResp.Body.Close() + + require.Equal(t, http.StatusNoContent, patchResp.StatusCode, + "PATCH chunk at offset %d should return 204", offset) + newOffset, _ := strconv.Atoi(patchResp.Header.Get("Upload-Offset")) + require.Equal(t, end, newOffset, "New offset should be %d", end) + + t.Logf("Uploaded chunk: offset=%d, size=%d, newOffset=%d", offset, len(chunk), newOffset) + offset = end + } + + // Step 3: Verify the complete file + getResp, err := client.Get(cluster.FilerURL() + targetPath) + require.NoError(t, err) + defer getResp.Body.Close() + + assert.Equal(t, http.StatusOK, getResp.StatusCode) + body, err := io.ReadAll(getResp.Body) + require.NoError(t, err) + assert.Equal(t, testData, body, "File content should match uploaded data") +} + +// TestTusHeadRequest tests the HEAD endpoint to get upload offset +func TestTusHeadRequest(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startTestCluster(t, ctx) + require.NoError(t, err) + defer func() { + cluster.Stop() + os.RemoveAll(cluster.dataDir) + }() + + testData := []byte("Test data for HEAD request verification") + targetPath := "/headtest/file.txt" + client := &http.Client{} + + // Create upload + createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) + require.NoError(t, err) + createReq.Header.Set("Tus-Resumable", TusVersion) + createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) + + createResp, err := client.Do(createReq) + require.NoError(t, err) + defer createResp.Body.Close() + require.Equal(t, http.StatusCreated, createResp.StatusCode) + uploadLocation := createResp.Header.Get("Location") + + // HEAD before any data uploaded - offset should be 0 + headReq1, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil) + require.NoError(t, err) + headReq1.Header.Set("Tus-Resumable", TusVersion) + + headResp1, err := client.Do(headReq1) + require.NoError(t, err) + defer headResp1.Body.Close() + + assert.Equal(t, http.StatusOK, headResp1.StatusCode) + assert.Equal(t, "0", headResp1.Header.Get("Upload-Offset"), "Initial offset should be 0") + assert.Equal(t, strconv.Itoa(len(testData)), headResp1.Header.Get("Upload-Length")) + + // Upload half the data + halfLen := len(testData) / 2 + patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[:halfLen])) + require.NoError(t, err) + patchReq.Header.Set("Tus-Resumable", TusVersion) + patchReq.Header.Set("Upload-Offset", "0") + patchReq.Header.Set("Content-Type", "application/offset+octet-stream") + + patchResp, err := client.Do(patchReq) + require.NoError(t, err) + patchResp.Body.Close() + require.Equal(t, http.StatusNoContent, patchResp.StatusCode) + + // HEAD after partial upload - offset should be halfLen + headReq2, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil) + require.NoError(t, err) + headReq2.Header.Set("Tus-Resumable", TusVersion) + + headResp2, err := client.Do(headReq2) + require.NoError(t, err) + defer headResp2.Body.Close() + + assert.Equal(t, http.StatusOK, headResp2.StatusCode) + assert.Equal(t, strconv.Itoa(halfLen), headResp2.Header.Get("Upload-Offset"), + "Offset should be %d after partial upload", halfLen) +} + +// TestTusDeleteUpload tests canceling an in-progress upload +func TestTusDeleteUpload(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startTestCluster(t, ctx) + require.NoError(t, err) + defer func() { + cluster.Stop() + os.RemoveAll(cluster.dataDir) + }() + + testData := []byte("Data to be deleted") + targetPath := "/deletetest/file.txt" + client := &http.Client{} + + // Create upload + createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) + require.NoError(t, err) + createReq.Header.Set("Tus-Resumable", TusVersion) + createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) + + createResp, err := client.Do(createReq) + require.NoError(t, err) + defer createResp.Body.Close() + require.Equal(t, http.StatusCreated, createResp.StatusCode) + uploadLocation := createResp.Header.Get("Location") + + // Upload some data + patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[:10])) + require.NoError(t, err) + patchReq.Header.Set("Tus-Resumable", TusVersion) + patchReq.Header.Set("Upload-Offset", "0") + patchReq.Header.Set("Content-Type", "application/offset+octet-stream") + + patchResp, err := client.Do(patchReq) + require.NoError(t, err) + patchResp.Body.Close() + + // Delete the upload + deleteReq, err := http.NewRequest(http.MethodDelete, cluster.FullURL(uploadLocation), nil) + require.NoError(t, err) + deleteReq.Header.Set("Tus-Resumable", TusVersion) + + deleteResp, err := client.Do(deleteReq) + require.NoError(t, err) + defer deleteResp.Body.Close() + + assert.Equal(t, http.StatusNoContent, deleteResp.StatusCode, "DELETE should return 204") + + // Verify upload is gone - HEAD should return 404 + headReq, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil) + require.NoError(t, err) + headReq.Header.Set("Tus-Resumable", TusVersion) + + headResp, err := client.Do(headReq) + require.NoError(t, err) + defer headResp.Body.Close() + + assert.Equal(t, http.StatusNotFound, headResp.StatusCode, "HEAD after DELETE should return 404") +} + +// TestTusInvalidOffset tests error handling for mismatched offsets +func TestTusInvalidOffset(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startTestCluster(t, ctx) + require.NoError(t, err) + defer func() { + cluster.Stop() + os.RemoveAll(cluster.dataDir) + }() + + testData := []byte("Test data for offset validation") + targetPath := "/offsettest/file.txt" + client := &http.Client{} + + // Create upload + createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) + require.NoError(t, err) + createReq.Header.Set("Tus-Resumable", TusVersion) + createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) + + createResp, err := client.Do(createReq) + require.NoError(t, err) + defer createResp.Body.Close() + require.Equal(t, http.StatusCreated, createResp.StatusCode) + uploadLocation := createResp.Header.Get("Location") + + // Try to upload with wrong offset (should be 0, but we send 100) + patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData)) + require.NoError(t, err) + patchReq.Header.Set("Tus-Resumable", TusVersion) + patchReq.Header.Set("Upload-Offset", "100") // Wrong offset! + patchReq.Header.Set("Content-Type", "application/offset+octet-stream") + + patchResp, err := client.Do(patchReq) + require.NoError(t, err) + defer patchResp.Body.Close() + + assert.Equal(t, http.StatusConflict, patchResp.StatusCode, + "PATCH with wrong offset should return 409 Conflict") +} + +// TestTusUploadNotFound tests accessing a non-existent upload +func TestTusUploadNotFound(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startTestCluster(t, ctx) + require.NoError(t, err) + defer func() { + cluster.Stop() + os.RemoveAll(cluster.dataDir) + }() + + client := &http.Client{} + fakeUploadURL := cluster.TusURL() + "/.uploads/nonexistent-upload-id" + + // HEAD on non-existent upload + headReq, err := http.NewRequest(http.MethodHead, fakeUploadURL, nil) + require.NoError(t, err) + headReq.Header.Set("Tus-Resumable", TusVersion) + + headResp, err := client.Do(headReq) + require.NoError(t, err) + defer headResp.Body.Close() + + assert.Equal(t, http.StatusNotFound, headResp.StatusCode, + "HEAD on non-existent upload should return 404") + + // PATCH on non-existent upload + patchReq, err := http.NewRequest(http.MethodPatch, fakeUploadURL, bytes.NewReader([]byte("data"))) + require.NoError(t, err) + patchReq.Header.Set("Tus-Resumable", TusVersion) + patchReq.Header.Set("Upload-Offset", "0") + patchReq.Header.Set("Content-Type", "application/offset+octet-stream") + + patchResp, err := client.Do(patchReq) + require.NoError(t, err) + defer patchResp.Body.Close() + + assert.Equal(t, http.StatusNotFound, patchResp.StatusCode, + "PATCH on non-existent upload should return 404") +} + +// TestTusCreationWithUpload tests the creation-with-upload extension +func TestTusCreationWithUpload(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startTestCluster(t, ctx) + require.NoError(t, err) + defer func() { + cluster.Stop() + os.RemoveAll(cluster.dataDir) + }() + + testData := []byte("Small file uploaded in creation request") + targetPath := "/creationwithupload/smallfile.txt" + client := &http.Client{} + + // Create upload with data in the same request + createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, bytes.NewReader(testData)) + require.NoError(t, err) + createReq.Header.Set("Tus-Resumable", TusVersion) + createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) + createReq.Header.Set("Content-Type", "application/offset+octet-stream") + + createResp, err := client.Do(createReq) + require.NoError(t, err) + defer createResp.Body.Close() + + assert.Equal(t, http.StatusCreated, createResp.StatusCode) + uploadLocation := createResp.Header.Get("Location") + assert.NotEmpty(t, uploadLocation) + + // Check Upload-Offset header - should indicate all data was received + uploadOffset := createResp.Header.Get("Upload-Offset") + assert.Equal(t, strconv.Itoa(len(testData)), uploadOffset, + "Upload-Offset should equal file size for complete upload") + + // Verify the file + getResp, err := client.Get(cluster.FilerURL() + targetPath) + require.NoError(t, err) + defer getResp.Body.Close() + + assert.Equal(t, http.StatusOK, getResp.StatusCode) + body, err := io.ReadAll(getResp.Body) + require.NoError(t, err) + assert.Equal(t, testData, body) +} + +// TestTusResumeAfterInterruption simulates resuming an upload after failure +func TestTusResumeAfterInterruption(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startTestCluster(t, ctx) + require.NoError(t, err) + defer func() { + cluster.Stop() + os.RemoveAll(cluster.dataDir) + }() + + // 50KB test data + testData := make([]byte, 50*1024) + for i := range testData { + testData[i] = byte(i % 256) + } + targetPath := "/resume/interrupted.bin" + client := &http.Client{} + + // Create upload + createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) + require.NoError(t, err) + createReq.Header.Set("Tus-Resumable", TusVersion) + createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) + + createResp, err := client.Do(createReq) + require.NoError(t, err) + defer createResp.Body.Close() + require.Equal(t, http.StatusCreated, createResp.StatusCode) + uploadLocation := createResp.Header.Get("Location") + + // Upload first 20KB + firstChunkSize := 20 * 1024 + patchReq1, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[:firstChunkSize])) + require.NoError(t, err) + patchReq1.Header.Set("Tus-Resumable", TusVersion) + patchReq1.Header.Set("Upload-Offset", "0") + patchReq1.Header.Set("Content-Type", "application/offset+octet-stream") + + patchResp1, err := client.Do(patchReq1) + require.NoError(t, err) + patchResp1.Body.Close() + require.Equal(t, http.StatusNoContent, patchResp1.StatusCode) + + t.Log("Simulating network interruption...") + + // Simulate resumption: Query current offset with HEAD + headReq, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil) + require.NoError(t, err) + headReq.Header.Set("Tus-Resumable", TusVersion) + + headResp, err := client.Do(headReq) + require.NoError(t, err) + defer headResp.Body.Close() + + require.Equal(t, http.StatusOK, headResp.StatusCode) + currentOffset, _ := strconv.Atoi(headResp.Header.Get("Upload-Offset")) + t.Logf("Resumed upload at offset: %d", currentOffset) + require.Equal(t, firstChunkSize, currentOffset) + + // Resume upload from current offset + patchReq2, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[currentOffset:])) + require.NoError(t, err) + patchReq2.Header.Set("Tus-Resumable", TusVersion) + patchReq2.Header.Set("Upload-Offset", strconv.Itoa(currentOffset)) + patchReq2.Header.Set("Content-Type", "application/offset+octet-stream") + + patchResp2, err := client.Do(patchReq2) + require.NoError(t, err) + patchResp2.Body.Close() + require.Equal(t, http.StatusNoContent, patchResp2.StatusCode) + + // Verify complete file + getResp, err := client.Get(cluster.FilerURL() + targetPath) + require.NoError(t, err) + defer getResp.Body.Close() + + assert.Equal(t, http.StatusOK, getResp.StatusCode) + body, err := io.ReadAll(getResp.Body) + require.NoError(t, err) + assert.Equal(t, testData, body, "Resumed upload should produce complete file") +} diff --git a/weed/command/filer.go b/weed/command/filer.go index 86991a181..314bb83b6 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -74,6 +74,7 @@ type FilerOptions struct { diskType *string allowedOrigins *string exposeDirectoryData *bool + tusPath *string certProvider certprovider.Provider } @@ -109,6 +110,7 @@ func init() { f.diskType = cmdFiler.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") f.allowedOrigins = cmdFiler.Flag.String("allowedOrigins", "*", "comma separated list of allowed origins") f.exposeDirectoryData = cmdFiler.Flag.Bool("exposeDirectoryData", true, "whether to return directory metadata and content in Filer UI") + f.tusPath = cmdFiler.Flag.String("tusBasePath", ".tus", "TUS resumable upload endpoint base path") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -334,6 +336,7 @@ func (fo *FilerOptions) startFiler() { DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, DiskType: *fo.diskType, AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), + TusPath: *fo.tusPath, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/server.go b/weed/command/server.go index d729502f0..ebd9f359a 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -129,6 +129,7 @@ func init() { filerOptions.downloadMaxMBps = cmdServer.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second") filerOptions.diskType = cmdServer.Flag.String("filer.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") filerOptions.exposeDirectoryData = cmdServer.Flag.Bool("filer.exposeDirectoryData", true, "expose directory data via filer. If false, filer UI will be innaccessible.") + filerOptions.tusPath = cmdServer.Flag.String("filer.tusBasePath", ".tus", "TUS resumable upload endpoint base path") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port") diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 09d009372..66d4ded80 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -167,6 +167,14 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } // Copy extended attributes from source, filtering out conflicting encryption metadata + // Pre-compute encryption state once for efficiency + srcHasSSEC := IsSSECEncrypted(entry.Extended) + srcHasSSEKMS := IsSSEKMSEncrypted(entry.Extended) + srcHasSSES3 := IsSSES3EncryptedInternal(entry.Extended) + dstWantsSSEC := IsSSECRequest(r) + dstWantsSSEKMS := IsSSEKMSRequest(r) + dstWantsSSES3 := IsSSES3RequestInternal(r) + for k, v := range entry.Extended { // Skip encryption-specific headers that might conflict with destination encryption type skipHeader := false @@ -177,17 +185,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request skipHeader = true } - // If we're doing cross-encryption, skip conflicting headers - if !skipHeader && len(entry.GetChunks()) > 0 { - // Detect source and destination encryption types - srcHasSSEC := IsSSECEncrypted(entry.Extended) - srcHasSSEKMS := IsSSEKMSEncrypted(entry.Extended) - srcHasSSES3 := IsSSES3EncryptedInternal(entry.Extended) - dstWantsSSEC := IsSSECRequest(r) - dstWantsSSEKMS := IsSSEKMSRequest(r) - dstWantsSSES3 := IsSSES3RequestInternal(r) - - // Use helper function to determine if header should be skipped + // Filter conflicting headers for cross-encryption or encrypted→unencrypted copies + // This applies to both inline files (no chunks) and chunked files - fixes GitHub #7562 + if !skipHeader { skipHeader = shouldSkipEncryptionHeader(k, srcHasSSEC, srcHasSSEKMS, srcHasSSES3, dstWantsSSEC, dstWantsSSEKMS, dstWantsSSES3) @@ -212,10 +212,31 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request dstEntry.Extended[k] = v } - // For zero-size files or files without chunks, use the original approach + // For zero-size files or files without chunks, handle inline content + // This includes encrypted inline files that need decryption/re-encryption if entry.Attributes.FileSize == 0 || len(entry.GetChunks()) == 0 { - // Just copy the entry structure without chunks for zero-size files dstEntry.Chunks = nil + + // Handle inline encrypted content - fixes GitHub #7562 + if len(entry.Content) > 0 { + inlineContent, inlineMetadata, inlineErr := s3a.processInlineContentForCopy( + entry, r, dstBucket, dstObject, + srcHasSSEC, srcHasSSEKMS, srcHasSSES3, + dstWantsSSEC, dstWantsSSEKMS, dstWantsSSES3) + if inlineErr != nil { + glog.Errorf("CopyObjectHandler inline content error: %v", inlineErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + dstEntry.Content = inlineContent + + // Apply inline destination metadata + if inlineMetadata != nil { + for k, v := range inlineMetadata { + dstEntry.Extended[k] = v + } + } + } } else { // Use unified copy strategy approach dstChunks, dstMetadata, copyErr := s3a.executeUnifiedCopyStrategy(entry, r, dstBucket, srcObject, dstObject) @@ -2508,3 +2529,233 @@ func shouldSkipEncryptionHeader(headerKey string, // Default: don't skip the header return false } + +// processInlineContentForCopy handles encryption/decryption for inline content during copy +// This fixes GitHub #7562 where small files stored inline weren't properly decrypted/re-encrypted +func (s3a *S3ApiServer) processInlineContentForCopy( + entry *filer_pb.Entry, r *http.Request, dstBucket, dstObject string, + srcSSEC, srcSSEKMS, srcSSES3 bool, + dstSSEC, dstSSEKMS, dstSSES3 bool) ([]byte, map[string][]byte, error) { + + content := entry.Content + var dstMetadata map[string][]byte + + // Check if source is encrypted and needs decryption + srcEncrypted := srcSSEC || srcSSEKMS || srcSSES3 + + // Check if destination needs encryption (explicit request or bucket default) + dstNeedsEncryption := dstSSEC || dstSSEKMS || dstSSES3 + if !dstNeedsEncryption { + // Check bucket default encryption + bucketMetadata, err := s3a.getBucketMetadata(dstBucket) + if err == nil && bucketMetadata != nil && bucketMetadata.Encryption != nil { + switch bucketMetadata.Encryption.SseAlgorithm { + case "aws:kms": + dstSSEKMS = true + dstNeedsEncryption = true + case "AES256": + dstSSES3 = true + dstNeedsEncryption = true + } + } + } + + // Decrypt source content if encrypted + if srcEncrypted { + decryptedContent, decErr := s3a.decryptInlineContent(entry, srcSSEC, srcSSEKMS, srcSSES3, r) + if decErr != nil { + return nil, nil, fmt.Errorf("failed to decrypt inline content: %w", decErr) + } + content = decryptedContent + glog.V(3).Infof("Decrypted inline content: %d bytes", len(content)) + } + + // Re-encrypt if destination needs encryption + if dstNeedsEncryption { + encryptedContent, encMetadata, encErr := s3a.encryptInlineContent(content, dstBucket, dstObject, dstSSEC, dstSSEKMS, dstSSES3, r) + if encErr != nil { + return nil, nil, fmt.Errorf("failed to encrypt inline content: %w", encErr) + } + content = encryptedContent + dstMetadata = encMetadata + glog.V(3).Infof("Encrypted inline content: %d bytes", len(content)) + } + + return content, dstMetadata, nil +} + +// decryptInlineContent decrypts inline content from an encrypted source +func (s3a *S3ApiServer) decryptInlineContent(entry *filer_pb.Entry, srcSSEC, srcSSEKMS, srcSSES3 bool, r *http.Request) ([]byte, error) { + content := entry.Content + + if srcSSES3 { + // Get SSE-S3 key from metadata + keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key] + if !exists { + return nil, fmt.Errorf("SSE-S3 key not found in metadata") + } + + keyManager := GetSSES3KeyManager() + sseKey, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 key: %w", err) + } + + // Get IV + iv := sseKey.IV + if len(iv) == 0 { + return nil, fmt.Errorf("SSE-S3 IV not found") + } + + // Decrypt content + decryptedReader, err := CreateSSES3DecryptedReader(bytes.NewReader(content), sseKey, iv) + if err != nil { + return nil, fmt.Errorf("failed to create SSE-S3 decrypted reader: %w", err) + } + return io.ReadAll(decryptedReader) + + } else if srcSSEKMS { + // Get SSE-KMS key from metadata + keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey] + if !exists { + return nil, fmt.Errorf("SSE-KMS key not found in metadata") + } + + sseKey, err := DeserializeSSEKMSMetadata(keyData) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-KMS key: %w", err) + } + + // Decrypt content + decryptedReader, err := CreateSSEKMSDecryptedReader(bytes.NewReader(content), sseKey) + if err != nil { + return nil, fmt.Errorf("failed to create SSE-KMS decrypted reader: %w", err) + } + return io.ReadAll(decryptedReader) + + } else if srcSSEC { + // Get SSE-C key from request headers + sourceKey, err := ParseSSECCopySourceHeaders(r) + if err != nil { + return nil, fmt.Errorf("failed to parse SSE-C copy source headers: %w", err) + } + + // Get IV from metadata + iv, err := GetSSECIVFromMetadata(entry.Extended) + if err != nil { + return nil, fmt.Errorf("failed to get SSE-C IV: %w", err) + } + + // Decrypt content + decryptedReader, err := CreateSSECDecryptedReader(bytes.NewReader(content), sourceKey, iv) + if err != nil { + return nil, fmt.Errorf("failed to create SSE-C decrypted reader: %w", err) + } + return io.ReadAll(decryptedReader) + } + + // Source not encrypted, return as-is + return content, nil +} + +// encryptInlineContent encrypts inline content for the destination +func (s3a *S3ApiServer) encryptInlineContent(content []byte, dstBucket, dstObject string, + dstSSEC, dstSSEKMS, dstSSES3 bool, r *http.Request) ([]byte, map[string][]byte, error) { + + dstMetadata := make(map[string][]byte) + + if dstSSES3 { + // Generate SSE-S3 key + keyManager := GetSSES3KeyManager() + key, err := keyManager.GetOrCreateKey("") + if err != nil { + return nil, nil, fmt.Errorf("failed to generate SSE-S3 key: %w", err) + } + + // Encrypt content + encryptedReader, iv, err := CreateSSES3EncryptedReader(bytes.NewReader(content), key) + if err != nil { + return nil, nil, fmt.Errorf("failed to create SSE-S3 encrypted reader: %w", err) + } + + encryptedContent, err := io.ReadAll(encryptedReader) + if err != nil { + return nil, nil, fmt.Errorf("failed to read encrypted content: %w", err) + } + + // Store IV on key and serialize metadata + key.IV = iv + keyData, err := SerializeSSES3Metadata(key) + if err != nil { + return nil, nil, fmt.Errorf("failed to serialize SSE-S3 metadata: %w", err) + } + + dstMetadata[s3_constants.SeaweedFSSSES3Key] = keyData + dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("AES256") + + return encryptedContent, dstMetadata, nil + + } else if dstSSEKMS { + // Parse SSE-KMS headers + keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse SSE-KMS headers: %w", err) + } + + // Build encryption context if needed + if encryptionContext == nil { + encryptionContext = BuildEncryptionContext(dstBucket, dstObject, bucketKeyEnabled) + } + + // Encrypt content + encryptedReader, sseKey, err := CreateSSEKMSEncryptedReaderWithBucketKey( + bytes.NewReader(content), keyID, encryptionContext, bucketKeyEnabled) + if err != nil { + return nil, nil, fmt.Errorf("failed to create SSE-KMS encrypted reader: %w", err) + } + + encryptedContent, err := io.ReadAll(encryptedReader) + if err != nil { + return nil, nil, fmt.Errorf("failed to read encrypted content: %w", err) + } + + // Serialize metadata + keyData, err := SerializeSSEKMSMetadata(sseKey) + if err != nil { + return nil, nil, fmt.Errorf("failed to serialize SSE-KMS metadata: %w", err) + } + + dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = keyData + dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("aws:kms") + + return encryptedContent, dstMetadata, nil + + } else if dstSSEC { + // Parse SSE-C headers + destKey, err := ParseSSECHeaders(r) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse SSE-C headers: %w", err) + } + + // Encrypt content + encryptedReader, iv, err := CreateSSECEncryptedReader(bytes.NewReader(content), destKey) + if err != nil { + return nil, nil, fmt.Errorf("failed to create SSE-C encrypted reader: %w", err) + } + + encryptedContent, err := io.ReadAll(encryptedReader) + if err != nil { + return nil, nil, fmt.Errorf("failed to read encrypted content: %w", err) + } + + // Store IV in metadata + StoreSSECIVInMetadata(dstMetadata, iv) + dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") + dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destKey.KeyMD5) + + return encryptedContent, dstMetadata, nil + } + + // No encryption needed + return content, nil, nil +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 95d344af4..3d2db00ad 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -79,6 +79,7 @@ type FilerOption struct { DiskType string AllowedOrigins []string ExposeDirectoryData bool + TusPath string } type FilerServer struct { @@ -195,6 +196,17 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) handleStaticResources(defaultMux) if !option.DisableHttp { defaultMux.HandleFunc("/healthz", requestIDMiddleware(fs.filerHealthzHandler)) + // TUS resumable upload protocol handler + if option.TusPath != "" { + tusPath := option.TusPath + if !strings.HasPrefix(tusPath, "/") { + tusPath = "/" + tusPath + } + if !strings.HasSuffix(tusPath, "/") { + tusPath += "/" + } + defaultMux.HandleFunc(tusPath, fs.filerGuard.WhiteList(requestIDMiddleware(fs.tusHandler))) + } defaultMux.HandleFunc("/", fs.filerGuard.WhiteList(requestIDMiddleware(fs.filerHandler))) } if defaultMux != readonlyMux { diff --git a/weed/server/filer_server_tus_handlers.go b/weed/server/filer_server_tus_handlers.go new file mode 100644 index 000000000..abb6ecca4 --- /dev/null +++ b/weed/server/filer_server_tus_handlers.go @@ -0,0 +1,415 @@ +package weed_server + +import ( + "bytes" + "context" + "encoding/base64" + "fmt" + "io" + "net/http" + "path" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// tusHandler is the main entry point for TUS protocol requests +func (fs *FilerServer) tusHandler(w http.ResponseWriter, r *http.Request) { + // Set common TUS response headers + w.Header().Set("Tus-Resumable", TusVersion) + + // Check Tus-Resumable header for non-OPTIONS requests + if r.Method != http.MethodOptions { + tusVersion := r.Header.Get("Tus-Resumable") + if tusVersion != TusVersion { + http.Error(w, "Unsupported TUS version", http.StatusPreconditionFailed) + return + } + } + + // Route based on method and path + reqPath := r.URL.Path + tusPrefix := fs.option.TusPath + if tusPrefix == "" { + tusPrefix = ".tus" + } + if !strings.HasPrefix(tusPrefix, "/") { + tusPrefix = "/" + tusPrefix + } + + // Check if this is an upload location (contains upload ID after {tusPrefix}/.uploads/) + uploadsPrefix := tusPrefix + "/.uploads/" + if strings.HasPrefix(reqPath, uploadsPrefix) { + uploadID := strings.TrimPrefix(reqPath, uploadsPrefix) + uploadID = strings.Split(uploadID, "/")[0] // Get just the ID, not any trailing path + + switch r.Method { + case http.MethodHead: + fs.tusHeadHandler(w, r, uploadID) + case http.MethodPatch: + fs.tusPatchHandler(w, r, uploadID) + case http.MethodDelete: + fs.tusDeleteHandler(w, r, uploadID) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } + return + } + + // Handle creation endpoints (POST to /.tus/{path}) + switch r.Method { + case http.MethodOptions: + fs.tusOptionsHandler(w, r) + case http.MethodPost: + fs.tusCreateHandler(w, r) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } +} + +// tusOptionsHandler handles OPTIONS requests for capability discovery +func (fs *FilerServer) tusOptionsHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Tus-Version", TusVersion) + w.Header().Set("Tus-Extension", TusExtensions) + w.Header().Set("Tus-Max-Size", strconv.FormatInt(TusMaxSize, 10)) + w.WriteHeader(http.StatusOK) +} + +// tusCreateHandler handles POST requests to create new uploads +func (fs *FilerServer) tusCreateHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Parse Upload-Length header (required) + uploadLengthStr := r.Header.Get("Upload-Length") + if uploadLengthStr == "" { + http.Error(w, "Upload-Length header required", http.StatusBadRequest) + return + } + uploadLength, err := strconv.ParseInt(uploadLengthStr, 10, 64) + if err != nil || uploadLength < 0 { + http.Error(w, "Invalid Upload-Length", http.StatusBadRequest) + return + } + if uploadLength > TusMaxSize { + http.Error(w, "Upload-Length exceeds maximum", http.StatusRequestEntityTooLarge) + return + } + + // Parse Upload-Metadata header (optional) + metadata := parseTusMetadata(r.Header.Get("Upload-Metadata")) + + // Get TUS path prefix + tusPrefix := fs.option.TusPath + if tusPrefix == "" { + tusPrefix = ".tus" + } + if !strings.HasPrefix(tusPrefix, "/") { + tusPrefix = "/" + tusPrefix + } + + // Determine target path from request URL + targetPath := strings.TrimPrefix(r.URL.Path, tusPrefix) + if targetPath == "" || targetPath == "/" { + http.Error(w, "Target path required", http.StatusBadRequest) + return + } + + // Generate upload ID + uploadID := uuid.New().String() + + // Create upload session + session, err := fs.createTusSession(ctx, uploadID, targetPath, uploadLength, metadata) + if err != nil { + glog.Errorf("Failed to create TUS session: %v", err) + http.Error(w, "Failed to create upload", http.StatusInternalServerError) + return + } + + // Build upload location URL (ensure it starts with single /) + uploadLocation := path.Clean(fmt.Sprintf("%s/.uploads/%s", tusPrefix, uploadID)) + if !strings.HasPrefix(uploadLocation, "/") { + uploadLocation = "/" + uploadLocation + } + + // Handle creation-with-upload extension + // TUS requires Content-Length for uploads; reject chunked encoding + if r.Header.Get("Content-Type") == "application/offset+octet-stream" { + if r.ContentLength < 0 { + fs.deleteTusSession(ctx, uploadID) + http.Error(w, "Content-Length header required for creation-with-upload", http.StatusBadRequest) + return + } + if r.ContentLength == 0 { + // Empty body is allowed, just skip the upload + goto respond + } + // Upload data in the creation request + bytesWritten, uploadErr := fs.tusWriteData(ctx, session, 0, r.Body, r.ContentLength) + if uploadErr != nil { + // Cleanup session on failure + fs.deleteTusSession(ctx, uploadID) + glog.Errorf("Failed to write initial TUS data: %v", uploadErr) + http.Error(w, "Failed to write data", http.StatusInternalServerError) + return + } + + // Update offset in response header + w.Header().Set("Upload-Offset", strconv.FormatInt(bytesWritten, 10)) + + // Check if upload is complete + if bytesWritten == session.Size { + // Refresh session to get updated chunks + session, err = fs.getTusSession(ctx, uploadID) + if err != nil { + glog.Errorf("Failed to get updated TUS session: %v", err) + http.Error(w, "Failed to complete upload", http.StatusInternalServerError) + return + } + if err := fs.completeTusUpload(ctx, session); err != nil { + glog.Errorf("Failed to complete TUS upload: %v", err) + http.Error(w, "Failed to complete upload", http.StatusInternalServerError) + return + } + } + } + +respond: + w.Header().Set("Location", uploadLocation) + w.WriteHeader(http.StatusCreated) +} + +// tusHeadHandler handles HEAD requests to get current upload offset +func (fs *FilerServer) tusHeadHandler(w http.ResponseWriter, r *http.Request, uploadID string) { + ctx := r.Context() + + session, err := fs.getTusSession(ctx, uploadID) + if err != nil { + http.Error(w, "Upload not found", http.StatusNotFound) + return + } + + w.Header().Set("Upload-Offset", strconv.FormatInt(session.Offset, 10)) + w.Header().Set("Upload-Length", strconv.FormatInt(session.Size, 10)) + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusOK) +} + +// tusPatchHandler handles PATCH requests to upload data +func (fs *FilerServer) tusPatchHandler(w http.ResponseWriter, r *http.Request, uploadID string) { + ctx := r.Context() + + // Validate Content-Type + contentType := r.Header.Get("Content-Type") + if contentType != "application/offset+octet-stream" { + http.Error(w, "Content-Type must be application/offset+octet-stream", http.StatusUnsupportedMediaType) + return + } + + // Get current session + session, err := fs.getTusSession(ctx, uploadID) + if err != nil { + http.Error(w, "Upload not found", http.StatusNotFound) + return + } + + // Validate Upload-Offset header + uploadOffsetStr := r.Header.Get("Upload-Offset") + if uploadOffsetStr == "" { + http.Error(w, "Upload-Offset header required", http.StatusBadRequest) + return + } + uploadOffset, err := strconv.ParseInt(uploadOffsetStr, 10, 64) + if err != nil || uploadOffset < 0 { + http.Error(w, "Invalid Upload-Offset", http.StatusBadRequest) + return + } + + // Check offset matches current position + if uploadOffset != session.Offset { + http.Error(w, fmt.Sprintf("Offset mismatch: expected %d, got %d", session.Offset, uploadOffset), http.StatusConflict) + return + } + + // TUS requires Content-Length header for PATCH requests + if r.ContentLength < 0 { + http.Error(w, "Content-Length header required", http.StatusBadRequest) + return + } + + // Write data + bytesWritten, err := fs.tusWriteData(ctx, session, uploadOffset, r.Body, r.ContentLength) + if err != nil { + glog.Errorf("Failed to write TUS data: %v", err) + http.Error(w, "Failed to write data", http.StatusInternalServerError) + return + } + + newOffset := uploadOffset + bytesWritten + + // Check if upload is complete + if newOffset == session.Size { + // Refresh session to get updated chunks + session, err = fs.getTusSession(ctx, uploadID) + if err != nil { + glog.Errorf("Failed to get updated TUS session: %v", err) + http.Error(w, "Failed to complete upload", http.StatusInternalServerError) + return + } + + if err := fs.completeTusUpload(ctx, session); err != nil { + glog.Errorf("Failed to complete TUS upload: %v", err) + http.Error(w, "Failed to complete upload", http.StatusInternalServerError) + return + } + } + + w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10)) + w.WriteHeader(http.StatusNoContent) +} + +// tusDeleteHandler handles DELETE requests to cancel uploads +func (fs *FilerServer) tusDeleteHandler(w http.ResponseWriter, r *http.Request, uploadID string) { + ctx := r.Context() + + if err := fs.deleteTusSession(ctx, uploadID); err != nil { + glog.Errorf("Failed to delete TUS session: %v", err) + http.Error(w, "Failed to delete upload", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// tusWriteData uploads data to volume servers and updates session +func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, offset int64, reader io.Reader, contentLength int64) (int64, error) { + if contentLength == 0 { + return 0, nil + } + + // Limit content length to remaining size + remaining := session.Size - offset + if contentLength > remaining { + contentLength = remaining + } + if contentLength <= 0 { + return 0, nil + } + + // Read data into buffer + // Determine storage options based on target path + so, err := fs.detectStorageOption0(ctx, session.TargetPath, "", "", "", "", "", "", "", "", "") + if err != nil { + return 0, fmt.Errorf("detect storage option: %w", err) + } + + // Assign file ID from master + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so) + if assignErr != nil { + return 0, fmt.Errorf("assign volume: %w", assignErr) + } + + // Upload to volume server + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return 0, fmt.Errorf("create uploader: %w", uploaderErr) + } + + // Read first bytes for MIME type detection, respecting contentLength + // http.DetectContentType uses at most 512 bytes + sniffSize := int64(512) + if contentLength < sniffSize { + sniffSize = contentLength + } + sniffBuf := make([]byte, sniffSize) + sniffN, sniffErr := io.ReadFull(reader, sniffBuf) + if sniffErr != nil && sniffErr != io.EOF && sniffErr != io.ErrUnexpectedEOF { + return 0, fmt.Errorf("read data for mime detection: %w", sniffErr) + } + if sniffN == 0 { + return 0, nil + } + sniffBuf = sniffBuf[:sniffN] + + // Detect MIME type from sniffed bytes + mimeType := http.DetectContentType(sniffBuf) + + // Create a reader that combines sniffed bytes with remaining data + var dataReader io.Reader + if int64(sniffN) >= contentLength { + // All data fits in sniff buffer + dataReader = bytes.NewReader(sniffBuf) + } else { + // Combine sniffed bytes with remaining stream + dataReader = io.MultiReader(bytes.NewReader(sniffBuf), io.LimitReader(reader, contentLength-int64(sniffN))) + } + + uploadResult, uploadErr, _ := uploader.Upload(ctx, dataReader, &operation.UploadOption{ + UploadUrl: urlLocation, + Filename: "", + Cipher: fs.option.Cipher, + IsInputCompressed: false, + MimeType: mimeType, + PairMap: nil, + Jwt: auth, + }) + if uploadErr != nil { + return 0, fmt.Errorf("upload data: %w", uploadErr) + } + + // Create chunk info + chunk := &TusChunkInfo{ + Offset: offset, + Size: int64(uploadResult.Size), + FileId: fileId, + UploadAt: time.Now().UnixNano(), + } + + // Update session + if err := fs.updateTusSessionOffset(ctx, session.ID, offset+int64(uploadResult.Size), chunk); err != nil { + // Try to clean up the uploaded chunk + fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{ + {FileId: fileId}, + }) + return 0, fmt.Errorf("update session: %w", err) + } + + stats.FilerHandlerCounter.WithLabelValues("tusUploadChunk").Inc() + + return int64(uploadResult.Size), nil +} + +// parseTusMetadata parses the Upload-Metadata header +// Format: key1 base64value1,key2 base64value2,... +func parseTusMetadata(header string) map[string]string { + metadata := make(map[string]string) + if header == "" { + return metadata + } + + pairs := strings.Split(header, ",") + for _, pair := range pairs { + pair = strings.TrimSpace(pair) + parts := strings.SplitN(pair, " ", 2) + if len(parts) != 2 { + continue + } + key := strings.TrimSpace(parts[0]) + encodedValue := strings.TrimSpace(parts[1]) + + value, err := base64.StdEncoding.DecodeString(encodedValue) + if err != nil { + glog.V(1).Infof("Failed to decode TUS metadata value for key %s: %v", key, err) + continue + } + metadata[key] = string(value) + } + + return metadata +} diff --git a/weed/server/filer_server_tus_session.go b/weed/server/filer_server_tus_session.go new file mode 100644 index 000000000..550574058 --- /dev/null +++ b/weed/server/filer_server_tus_session.go @@ -0,0 +1,341 @@ +package weed_server + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sort" + "strconv" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +const ( + TusVersion = "1.0.0" + TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size + TusUploadsFolder = ".uploads.tus" + TusInfoFileName = ".info" + TusChunkExt = ".chunk" + TusExtensions = "creation,creation-with-upload,termination" +) + +// TusSession represents an in-progress TUS upload session +type TusSession struct { + ID string `json:"id"` + TargetPath string `json:"target_path"` + Size int64 `json:"size"` + Offset int64 `json:"offset"` + Metadata map[string]string `json:"metadata,omitempty"` + CreatedAt time.Time `json:"created_at"` + ExpiresAt time.Time `json:"expires_at,omitempty"` + Chunks []*TusChunkInfo `json:"chunks,omitempty"` +} + +// TusChunkInfo tracks individual chunk uploads within a session +type TusChunkInfo struct { + Offset int64 `json:"offset"` + Size int64 `json:"size"` + FileId string `json:"file_id"` + UploadAt int64 `json:"upload_at"` +} + +// tusSessionDir returns the directory path for storing TUS upload sessions +func (fs *FilerServer) tusSessionDir() string { + return "/" + TusUploadsFolder +} + +// tusSessionPath returns the path to a specific upload session directory +func (fs *FilerServer) tusSessionPath(uploadID string) string { + return fmt.Sprintf("/%s/%s", TusUploadsFolder, uploadID) +} + +// tusSessionInfoPath returns the path to the session info file +func (fs *FilerServer) tusSessionInfoPath(uploadID string) string { + return fmt.Sprintf("/%s/%s/%s", TusUploadsFolder, uploadID, TusInfoFileName) +} + +// tusChunkPath returns the path to store a chunk info file +// Format: /{TusUploadsFolder}/{uploadID}/chunk_{offset}_{size}_{fileId} +func (fs *FilerServer) tusChunkPath(uploadID string, offset, size int64, fileId string) string { + // Replace / in fileId with _ to make it a valid filename + safeFileId := strings.ReplaceAll(fileId, "/", "_") + return fmt.Sprintf("/%s/%s/chunk_%016d_%016d_%s", TusUploadsFolder, uploadID, offset, size, safeFileId) +} + +// parseTusChunkPath parses chunk info from a chunk file name +func parseTusChunkPath(name string) (*TusChunkInfo, error) { + if !strings.HasPrefix(name, "chunk_") { + return nil, fmt.Errorf("not a chunk file: %s", name) + } + parts := strings.SplitN(name[6:], "_", 3) // Skip "chunk_" prefix + if len(parts) < 3 { + return nil, fmt.Errorf("invalid chunk file name: %s", name) + } + offset, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid offset in chunk file: %s", name) + } + size, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid size in chunk file: %s", name) + } + // Restore / in fileId + fileId := strings.ReplaceAll(parts[2], "_", "/") + return &TusChunkInfo{ + Offset: offset, + Size: size, + FileId: fileId, + UploadAt: time.Now().UnixNano(), + }, nil +} + +// createTusSession creates a new TUS upload session +func (fs *FilerServer) createTusSession(ctx context.Context, uploadID, targetPath string, size int64, metadata map[string]string) (*TusSession, error) { + session := &TusSession{ + ID: uploadID, + TargetPath: targetPath, + Size: size, + Offset: 0, + Metadata: metadata, + CreatedAt: time.Now(), + ExpiresAt: time.Now().Add(7 * 24 * time.Hour), // 7 days default expiration + Chunks: []*TusChunkInfo{}, + } + + // Create session directory + sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID)) + if err := fs.filer.CreateEntry(ctx, &filer.Entry{ + FullPath: sessionDirPath, + Attr: filer.Attr{ + Mode: os.ModeDir | 0755, + Crtime: time.Now(), + Mtime: time.Now(), + Uid: OS_UID, + Gid: OS_GID, + }, + }, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil { + return nil, fmt.Errorf("create session directory: %w", err) + } + + // Save session info + if err := fs.saveTusSession(ctx, session); err != nil { + // Cleanup the directory on failure + fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0) + return nil, fmt.Errorf("save session info: %w", err) + } + + glog.V(2).Infof("Created TUS session %s for %s, size=%d", uploadID, targetPath, size) + return session, nil +} + +// saveTusSession saves the session info to the filer +func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession) error { + sessionData, err := json.Marshal(session) + if err != nil { + return fmt.Errorf("marshal session: %w", err) + } + + infoPath := util.FullPath(fs.tusSessionInfoPath(session.ID)) + entry := &filer.Entry{ + FullPath: infoPath, + Attr: filer.Attr{ + Mode: 0644, + Crtime: session.CreatedAt, + Mtime: time.Now(), + Uid: OS_UID, + Gid: OS_GID, + }, + Content: sessionData, + } + + if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil { + return fmt.Errorf("save session info entry: %w", err) + } + + return nil +} + +// getTusSession retrieves a TUS session by upload ID, including chunks from directory listing +func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*TusSession, error) { + infoPath := util.FullPath(fs.tusSessionInfoPath(uploadID)) + entry, err := fs.filer.FindEntry(ctx, infoPath) + if err != nil { + if err == filer_pb.ErrNotFound { + return nil, fmt.Errorf("session not found: %s", uploadID) + } + return nil, fmt.Errorf("find session: %w", err) + } + + var session TusSession + if err := json.Unmarshal(entry.Content, &session); err != nil { + return nil, fmt.Errorf("unmarshal session: %w", err) + } + + // Load chunks from directory listing (atomic read, no race condition) + sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID)) + entries, _, err := fs.filer.ListDirectoryEntries(ctx, sessionDirPath, "", false, 10000, "", "", "") + if err != nil { + return nil, fmt.Errorf("list session directory: %w", err) + } + + session.Chunks = nil + session.Offset = 0 + for _, e := range entries { + if strings.HasPrefix(e.Name(), "chunk_") { + chunk, parseErr := parseTusChunkPath(e.Name()) + if parseErr != nil { + glog.V(1).Infof("Skipping invalid chunk file %s: %v", e.Name(), parseErr) + continue + } + session.Chunks = append(session.Chunks, chunk) + } + } + + // Sort chunks by offset and compute current offset + if len(session.Chunks) > 0 { + sort.Slice(session.Chunks, func(i, j int) bool { + return session.Chunks[i].Offset < session.Chunks[j].Offset + }) + // Current offset is the end of the last chunk + lastChunk := session.Chunks[len(session.Chunks)-1] + session.Offset = lastChunk.Offset + lastChunk.Size + } + + return &session, nil +} + +// updateTusSessionOffset stores the chunk info as a separate file entry +// This avoids read-modify-write race conditions across multiple filer instances +func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error { + if chunk == nil { + return nil + } + + // Store chunk info as a separate file entry (atomic operation) + chunkPath := util.FullPath(fs.tusChunkPath(uploadID, chunk.Offset, chunk.Size, chunk.FileId)) + chunkData, err := json.Marshal(chunk) + if err != nil { + return fmt.Errorf("marshal chunk info: %w", err) + } + + if err := fs.filer.CreateEntry(ctx, &filer.Entry{ + FullPath: chunkPath, + Attr: filer.Attr{ + Mode: 0644, + Crtime: time.Now(), + Mtime: time.Now(), + Uid: OS_UID, + Gid: OS_GID, + }, + Content: chunkData, + }, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil { + return fmt.Errorf("save chunk info: %w", err) + } + + return nil +} + +// deleteTusSession removes a TUS upload session and all its data +func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error { + + session, err := fs.getTusSession(ctx, uploadID) + if err != nil { + // Session might already be deleted or never existed + glog.V(1).Infof("TUS session %s not found for deletion: %v", uploadID, err) + return nil + } + + // Delete any uploaded chunks from volume servers + for _, chunk := range session.Chunks { + if chunk.FileId != "" { + fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{ + {FileId: chunk.FileId}, + }) + } + } + + // Delete the session directory + sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID)) + if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0); err != nil { + return fmt.Errorf("delete session directory: %w", err) + } + + glog.V(2).Infof("Deleted TUS session %s", uploadID) + return nil +} + +// completeTusUpload assembles all chunks and creates the final file +func (fs *FilerServer) completeTusUpload(ctx context.Context, session *TusSession) error { + if session.Offset != session.Size { + return fmt.Errorf("upload incomplete: offset=%d, expected=%d", session.Offset, session.Size) + } + + // Sort chunks by offset to ensure correct order + sort.Slice(session.Chunks, func(i, j int) bool { + return session.Chunks[i].Offset < session.Chunks[j].Offset + }) + + // Assemble file chunks in order + var fileChunks []*filer_pb.FileChunk + + for _, chunk := range session.Chunks { + fid, fidErr := filer_pb.ToFileIdObject(chunk.FileId) + if fidErr != nil { + return fmt.Errorf("invalid file ID %s at offset %d: %w", chunk.FileId, chunk.Offset, fidErr) + } + + fileChunk := &filer_pb.FileChunk{ + FileId: chunk.FileId, + Offset: chunk.Offset, + Size: uint64(chunk.Size), + ModifiedTsNs: chunk.UploadAt, + Fid: fid, + } + fileChunks = append(fileChunks, fileChunk) + } + + // Determine content type from metadata + contentType := "" + if session.Metadata != nil { + if ct, ok := session.Metadata["content-type"]; ok { + contentType = ct + } + } + + // Create the final file entry + targetPath := util.FullPath(session.TargetPath) + entry := &filer.Entry{ + FullPath: targetPath, + Attr: filer.Attr{ + Mode: 0644, + Crtime: session.CreatedAt, + Mtime: time.Now(), + Uid: OS_UID, + Gid: OS_GID, + Mime: contentType, + }, + Chunks: fileChunks, + } + + // Ensure parent directory exists + if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil { + return fmt.Errorf("create final file entry: %w", err) + } + + // Delete the session (but keep the chunks since they're now part of the final file) + sessionDirPath := util.FullPath(fs.tusSessionPath(session.ID)) + if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, false, false, false, nil, 0); err != nil { + glog.V(1).Infof("Failed to cleanup TUS session directory %s: %v", session.ID, err) + } + + glog.V(2).Infof("Completed TUS upload %s -> %s, size=%d, chunks=%d", + session.ID, session.TargetPath, session.Size, len(fileChunks)) + + return nil +}