You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

449 lines
14 KiB

package s3api
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/k0kubun/pp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// S3TestConfig holds configuration for S3 tests
type S3TestConfig struct {
Endpoint string
AccessKey string
SecretKey string
Region string
BucketPrefix string
UseSSL bool
SkipVerifySSL bool
}
// Default test configuration - should match s3tests.conf
var defaultConfig = &S3TestConfig{
Endpoint: "http://localhost:8333", // Default SeaweedFS S3 port
AccessKey: "some_access_key1",
SecretKey: "some_secret_key1",
Region: "us-east-1",
BucketPrefix: "test-versioning-",
UseSSL: false,
SkipVerifySSL: true,
}
// getS3Client creates an AWS S3 client for testing
func getS3Client(t *testing.T) *s3.Client {
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(defaultConfig.Region),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
defaultConfig.AccessKey,
defaultConfig.SecretKey,
"",
)),
config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(
func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{
URL: defaultConfig.Endpoint,
SigningRegion: defaultConfig.Region,
HostnameImmutable: true,
}, nil
})),
)
require.NoError(t, err)
return s3.NewFromConfig(cfg, func(o *s3.Options) {
o.UsePathStyle = true // Important for SeaweedFS
})
}
// getNewBucketName generates a unique bucket name
func getNewBucketName() string {
timestamp := time.Now().UnixNano()
return fmt.Sprintf("%s%d", defaultConfig.BucketPrefix, timestamp)
}
// createBucket creates a new bucket for testing
func createBucket(t *testing.T, client *s3.Client, bucketName string) {
_, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err)
}
// deleteBucket deletes a bucket and all its contents
func deleteBucket(t *testing.T, client *s3.Client, bucketName string) {
// First, delete all objects and versions
err := deleteAllObjectVersions(t, client, bucketName)
if err != nil {
t.Logf("Warning: failed to delete all object versions: %v", err)
}
// Then delete the bucket
_, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
t.Logf("Warning: failed to delete bucket %s: %v", bucketName, err)
}
}
// deleteAllObjectVersions deletes all object versions in a bucket
func deleteAllObjectVersions(t *testing.T, client *s3.Client, bucketName string) error {
// List all object versions
paginator := s3.NewListObjectVersionsPaginator(client, &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
})
for paginator.HasMorePages() {
page, err := paginator.NextPage(context.TODO())
if err != nil {
return err
}
var objectsToDelete []types.ObjectIdentifier
// Add versions
for _, version := range page.Versions {
objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
Key: version.Key,
VersionId: version.VersionId,
})
}
// Add delete markers
for _, deleteMarker := range page.DeleteMarkers {
objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
Key: deleteMarker.Key,
VersionId: deleteMarker.VersionId,
})
}
// Delete objects in batches
if len(objectsToDelete) > 0 {
_, err := client.DeleteObjects(context.TODO(), &s3.DeleteObjectsInput{
Bucket: aws.String(bucketName),
Delete: &types.Delete{
Objects: objectsToDelete,
Quiet: aws.Bool(true),
},
})
if err != nil {
return err
}
}
}
return nil
}
// enableVersioning enables versioning on a bucket
func enableVersioning(t *testing.T, client *s3.Client, bucketName string) {
_, err := client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{
Bucket: aws.String(bucketName),
VersioningConfiguration: &types.VersioningConfiguration{
Status: types.BucketVersioningStatusEnabled,
},
})
require.NoError(t, err)
}
// checkVersioningStatus verifies the versioning status of a bucket
func checkVersioningStatus(t *testing.T, client *s3.Client, bucketName string, expectedStatus types.BucketVersioningStatus) {
resp, err := client.GetBucketVersioning(context.TODO(), &s3.GetBucketVersioningInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err)
assert.Equal(t, expectedStatus, resp.Status)
}
// checkVersioningStatusEmpty verifies that a bucket has no versioning configuration (newly created bucket)
func checkVersioningStatusEmpty(t *testing.T, client *s3.Client, bucketName string) {
resp, err := client.GetBucketVersioning(context.TODO(), &s3.GetBucketVersioningInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err)
// AWS S3 returns an empty versioning configuration (no Status field) for buckets that have never had versioning configured, such as newly created buckets.
assert.Empty(t, resp.Status, "Newly created bucket should have empty versioning status")
}
// putObject puts an object into a bucket
func putObject(t *testing.T, client *s3.Client, bucketName, key, content string) *s3.PutObjectOutput {
resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: strings.NewReader(content),
})
require.NoError(t, err)
return resp
}
// headObject gets object metadata
func headObject(t *testing.T, client *s3.Client, bucketName, key string) *s3.HeadObjectOutput {
resp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
require.NoError(t, err)
return resp
}
// TestBucketListReturnDataVersioning is the Go equivalent of test_bucket_list_return_data_versioning
func TestBucketListReturnDataVersioning(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create bucket
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
// Enable versioning
enableVersioning(t, client, bucketName)
checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
// Create test objects
keyNames := []string{"bar", "baz", "foo"}
objectData := make(map[string]map[string]interface{})
for _, keyName := range keyNames {
// Put the object
putResp := putObject(t, client, bucketName, keyName, keyName) // content = key name
// Get object metadata
headResp := headObject(t, client, bucketName, keyName)
// Store expected data for later comparison
objectData[keyName] = map[string]interface{}{
"ETag": *headResp.ETag,
"LastModified": *headResp.LastModified,
"ContentLength": headResp.ContentLength,
"VersionId": *headResp.VersionId,
}
// Verify version ID was returned
require.NotNil(t, putResp.VersionId)
require.NotEmpty(t, *putResp.VersionId)
assert.Equal(t, *putResp.VersionId, *headResp.VersionId)
}
// List object versions
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err)
// Verify we have the expected number of versions
assert.Len(t, resp.Versions, len(keyNames))
// Check each version matches our stored data
versionsByKey := make(map[string]types.ObjectVersion)
for _, version := range resp.Versions {
versionsByKey[*version.Key] = version
}
for _, keyName := range keyNames {
version, exists := versionsByKey[keyName]
require.True(t, exists, "Expected version for key %s", keyName)
expectedData := objectData[keyName]
// Compare ETag
assert.Equal(t, expectedData["ETag"], *version.ETag)
// Compare Size
assert.Equal(t, expectedData["ContentLength"], version.Size)
// Compare VersionId
assert.Equal(t, expectedData["VersionId"], *version.VersionId)
// Compare LastModified (within reasonable tolerance)
expectedTime := expectedData["LastModified"].(time.Time)
actualTime := *version.LastModified
timeDiff := actualTime.Sub(expectedTime)
if timeDiff < 0 {
timeDiff = -timeDiff
}
assert.True(t, timeDiff < time.Minute, "LastModified times should be close")
// Verify this is marked as the latest version
assert.True(t, *version.IsLatest)
// Verify it's not a delete marker
// (delete markers should be in resp.DeleteMarkers, not resp.Versions)
}
// Verify no delete markers
assert.Empty(t, resp.DeleteMarkers)
t.Logf("Successfully verified %d versioned objects", len(keyNames))
}
// TestVersioningBasicWorkflow tests basic versioning operations
func TestVersioningBasicWorkflow(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create bucket
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
// Initially, versioning should be unset/empty (not suspended) for newly created buckets
// This matches AWS S3 behavior where new buckets have no versioning status
checkVersioningStatusEmpty(t, client, bucketName)
// Enable versioning
enableVersioning(t, client, bucketName)
checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
// Put same object multiple times to create versions
key := "test-object"
version1 := putObject(t, client, bucketName, key, "content-v1")
version2 := putObject(t, client, bucketName, key, "content-v2")
version3 := putObject(t, client, bucketName, key, "content-v3")
// Verify each put returned a different version ID
require.NotEqual(t, *version1.VersionId, *version2.VersionId)
require.NotEqual(t, *version2.VersionId, *version3.VersionId)
require.NotEqual(t, *version1.VersionId, *version3.VersionId)
// List versions
resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err)
// Should have 3 versions
assert.Len(t, resp.Versions, 3)
// Only the latest should be marked as latest
latestCount := 0
for _, version := range resp.Versions {
if *version.IsLatest {
latestCount++
assert.Equal(t, *version3.VersionId, *version.VersionId)
}
}
assert.Equal(t, 1, latestCount, "Only one version should be marked as latest")
t.Logf("Successfully created and verified %d versions", len(resp.Versions))
}
// TestVersioningDeleteMarkers tests delete marker creation
func TestVersioningDeleteMarkers(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create bucket and enable versioning
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
enableVersioning(t, client, bucketName)
// Put an object
key := "test-delete-marker"
putResp := putObject(t, client, bucketName, key, "content")
require.NotNil(t, putResp.VersionId)
// Delete the object (should create delete marker)
deleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
})
require.NoError(t, err)
require.NotNil(t, deleteResp.VersionId)
// List versions to see the delete marker
listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
})
require.NoError(t, err)
// Should have 1 version and 1 delete marker
assert.Len(t, listResp.Versions, 1)
assert.Len(t, listResp.DeleteMarkers, 1)
// The delete marker should be the latest
deleteMarker := listResp.DeleteMarkers[0]
assert.True(t, *deleteMarker.IsLatest)
assert.Equal(t, *deleteResp.VersionId, *deleteMarker.VersionId)
// The original version should not be latest
version := listResp.Versions[0]
assert.False(t, *version.IsLatest)
assert.Equal(t, *putResp.VersionId, *version.VersionId)
t.Logf("Successfully created and verified delete marker")
}
// TestVersioningConcurrentOperations tests concurrent versioning operations
func TestVersioningConcurrentOperations(t *testing.T) {
client := getS3Client(t)
bucketName := getNewBucketName()
// Create bucket and enable versioning
createBucket(t, client, bucketName)
defer deleteBucket(t, client, bucketName)
enableVersioning(t, client, bucketName)
// Concurrently create multiple objects
numObjects := 10
objectKey := "concurrent-test"
// Channel to collect version IDs
versionIds := make(chan string, numObjects)
errors := make(chan error, numObjects)
// Launch concurrent puts
for i := 0; i < numObjects; i++ {
go func(index int) {
content := fmt.Sprintf("content-%d", index)
resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(objectKey),
Body: strings.NewReader(content),
})
if err != nil {
errors <- err
return
}
versionIds <- *resp.VersionId
}(i)
}
// Collect results
var collectedVersionIds []string
for i := 0; i < numObjects; i++ {
select {
case versionId := <-versionIds:
t.Logf("Received Version ID %d: %s", i, versionId)
collectedVersionIds = append(collectedVersionIds, versionId)
case err := <-errors:
t.Fatalf("Concurrent put failed: %v", err)
case <-time.After(30 * time.Second):
t.Fatalf("Timeout waiting for concurrent operations")
}
}
// Verify all version IDs are unique
versionIdSet := make(map[string]bool)
for _, versionId := range collectedVersionIds {
assert.False(t, versionIdSet[versionId], "Version ID should be unique: %s", versionId)
versionIdSet[versionId] = true
}
// List versions and verify count
listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucketName),
})
pp.Println(listResp)
require.NoError(t, err)
assert.Len(t, listResp.Versions, numObjects)
t.Logf("Successfully created %d concurrent versions with unique IDs", numObjects)
}