committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 2895 additions and 13 deletions
-
114.github/workflows/tus-tests.yml
-
1.gitignore
-
505test/s3/sse/github_7562_copy_test.go
-
226test/tus/Makefile
-
241test/tus/README.md
-
772test/tus/tus_integration_test.go
-
3weed/command/filer.go
-
1weed/command/server.go
-
277weed/s3api/s3api_object_handlers_copy.go
-
12weed/server/filer_server.go
-
415weed/server/filer_server_tus_handlers.go
-
341weed/server/filer_server_tus_session.go
@ -0,0 +1,114 @@ |
|||
name: "TUS Protocol Tests" |
|||
|
|||
on: |
|||
pull_request: |
|||
paths: |
|||
- 'weed/server/filer_server_tus*.go' |
|||
- 'weed/server/filer_server.go' |
|||
- 'test/tus/**' |
|||
- '.github/workflows/tus-tests.yml' |
|||
push: |
|||
branches: [ master, main ] |
|||
paths: |
|||
- 'weed/server/filer_server_tus*.go' |
|||
- 'weed/server/filer_server.go' |
|||
- 'test/tus/**' |
|||
|
|||
concurrency: |
|||
group: ${{ github.head_ref || github.ref }}/tus-tests |
|||
cancel-in-progress: true |
|||
|
|||
permissions: |
|||
contents: read |
|||
|
|||
defaults: |
|||
run: |
|||
working-directory: weed |
|||
|
|||
jobs: |
|||
tus-integration-tests: |
|||
name: TUS Protocol Integration Tests |
|||
runs-on: ubuntu-22.04 |
|||
timeout-minutes: 20 |
|||
|
|||
steps: |
|||
- name: Check out code |
|||
uses: actions/checkout@v6 |
|||
|
|||
- name: Set up Go |
|||
uses: actions/setup-go@v6 |
|||
with: |
|||
go-version-file: 'go.mod' |
|||
id: go |
|||
|
|||
- name: Install SeaweedFS |
|||
run: | |
|||
go install -buildvcs=false |
|||
|
|||
- name: Run TUS Integration Tests |
|||
timeout-minutes: 15 |
|||
working-directory: test/tus |
|||
run: | |
|||
set -x |
|||
echo "=== System Information ===" |
|||
uname -a |
|||
free -h |
|||
df -h |
|||
echo "=== Starting TUS Tests ===" |
|||
|
|||
# Run tests with automatic server management |
|||
make test-with-server || { |
|||
echo "TUS integration tests failed, checking logs..." |
|||
if [ -f /tmp/seaweedfs-tus-filer.log ]; then |
|||
echo "=== Filer logs ===" |
|||
tail -100 /tmp/seaweedfs-tus-filer.log |
|||
fi |
|||
if [ -f /tmp/seaweedfs-tus-master.log ]; then |
|||
echo "=== Master logs ===" |
|||
tail -50 /tmp/seaweedfs-tus-master.log |
|||
fi |
|||
if [ -f /tmp/seaweedfs-tus-volume.log ]; then |
|||
echo "=== Volume logs ===" |
|||
tail -50 /tmp/seaweedfs-tus-volume.log |
|||
fi |
|||
exit 1 |
|||
} |
|||
|
|||
- name: Show server logs on failure |
|||
if: failure() |
|||
working-directory: test/tus |
|||
run: | |
|||
echo "=== Filer Server Logs ===" |
|||
if [ -f /tmp/seaweedfs-tus-filer.log ]; then |
|||
echo "Last 100 lines of filer logs:" |
|||
tail -100 /tmp/seaweedfs-tus-filer.log |
|||
else |
|||
echo "No filer log file found" |
|||
fi |
|||
|
|||
echo "=== Master Server Logs ===" |
|||
if [ -f /tmp/seaweedfs-tus-master.log ]; then |
|||
tail -50 /tmp/seaweedfs-tus-master.log |
|||
else |
|||
echo "No master log file found" |
|||
fi |
|||
|
|||
echo "=== Volume Server Logs ===" |
|||
if [ -f /tmp/seaweedfs-tus-volume.log ]; then |
|||
tail -50 /tmp/seaweedfs-tus-volume.log |
|||
else |
|||
echo "No volume log file found" |
|||
fi |
|||
|
|||
echo "=== Test Environment ===" |
|||
ps aux | grep -E "(weed|test)" || true |
|||
netstat -tlnp 2>/dev/null | grep -E "(18888|19333|18080)" || true |
|||
|
|||
- name: Upload test logs on failure |
|||
if: failure() |
|||
uses: actions/upload-artifact@v5 |
|||
with: |
|||
name: tus-test-logs |
|||
path: | |
|||
/tmp/seaweedfs-tus-*.log |
|||
retention-days: 3 |
|||
@ -0,0 +1,505 @@ |
|||
package sse_test |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"fmt" |
|||
"io" |
|||
"testing" |
|||
|
|||
"github.com/aws/aws-sdk-go-v2/aws" |
|||
"github.com/aws/aws-sdk-go-v2/service/s3" |
|||
"github.com/aws/aws-sdk-go-v2/service/s3/types" |
|||
"github.com/stretchr/testify/assert" |
|||
"github.com/stretchr/testify/require" |
|||
) |
|||
|
|||
// TestGitHub7562CopyFromEncryptedToTempToEncrypted reproduces the exact scenario from
|
|||
// GitHub issue #7562: copying from an encrypted bucket to a temp bucket, then to another
|
|||
// encrypted bucket fails with InternalError.
|
|||
//
|
|||
// Reproduction steps:
|
|||
// 1. Create source bucket with SSE-S3 encryption enabled
|
|||
// 2. Upload object (automatically encrypted)
|
|||
// 3. Create temp bucket (no encryption)
|
|||
// 4. Copy object from source to temp (decrypts)
|
|||
// 5. Delete source bucket
|
|||
// 6. Create destination bucket with SSE-S3 encryption
|
|||
// 7. Copy object from temp to dest (should re-encrypt) - THIS FAILS
|
|||
func TestGitHub7562CopyFromEncryptedToTempToEncrypted(t *testing.T) { |
|||
ctx := context.Background() |
|||
client, err := createS3Client(ctx, defaultConfig) |
|||
require.NoError(t, err, "Failed to create S3 client") |
|||
|
|||
// Create three buckets
|
|||
srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-src-") |
|||
require.NoError(t, err, "Failed to create source bucket") |
|||
|
|||
tempBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-temp-") |
|||
require.NoError(t, err, "Failed to create temp bucket") |
|||
|
|||
destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-dest-") |
|||
require.NoError(t, err, "Failed to create destination bucket") |
|||
|
|||
// Cleanup at the end
|
|||
defer func() { |
|||
// Clean up in reverse order of creation
|
|||
cleanupTestBucket(ctx, client, destBucket) |
|||
cleanupTestBucket(ctx, client, tempBucket) |
|||
// Note: srcBucket is deleted during the test
|
|||
}() |
|||
|
|||
testData := []byte("Test data for GitHub issue #7562 - copy from encrypted to temp to encrypted bucket") |
|||
objectKey := "demo-file.txt" |
|||
|
|||
t.Logf("[1] Creating source bucket with SSE-S3 default encryption: %s", srcBucket) |
|||
|
|||
// Step 1: Enable SSE-S3 default encryption on source bucket
|
|||
_, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ |
|||
Bucket: aws.String(srcBucket), |
|||
ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ |
|||
Rules: []types.ServerSideEncryptionRule{ |
|||
{ |
|||
ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ |
|||
SSEAlgorithm: types.ServerSideEncryptionAes256, |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
}) |
|||
require.NoError(t, err, "Failed to set source bucket default encryption") |
|||
|
|||
t.Log("[2] Uploading demo object to source bucket") |
|||
|
|||
// Step 2: Upload object to source bucket (will be automatically encrypted)
|
|||
_, err = client.PutObject(ctx, &s3.PutObjectInput{ |
|||
Bucket: aws.String(srcBucket), |
|||
Key: aws.String(objectKey), |
|||
Body: bytes.NewReader(testData), |
|||
// No encryption header - bucket default applies
|
|||
}) |
|||
require.NoError(t, err, "Failed to upload to source bucket") |
|||
|
|||
// Verify source object is encrypted
|
|||
srcHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ |
|||
Bucket: aws.String(srcBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to HEAD source object") |
|||
assert.Equal(t, types.ServerSideEncryptionAes256, srcHead.ServerSideEncryption, |
|||
"Source object should be SSE-S3 encrypted") |
|||
t.Logf("Source object encryption: %v", srcHead.ServerSideEncryption) |
|||
|
|||
t.Logf("[3] Creating temp bucket (no encryption): %s", tempBucket) |
|||
// Temp bucket already created without encryption
|
|||
|
|||
t.Log("[4] Copying object from source to temp (should decrypt)") |
|||
|
|||
// Step 4: Copy to temp bucket (no encryption = decrypts)
|
|||
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{ |
|||
Bucket: aws.String(tempBucket), |
|||
Key: aws.String(objectKey), |
|||
CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)), |
|||
// No encryption header - data stored unencrypted
|
|||
}) |
|||
require.NoError(t, err, "Failed to copy to temp bucket") |
|||
|
|||
// Verify temp object is NOT encrypted
|
|||
tempHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ |
|||
Bucket: aws.String(tempBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to HEAD temp object") |
|||
assert.Empty(t, tempHead.ServerSideEncryption, "Temp object should NOT be encrypted") |
|||
t.Logf("Temp object encryption: %v (should be empty)", tempHead.ServerSideEncryption) |
|||
|
|||
// Verify temp object content
|
|||
tempGet, err := client.GetObject(ctx, &s3.GetObjectInput{ |
|||
Bucket: aws.String(tempBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to GET temp object") |
|||
tempData, err := io.ReadAll(tempGet.Body) |
|||
tempGet.Body.Close() |
|||
require.NoError(t, err, "Failed to read temp object") |
|||
assertDataEqual(t, testData, tempData, "Temp object data should match original") |
|||
|
|||
t.Log("[5] Deleting original source bucket") |
|||
|
|||
// Step 5: Delete source bucket
|
|||
err = cleanupTestBucket(ctx, client, srcBucket) |
|||
require.NoError(t, err, "Failed to delete source bucket") |
|||
|
|||
t.Logf("[6] Creating destination bucket with SSE-S3 encryption: %s", destBucket) |
|||
|
|||
// Step 6: Enable SSE-S3 default encryption on destination bucket
|
|||
_, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ |
|||
Bucket: aws.String(destBucket), |
|||
ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ |
|||
Rules: []types.ServerSideEncryptionRule{ |
|||
{ |
|||
ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ |
|||
SSEAlgorithm: types.ServerSideEncryptionAes256, |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
}) |
|||
require.NoError(t, err, "Failed to set destination bucket default encryption") |
|||
|
|||
t.Log("[7] Copying object from temp to dest (should re-encrypt) - THIS IS WHERE #7562 FAILS") |
|||
|
|||
// Step 7: Copy from temp to dest bucket (should re-encrypt with SSE-S3)
|
|||
// THIS IS THE STEP THAT FAILS IN GITHUB ISSUE #7562
|
|||
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)), |
|||
// No encryption header - bucket default should apply
|
|||
}) |
|||
require.NoError(t, err, "GitHub #7562: Failed to copy from temp to encrypted dest bucket") |
|||
|
|||
// Verify destination object is encrypted
|
|||
destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to HEAD destination object") |
|||
assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption, |
|||
"Destination object should be SSE-S3 encrypted via bucket default") |
|||
t.Logf("Destination object encryption: %v", destHead.ServerSideEncryption) |
|||
|
|||
// Verify destination object content is correct
|
|||
destGet, err := client.GetObject(ctx, &s3.GetObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to GET destination object") |
|||
destData, err := io.ReadAll(destGet.Body) |
|||
destGet.Body.Close() |
|||
require.NoError(t, err, "Failed to read destination object") |
|||
assertDataEqual(t, testData, destData, "GitHub #7562: Destination object data mismatch after re-encryption") |
|||
|
|||
t.Log("[done] GitHub #7562 reproduction test completed successfully!") |
|||
} |
|||
|
|||
// TestGitHub7562SimpleScenario tests the simpler variant: just copy unencrypted to encrypted bucket
|
|||
func TestGitHub7562SimpleScenario(t *testing.T) { |
|||
ctx := context.Background() |
|||
client, err := createS3Client(ctx, defaultConfig) |
|||
require.NoError(t, err, "Failed to create S3 client") |
|||
|
|||
// Create two buckets
|
|||
srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-simple-src-") |
|||
require.NoError(t, err, "Failed to create source bucket") |
|||
defer cleanupTestBucket(ctx, client, srcBucket) |
|||
|
|||
destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-simple-dest-") |
|||
require.NoError(t, err, "Failed to create destination bucket") |
|||
defer cleanupTestBucket(ctx, client, destBucket) |
|||
|
|||
testData := []byte("Simple test for unencrypted to encrypted copy") |
|||
objectKey := "test-object.txt" |
|||
|
|||
t.Logf("Source bucket (no encryption): %s", srcBucket) |
|||
t.Logf("Dest bucket (SSE-S3 default): %s", destBucket) |
|||
|
|||
// Upload to unencrypted source bucket
|
|||
_, err = client.PutObject(ctx, &s3.PutObjectInput{ |
|||
Bucket: aws.String(srcBucket), |
|||
Key: aws.String(objectKey), |
|||
Body: bytes.NewReader(testData), |
|||
}) |
|||
require.NoError(t, err, "Failed to upload to source bucket") |
|||
|
|||
// Enable SSE-S3 on destination bucket
|
|||
_, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ |
|||
Bucket: aws.String(destBucket), |
|||
ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ |
|||
Rules: []types.ServerSideEncryptionRule{ |
|||
{ |
|||
ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ |
|||
SSEAlgorithm: types.ServerSideEncryptionAes256, |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
}) |
|||
require.NoError(t, err, "Failed to set dest bucket encryption") |
|||
|
|||
// Copy to encrypted bucket (should use bucket default encryption)
|
|||
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)), |
|||
}) |
|||
require.NoError(t, err, "Failed to copy to encrypted bucket") |
|||
|
|||
// Verify destination is encrypted
|
|||
destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to HEAD dest object") |
|||
assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption, |
|||
"Object should be encrypted via bucket default") |
|||
|
|||
// Verify content
|
|||
destGet, err := client.GetObject(ctx, &s3.GetObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to GET dest object") |
|||
destData, err := io.ReadAll(destGet.Body) |
|||
destGet.Body.Close() |
|||
require.NoError(t, err, "Failed to read dest object") |
|||
assertDataEqual(t, testData, destData, "Data mismatch") |
|||
} |
|||
|
|||
// TestGitHub7562DebugMetadata helps debug what metadata is present on objects at each step
|
|||
func TestGitHub7562DebugMetadata(t *testing.T) { |
|||
ctx := context.Background() |
|||
client, err := createS3Client(ctx, defaultConfig) |
|||
require.NoError(t, err, "Failed to create S3 client") |
|||
|
|||
// Create three buckets
|
|||
srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-debug-src-") |
|||
require.NoError(t, err, "Failed to create source bucket") |
|||
|
|||
tempBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-debug-temp-") |
|||
require.NoError(t, err, "Failed to create temp bucket") |
|||
|
|||
destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-debug-dest-") |
|||
require.NoError(t, err, "Failed to create destination bucket") |
|||
|
|||
defer func() { |
|||
cleanupTestBucket(ctx, client, destBucket) |
|||
cleanupTestBucket(ctx, client, tempBucket) |
|||
}() |
|||
|
|||
testData := []byte("Debug metadata test for GitHub #7562") |
|||
objectKey := "debug-file.txt" |
|||
|
|||
// Enable SSE-S3 on source
|
|||
_, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ |
|||
Bucket: aws.String(srcBucket), |
|||
ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ |
|||
Rules: []types.ServerSideEncryptionRule{ |
|||
{ |
|||
ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ |
|||
SSEAlgorithm: types.ServerSideEncryptionAes256, |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
}) |
|||
require.NoError(t, err, "Failed to set source bucket encryption") |
|||
|
|||
// Upload
|
|||
_, err = client.PutObject(ctx, &s3.PutObjectInput{ |
|||
Bucket: aws.String(srcBucket), |
|||
Key: aws.String(objectKey), |
|||
Body: bytes.NewReader(testData), |
|||
}) |
|||
require.NoError(t, err, "Failed to upload") |
|||
|
|||
// Log source object headers
|
|||
srcHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ |
|||
Bucket: aws.String(srcBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to HEAD source") |
|||
t.Logf("=== SOURCE OBJECT (encrypted) ===") |
|||
t.Logf("ServerSideEncryption: %v", srcHead.ServerSideEncryption) |
|||
t.Logf("Metadata: %v", srcHead.Metadata) |
|||
t.Logf("ContentLength: %d", aws.ToInt64(srcHead.ContentLength)) |
|||
|
|||
// Copy to temp
|
|||
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{ |
|||
Bucket: aws.String(tempBucket), |
|||
Key: aws.String(objectKey), |
|||
CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)), |
|||
}) |
|||
require.NoError(t, err, "Failed to copy to temp") |
|||
|
|||
// Log temp object headers
|
|||
tempHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ |
|||
Bucket: aws.String(tempBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to HEAD temp") |
|||
t.Logf("=== TEMP OBJECT (should be unencrypted) ===") |
|||
t.Logf("ServerSideEncryption: %v (should be empty)", tempHead.ServerSideEncryption) |
|||
t.Logf("Metadata: %v", tempHead.Metadata) |
|||
t.Logf("ContentLength: %d", aws.ToInt64(tempHead.ContentLength)) |
|||
|
|||
// Verify temp is NOT encrypted
|
|||
if tempHead.ServerSideEncryption != "" { |
|||
t.Logf("WARNING: Temp object unexpectedly has encryption: %v", tempHead.ServerSideEncryption) |
|||
} |
|||
|
|||
// Delete source bucket
|
|||
cleanupTestBucket(ctx, client, srcBucket) |
|||
|
|||
// Enable SSE-S3 on dest
|
|||
_, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ |
|||
Bucket: aws.String(destBucket), |
|||
ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ |
|||
Rules: []types.ServerSideEncryptionRule{ |
|||
{ |
|||
ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ |
|||
SSEAlgorithm: types.ServerSideEncryptionAes256, |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
}) |
|||
require.NoError(t, err, "Failed to set dest bucket encryption") |
|||
|
|||
// Copy to dest - THIS IS WHERE #7562 FAILS
|
|||
t.Log("=== COPYING TO ENCRYPTED DEST ===") |
|||
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)), |
|||
}) |
|||
if err != nil { |
|||
t.Logf("!!! COPY FAILED (GitHub #7562): %v", err) |
|||
t.FailNow() |
|||
} |
|||
|
|||
// Log dest object headers
|
|||
destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to HEAD dest") |
|||
t.Logf("=== DEST OBJECT (should be encrypted) ===") |
|||
t.Logf("ServerSideEncryption: %v", destHead.ServerSideEncryption) |
|||
t.Logf("Metadata: %v", destHead.Metadata) |
|||
t.Logf("ContentLength: %d", aws.ToInt64(destHead.ContentLength)) |
|||
|
|||
// Verify dest IS encrypted
|
|||
assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption, |
|||
"Dest object should be encrypted") |
|||
|
|||
// Verify content is readable
|
|||
destGet, err := client.GetObject(ctx, &s3.GetObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to GET dest") |
|||
destData, err := io.ReadAll(destGet.Body) |
|||
destGet.Body.Close() |
|||
require.NoError(t, err, "Failed to read dest") |
|||
assertDataEqual(t, testData, destData, "Data mismatch") |
|||
|
|||
t.Log("=== DEBUG TEST PASSED ===") |
|||
} |
|||
|
|||
// TestGitHub7562LargeFile tests the issue with larger files that might trigger multipart handling
|
|||
func TestGitHub7562LargeFile(t *testing.T) { |
|||
ctx := context.Background() |
|||
client, err := createS3Client(ctx, defaultConfig) |
|||
require.NoError(t, err, "Failed to create S3 client") |
|||
|
|||
srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-large-src-") |
|||
require.NoError(t, err, "Failed to create source bucket") |
|||
|
|||
tempBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-large-temp-") |
|||
require.NoError(t, err, "Failed to create temp bucket") |
|||
|
|||
destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-large-dest-") |
|||
require.NoError(t, err, "Failed to create destination bucket") |
|||
|
|||
defer func() { |
|||
cleanupTestBucket(ctx, client, destBucket) |
|||
cleanupTestBucket(ctx, client, tempBucket) |
|||
}() |
|||
|
|||
// Use larger file to potentially trigger different code paths
|
|||
testData := generateTestData(5 * 1024 * 1024) // 5MB
|
|||
objectKey := "large-file.bin" |
|||
|
|||
t.Logf("Testing with %d byte file", len(testData)) |
|||
|
|||
// Enable SSE-S3 on source
|
|||
_, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ |
|||
Bucket: aws.String(srcBucket), |
|||
ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ |
|||
Rules: []types.ServerSideEncryptionRule{ |
|||
{ |
|||
ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ |
|||
SSEAlgorithm: types.ServerSideEncryptionAes256, |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
}) |
|||
require.NoError(t, err, "Failed to set source bucket encryption") |
|||
|
|||
// Upload
|
|||
_, err = client.PutObject(ctx, &s3.PutObjectInput{ |
|||
Bucket: aws.String(srcBucket), |
|||
Key: aws.String(objectKey), |
|||
Body: bytes.NewReader(testData), |
|||
}) |
|||
require.NoError(t, err, "Failed to upload") |
|||
|
|||
// Copy to temp (decrypt)
|
|||
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{ |
|||
Bucket: aws.String(tempBucket), |
|||
Key: aws.String(objectKey), |
|||
CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)), |
|||
}) |
|||
require.NoError(t, err, "Failed to copy to temp") |
|||
|
|||
// Delete source
|
|||
cleanupTestBucket(ctx, client, srcBucket) |
|||
|
|||
// Enable SSE-S3 on dest
|
|||
_, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ |
|||
Bucket: aws.String(destBucket), |
|||
ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ |
|||
Rules: []types.ServerSideEncryptionRule{ |
|||
{ |
|||
ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ |
|||
SSEAlgorithm: types.ServerSideEncryptionAes256, |
|||
}, |
|||
}, |
|||
}, |
|||
}, |
|||
}) |
|||
require.NoError(t, err, "Failed to set dest bucket encryption") |
|||
|
|||
// Copy to dest (re-encrypt) - GitHub #7562
|
|||
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)), |
|||
}) |
|||
require.NoError(t, err, "GitHub #7562: Large file copy to encrypted bucket failed") |
|||
|
|||
// Verify
|
|||
destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to HEAD dest") |
|||
assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption) |
|||
assert.Equal(t, int64(len(testData)), aws.ToInt64(destHead.ContentLength)) |
|||
|
|||
// Verify content
|
|||
destGet, err := client.GetObject(ctx, &s3.GetObjectInput{ |
|||
Bucket: aws.String(destBucket), |
|||
Key: aws.String(objectKey), |
|||
}) |
|||
require.NoError(t, err, "Failed to GET dest") |
|||
destData, err := io.ReadAll(destGet.Body) |
|||
destGet.Body.Close() |
|||
require.NoError(t, err, "Failed to read dest") |
|||
assertDataEqual(t, testData, destData, "Large file data mismatch") |
|||
|
|||
t.Log("Large file test passed!") |
|||
} |
|||
|
|||
@ -0,0 +1,226 @@ |
|||
# Makefile for TUS Protocol Integration Tests
|
|||
# This Makefile provides targets for running TUS (resumable upload) integration tests
|
|||
|
|||
# Default values
|
|||
SEAWEEDFS_BINARY ?= weed |
|||
FILER_PORT ?= 18888 |
|||
VOLUME_PORT ?= 18080 |
|||
MASTER_PORT ?= 19333 |
|||
TEST_TIMEOUT ?= 10m |
|||
VOLUME_MAX_SIZE_MB ?= 50 |
|||
VOLUME_MAX_COUNT ?= 100 |
|||
|
|||
# Test directory
|
|||
TEST_DIR := $(shell pwd) |
|||
SEAWEEDFS_ROOT := $(shell cd ../.. && pwd) |
|||
|
|||
# Colors for output
|
|||
RED := \033[0;31m |
|||
GREEN := \033[0;32m |
|||
YELLOW := \033[1;33m |
|||
NC := \033[0m # No Color |
|||
|
|||
.PHONY: all test clean start-seaweedfs stop-seaweedfs check-binary build-weed help test-basic test-chunked test-resume test-errors test-with-server |
|||
|
|||
all: test |
|||
|
|||
# Build SeaweedFS binary
|
|||
build-weed: |
|||
@echo "Building SeaweedFS binary..." |
|||
@cd $(SEAWEEDFS_ROOT)/weed && go build -o weed |
|||
@echo "$(GREEN)SeaweedFS binary built successfully$(NC)" |
|||
|
|||
help: |
|||
@echo "SeaweedFS TUS Protocol Integration Tests" |
|||
@echo "" |
|||
@echo "Available targets:" |
|||
@echo " test - Run all TUS integration tests" |
|||
@echo " test-basic - Run basic TUS upload tests" |
|||
@echo " test-chunked - Run chunked upload tests" |
|||
@echo " test-resume - Run upload resume tests" |
|||
@echo " test-errors - Run error handling tests" |
|||
@echo " test-with-server - Run tests with automatic server management" |
|||
@echo " start-seaweedfs - Start SeaweedFS server for testing" |
|||
@echo " stop-seaweedfs - Stop SeaweedFS server" |
|||
@echo " clean - Clean up test artifacts" |
|||
@echo " check-binary - Check if SeaweedFS binary exists" |
|||
@echo " build-weed - Build SeaweedFS binary" |
|||
@echo "" |
|||
@echo "Configuration:" |
|||
@echo " SEAWEEDFS_BINARY=$(SEAWEEDFS_BINARY)" |
|||
@echo " FILER_PORT=$(FILER_PORT)" |
|||
@echo " VOLUME_PORT=$(VOLUME_PORT)" |
|||
@echo " MASTER_PORT=$(MASTER_PORT)" |
|||
@echo " TEST_TIMEOUT=$(TEST_TIMEOUT)" |
|||
|
|||
check-binary: |
|||
@if ! command -v $(SEAWEEDFS_BINARY) > /dev/null 2>&1 && [ ! -f "$(SEAWEEDFS_ROOT)/weed/weed" ]; then \
|
|||
echo "$(RED)Error: SeaweedFS binary not found$(NC)"; \
|
|||
echo "Please build SeaweedFS first: make build-weed"; \
|
|||
exit 1; \
|
|||
fi |
|||
@echo "$(GREEN)SeaweedFS binary found$(NC)" |
|||
|
|||
start-seaweedfs: check-binary |
|||
@echo "$(YELLOW)Starting SeaweedFS server for TUS testing...$(NC)" |
|||
@# Clean up any existing processes on our test ports |
|||
@lsof -ti :$(MASTER_PORT) | xargs kill -TERM 2>/dev/null || true |
|||
@lsof -ti :$(VOLUME_PORT) | xargs kill -TERM 2>/dev/null || true |
|||
@lsof -ti :$(FILER_PORT) | xargs kill -TERM 2>/dev/null || true |
|||
@sleep 2 |
|||
|
|||
# Create necessary directories |
|||
@mkdir -p /tmp/seaweedfs-test-tus-master |
|||
@mkdir -p /tmp/seaweedfs-test-tus-volume |
|||
@mkdir -p /tmp/seaweedfs-test-tus-filer |
|||
|
|||
# Start master server (use freshly built binary) |
|||
@echo "Starting master server..." |
|||
@nohup $(SEAWEEDFS_ROOT)/weed/weed master \
|
|||
-port=$(MASTER_PORT) \
|
|||
-mdir=/tmp/seaweedfs-test-tus-master \
|
|||
-volumeSizeLimitMB=$(VOLUME_MAX_SIZE_MB) \
|
|||
-ip=127.0.0.1 \
|
|||
> /tmp/seaweedfs-tus-master.log 2>&1 & |
|||
@sleep 3 |
|||
|
|||
# Start volume server |
|||
@echo "Starting volume server..." |
|||
@nohup $(SEAWEEDFS_ROOT)/weed/weed volume \
|
|||
-port=$(VOLUME_PORT) \
|
|||
-mserver=127.0.0.1:$(MASTER_PORT) \
|
|||
-dir=/tmp/seaweedfs-test-tus-volume \
|
|||
-max=$(VOLUME_MAX_COUNT) \
|
|||
-ip=127.0.0.1 \
|
|||
> /tmp/seaweedfs-tus-volume.log 2>&1 & |
|||
@sleep 3 |
|||
|
|||
# Start filer server with TUS enabled (default tusBasePath is .tus) |
|||
@echo "Starting filer server..." |
|||
@nohup $(SEAWEEDFS_ROOT)/weed/weed filer \
|
|||
-port=$(FILER_PORT) \
|
|||
-master=127.0.0.1:$(MASTER_PORT) \
|
|||
-ip=127.0.0.1 \
|
|||
> /tmp/seaweedfs-tus-filer.log 2>&1 & |
|||
@sleep 5 |
|||
|
|||
# Wait for filer to be ready |
|||
@echo "$(YELLOW)Waiting for filer to be ready...$(NC)" |
|||
@for i in $$(seq 1 30); do \
|
|||
if curl -s -f http://127.0.0.1:$(FILER_PORT)/ > /dev/null 2>&1; then \
|
|||
echo "$(GREEN)Filer is ready$(NC)"; \
|
|||
break; \
|
|||
fi; \
|
|||
if [ $$i -eq 30 ]; then \
|
|||
echo "$(RED)Filer failed to start within 30 seconds$(NC)"; \
|
|||
$(MAKE) debug-logs; \
|
|||
exit 1; \
|
|||
fi; \
|
|||
echo "Waiting for filer... ($$i/30)"; \
|
|||
sleep 1; \
|
|||
done |
|||
|
|||
@echo "$(GREEN)SeaweedFS server started successfully for TUS testing$(NC)" |
|||
@echo "Master: http://localhost:$(MASTER_PORT)" |
|||
@echo "Volume: http://localhost:$(VOLUME_PORT)" |
|||
@echo "Filer: http://localhost:$(FILER_PORT)" |
|||
@echo "TUS Endpoint: http://localhost:$(FILER_PORT)/.tus/" |
|||
|
|||
stop-seaweedfs: |
|||
@echo "$(YELLOW)Stopping SeaweedFS server...$(NC)" |
|||
@lsof -ti :$(MASTER_PORT) | xargs -r kill -TERM 2>/dev/null || true |
|||
@lsof -ti :$(VOLUME_PORT) | xargs -r kill -TERM 2>/dev/null || true |
|||
@lsof -ti :$(FILER_PORT) | xargs -r kill -TERM 2>/dev/null || true |
|||
@sleep 2 |
|||
@echo "$(GREEN)SeaweedFS server stopped$(NC)" |
|||
|
|||
clean: |
|||
@echo "$(YELLOW)Cleaning up TUS test artifacts...$(NC)" |
|||
@rm -rf /tmp/seaweedfs-test-tus-* |
|||
@rm -f /tmp/seaweedfs-tus-*.log |
|||
@echo "$(GREEN)TUS test cleanup completed$(NC)" |
|||
|
|||
# Run all tests
|
|||
test: check-binary |
|||
@echo "$(YELLOW)Running all TUS integration tests...$(NC)" |
|||
@cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) ./test/tus/... |
|||
@echo "$(GREEN)All TUS tests completed$(NC)" |
|||
|
|||
# Run basic upload tests
|
|||
test-basic: check-binary |
|||
@echo "$(YELLOW)Running basic TUS upload tests...$(NC)" |
|||
@cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusBasicUpload|TestTusOptionsHandler" ./test/tus/... |
|||
@echo "$(GREEN)Basic TUS tests completed$(NC)" |
|||
|
|||
# Run chunked upload tests
|
|||
test-chunked: check-binary |
|||
@echo "$(YELLOW)Running chunked TUS upload tests...$(NC)" |
|||
@cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusChunkedUpload" ./test/tus/... |
|||
@echo "$(GREEN)Chunked TUS tests completed$(NC)" |
|||
|
|||
# Run resume tests
|
|||
test-resume: check-binary |
|||
@echo "$(YELLOW)Running TUS upload resume tests...$(NC)" |
|||
@cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusResumeAfterInterruption|TestTusHeadRequest" ./test/tus/... |
|||
@echo "$(GREEN)TUS resume tests completed$(NC)" |
|||
|
|||
# Run error handling tests
|
|||
test-errors: check-binary |
|||
@echo "$(YELLOW)Running TUS error handling tests...$(NC)" |
|||
@cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusInvalidOffset|TestTusUploadNotFound|TestTusDeleteUpload" ./test/tus/... |
|||
@echo "$(GREEN)TUS error tests completed$(NC)" |
|||
|
|||
# Run tests with automatic server management
|
|||
test-with-server: build-weed |
|||
@echo "$(YELLOW)Running TUS tests with automatic server management...$(NC)" |
|||
@$(MAKE) -C $(TEST_DIR) start-seaweedfs && \
|
|||
sleep 3 && \
|
|||
cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) ./test/tus/...; \
|
|||
TEST_RESULT=$$?; \
|
|||
$(MAKE) -C $(TEST_DIR) stop-seaweedfs; \
|
|||
$(MAKE) -C $(TEST_DIR) clean; \
|
|||
if [ $$TEST_RESULT -eq 0 ]; then echo "$(GREEN)All TUS tests passed!$(NC)"; fi; \
|
|||
exit $$TEST_RESULT |
|||
|
|||
# Debug targets
|
|||
debug-logs: |
|||
@echo "$(YELLOW)=== Master Log ===$(NC)" |
|||
@tail -n 50 /tmp/seaweedfs-tus-master.log 2>/dev/null || echo "No master log found" |
|||
@echo "$(YELLOW)=== Volume Log ===$(NC)" |
|||
@tail -n 50 /tmp/seaweedfs-tus-volume.log 2>/dev/null || echo "No volume log found" |
|||
@echo "$(YELLOW)=== Filer Log ===$(NC)" |
|||
@tail -n 50 /tmp/seaweedfs-tus-filer.log 2>/dev/null || echo "No filer log found" |
|||
|
|||
debug-status: |
|||
@echo "$(YELLOW)=== Process Status ===$(NC)" |
|||
@ps aux | grep -E "(weed|seaweedfs)" | grep -v grep || echo "No SeaweedFS processes found" |
|||
@echo "$(YELLOW)=== Port Status ===$(NC)" |
|||
@lsof -i :$(MASTER_PORT) -i :$(VOLUME_PORT) -i :$(FILER_PORT) 2>/dev/null || echo "No ports in use" |
|||
|
|||
# Manual testing targets
|
|||
manual-start: start-seaweedfs |
|||
@echo "$(GREEN)SeaweedFS is now running for manual TUS testing$(NC)" |
|||
@echo "" |
|||
@echo "TUS Endpoints:" |
|||
@echo " OPTIONS /.tus/ - Capability discovery" |
|||
@echo " POST /.tus/{path} - Create upload" |
|||
@echo " HEAD /.tus/.uploads/{id} - Get offset" |
|||
@echo " PATCH /.tus/.uploads/{id} - Upload data" |
|||
@echo " DELETE /.tus/.uploads/{id} - Cancel upload" |
|||
@echo "" |
|||
@echo "Example curl commands:" |
|||
@echo " curl -X OPTIONS http://localhost:$(FILER_PORT)/.tus/ -H 'Tus-Resumable: 1.0.0'" |
|||
@echo "" |
|||
@echo "Run 'make manual-stop' when finished" |
|||
|
|||
manual-stop: stop-seaweedfs clean |
|||
|
|||
# CI targets
|
|||
ci-test: test-with-server |
|||
|
|||
# Skip integration tests (short mode)
|
|||
test-short: |
|||
@echo "$(YELLOW)Running TUS tests in short mode (skipping integration tests)...$(NC)" |
|||
@cd $(SEAWEEDFS_ROOT) && go test -v -short ./test/tus/... |
|||
@echo "$(GREEN)Short tests completed$(NC)" |
|||
|
|||
@ -0,0 +1,241 @@ |
|||
# TUS Protocol Integration Tests |
|||
|
|||
This directory contains integration tests for the TUS (resumable upload) protocol support in SeaweedFS Filer. |
|||
|
|||
## Overview |
|||
|
|||
TUS is an open protocol for resumable file uploads over HTTP. It allows clients to upload files in chunks and resume uploads after network failures or interruptions. |
|||
|
|||
### Why TUS? |
|||
|
|||
- **Resumable uploads**: Resume interrupted uploads without re-sending data |
|||
- **Chunked uploads**: Upload large files in smaller pieces |
|||
- **Simple protocol**: Standard HTTP methods with custom headers |
|||
- **Wide client support**: Libraries available for JavaScript, Python, Go, and more |
|||
|
|||
## TUS Protocol Endpoints |
|||
|
|||
| Method | Path | Description | |
|||
|--------|------|-------------| |
|||
| `OPTIONS` | `/.tus/` | Server capability discovery | |
|||
| `POST` | `/.tus/{path}` | Create new upload session | |
|||
| `HEAD` | `/.tus/.uploads/{id}` | Get current upload offset | |
|||
| `PATCH` | `/.tus/.uploads/{id}` | Upload data at offset | |
|||
| `DELETE` | `/.tus/.uploads/{id}` | Cancel upload | |
|||
|
|||
### TUS Headers |
|||
|
|||
**Request Headers:** |
|||
- `Tus-Resumable: 1.0.0` - Protocol version (required) |
|||
- `Upload-Length` - Total file size in bytes (required on POST) |
|||
- `Upload-Offset` - Current byte offset (required on PATCH) |
|||
- `Upload-Metadata` - Base64-encoded key-value pairs (optional) |
|||
- `Content-Type: application/offset+octet-stream` (required on PATCH) |
|||
|
|||
**Response Headers:** |
|||
- `Tus-Resumable` - Protocol version |
|||
- `Tus-Version` - Supported versions |
|||
- `Tus-Extension` - Supported extensions |
|||
- `Tus-Max-Size` - Maximum upload size |
|||
- `Upload-Offset` - Current byte offset |
|||
- `Location` - Upload URL (on POST) |
|||
|
|||
## Enabling TUS |
|||
|
|||
TUS protocol support is enabled by default at `/.tus` path. You can customize the path using the `-tusBasePath` flag: |
|||
|
|||
```bash |
|||
# Start filer with default TUS path (/.tus) |
|||
weed filer -master=localhost:9333 |
|||
|
|||
# Use a custom path |
|||
weed filer -master=localhost:9333 -tusBasePath=uploads/tus |
|||
|
|||
# Disable TUS by setting empty path |
|||
weed filer -master=localhost:9333 -tusBasePath= |
|||
``` |
|||
|
|||
## Test Structure |
|||
|
|||
### Integration Tests |
|||
|
|||
The tests cover: |
|||
|
|||
1. **Basic Functionality** |
|||
- `TestTusOptionsHandler` - Capability discovery |
|||
- `TestTusBasicUpload` - Simple complete upload |
|||
- `TestTusCreationWithUpload` - Creation-with-upload extension |
|||
|
|||
2. **Chunked Uploads** |
|||
- `TestTusChunkedUpload` - Upload in multiple chunks |
|||
|
|||
3. **Resumable Uploads** |
|||
- `TestTusHeadRequest` - Offset tracking |
|||
- `TestTusResumeAfterInterruption` - Resume after failure |
|||
|
|||
4. **Error Handling** |
|||
- `TestTusInvalidOffset` - Offset mismatch (409 Conflict) |
|||
- `TestTusUploadNotFound` - Missing upload (404 Not Found) |
|||
- `TestTusDeleteUpload` - Upload cancellation |
|||
|
|||
## Running Tests |
|||
|
|||
### Prerequisites |
|||
|
|||
1. **Build SeaweedFS**: |
|||
```bash |
|||
make build-weed |
|||
# or |
|||
cd ../../weed && go build -o weed |
|||
``` |
|||
|
|||
### Using Makefile |
|||
|
|||
```bash |
|||
# Show available targets |
|||
make help |
|||
|
|||
# Run all tests with automatic server management |
|||
make test-with-server |
|||
|
|||
# Run all tests (requires running server) |
|||
make test |
|||
|
|||
# Run specific test categories |
|||
make test-basic # Basic upload tests |
|||
make test-chunked # Chunked upload tests |
|||
make test-resume # Resume/HEAD tests |
|||
make test-errors # Error handling tests |
|||
|
|||
# Manual testing |
|||
make manual-start # Start SeaweedFS for manual testing |
|||
make manual-stop # Stop and cleanup |
|||
``` |
|||
|
|||
### Using Go Test Directly |
|||
|
|||
```bash |
|||
# Run all TUS tests |
|||
go test -v ./test/tus/... |
|||
|
|||
# Run specific test |
|||
go test -v ./test/tus -run TestTusBasicUpload |
|||
|
|||
# Skip integration tests (short mode) |
|||
go test -v -short ./test/tus/... |
|||
``` |
|||
|
|||
### Debug |
|||
|
|||
```bash |
|||
# View server logs |
|||
make debug-logs |
|||
|
|||
# Check process and port status |
|||
make debug-status |
|||
``` |
|||
|
|||
## Test Environment |
|||
|
|||
Each test run: |
|||
1. Starts a SeaweedFS cluster (master, volume, filer) |
|||
2. Creates uploads using TUS protocol |
|||
3. Verifies files are stored correctly |
|||
4. Cleans up test data |
|||
|
|||
### Default Ports |
|||
|
|||
| Service | Port | |
|||
|---------|------| |
|||
| Master | 19333 | |
|||
| Volume | 18080 | |
|||
| Filer | 18888 | |
|||
|
|||
### Configuration |
|||
|
|||
Override defaults via environment or Makefile variables: |
|||
```bash |
|||
FILER_PORT=8889 MASTER_PORT=9334 make test |
|||
``` |
|||
|
|||
## Example Usage |
|||
|
|||
### Create Upload |
|||
|
|||
```bash |
|||
curl -X POST http://localhost:18888/.tus/mydir/file.txt \ |
|||
-H "Tus-Resumable: 1.0.0" \ |
|||
-H "Upload-Length: 1000" \ |
|||
-H "Upload-Metadata: filename dGVzdC50eHQ=" |
|||
``` |
|||
|
|||
### Upload Data |
|||
|
|||
```bash |
|||
curl -X PATCH http://localhost:18888/.tus/.uploads/{upload-id} \ |
|||
-H "Tus-Resumable: 1.0.0" \ |
|||
-H "Upload-Offset: 0" \ |
|||
-H "Content-Type: application/offset+octet-stream" \ |
|||
--data-binary @file.txt |
|||
``` |
|||
|
|||
### Check Offset |
|||
|
|||
```bash |
|||
curl -I http://localhost:18888/.tus/.uploads/{upload-id} \ |
|||
-H "Tus-Resumable: 1.0.0" |
|||
``` |
|||
|
|||
### Cancel Upload |
|||
|
|||
```bash |
|||
curl -X DELETE http://localhost:18888/.tus/.uploads/{upload-id} \ |
|||
-H "Tus-Resumable: 1.0.0" |
|||
``` |
|||
|
|||
## TUS Extensions Supported |
|||
|
|||
- **creation**: Create new uploads with POST |
|||
- **creation-with-upload**: Send data in creation request |
|||
- **termination**: Cancel uploads with DELETE |
|||
|
|||
## Architecture |
|||
|
|||
```text |
|||
Client Filer Volume Servers |
|||
| | | |
|||
|-- POST /.tus/path/file.mp4 ->| | |
|||
| |-- Create session dir ------->| |
|||
|<-- 201 Location: /.../{id} --| | |
|||
| | | |
|||
|-- PATCH /.tus/.uploads/{id} >| | |
|||
| Upload-Offset: 0 |-- Assign volume ------------>| |
|||
| [chunk data] |-- Upload chunk ------------->| |
|||
|<-- 204 Upload-Offset: N -----| | |
|||
| | | |
|||
| (network failure) | | |
|||
| | | |
|||
|-- HEAD /.tus/.uploads/{id} ->| | |
|||
|<-- Upload-Offset: N ---------| | |
|||
| | | |
|||
|-- PATCH (resume) ----------->|-- Upload remaining -------->| |
|||
|<-- 204 (complete) -----------|-- Assemble final file ----->| |
|||
``` |
|||
|
|||
## Comparison with S3 Multipart |
|||
|
|||
| Feature | TUS | S3 Multipart | |
|||
|---------|-----|--------------| |
|||
| Protocol | Custom HTTP headers | S3 API | |
|||
| Session Init | POST with Upload-Length | CreateMultipartUpload | |
|||
| Upload Data | PATCH with offset | UploadPart with partNumber | |
|||
| Resume | HEAD to get offset | ListParts | |
|||
| Complete | Automatic at final offset | CompleteMultipartUpload | |
|||
| Ordering | Sequential (offset-based) | Parallel (part numbers) | |
|||
|
|||
## Related Resources |
|||
|
|||
- [TUS Protocol Specification](https://tus.io/protocols/resumable-upload) |
|||
- [tus-js-client](https://github.com/tus/tus-js-client) - JavaScript client |
|||
- [go-tus](https://github.com/eventials/go-tus) - Go client |
|||
- [SeaweedFS S3 API](../../weed/s3api) - Alternative multipart upload |
|||
@ -0,0 +1,772 @@ |
|||
package tus |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"encoding/base64" |
|||
"fmt" |
|||
"io" |
|||
"net/http" |
|||
"os" |
|||
"os/exec" |
|||
"path/filepath" |
|||
"strconv" |
|||
"strings" |
|||
"testing" |
|||
"time" |
|||
|
|||
"github.com/stretchr/testify/assert" |
|||
"github.com/stretchr/testify/require" |
|||
) |
|||
|
|||
const ( |
|||
TusVersion = "1.0.0" |
|||
testFilerPort = "18888" |
|||
testMasterPort = "19333" |
|||
testVolumePort = "18080" |
|||
) |
|||
|
|||
// TestCluster represents a running SeaweedFS cluster for testing
|
|||
type TestCluster struct { |
|||
masterCmd *exec.Cmd |
|||
volumeCmd *exec.Cmd |
|||
filerCmd *exec.Cmd |
|||
dataDir string |
|||
} |
|||
|
|||
func (c *TestCluster) Stop() { |
|||
if c.filerCmd != nil && c.filerCmd.Process != nil { |
|||
c.filerCmd.Process.Signal(os.Interrupt) |
|||
c.filerCmd.Wait() |
|||
} |
|||
if c.volumeCmd != nil && c.volumeCmd.Process != nil { |
|||
c.volumeCmd.Process.Signal(os.Interrupt) |
|||
c.volumeCmd.Wait() |
|||
} |
|||
if c.masterCmd != nil && c.masterCmd.Process != nil { |
|||
c.masterCmd.Process.Signal(os.Interrupt) |
|||
c.masterCmd.Wait() |
|||
} |
|||
} |
|||
|
|||
func (c *TestCluster) FilerURL() string { |
|||
return fmt.Sprintf("http://127.0.0.1:%s", testFilerPort) |
|||
} |
|||
|
|||
func (c *TestCluster) TusURL() string { |
|||
return fmt.Sprintf("%s/.tus", c.FilerURL()) |
|||
} |
|||
|
|||
// FullURL converts a relative path to a full URL
|
|||
func (c *TestCluster) FullURL(path string) string { |
|||
if strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://") { |
|||
return path |
|||
} |
|||
return fmt.Sprintf("http://127.0.0.1:%s%s", testFilerPort, path) |
|||
} |
|||
|
|||
// startTestCluster starts a SeaweedFS cluster for testing
|
|||
func startTestCluster(t *testing.T, ctx context.Context) (*TestCluster, error) { |
|||
weedBinary := findWeedBinary() |
|||
if weedBinary == "" { |
|||
return nil, fmt.Errorf("weed binary not found - please build it first: cd weed && go build") |
|||
} |
|||
|
|||
dataDir, err := os.MkdirTemp("", "seaweedfs_tus_test_") |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
cluster := &TestCluster{dataDir: dataDir} |
|||
|
|||
// Create subdirectories
|
|||
masterDir := filepath.Join(dataDir, "master") |
|||
volumeDir := filepath.Join(dataDir, "volume") |
|||
filerDir := filepath.Join(dataDir, "filer") |
|||
os.MkdirAll(masterDir, 0755) |
|||
os.MkdirAll(volumeDir, 0755) |
|||
os.MkdirAll(filerDir, 0755) |
|||
|
|||
// Start master
|
|||
masterCmd := exec.CommandContext(ctx, weedBinary, "master", |
|||
"-port", testMasterPort, |
|||
"-mdir", masterDir, |
|||
"-ip", "127.0.0.1", |
|||
) |
|||
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) |
|||
if err != nil { |
|||
os.RemoveAll(dataDir) |
|||
return nil, fmt.Errorf("failed to create master log: %v", err) |
|||
} |
|||
masterCmd.Stdout = masterLogFile |
|||
masterCmd.Stderr = masterLogFile |
|||
if err := masterCmd.Start(); err != nil { |
|||
os.RemoveAll(dataDir) |
|||
return nil, fmt.Errorf("failed to start master: %v", err) |
|||
} |
|||
cluster.masterCmd = masterCmd |
|||
|
|||
// Wait for master to be ready
|
|||
if err := waitForHTTPServer("http://127.0.0.1:"+testMasterPort+"/dir/status", 30*time.Second); err != nil { |
|||
cluster.Stop() |
|||
os.RemoveAll(dataDir) |
|||
return nil, fmt.Errorf("master not ready: %v", err) |
|||
} |
|||
|
|||
// Start volume server
|
|||
volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", |
|||
"-port", testVolumePort, |
|||
"-dir", volumeDir, |
|||
"-mserver", "127.0.0.1:"+testMasterPort, |
|||
"-ip", "127.0.0.1", |
|||
) |
|||
volumeLogFile, err := os.Create(filepath.Join(volumeDir, "volume.log")) |
|||
if err != nil { |
|||
cluster.Stop() |
|||
os.RemoveAll(dataDir) |
|||
return nil, fmt.Errorf("failed to create volume log: %v", err) |
|||
} |
|||
volumeCmd.Stdout = volumeLogFile |
|||
volumeCmd.Stderr = volumeLogFile |
|||
if err := volumeCmd.Start(); err != nil { |
|||
cluster.Stop() |
|||
os.RemoveAll(dataDir) |
|||
return nil, fmt.Errorf("failed to start volume server: %v", err) |
|||
} |
|||
cluster.volumeCmd = volumeCmd |
|||
|
|||
// Wait for volume server to register with master
|
|||
if err := waitForHTTPServer("http://127.0.0.1:"+testVolumePort+"/status", 30*time.Second); err != nil { |
|||
cluster.Stop() |
|||
os.RemoveAll(dataDir) |
|||
return nil, fmt.Errorf("volume server not ready: %v", err) |
|||
} |
|||
|
|||
// Start filer with TUS enabled
|
|||
filerCmd := exec.CommandContext(ctx, weedBinary, "filer", |
|||
"-port", testFilerPort, |
|||
"-master", "127.0.0.1:"+testMasterPort, |
|||
"-ip", "127.0.0.1", |
|||
"-defaultStoreDir", filerDir, |
|||
) |
|||
filerLogFile, err := os.Create(filepath.Join(filerDir, "filer.log")) |
|||
if err != nil { |
|||
cluster.Stop() |
|||
os.RemoveAll(dataDir) |
|||
return nil, fmt.Errorf("failed to create filer log: %v", err) |
|||
} |
|||
filerCmd.Stdout = filerLogFile |
|||
filerCmd.Stderr = filerLogFile |
|||
if err := filerCmd.Start(); err != nil { |
|||
cluster.Stop() |
|||
os.RemoveAll(dataDir) |
|||
return nil, fmt.Errorf("failed to start filer: %v", err) |
|||
} |
|||
cluster.filerCmd = filerCmd |
|||
|
|||
// Wait for filer
|
|||
if err := waitForHTTPServer("http://127.0.0.1:"+testFilerPort+"/", 30*time.Second); err != nil { |
|||
cluster.Stop() |
|||
os.RemoveAll(dataDir) |
|||
return nil, fmt.Errorf("filer not ready: %v", err) |
|||
} |
|||
|
|||
// Wait a bit more for the cluster to fully stabilize
|
|||
// Volumes are created lazily, and we need to ensure the master topology is ready
|
|||
time.Sleep(5 * time.Second) |
|||
|
|||
return cluster, nil |
|||
} |
|||
|
|||
func findWeedBinary() string { |
|||
candidates := []string{ |
|||
"../../weed/weed", |
|||
"../weed/weed", |
|||
"./weed/weed", |
|||
"weed", |
|||
} |
|||
for _, candidate := range candidates { |
|||
if _, err := os.Stat(candidate); err == nil { |
|||
return candidate |
|||
} |
|||
} |
|||
if path, err := exec.LookPath("weed"); err == nil { |
|||
return path |
|||
} |
|||
return "" |
|||
} |
|||
|
|||
func waitForHTTPServer(url string, timeout time.Duration) error { |
|||
start := time.Now() |
|||
client := &http.Client{Timeout: 1 * time.Second} |
|||
for time.Since(start) < timeout { |
|||
resp, err := client.Get(url) |
|||
if err == nil { |
|||
resp.Body.Close() |
|||
return nil |
|||
} |
|||
time.Sleep(500 * time.Millisecond) |
|||
} |
|||
return fmt.Errorf("timeout waiting for %s", url) |
|||
} |
|||
|
|||
// encodeTusMetadata encodes key-value pairs for Upload-Metadata header
|
|||
func encodeTusMetadata(metadata map[string]string) string { |
|||
var parts []string |
|||
for k, v := range metadata { |
|||
encoded := base64.StdEncoding.EncodeToString([]byte(v)) |
|||
parts = append(parts, fmt.Sprintf("%s %s", k, encoded)) |
|||
} |
|||
return strings.Join(parts, ",") |
|||
} |
|||
|
|||
// TestTusOptionsHandler tests the OPTIONS endpoint for capability discovery
|
|||
func TestTusOptionsHandler(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
|||
defer cancel() |
|||
|
|||
cluster, err := startTestCluster(t, ctx) |
|||
require.NoError(t, err) |
|||
defer func() { |
|||
cluster.Stop() |
|||
os.RemoveAll(cluster.dataDir) |
|||
}() |
|||
|
|||
// Test OPTIONS request
|
|||
req, err := http.NewRequest(http.MethodOptions, cluster.TusURL()+"/", nil) |
|||
require.NoError(t, err) |
|||
req.Header.Set("Tus-Resumable", TusVersion) |
|||
|
|||
client := &http.Client{} |
|||
resp, err := client.Do(req) |
|||
require.NoError(t, err) |
|||
defer resp.Body.Close() |
|||
|
|||
// Verify TUS headers
|
|||
assert.Equal(t, http.StatusOK, resp.StatusCode, "OPTIONS should return 200 OK") |
|||
assert.Equal(t, TusVersion, resp.Header.Get("Tus-Resumable"), "Should return Tus-Resumable header") |
|||
assert.NotEmpty(t, resp.Header.Get("Tus-Version"), "Should return Tus-Version header") |
|||
assert.NotEmpty(t, resp.Header.Get("Tus-Extension"), "Should return Tus-Extension header") |
|||
assert.NotEmpty(t, resp.Header.Get("Tus-Max-Size"), "Should return Tus-Max-Size header") |
|||
} |
|||
|
|||
// TestTusBasicUpload tests a simple complete upload
|
|||
func TestTusBasicUpload(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
|||
defer cancel() |
|||
|
|||
cluster, err := startTestCluster(t, ctx) |
|||
require.NoError(t, err) |
|||
defer func() { |
|||
cluster.Stop() |
|||
os.RemoveAll(cluster.dataDir) |
|||
}() |
|||
|
|||
testData := []byte("Hello, TUS Protocol! This is a test file.") |
|||
targetPath := "/testdir/testfile.txt" |
|||
|
|||
// Step 1: Create upload (POST)
|
|||
createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) |
|||
require.NoError(t, err) |
|||
createReq.Header.Set("Tus-Resumable", TusVersion) |
|||
createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) |
|||
createReq.Header.Set("Upload-Metadata", encodeTusMetadata(map[string]string{ |
|||
"filename": "testfile.txt", |
|||
"content-type": "text/plain", |
|||
})) |
|||
|
|||
client := &http.Client{} |
|||
createResp, err := client.Do(createReq) |
|||
require.NoError(t, err) |
|||
defer createResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusCreated, createResp.StatusCode, "POST should return 201 Created") |
|||
uploadLocation := createResp.Header.Get("Location") |
|||
assert.NotEmpty(t, uploadLocation, "Should return Location header with upload URL") |
|||
t.Logf("Upload location: %s", uploadLocation) |
|||
|
|||
// Step 2: Upload data (PATCH)
|
|||
patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData)) |
|||
require.NoError(t, err) |
|||
patchReq.Header.Set("Tus-Resumable", TusVersion) |
|||
patchReq.Header.Set("Upload-Offset", "0") |
|||
patchReq.Header.Set("Content-Type", "application/offset+octet-stream") |
|||
patchReq.Header.Set("Content-Length", strconv.Itoa(len(testData))) |
|||
|
|||
patchResp, err := client.Do(patchReq) |
|||
require.NoError(t, err) |
|||
defer patchResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusNoContent, patchResp.StatusCode, "PATCH should return 204 No Content") |
|||
newOffset := patchResp.Header.Get("Upload-Offset") |
|||
assert.Equal(t, strconv.Itoa(len(testData)), newOffset, "Upload-Offset should equal total file size") |
|||
|
|||
// Step 3: Verify the file was created
|
|||
getResp, err := client.Get(cluster.FilerURL() + targetPath) |
|||
require.NoError(t, err) |
|||
defer getResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusOK, getResp.StatusCode, "GET should return 200 OK") |
|||
body, err := io.ReadAll(getResp.Body) |
|||
require.NoError(t, err) |
|||
assert.Equal(t, testData, body, "File content should match uploaded data") |
|||
} |
|||
|
|||
// TestTusChunkedUpload tests uploading a file in multiple chunks
|
|||
func TestTusChunkedUpload(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
|||
defer cancel() |
|||
|
|||
cluster, err := startTestCluster(t, ctx) |
|||
require.NoError(t, err) |
|||
defer func() { |
|||
cluster.Stop() |
|||
os.RemoveAll(cluster.dataDir) |
|||
}() |
|||
|
|||
// Create test data (100KB)
|
|||
testData := make([]byte, 100*1024) |
|||
for i := range testData { |
|||
testData[i] = byte(i % 256) |
|||
} |
|||
chunkSize := 32 * 1024 // 32KB chunks
|
|||
targetPath := "/chunked/largefile.bin" |
|||
|
|||
client := &http.Client{} |
|||
|
|||
// Step 1: Create upload
|
|||
createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) |
|||
require.NoError(t, err) |
|||
createReq.Header.Set("Tus-Resumable", TusVersion) |
|||
createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) |
|||
|
|||
createResp, err := client.Do(createReq) |
|||
require.NoError(t, err) |
|||
defer createResp.Body.Close() |
|||
|
|||
require.Equal(t, http.StatusCreated, createResp.StatusCode) |
|||
uploadLocation := createResp.Header.Get("Location") |
|||
require.NotEmpty(t, uploadLocation) |
|||
t.Logf("Upload location: %s", uploadLocation) |
|||
|
|||
// Step 2: Upload in chunks
|
|||
offset := 0 |
|||
for offset < len(testData) { |
|||
end := offset + chunkSize |
|||
if end > len(testData) { |
|||
end = len(testData) |
|||
} |
|||
chunk := testData[offset:end] |
|||
|
|||
patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(chunk)) |
|||
require.NoError(t, err) |
|||
patchReq.Header.Set("Tus-Resumable", TusVersion) |
|||
patchReq.Header.Set("Upload-Offset", strconv.Itoa(offset)) |
|||
patchReq.Header.Set("Content-Type", "application/offset+octet-stream") |
|||
patchReq.Header.Set("Content-Length", strconv.Itoa(len(chunk))) |
|||
|
|||
patchResp, err := client.Do(patchReq) |
|||
require.NoError(t, err) |
|||
patchResp.Body.Close() |
|||
|
|||
require.Equal(t, http.StatusNoContent, patchResp.StatusCode, |
|||
"PATCH chunk at offset %d should return 204", offset) |
|||
newOffset, _ := strconv.Atoi(patchResp.Header.Get("Upload-Offset")) |
|||
require.Equal(t, end, newOffset, "New offset should be %d", end) |
|||
|
|||
t.Logf("Uploaded chunk: offset=%d, size=%d, newOffset=%d", offset, len(chunk), newOffset) |
|||
offset = end |
|||
} |
|||
|
|||
// Step 3: Verify the complete file
|
|||
getResp, err := client.Get(cluster.FilerURL() + targetPath) |
|||
require.NoError(t, err) |
|||
defer getResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusOK, getResp.StatusCode) |
|||
body, err := io.ReadAll(getResp.Body) |
|||
require.NoError(t, err) |
|||
assert.Equal(t, testData, body, "File content should match uploaded data") |
|||
} |
|||
|
|||
// TestTusHeadRequest tests the HEAD endpoint to get upload offset
|
|||
func TestTusHeadRequest(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
|||
defer cancel() |
|||
|
|||
cluster, err := startTestCluster(t, ctx) |
|||
require.NoError(t, err) |
|||
defer func() { |
|||
cluster.Stop() |
|||
os.RemoveAll(cluster.dataDir) |
|||
}() |
|||
|
|||
testData := []byte("Test data for HEAD request verification") |
|||
targetPath := "/headtest/file.txt" |
|||
client := &http.Client{} |
|||
|
|||
// Create upload
|
|||
createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) |
|||
require.NoError(t, err) |
|||
createReq.Header.Set("Tus-Resumable", TusVersion) |
|||
createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) |
|||
|
|||
createResp, err := client.Do(createReq) |
|||
require.NoError(t, err) |
|||
defer createResp.Body.Close() |
|||
require.Equal(t, http.StatusCreated, createResp.StatusCode) |
|||
uploadLocation := createResp.Header.Get("Location") |
|||
|
|||
// HEAD before any data uploaded - offset should be 0
|
|||
headReq1, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil) |
|||
require.NoError(t, err) |
|||
headReq1.Header.Set("Tus-Resumable", TusVersion) |
|||
|
|||
headResp1, err := client.Do(headReq1) |
|||
require.NoError(t, err) |
|||
defer headResp1.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusOK, headResp1.StatusCode) |
|||
assert.Equal(t, "0", headResp1.Header.Get("Upload-Offset"), "Initial offset should be 0") |
|||
assert.Equal(t, strconv.Itoa(len(testData)), headResp1.Header.Get("Upload-Length")) |
|||
|
|||
// Upload half the data
|
|||
halfLen := len(testData) / 2 |
|||
patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[:halfLen])) |
|||
require.NoError(t, err) |
|||
patchReq.Header.Set("Tus-Resumable", TusVersion) |
|||
patchReq.Header.Set("Upload-Offset", "0") |
|||
patchReq.Header.Set("Content-Type", "application/offset+octet-stream") |
|||
|
|||
patchResp, err := client.Do(patchReq) |
|||
require.NoError(t, err) |
|||
patchResp.Body.Close() |
|||
require.Equal(t, http.StatusNoContent, patchResp.StatusCode) |
|||
|
|||
// HEAD after partial upload - offset should be halfLen
|
|||
headReq2, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil) |
|||
require.NoError(t, err) |
|||
headReq2.Header.Set("Tus-Resumable", TusVersion) |
|||
|
|||
headResp2, err := client.Do(headReq2) |
|||
require.NoError(t, err) |
|||
defer headResp2.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusOK, headResp2.StatusCode) |
|||
assert.Equal(t, strconv.Itoa(halfLen), headResp2.Header.Get("Upload-Offset"), |
|||
"Offset should be %d after partial upload", halfLen) |
|||
} |
|||
|
|||
// TestTusDeleteUpload tests canceling an in-progress upload
|
|||
func TestTusDeleteUpload(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
|||
defer cancel() |
|||
|
|||
cluster, err := startTestCluster(t, ctx) |
|||
require.NoError(t, err) |
|||
defer func() { |
|||
cluster.Stop() |
|||
os.RemoveAll(cluster.dataDir) |
|||
}() |
|||
|
|||
testData := []byte("Data to be deleted") |
|||
targetPath := "/deletetest/file.txt" |
|||
client := &http.Client{} |
|||
|
|||
// Create upload
|
|||
createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) |
|||
require.NoError(t, err) |
|||
createReq.Header.Set("Tus-Resumable", TusVersion) |
|||
createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) |
|||
|
|||
createResp, err := client.Do(createReq) |
|||
require.NoError(t, err) |
|||
defer createResp.Body.Close() |
|||
require.Equal(t, http.StatusCreated, createResp.StatusCode) |
|||
uploadLocation := createResp.Header.Get("Location") |
|||
|
|||
// Upload some data
|
|||
patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[:10])) |
|||
require.NoError(t, err) |
|||
patchReq.Header.Set("Tus-Resumable", TusVersion) |
|||
patchReq.Header.Set("Upload-Offset", "0") |
|||
patchReq.Header.Set("Content-Type", "application/offset+octet-stream") |
|||
|
|||
patchResp, err := client.Do(patchReq) |
|||
require.NoError(t, err) |
|||
patchResp.Body.Close() |
|||
|
|||
// Delete the upload
|
|||
deleteReq, err := http.NewRequest(http.MethodDelete, cluster.FullURL(uploadLocation), nil) |
|||
require.NoError(t, err) |
|||
deleteReq.Header.Set("Tus-Resumable", TusVersion) |
|||
|
|||
deleteResp, err := client.Do(deleteReq) |
|||
require.NoError(t, err) |
|||
defer deleteResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusNoContent, deleteResp.StatusCode, "DELETE should return 204") |
|||
|
|||
// Verify upload is gone - HEAD should return 404
|
|||
headReq, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil) |
|||
require.NoError(t, err) |
|||
headReq.Header.Set("Tus-Resumable", TusVersion) |
|||
|
|||
headResp, err := client.Do(headReq) |
|||
require.NoError(t, err) |
|||
defer headResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusNotFound, headResp.StatusCode, "HEAD after DELETE should return 404") |
|||
} |
|||
|
|||
// TestTusInvalidOffset tests error handling for mismatched offsets
|
|||
func TestTusInvalidOffset(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
|||
defer cancel() |
|||
|
|||
cluster, err := startTestCluster(t, ctx) |
|||
require.NoError(t, err) |
|||
defer func() { |
|||
cluster.Stop() |
|||
os.RemoveAll(cluster.dataDir) |
|||
}() |
|||
|
|||
testData := []byte("Test data for offset validation") |
|||
targetPath := "/offsettest/file.txt" |
|||
client := &http.Client{} |
|||
|
|||
// Create upload
|
|||
createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) |
|||
require.NoError(t, err) |
|||
createReq.Header.Set("Tus-Resumable", TusVersion) |
|||
createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) |
|||
|
|||
createResp, err := client.Do(createReq) |
|||
require.NoError(t, err) |
|||
defer createResp.Body.Close() |
|||
require.Equal(t, http.StatusCreated, createResp.StatusCode) |
|||
uploadLocation := createResp.Header.Get("Location") |
|||
|
|||
// Try to upload with wrong offset (should be 0, but we send 100)
|
|||
patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData)) |
|||
require.NoError(t, err) |
|||
patchReq.Header.Set("Tus-Resumable", TusVersion) |
|||
patchReq.Header.Set("Upload-Offset", "100") // Wrong offset!
|
|||
patchReq.Header.Set("Content-Type", "application/offset+octet-stream") |
|||
|
|||
patchResp, err := client.Do(patchReq) |
|||
require.NoError(t, err) |
|||
defer patchResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusConflict, patchResp.StatusCode, |
|||
"PATCH with wrong offset should return 409 Conflict") |
|||
} |
|||
|
|||
// TestTusUploadNotFound tests accessing a non-existent upload
|
|||
func TestTusUploadNotFound(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
|||
defer cancel() |
|||
|
|||
cluster, err := startTestCluster(t, ctx) |
|||
require.NoError(t, err) |
|||
defer func() { |
|||
cluster.Stop() |
|||
os.RemoveAll(cluster.dataDir) |
|||
}() |
|||
|
|||
client := &http.Client{} |
|||
fakeUploadURL := cluster.TusURL() + "/.uploads/nonexistent-upload-id" |
|||
|
|||
// HEAD on non-existent upload
|
|||
headReq, err := http.NewRequest(http.MethodHead, fakeUploadURL, nil) |
|||
require.NoError(t, err) |
|||
headReq.Header.Set("Tus-Resumable", TusVersion) |
|||
|
|||
headResp, err := client.Do(headReq) |
|||
require.NoError(t, err) |
|||
defer headResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusNotFound, headResp.StatusCode, |
|||
"HEAD on non-existent upload should return 404") |
|||
|
|||
// PATCH on non-existent upload
|
|||
patchReq, err := http.NewRequest(http.MethodPatch, fakeUploadURL, bytes.NewReader([]byte("data"))) |
|||
require.NoError(t, err) |
|||
patchReq.Header.Set("Tus-Resumable", TusVersion) |
|||
patchReq.Header.Set("Upload-Offset", "0") |
|||
patchReq.Header.Set("Content-Type", "application/offset+octet-stream") |
|||
|
|||
patchResp, err := client.Do(patchReq) |
|||
require.NoError(t, err) |
|||
defer patchResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusNotFound, patchResp.StatusCode, |
|||
"PATCH on non-existent upload should return 404") |
|||
} |
|||
|
|||
// TestTusCreationWithUpload tests the creation-with-upload extension
|
|||
func TestTusCreationWithUpload(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
|||
defer cancel() |
|||
|
|||
cluster, err := startTestCluster(t, ctx) |
|||
require.NoError(t, err) |
|||
defer func() { |
|||
cluster.Stop() |
|||
os.RemoveAll(cluster.dataDir) |
|||
}() |
|||
|
|||
testData := []byte("Small file uploaded in creation request") |
|||
targetPath := "/creationwithupload/smallfile.txt" |
|||
client := &http.Client{} |
|||
|
|||
// Create upload with data in the same request
|
|||
createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, bytes.NewReader(testData)) |
|||
require.NoError(t, err) |
|||
createReq.Header.Set("Tus-Resumable", TusVersion) |
|||
createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) |
|||
createReq.Header.Set("Content-Type", "application/offset+octet-stream") |
|||
|
|||
createResp, err := client.Do(createReq) |
|||
require.NoError(t, err) |
|||
defer createResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusCreated, createResp.StatusCode) |
|||
uploadLocation := createResp.Header.Get("Location") |
|||
assert.NotEmpty(t, uploadLocation) |
|||
|
|||
// Check Upload-Offset header - should indicate all data was received
|
|||
uploadOffset := createResp.Header.Get("Upload-Offset") |
|||
assert.Equal(t, strconv.Itoa(len(testData)), uploadOffset, |
|||
"Upload-Offset should equal file size for complete upload") |
|||
|
|||
// Verify the file
|
|||
getResp, err := client.Get(cluster.FilerURL() + targetPath) |
|||
require.NoError(t, err) |
|||
defer getResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusOK, getResp.StatusCode) |
|||
body, err := io.ReadAll(getResp.Body) |
|||
require.NoError(t, err) |
|||
assert.Equal(t, testData, body) |
|||
} |
|||
|
|||
// TestTusResumeAfterInterruption simulates resuming an upload after failure
|
|||
func TestTusResumeAfterInterruption(t *testing.T) { |
|||
if testing.Short() { |
|||
t.Skip("Skipping integration test in short mode") |
|||
} |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
|||
defer cancel() |
|||
|
|||
cluster, err := startTestCluster(t, ctx) |
|||
require.NoError(t, err) |
|||
defer func() { |
|||
cluster.Stop() |
|||
os.RemoveAll(cluster.dataDir) |
|||
}() |
|||
|
|||
// 50KB test data
|
|||
testData := make([]byte, 50*1024) |
|||
for i := range testData { |
|||
testData[i] = byte(i % 256) |
|||
} |
|||
targetPath := "/resume/interrupted.bin" |
|||
client := &http.Client{} |
|||
|
|||
// Create upload
|
|||
createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil) |
|||
require.NoError(t, err) |
|||
createReq.Header.Set("Tus-Resumable", TusVersion) |
|||
createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData))) |
|||
|
|||
createResp, err := client.Do(createReq) |
|||
require.NoError(t, err) |
|||
defer createResp.Body.Close() |
|||
require.Equal(t, http.StatusCreated, createResp.StatusCode) |
|||
uploadLocation := createResp.Header.Get("Location") |
|||
|
|||
// Upload first 20KB
|
|||
firstChunkSize := 20 * 1024 |
|||
patchReq1, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[:firstChunkSize])) |
|||
require.NoError(t, err) |
|||
patchReq1.Header.Set("Tus-Resumable", TusVersion) |
|||
patchReq1.Header.Set("Upload-Offset", "0") |
|||
patchReq1.Header.Set("Content-Type", "application/offset+octet-stream") |
|||
|
|||
patchResp1, err := client.Do(patchReq1) |
|||
require.NoError(t, err) |
|||
patchResp1.Body.Close() |
|||
require.Equal(t, http.StatusNoContent, patchResp1.StatusCode) |
|||
|
|||
t.Log("Simulating network interruption...") |
|||
|
|||
// Simulate resumption: Query current offset with HEAD
|
|||
headReq, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil) |
|||
require.NoError(t, err) |
|||
headReq.Header.Set("Tus-Resumable", TusVersion) |
|||
|
|||
headResp, err := client.Do(headReq) |
|||
require.NoError(t, err) |
|||
defer headResp.Body.Close() |
|||
|
|||
require.Equal(t, http.StatusOK, headResp.StatusCode) |
|||
currentOffset, _ := strconv.Atoi(headResp.Header.Get("Upload-Offset")) |
|||
t.Logf("Resumed upload at offset: %d", currentOffset) |
|||
require.Equal(t, firstChunkSize, currentOffset) |
|||
|
|||
// Resume upload from current offset
|
|||
patchReq2, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[currentOffset:])) |
|||
require.NoError(t, err) |
|||
patchReq2.Header.Set("Tus-Resumable", TusVersion) |
|||
patchReq2.Header.Set("Upload-Offset", strconv.Itoa(currentOffset)) |
|||
patchReq2.Header.Set("Content-Type", "application/offset+octet-stream") |
|||
|
|||
patchResp2, err := client.Do(patchReq2) |
|||
require.NoError(t, err) |
|||
patchResp2.Body.Close() |
|||
require.Equal(t, http.StatusNoContent, patchResp2.StatusCode) |
|||
|
|||
// Verify complete file
|
|||
getResp, err := client.Get(cluster.FilerURL() + targetPath) |
|||
require.NoError(t, err) |
|||
defer getResp.Body.Close() |
|||
|
|||
assert.Equal(t, http.StatusOK, getResp.StatusCode) |
|||
body, err := io.ReadAll(getResp.Body) |
|||
require.NoError(t, err) |
|||
assert.Equal(t, testData, body, "Resumed upload should produce complete file") |
|||
} |
|||
@ -0,0 +1,415 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"encoding/base64" |
|||
"fmt" |
|||
"io" |
|||
"net/http" |
|||
"path" |
|||
"strconv" |
|||
"strings" |
|||
"time" |
|||
|
|||
"github.com/google/uuid" |
|||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|||
"github.com/seaweedfs/seaweedfs/weed/operation" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/stats" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
) |
|||
|
|||
// tusHandler is the main entry point for TUS protocol requests
|
|||
func (fs *FilerServer) tusHandler(w http.ResponseWriter, r *http.Request) { |
|||
// Set common TUS response headers
|
|||
w.Header().Set("Tus-Resumable", TusVersion) |
|||
|
|||
// Check Tus-Resumable header for non-OPTIONS requests
|
|||
if r.Method != http.MethodOptions { |
|||
tusVersion := r.Header.Get("Tus-Resumable") |
|||
if tusVersion != TusVersion { |
|||
http.Error(w, "Unsupported TUS version", http.StatusPreconditionFailed) |
|||
return |
|||
} |
|||
} |
|||
|
|||
// Route based on method and path
|
|||
reqPath := r.URL.Path |
|||
tusPrefix := fs.option.TusPath |
|||
if tusPrefix == "" { |
|||
tusPrefix = ".tus" |
|||
} |
|||
if !strings.HasPrefix(tusPrefix, "/") { |
|||
tusPrefix = "/" + tusPrefix |
|||
} |
|||
|
|||
// Check if this is an upload location (contains upload ID after {tusPrefix}/.uploads/)
|
|||
uploadsPrefix := tusPrefix + "/.uploads/" |
|||
if strings.HasPrefix(reqPath, uploadsPrefix) { |
|||
uploadID := strings.TrimPrefix(reqPath, uploadsPrefix) |
|||
uploadID = strings.Split(uploadID, "/")[0] // Get just the ID, not any trailing path
|
|||
|
|||
switch r.Method { |
|||
case http.MethodHead: |
|||
fs.tusHeadHandler(w, r, uploadID) |
|||
case http.MethodPatch: |
|||
fs.tusPatchHandler(w, r, uploadID) |
|||
case http.MethodDelete: |
|||
fs.tusDeleteHandler(w, r, uploadID) |
|||
default: |
|||
w.WriteHeader(http.StatusMethodNotAllowed) |
|||
} |
|||
return |
|||
} |
|||
|
|||
// Handle creation endpoints (POST to /.tus/{path})
|
|||
switch r.Method { |
|||
case http.MethodOptions: |
|||
fs.tusOptionsHandler(w, r) |
|||
case http.MethodPost: |
|||
fs.tusCreateHandler(w, r) |
|||
default: |
|||
w.WriteHeader(http.StatusMethodNotAllowed) |
|||
} |
|||
} |
|||
|
|||
// tusOptionsHandler handles OPTIONS requests for capability discovery
|
|||
func (fs *FilerServer) tusOptionsHandler(w http.ResponseWriter, r *http.Request) { |
|||
w.Header().Set("Tus-Version", TusVersion) |
|||
w.Header().Set("Tus-Extension", TusExtensions) |
|||
w.Header().Set("Tus-Max-Size", strconv.FormatInt(TusMaxSize, 10)) |
|||
w.WriteHeader(http.StatusOK) |
|||
} |
|||
|
|||
// tusCreateHandler handles POST requests to create new uploads
|
|||
func (fs *FilerServer) tusCreateHandler(w http.ResponseWriter, r *http.Request) { |
|||
ctx := r.Context() |
|||
|
|||
// Parse Upload-Length header (required)
|
|||
uploadLengthStr := r.Header.Get("Upload-Length") |
|||
if uploadLengthStr == "" { |
|||
http.Error(w, "Upload-Length header required", http.StatusBadRequest) |
|||
return |
|||
} |
|||
uploadLength, err := strconv.ParseInt(uploadLengthStr, 10, 64) |
|||
if err != nil || uploadLength < 0 { |
|||
http.Error(w, "Invalid Upload-Length", http.StatusBadRequest) |
|||
return |
|||
} |
|||
if uploadLength > TusMaxSize { |
|||
http.Error(w, "Upload-Length exceeds maximum", http.StatusRequestEntityTooLarge) |
|||
return |
|||
} |
|||
|
|||
// Parse Upload-Metadata header (optional)
|
|||
metadata := parseTusMetadata(r.Header.Get("Upload-Metadata")) |
|||
|
|||
// Get TUS path prefix
|
|||
tusPrefix := fs.option.TusPath |
|||
if tusPrefix == "" { |
|||
tusPrefix = ".tus" |
|||
} |
|||
if !strings.HasPrefix(tusPrefix, "/") { |
|||
tusPrefix = "/" + tusPrefix |
|||
} |
|||
|
|||
// Determine target path from request URL
|
|||
targetPath := strings.TrimPrefix(r.URL.Path, tusPrefix) |
|||
if targetPath == "" || targetPath == "/" { |
|||
http.Error(w, "Target path required", http.StatusBadRequest) |
|||
return |
|||
} |
|||
|
|||
// Generate upload ID
|
|||
uploadID := uuid.New().String() |
|||
|
|||
// Create upload session
|
|||
session, err := fs.createTusSession(ctx, uploadID, targetPath, uploadLength, metadata) |
|||
if err != nil { |
|||
glog.Errorf("Failed to create TUS session: %v", err) |
|||
http.Error(w, "Failed to create upload", http.StatusInternalServerError) |
|||
return |
|||
} |
|||
|
|||
// Build upload location URL (ensure it starts with single /)
|
|||
uploadLocation := path.Clean(fmt.Sprintf("%s/.uploads/%s", tusPrefix, uploadID)) |
|||
if !strings.HasPrefix(uploadLocation, "/") { |
|||
uploadLocation = "/" + uploadLocation |
|||
} |
|||
|
|||
// Handle creation-with-upload extension
|
|||
// TUS requires Content-Length for uploads; reject chunked encoding
|
|||
if r.Header.Get("Content-Type") == "application/offset+octet-stream" { |
|||
if r.ContentLength < 0 { |
|||
fs.deleteTusSession(ctx, uploadID) |
|||
http.Error(w, "Content-Length header required for creation-with-upload", http.StatusBadRequest) |
|||
return |
|||
} |
|||
if r.ContentLength == 0 { |
|||
// Empty body is allowed, just skip the upload
|
|||
goto respond |
|||
} |
|||
// Upload data in the creation request
|
|||
bytesWritten, uploadErr := fs.tusWriteData(ctx, session, 0, r.Body, r.ContentLength) |
|||
if uploadErr != nil { |
|||
// Cleanup session on failure
|
|||
fs.deleteTusSession(ctx, uploadID) |
|||
glog.Errorf("Failed to write initial TUS data: %v", uploadErr) |
|||
http.Error(w, "Failed to write data", http.StatusInternalServerError) |
|||
return |
|||
} |
|||
|
|||
// Update offset in response header
|
|||
w.Header().Set("Upload-Offset", strconv.FormatInt(bytesWritten, 10)) |
|||
|
|||
// Check if upload is complete
|
|||
if bytesWritten == session.Size { |
|||
// Refresh session to get updated chunks
|
|||
session, err = fs.getTusSession(ctx, uploadID) |
|||
if err != nil { |
|||
glog.Errorf("Failed to get updated TUS session: %v", err) |
|||
http.Error(w, "Failed to complete upload", http.StatusInternalServerError) |
|||
return |
|||
} |
|||
if err := fs.completeTusUpload(ctx, session); err != nil { |
|||
glog.Errorf("Failed to complete TUS upload: %v", err) |
|||
http.Error(w, "Failed to complete upload", http.StatusInternalServerError) |
|||
return |
|||
} |
|||
} |
|||
} |
|||
|
|||
respond: |
|||
w.Header().Set("Location", uploadLocation) |
|||
w.WriteHeader(http.StatusCreated) |
|||
} |
|||
|
|||
// tusHeadHandler handles HEAD requests to get current upload offset
|
|||
func (fs *FilerServer) tusHeadHandler(w http.ResponseWriter, r *http.Request, uploadID string) { |
|||
ctx := r.Context() |
|||
|
|||
session, err := fs.getTusSession(ctx, uploadID) |
|||
if err != nil { |
|||
http.Error(w, "Upload not found", http.StatusNotFound) |
|||
return |
|||
} |
|||
|
|||
w.Header().Set("Upload-Offset", strconv.FormatInt(session.Offset, 10)) |
|||
w.Header().Set("Upload-Length", strconv.FormatInt(session.Size, 10)) |
|||
w.Header().Set("Cache-Control", "no-store") |
|||
w.WriteHeader(http.StatusOK) |
|||
} |
|||
|
|||
// tusPatchHandler handles PATCH requests to upload data
|
|||
func (fs *FilerServer) tusPatchHandler(w http.ResponseWriter, r *http.Request, uploadID string) { |
|||
ctx := r.Context() |
|||
|
|||
// Validate Content-Type
|
|||
contentType := r.Header.Get("Content-Type") |
|||
if contentType != "application/offset+octet-stream" { |
|||
http.Error(w, "Content-Type must be application/offset+octet-stream", http.StatusUnsupportedMediaType) |
|||
return |
|||
} |
|||
|
|||
// Get current session
|
|||
session, err := fs.getTusSession(ctx, uploadID) |
|||
if err != nil { |
|||
http.Error(w, "Upload not found", http.StatusNotFound) |
|||
return |
|||
} |
|||
|
|||
// Validate Upload-Offset header
|
|||
uploadOffsetStr := r.Header.Get("Upload-Offset") |
|||
if uploadOffsetStr == "" { |
|||
http.Error(w, "Upload-Offset header required", http.StatusBadRequest) |
|||
return |
|||
} |
|||
uploadOffset, err := strconv.ParseInt(uploadOffsetStr, 10, 64) |
|||
if err != nil || uploadOffset < 0 { |
|||
http.Error(w, "Invalid Upload-Offset", http.StatusBadRequest) |
|||
return |
|||
} |
|||
|
|||
// Check offset matches current position
|
|||
if uploadOffset != session.Offset { |
|||
http.Error(w, fmt.Sprintf("Offset mismatch: expected %d, got %d", session.Offset, uploadOffset), http.StatusConflict) |
|||
return |
|||
} |
|||
|
|||
// TUS requires Content-Length header for PATCH requests
|
|||
if r.ContentLength < 0 { |
|||
http.Error(w, "Content-Length header required", http.StatusBadRequest) |
|||
return |
|||
} |
|||
|
|||
// Write data
|
|||
bytesWritten, err := fs.tusWriteData(ctx, session, uploadOffset, r.Body, r.ContentLength) |
|||
if err != nil { |
|||
glog.Errorf("Failed to write TUS data: %v", err) |
|||
http.Error(w, "Failed to write data", http.StatusInternalServerError) |
|||
return |
|||
} |
|||
|
|||
newOffset := uploadOffset + bytesWritten |
|||
|
|||
// Check if upload is complete
|
|||
if newOffset == session.Size { |
|||
// Refresh session to get updated chunks
|
|||
session, err = fs.getTusSession(ctx, uploadID) |
|||
if err != nil { |
|||
glog.Errorf("Failed to get updated TUS session: %v", err) |
|||
http.Error(w, "Failed to complete upload", http.StatusInternalServerError) |
|||
return |
|||
} |
|||
|
|||
if err := fs.completeTusUpload(ctx, session); err != nil { |
|||
glog.Errorf("Failed to complete TUS upload: %v", err) |
|||
http.Error(w, "Failed to complete upload", http.StatusInternalServerError) |
|||
return |
|||
} |
|||
} |
|||
|
|||
w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10)) |
|||
w.WriteHeader(http.StatusNoContent) |
|||
} |
|||
|
|||
// tusDeleteHandler handles DELETE requests to cancel uploads
|
|||
func (fs *FilerServer) tusDeleteHandler(w http.ResponseWriter, r *http.Request, uploadID string) { |
|||
ctx := r.Context() |
|||
|
|||
if err := fs.deleteTusSession(ctx, uploadID); err != nil { |
|||
glog.Errorf("Failed to delete TUS session: %v", err) |
|||
http.Error(w, "Failed to delete upload", http.StatusInternalServerError) |
|||
return |
|||
} |
|||
|
|||
w.WriteHeader(http.StatusNoContent) |
|||
} |
|||
|
|||
// tusWriteData uploads data to volume servers and updates session
|
|||
func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, offset int64, reader io.Reader, contentLength int64) (int64, error) { |
|||
if contentLength == 0 { |
|||
return 0, nil |
|||
} |
|||
|
|||
// Limit content length to remaining size
|
|||
remaining := session.Size - offset |
|||
if contentLength > remaining { |
|||
contentLength = remaining |
|||
} |
|||
if contentLength <= 0 { |
|||
return 0, nil |
|||
} |
|||
|
|||
// Read data into buffer
|
|||
// Determine storage options based on target path
|
|||
so, err := fs.detectStorageOption0(ctx, session.TargetPath, "", "", "", "", "", "", "", "", "") |
|||
if err != nil { |
|||
return 0, fmt.Errorf("detect storage option: %w", err) |
|||
} |
|||
|
|||
// Assign file ID from master
|
|||
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so) |
|||
if assignErr != nil { |
|||
return 0, fmt.Errorf("assign volume: %w", assignErr) |
|||
} |
|||
|
|||
// Upload to volume server
|
|||
uploader, uploaderErr := operation.NewUploader() |
|||
if uploaderErr != nil { |
|||
return 0, fmt.Errorf("create uploader: %w", uploaderErr) |
|||
} |
|||
|
|||
// Read first bytes for MIME type detection, respecting contentLength
|
|||
// http.DetectContentType uses at most 512 bytes
|
|||
sniffSize := int64(512) |
|||
if contentLength < sniffSize { |
|||
sniffSize = contentLength |
|||
} |
|||
sniffBuf := make([]byte, sniffSize) |
|||
sniffN, sniffErr := io.ReadFull(reader, sniffBuf) |
|||
if sniffErr != nil && sniffErr != io.EOF && sniffErr != io.ErrUnexpectedEOF { |
|||
return 0, fmt.Errorf("read data for mime detection: %w", sniffErr) |
|||
} |
|||
if sniffN == 0 { |
|||
return 0, nil |
|||
} |
|||
sniffBuf = sniffBuf[:sniffN] |
|||
|
|||
// Detect MIME type from sniffed bytes
|
|||
mimeType := http.DetectContentType(sniffBuf) |
|||
|
|||
// Create a reader that combines sniffed bytes with remaining data
|
|||
var dataReader io.Reader |
|||
if int64(sniffN) >= contentLength { |
|||
// All data fits in sniff buffer
|
|||
dataReader = bytes.NewReader(sniffBuf) |
|||
} else { |
|||
// Combine sniffed bytes with remaining stream
|
|||
dataReader = io.MultiReader(bytes.NewReader(sniffBuf), io.LimitReader(reader, contentLength-int64(sniffN))) |
|||
} |
|||
|
|||
uploadResult, uploadErr, _ := uploader.Upload(ctx, dataReader, &operation.UploadOption{ |
|||
UploadUrl: urlLocation, |
|||
Filename: "", |
|||
Cipher: fs.option.Cipher, |
|||
IsInputCompressed: false, |
|||
MimeType: mimeType, |
|||
PairMap: nil, |
|||
Jwt: auth, |
|||
}) |
|||
if uploadErr != nil { |
|||
return 0, fmt.Errorf("upload data: %w", uploadErr) |
|||
} |
|||
|
|||
// Create chunk info
|
|||
chunk := &TusChunkInfo{ |
|||
Offset: offset, |
|||
Size: int64(uploadResult.Size), |
|||
FileId: fileId, |
|||
UploadAt: time.Now().UnixNano(), |
|||
} |
|||
|
|||
// Update session
|
|||
if err := fs.updateTusSessionOffset(ctx, session.ID, offset+int64(uploadResult.Size), chunk); err != nil { |
|||
// Try to clean up the uploaded chunk
|
|||
fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{ |
|||
{FileId: fileId}, |
|||
}) |
|||
return 0, fmt.Errorf("update session: %w", err) |
|||
} |
|||
|
|||
stats.FilerHandlerCounter.WithLabelValues("tusUploadChunk").Inc() |
|||
|
|||
return int64(uploadResult.Size), nil |
|||
} |
|||
|
|||
// parseTusMetadata parses the Upload-Metadata header
|
|||
// Format: key1 base64value1,key2 base64value2,...
|
|||
func parseTusMetadata(header string) map[string]string { |
|||
metadata := make(map[string]string) |
|||
if header == "" { |
|||
return metadata |
|||
} |
|||
|
|||
pairs := strings.Split(header, ",") |
|||
for _, pair := range pairs { |
|||
pair = strings.TrimSpace(pair) |
|||
parts := strings.SplitN(pair, " ", 2) |
|||
if len(parts) != 2 { |
|||
continue |
|||
} |
|||
key := strings.TrimSpace(parts[0]) |
|||
encodedValue := strings.TrimSpace(parts[1]) |
|||
|
|||
value, err := base64.StdEncoding.DecodeString(encodedValue) |
|||
if err != nil { |
|||
glog.V(1).Infof("Failed to decode TUS metadata value for key %s: %v", key, err) |
|||
continue |
|||
} |
|||
metadata[key] = string(value) |
|||
} |
|||
|
|||
return metadata |
|||
} |
|||
@ -0,0 +1,341 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"context" |
|||
"encoding/json" |
|||
"fmt" |
|||
"os" |
|||
"sort" |
|||
"strconv" |
|||
"strings" |
|||
"time" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/filer" |
|||
"github.com/seaweedfs/seaweedfs/weed/glog" |
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/util" |
|||
) |
|||
|
|||
const ( |
|||
TusVersion = "1.0.0" |
|||
TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size
|
|||
TusUploadsFolder = ".uploads.tus" |
|||
TusInfoFileName = ".info" |
|||
TusChunkExt = ".chunk" |
|||
TusExtensions = "creation,creation-with-upload,termination" |
|||
) |
|||
|
|||
// TusSession represents an in-progress TUS upload session
|
|||
type TusSession struct { |
|||
ID string `json:"id"` |
|||
TargetPath string `json:"target_path"` |
|||
Size int64 `json:"size"` |
|||
Offset int64 `json:"offset"` |
|||
Metadata map[string]string `json:"metadata,omitempty"` |
|||
CreatedAt time.Time `json:"created_at"` |
|||
ExpiresAt time.Time `json:"expires_at,omitempty"` |
|||
Chunks []*TusChunkInfo `json:"chunks,omitempty"` |
|||
} |
|||
|
|||
// TusChunkInfo tracks individual chunk uploads within a session
|
|||
type TusChunkInfo struct { |
|||
Offset int64 `json:"offset"` |
|||
Size int64 `json:"size"` |
|||
FileId string `json:"file_id"` |
|||
UploadAt int64 `json:"upload_at"` |
|||
} |
|||
|
|||
// tusSessionDir returns the directory path for storing TUS upload sessions
|
|||
func (fs *FilerServer) tusSessionDir() string { |
|||
return "/" + TusUploadsFolder |
|||
} |
|||
|
|||
// tusSessionPath returns the path to a specific upload session directory
|
|||
func (fs *FilerServer) tusSessionPath(uploadID string) string { |
|||
return fmt.Sprintf("/%s/%s", TusUploadsFolder, uploadID) |
|||
} |
|||
|
|||
// tusSessionInfoPath returns the path to the session info file
|
|||
func (fs *FilerServer) tusSessionInfoPath(uploadID string) string { |
|||
return fmt.Sprintf("/%s/%s/%s", TusUploadsFolder, uploadID, TusInfoFileName) |
|||
} |
|||
|
|||
// tusChunkPath returns the path to store a chunk info file
|
|||
// Format: /{TusUploadsFolder}/{uploadID}/chunk_{offset}_{size}_{fileId}
|
|||
func (fs *FilerServer) tusChunkPath(uploadID string, offset, size int64, fileId string) string { |
|||
// Replace / in fileId with _ to make it a valid filename
|
|||
safeFileId := strings.ReplaceAll(fileId, "/", "_") |
|||
return fmt.Sprintf("/%s/%s/chunk_%016d_%016d_%s", TusUploadsFolder, uploadID, offset, size, safeFileId) |
|||
} |
|||
|
|||
// parseTusChunkPath parses chunk info from a chunk file name
|
|||
func parseTusChunkPath(name string) (*TusChunkInfo, error) { |
|||
if !strings.HasPrefix(name, "chunk_") { |
|||
return nil, fmt.Errorf("not a chunk file: %s", name) |
|||
} |
|||
parts := strings.SplitN(name[6:], "_", 3) // Skip "chunk_" prefix
|
|||
if len(parts) < 3 { |
|||
return nil, fmt.Errorf("invalid chunk file name: %s", name) |
|||
} |
|||
offset, err := strconv.ParseInt(parts[0], 10, 64) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("invalid offset in chunk file: %s", name) |
|||
} |
|||
size, err := strconv.ParseInt(parts[1], 10, 64) |
|||
if err != nil { |
|||
return nil, fmt.Errorf("invalid size in chunk file: %s", name) |
|||
} |
|||
// Restore / in fileId
|
|||
fileId := strings.ReplaceAll(parts[2], "_", "/") |
|||
return &TusChunkInfo{ |
|||
Offset: offset, |
|||
Size: size, |
|||
FileId: fileId, |
|||
UploadAt: time.Now().UnixNano(), |
|||
}, nil |
|||
} |
|||
|
|||
// createTusSession creates a new TUS upload session
|
|||
func (fs *FilerServer) createTusSession(ctx context.Context, uploadID, targetPath string, size int64, metadata map[string]string) (*TusSession, error) { |
|||
session := &TusSession{ |
|||
ID: uploadID, |
|||
TargetPath: targetPath, |
|||
Size: size, |
|||
Offset: 0, |
|||
Metadata: metadata, |
|||
CreatedAt: time.Now(), |
|||
ExpiresAt: time.Now().Add(7 * 24 * time.Hour), // 7 days default expiration
|
|||
Chunks: []*TusChunkInfo{}, |
|||
} |
|||
|
|||
// Create session directory
|
|||
sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID)) |
|||
if err := fs.filer.CreateEntry(ctx, &filer.Entry{ |
|||
FullPath: sessionDirPath, |
|||
Attr: filer.Attr{ |
|||
Mode: os.ModeDir | 0755, |
|||
Crtime: time.Now(), |
|||
Mtime: time.Now(), |
|||
Uid: OS_UID, |
|||
Gid: OS_GID, |
|||
}, |
|||
}, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil { |
|||
return nil, fmt.Errorf("create session directory: %w", err) |
|||
} |
|||
|
|||
// Save session info
|
|||
if err := fs.saveTusSession(ctx, session); err != nil { |
|||
// Cleanup the directory on failure
|
|||
fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0) |
|||
return nil, fmt.Errorf("save session info: %w", err) |
|||
} |
|||
|
|||
glog.V(2).Infof("Created TUS session %s for %s, size=%d", uploadID, targetPath, size) |
|||
return session, nil |
|||
} |
|||
|
|||
// saveTusSession saves the session info to the filer
|
|||
func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession) error { |
|||
sessionData, err := json.Marshal(session) |
|||
if err != nil { |
|||
return fmt.Errorf("marshal session: %w", err) |
|||
} |
|||
|
|||
infoPath := util.FullPath(fs.tusSessionInfoPath(session.ID)) |
|||
entry := &filer.Entry{ |
|||
FullPath: infoPath, |
|||
Attr: filer.Attr{ |
|||
Mode: 0644, |
|||
Crtime: session.CreatedAt, |
|||
Mtime: time.Now(), |
|||
Uid: OS_UID, |
|||
Gid: OS_GID, |
|||
}, |
|||
Content: sessionData, |
|||
} |
|||
|
|||
if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil { |
|||
return fmt.Errorf("save session info entry: %w", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
// getTusSession retrieves a TUS session by upload ID, including chunks from directory listing
|
|||
func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*TusSession, error) { |
|||
infoPath := util.FullPath(fs.tusSessionInfoPath(uploadID)) |
|||
entry, err := fs.filer.FindEntry(ctx, infoPath) |
|||
if err != nil { |
|||
if err == filer_pb.ErrNotFound { |
|||
return nil, fmt.Errorf("session not found: %s", uploadID) |
|||
} |
|||
return nil, fmt.Errorf("find session: %w", err) |
|||
} |
|||
|
|||
var session TusSession |
|||
if err := json.Unmarshal(entry.Content, &session); err != nil { |
|||
return nil, fmt.Errorf("unmarshal session: %w", err) |
|||
} |
|||
|
|||
// Load chunks from directory listing (atomic read, no race condition)
|
|||
sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID)) |
|||
entries, _, err := fs.filer.ListDirectoryEntries(ctx, sessionDirPath, "", false, 10000, "", "", "") |
|||
if err != nil { |
|||
return nil, fmt.Errorf("list session directory: %w", err) |
|||
} |
|||
|
|||
session.Chunks = nil |
|||
session.Offset = 0 |
|||
for _, e := range entries { |
|||
if strings.HasPrefix(e.Name(), "chunk_") { |
|||
chunk, parseErr := parseTusChunkPath(e.Name()) |
|||
if parseErr != nil { |
|||
glog.V(1).Infof("Skipping invalid chunk file %s: %v", e.Name(), parseErr) |
|||
continue |
|||
} |
|||
session.Chunks = append(session.Chunks, chunk) |
|||
} |
|||
} |
|||
|
|||
// Sort chunks by offset and compute current offset
|
|||
if len(session.Chunks) > 0 { |
|||
sort.Slice(session.Chunks, func(i, j int) bool { |
|||
return session.Chunks[i].Offset < session.Chunks[j].Offset |
|||
}) |
|||
// Current offset is the end of the last chunk
|
|||
lastChunk := session.Chunks[len(session.Chunks)-1] |
|||
session.Offset = lastChunk.Offset + lastChunk.Size |
|||
} |
|||
|
|||
return &session, nil |
|||
} |
|||
|
|||
// updateTusSessionOffset stores the chunk info as a separate file entry
|
|||
// This avoids read-modify-write race conditions across multiple filer instances
|
|||
func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error { |
|||
if chunk == nil { |
|||
return nil |
|||
} |
|||
|
|||
// Store chunk info as a separate file entry (atomic operation)
|
|||
chunkPath := util.FullPath(fs.tusChunkPath(uploadID, chunk.Offset, chunk.Size, chunk.FileId)) |
|||
chunkData, err := json.Marshal(chunk) |
|||
if err != nil { |
|||
return fmt.Errorf("marshal chunk info: %w", err) |
|||
} |
|||
|
|||
if err := fs.filer.CreateEntry(ctx, &filer.Entry{ |
|||
FullPath: chunkPath, |
|||
Attr: filer.Attr{ |
|||
Mode: 0644, |
|||
Crtime: time.Now(), |
|||
Mtime: time.Now(), |
|||
Uid: OS_UID, |
|||
Gid: OS_GID, |
|||
}, |
|||
Content: chunkData, |
|||
}, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil { |
|||
return fmt.Errorf("save chunk info: %w", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
// deleteTusSession removes a TUS upload session and all its data
|
|||
func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error { |
|||
|
|||
session, err := fs.getTusSession(ctx, uploadID) |
|||
if err != nil { |
|||
// Session might already be deleted or never existed
|
|||
glog.V(1).Infof("TUS session %s not found for deletion: %v", uploadID, err) |
|||
return nil |
|||
} |
|||
|
|||
// Delete any uploaded chunks from volume servers
|
|||
for _, chunk := range session.Chunks { |
|||
if chunk.FileId != "" { |
|||
fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{ |
|||
{FileId: chunk.FileId}, |
|||
}) |
|||
} |
|||
} |
|||
|
|||
// Delete the session directory
|
|||
sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID)) |
|||
if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0); err != nil { |
|||
return fmt.Errorf("delete session directory: %w", err) |
|||
} |
|||
|
|||
glog.V(2).Infof("Deleted TUS session %s", uploadID) |
|||
return nil |
|||
} |
|||
|
|||
// completeTusUpload assembles all chunks and creates the final file
|
|||
func (fs *FilerServer) completeTusUpload(ctx context.Context, session *TusSession) error { |
|||
if session.Offset != session.Size { |
|||
return fmt.Errorf("upload incomplete: offset=%d, expected=%d", session.Offset, session.Size) |
|||
} |
|||
|
|||
// Sort chunks by offset to ensure correct order
|
|||
sort.Slice(session.Chunks, func(i, j int) bool { |
|||
return session.Chunks[i].Offset < session.Chunks[j].Offset |
|||
}) |
|||
|
|||
// Assemble file chunks in order
|
|||
var fileChunks []*filer_pb.FileChunk |
|||
|
|||
for _, chunk := range session.Chunks { |
|||
fid, fidErr := filer_pb.ToFileIdObject(chunk.FileId) |
|||
if fidErr != nil { |
|||
return fmt.Errorf("invalid file ID %s at offset %d: %w", chunk.FileId, chunk.Offset, fidErr) |
|||
} |
|||
|
|||
fileChunk := &filer_pb.FileChunk{ |
|||
FileId: chunk.FileId, |
|||
Offset: chunk.Offset, |
|||
Size: uint64(chunk.Size), |
|||
ModifiedTsNs: chunk.UploadAt, |
|||
Fid: fid, |
|||
} |
|||
fileChunks = append(fileChunks, fileChunk) |
|||
} |
|||
|
|||
// Determine content type from metadata
|
|||
contentType := "" |
|||
if session.Metadata != nil { |
|||
if ct, ok := session.Metadata["content-type"]; ok { |
|||
contentType = ct |
|||
} |
|||
} |
|||
|
|||
// Create the final file entry
|
|||
targetPath := util.FullPath(session.TargetPath) |
|||
entry := &filer.Entry{ |
|||
FullPath: targetPath, |
|||
Attr: filer.Attr{ |
|||
Mode: 0644, |
|||
Crtime: session.CreatedAt, |
|||
Mtime: time.Now(), |
|||
Uid: OS_UID, |
|||
Gid: OS_GID, |
|||
Mime: contentType, |
|||
}, |
|||
Chunks: fileChunks, |
|||
} |
|||
|
|||
// Ensure parent directory exists
|
|||
if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil { |
|||
return fmt.Errorf("create final file entry: %w", err) |
|||
} |
|||
|
|||
// Delete the session (but keep the chunks since they're now part of the final file)
|
|||
sessionDirPath := util.FullPath(fs.tusSessionPath(session.ID)) |
|||
if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, false, false, false, nil, 0); err != nil { |
|||
glog.V(1).Infof("Failed to cleanup TUS session directory %s: %v", session.ID, err) |
|||
} |
|||
|
|||
glog.V(2).Infof("Completed TUS upload %s -> %s, size=%d, chunks=%d", |
|||
session.ID, session.TargetPath, session.Size, len(fileChunks)) |
|||
|
|||
return nil |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue