diff --git a/test/s3tables/catalog/iceberg_catalog_test.go b/test/s3tables/catalog/iceberg_catalog_test.go index d7ceaee1d..b4e13d5ad 100644 --- a/test/s3tables/catalog/iceberg_catalog_test.go +++ b/test/s3tables/catalog/iceberg_catalog_test.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "encoding/json" + "flag" "fmt" "io" "net" @@ -18,6 +19,37 @@ import ( "time" ) +// sharedEnv is the single TestEnvironment shared across all tests in this package. +var sharedEnv *TestEnvironment + +// TestMain starts one weed mini instance for the whole package and tears it down +// after all tests have run. +func TestMain(m *testing.M) { + flag.Parse() + if os.Getenv("SHORT") != "" || testing.Short() { + // Let tests self-skip when run with -short. + os.Exit(m.Run()) + } + + env, err := newTestEnvironmentForMain() + if err != nil { + fmt.Fprintf(os.Stderr, "SKIP: setup failed: %v\n", err) + os.Exit(0) // Skip all tests rather than fail + } + sharedEnv = env + + if startErr := sharedEnv.startSeaweedFSForMain(); startErr != nil { + fmt.Fprintf(os.Stderr, "SKIP: weed mini failed to start: %v\n", startErr) + sharedEnv.cleanupForMain() + os.Exit(0) + } + + code := m.Run() + + sharedEnv.cleanupForMain() + os.Exit(code) +} + // TestEnvironment contains the test environment configuration type TestEnvironment struct { seaweedDir string @@ -54,17 +86,15 @@ func getFreePort() (int, net.Listener, error) { return addr.Port, listener, nil } -// NewTestEnvironment creates a new test environment -func NewTestEnvironment(t *testing.T) *TestEnvironment { - t.Helper() - +// newTestEnvironmentForMain creates a TestEnvironment without calling t.Fatalf so it +// can be used from TestMain (which has no *testing.T). +func newTestEnvironmentForMain() (*TestEnvironment, error) { // Find the SeaweedFS root directory wd, err := os.Getwd() if err != nil { - t.Fatalf("Failed to get working directory: %v", err) + return nil, fmt.Errorf("get working directory: %w", err) } - // Navigate up to find the SeaweedFS root (contains go.mod) seaweedDir := wd for i := 0; i < 5; i++ { if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil { @@ -76,82 +106,93 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment { // Check for weed binary weedBinary := filepath.Join(seaweedDir, "weed", "weed") if _, err := os.Stat(weedBinary); os.IsNotExist(err) { - // Try system PATH weedBinary = "weed" if _, err := exec.LookPath(weedBinary); err != nil { - t.Skip("weed binary not found, skipping integration test") + return nil, fmt.Errorf("weed binary not found") } } // Create temporary data directory dataDir, err := os.MkdirTemp("", "seaweed-iceberg-test-*") if err != nil { - t.Fatalf("Failed to create temp dir: %v", err) + return nil, fmt.Errorf("create temp dir: %w", err) } // Allocate free ephemeral ports for each service var listeners []net.Listener - defer func() { + closeListeners := func() { for _, l := range listeners { l.Close() } - }() + } var l net.Listener s3Port, l, err := getFreePort() if err != nil { - t.Fatalf("Failed to get free port for S3: %v", err) + closeListeners() + return nil, fmt.Errorf("get free port for S3: %w", err) } listeners = append(listeners, l) icebergPort, l, err := getFreePort() if err != nil { - t.Fatalf("Failed to get free port for Iceberg: %v", err) + closeListeners() + return nil, fmt.Errorf("get free port for Iceberg: %w", err) } listeners = append(listeners, l) s3GrpcPort, l, err := getFreePort() if err != nil { - t.Fatalf("Failed to get free port for S3 gRPC: %v", err) + closeListeners() + return nil, fmt.Errorf("get free port for S3 gRPC: %w", err) } listeners = append(listeners, l) masterPort, l, err := getFreePort() if err != nil { - t.Fatalf("Failed to get free port for Master: %v", err) + closeListeners() + return nil, fmt.Errorf("get free port for Master: %w", err) } listeners = append(listeners, l) masterGrpcPort, l, err := getFreePort() if err != nil { - t.Fatalf("Failed to get free port for Master gRPC: %v", err) + closeListeners() + return nil, fmt.Errorf("get free port for Master gRPC: %w", err) } listeners = append(listeners, l) filerPort, l, err := getFreePort() if err != nil { - t.Fatalf("Failed to get free port for Filer: %v", err) + closeListeners() + return nil, fmt.Errorf("get free port for Filer: %w", err) } listeners = append(listeners, l) filerGrpcPort, l, err := getFreePort() if err != nil { - t.Fatalf("Failed to get free port for Filer gRPC: %v", err) + closeListeners() + return nil, fmt.Errorf("get free port for Filer gRPC: %w", err) } listeners = append(listeners, l) volumePort, l, err := getFreePort() if err != nil { - t.Fatalf("Failed to get free port for Volume: %v", err) + closeListeners() + return nil, fmt.Errorf("get free port for Volume: %w", err) } listeners = append(listeners, l) volumeGrpcPort, l, err := getFreePort() if err != nil { - t.Fatalf("Failed to get free port for Volume gRPC: %v", err) + closeListeners() + return nil, fmt.Errorf("get free port for Volume gRPC: %w", err) } listeners = append(listeners, l) + // Release the port reservations so weed mini can bind to them + closeListeners() + return &TestEnvironment{ seaweedDir: seaweedDir, weedBinary: weedBinary, @@ -166,13 +207,11 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment { volumePort: volumePort, volumeGrpcPort: volumeGrpcPort, dockerAvailable: hasDocker(), - } + }, nil } -// StartSeaweedFS starts a SeaweedFS mini cluster -func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { - t.Helper() - +// startSeaweedFSForMain starts weed mini without a *testing.T (for use in TestMain). +func (env *TestEnvironment) startSeaweedFSForMain() error { ctx, cancel := context.WithCancel(context.Background()) env.weedCancel = cancel @@ -182,7 +221,8 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { for _, dir := range []string{masterDir, filerDir, volumeDir} { if err := os.MkdirAll(dir, 0755); err != nil { - t.Fatalf("Failed to create directory %s: %v", dir, err) + cancel() + return fmt.Errorf("create directory %s: %w", dir, err) } } @@ -203,13 +243,30 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { - t.Fatalf("Failed to start SeaweedFS: %v", err) + cancel() + return fmt.Errorf("start SeaweedFS: %w", err) } env.weedProcess = cmd - // Wait for services to be ready if !env.waitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergPort), 30*time.Second) { - t.Fatalf("Iceberg REST API did not become ready") + cancel() + cmd.Wait() + return fmt.Errorf("Iceberg REST API did not become ready") + } + return nil +} + +// cleanupForMain stops SeaweedFS and cleans up resources (no *testing.T needed). +func (env *TestEnvironment) cleanupForMain() { + if env.weedCancel != nil { + env.weedCancel() + } + if env.weedProcess != nil { + time.Sleep(2 * time.Second) + env.weedProcess.Wait() + } + if env.dataDir != "" { + os.RemoveAll(env.dataDir) } } @@ -230,25 +287,6 @@ func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bo return false } -// Cleanup stops SeaweedFS and cleans up resources -func (env *TestEnvironment) Cleanup(t *testing.T) { - t.Helper() - - if env.weedCancel != nil { - env.weedCancel() - } - - if env.weedProcess != nil { - // Give process time to shut down gracefully - time.Sleep(2 * time.Second) - env.weedProcess.Wait() - } - - if env.dataDir != "" { - os.RemoveAll(env.dataDir) - } -} - // IcebergURL returns the Iceberg REST Catalog URL func (env *TestEnvironment) IcebergURL() string { return fmt.Sprintf("http://127.0.0.1:%d", env.icebergPort) @@ -260,10 +298,7 @@ func TestIcebergConfig(t *testing.T) { t.Skip("Skipping integration test in short mode") } - env := NewTestEnvironment(t) - defer env.Cleanup(t) - - env.StartSeaweedFS(t) + env := sharedEnv // Test GET /v1/config resp, err := http.Get(env.IcebergURL() + "/v1/config") @@ -294,16 +329,14 @@ func TestIcebergNamespaces(t *testing.T) { t.Skip("Skipping integration test in short mode") } - env := NewTestEnvironment(t) - defer env.Cleanup(t) - - env.StartSeaweedFS(t) + env := sharedEnv // Create the default table bucket first via S3 - createTableBucket(t, env, "warehouse") + bucketName := "warehouse-ns-" + randomSuffix() + createTableBucket(t, env, bucketName) // Test GET /v1/namespaces (should return empty list initially) - resp, err := http.Get(env.IcebergURL() + "/v1/namespaces") + resp, err := http.Get(env.IcebergURL() + icebergPath(bucketName, "/v1/namespaces")) if err != nil { t.Fatalf("Failed to list namespaces: %v", err) } @@ -321,16 +354,14 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) { t.Skip("Skipping integration test in short mode") } - env := NewTestEnvironment(t) - defer env.Cleanup(t) - - env.StartSeaweedFS(t) - createTableBucket(t, env, "warehouse") + env := sharedEnv + bucketName := "warehouse-stage-" + randomSuffix() + createTableBucket(t, env, bucketName) - namespace := "stage_ns" + namespace := "stage_ns_" + randomSuffix() tableName := "orders" - status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{ + status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{ "namespace": []string{namespace}, }) if err != nil { @@ -340,7 +371,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) { t.Fatalf("Create namespace status = %d, want 200 or 409", status) } - status, badReqResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables", namespace), map[string]any{ + status, badReqResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{ "stage-create": true, }) if err != nil { @@ -358,7 +389,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) { t.Fatalf("error.message = %v, want it to include %q", errorObj["message"], "table name is required") } - status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables", namespace), map[string]any{ + status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{ "name": tableName, "stage-create": true, }) @@ -373,7 +404,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) { t.Fatalf("stage metadata-location = %q, want suffix /metadata/v1.metadata.json", stageLocation) } - status, _, err = doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil) + status, _, err = doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil) if err != nil { t.Fatalf("Load staged table request failed: %v", err) } @@ -381,7 +412,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) { t.Fatalf("Load staged table status = %d, want 404", status) } - status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{ + status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{ "requirements": []map[string]any{ {"type": "assert-create"}, }, @@ -398,7 +429,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) { t.Fatalf("final metadata-location = %q, want suffix /metadata/v1.metadata.json", commitLocation) } - status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil) + status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil) if err != nil { t.Fatalf("Load finalized table request failed: %v", err) } @@ -417,16 +448,14 @@ func TestCommitMissingTableWithoutAssertCreate(t *testing.T) { t.Skip("Skipping integration test in short mode") } - env := NewTestEnvironment(t) - defer env.Cleanup(t) - - env.StartSeaweedFS(t) - createTableBucket(t, env, "warehouse") + env := sharedEnv + bucketName := "warehouse-missing-" + randomSuffix() + createTableBucket(t, env, bucketName) - namespace := "stage_missing_assert_ns" + namespace := "stage_missing_assert_ns_" + randomSuffix() tableName := "missing_table" - status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{ + status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{ "namespace": []string{namespace}, }) if err != nil { @@ -436,7 +465,7 @@ func TestCommitMissingTableWithoutAssertCreate(t *testing.T) { t.Fatalf("Create namespace status = %d, want 200 or 409", status) } - status, _, err = doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{ + status, _, err = doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{ "requirements": []any{}, "updates": []any{}, }) @@ -491,6 +520,21 @@ func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any return resp.StatusCode, decoded, nil } +// icebergPath inserts the table bucket prefix into Iceberg REST API paths. +// For example, "/v1/namespaces" with prefix "my-bucket" becomes +// "/v1/my-bucket/namespaces". +func icebergPath(prefix, path string) string { + if prefix == "" { + return path + } + const base = "/v1/" + if !strings.HasPrefix(path, base) { + return path + } + withPrefix := base + prefix + "/" + strings.TrimPrefix(path, base) + return withPrefix +} + // createTableBucket creates a table bucket via the S3Tables REST API func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { t.Helper() @@ -520,6 +564,11 @@ func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) { t.Logf("Created table bucket %s", bucketName) } +// randomSuffix returns a short random hex suffix for unique resource naming. +func randomSuffix() string { + return fmt.Sprintf("%x", time.Now().UnixNano()&0xffffffff) +} + // TestDuckDBIntegration tests Iceberg catalog operations using DuckDB // This test requires Docker to be available func TestDuckDBIntegration(t *testing.T) { @@ -527,15 +576,12 @@ func TestDuckDBIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - env := NewTestEnvironment(t) - defer env.Cleanup(t) + env := sharedEnv if !env.dockerAvailable { t.Skip("Docker not available, skipping DuckDB integration test") } - env.StartSeaweedFS(t) - // Create a temporary SQL file for DuckDB to execute sqlFile := filepath.Join(env.dataDir, "test.sql") sqlContent := fmt.Sprintf(` diff --git a/test/s3tables/catalog/pyiceberg_test.go b/test/s3tables/catalog/pyiceberg_test.go index 750207d74..54f4f84b8 100644 --- a/test/s3tables/catalog/pyiceberg_test.go +++ b/test/s3tables/catalog/pyiceberg_test.go @@ -26,17 +26,14 @@ func TestPyIcebergRestCatalog(t *testing.T) { t.Skip("Skipping integration test in short mode") } - env := NewTestEnvironment(t) - defer env.Cleanup(t) + env := sharedEnv if !env.dockerAvailable { t.Skip("Docker not available, skipping PyIceberg integration test") } - env.StartSeaweedFS(t) - // Create the test bucket first - bucketName := "pyiceberg-compat-test" + bucketName := "pyiceberg-compat-test-" + randomSuffix() createTableBucket(t, env, bucketName) // Build the test working directory path @@ -84,8 +81,7 @@ func TestPyIcebergRestCatalogAuthenticated(t *testing.T) { t.Skip("Skipping integration test in short mode") } - env := NewTestEnvironment(t) - defer env.Cleanup(t) + env := sharedEnv if !env.dockerAvailable { t.Skip("Docker not available, skipping PyIceberg integration test") @@ -95,11 +91,8 @@ func TestPyIcebergRestCatalogAuthenticated(t *testing.T) { testAccessKey := "admin" testSecretKey := "admin" - // Start SeaweedFS (it will use default admin credentials from environment if set) - env.StartSeaweedFS(t) - // Create the test bucket first (using unauthenticated request, which works with DefaultAllow) - bucketName := "pyiceberg-auth-test" + bucketName := "pyiceberg-auth-test-" + randomSuffix() createTableBucket(t, env, bucketName) // Build the test working directory path diff --git a/test/s3tables/catalog/pyiceberg_test_helpers.go b/test/s3tables/catalog/pyiceberg_test_helpers_test.go similarity index 100% rename from test/s3tables/catalog/pyiceberg_test_helpers.go rename to test/s3tables/catalog/pyiceberg_test_helpers_test.go diff --git a/test/s3tables/table-buckets/s3tables_integration_test.go b/test/s3tables/table-buckets/s3tables_integration_test.go index 3512dc4d5..e971a42fd 100644 --- a/test/s3tables/table-buckets/s3tables_integration_test.go +++ b/test/s3tables/table-buckets/s3tables_integration_test.go @@ -12,33 +12,57 @@ import ( "time" cryptorand "crypto/rand" - "sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "flag" + "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" - flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" ) -var ( - miniClusterMutex sync.Mutex -) +// TestMain starts a single default weed mini cluster for the whole package and +// tears it down after all tests have completed. Tests that require a different +// cluster configuration (e.g. TestS3TablesCreateBucketIAMPolicy) start their +// own cluster independently. +func TestMain(m *testing.M) { + flag.Parse() + if testing.Short() { + // Tests self-skip with t.Skip when -short is set; no cluster needed. + os.Exit(m.Run()) + } + + // Create a temporary T-less context so we can use t.TempDir-equivalent. + testDir, err := os.MkdirTemp("", "seaweed-s3tables-shared-*") + if err != nil { + fmt.Fprintf(os.Stderr, "SKIP: failed to create shared temp dir: %v\n", err) + os.Exit(0) + } + + cluster, err := startMiniClusterInDir(testDir, nil) + if err != nil { + fmt.Fprintf(os.Stderr, "SKIP: failed to start shared weed mini cluster: %v\n", err) + os.RemoveAll(testDir) + os.Exit(0) + } + sharedCluster = cluster + + code := m.Run() + + sharedCluster.Stop() + os.RemoveAll(testDir) + os.Exit(code) +} func TestS3TablesIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - // Create and start test cluster - cluster, err := startMiniCluster(t) - require.NoError(t, err) - defer cluster.Stop() - - // Create S3 Tables client - client := NewS3TablesClient(cluster.s3Endpoint, testRegion, testAccessKey, testSecretKey) + // Re-use the shared cluster started by TestMain. + client := NewS3TablesClient(sharedCluster.s3Endpoint, testRegion, testAccessKey, testSecretKey) // Run test suite t.Run("TableBucketLifecycle", func(t *testing.T) { @@ -70,6 +94,92 @@ func TestS3TablesIntegration(t *testing.T) { }) } +func TestS3TablesCreateBucketIAMPolicy(t *testing.T) { + if testing.Short() { + t.Skip("Skipping IAM integration test in short mode") + } + + t.Setenv("AWS_ACCESS_KEY_ID", "env-admin") + t.Setenv("AWS_SECRET_ACCESS_KEY", "env-secret") + + allowedBucket := "tables-allowed" + deniedBucket := "tables-denied" + iamConfigDir := t.TempDir() + iamConfigPath := filepath.Join(iamConfigDir, "iam_config.json") + iamConfig := fmt.Sprintf(`{ + "sts": { + "tokenDuration": "1h", + "maxSessionLength": "12h", + "issuer": "seaweedfs-sts", + "signingKey": "%s" + }, + "accounts": [ + { + "id": "%s", + "displayName": "tables-integration" + } + ], + "identities": [ + { + "name": "admin", + "credentials": [ + { + "accessKey": "%s", + "secretKey": "%s" + } + ], + "account": { + "id": "%s", + "displayName": "tables-integration" + }, + "policyNames": ["S3TablesBucketPolicy"] + } + ], + "policy": { + "defaultEffect": "Deny", + "storeType": "memory" + }, + "policies": [ + { + "name": "S3TablesBucketPolicy", + "document": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": ["s3tables:CreateTableBucket"], + "Resource": [ + "arn:aws:s3tables:*:*:bucket/%s", + "arn:aws:s3:::%s" + ] + } + ] + } + } + ] +}`, testIAMSigningKey, testAccountID, testAccessKey, testSecretKey, testAccountID, allowedBucket, allowedBucket) + require.NoError(t, os.WriteFile(iamConfigPath, []byte(iamConfig), 0644)) + + cluster, err := startMiniClusterWithExtraArgs(t, []string{ + "-s3.config=" + iamConfigPath, + "-s3.iam.config=" + iamConfigPath, + }) + require.NoError(t, err, "failed to start cluster with IAM config") + defer cluster.Stop() + + client := NewS3TablesClient(cluster.s3Endpoint, testRegion, testAccessKey, testSecretKey) + + _, err = client.CreateTableBucket(deniedBucket, nil) + require.Error(t, err, "denied bucket creation should fail") + assert.Contains(t, err.Error(), "AccessDenied") + + allowedResp, err := client.CreateTableBucket(allowedBucket, nil) + require.NoError(t, err, "allowed bucket creation should succeed") + defer func() { + _ = client.DeleteTableBucket(allowedResp.ARN) + }() +} + func testTableBucketLifecycle(t *testing.T, client *S3TablesClient) { bucketName := "test-bucket-" + randomString(8) @@ -508,11 +618,12 @@ func findAvailablePorts(n int) ([]int, error) { return ports, nil } -// startMiniCluster starts a weed mini instance directly without exec -func startMiniCluster(t *testing.T) (*TestCluster, error) { - // Find available ports - // We need 8 unique ports: Master(2), Volume(2), Filer(2), S3(2) - ports, err := findAvailablePorts(8) +// startMiniClusterInDir starts a weed mini instance using testDir as the data +// directory. It does not require a *testing.T so it can be called from TestMain. +// extraArgs are appended to the default mini command flags. +func startMiniClusterInDir(testDir string, extraArgs []string) (*TestCluster, error) { + // We need 10 unique ports: Master(2), Volume(2), Filer(2), S3(2), Admin(2) + ports, err := findAvailablePorts(10) if err != nil { return nil, fmt.Errorf("failed to find available ports: %v", err) } @@ -525,8 +636,8 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { filerGrpcPort := ports[5] s3Port := ports[6] s3GrpcPort := ports[7] - // Create temporary directory for test data - testDir := t.TempDir() + adminPort := ports[8] + adminGrpcPort := ports[9] // Ensure no configuration file from previous runs configFile := filepath.Join(testDir, "mini.options") @@ -537,7 +648,6 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { s3Endpoint := fmt.Sprintf("http://127.0.0.1:%d", s3Port) cluster := &TestCluster{ - t: t, dataDir: testDir, ctx: ctx, cancel: cancel, @@ -550,18 +660,17 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { // Create empty security.toml to disable JWT authentication in tests securityToml := filepath.Join(testDir, "security.toml") - err = os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644) - if err != nil { + if err = os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644); err != nil { cancel() return nil, fmt.Errorf("failed to create security.toml: %v", err) } - // Set environment variables for admin credentials safely for this test + // Ensure AWS credentials are set (don't use t.Setenv here — we are in TestMain). if os.Getenv("AWS_ACCESS_KEY_ID") == "" { - t.Setenv("AWS_ACCESS_KEY_ID", "admin") + os.Setenv("AWS_ACCESS_KEY_ID", "admin") } if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" { - t.Setenv("AWS_SECRET_ACCESS_KEY", "admin") + os.Setenv("AWS_SECRET_ACCESS_KEY", "admin") } // Start weed mini in a goroutine by calling the command directly @@ -569,11 +678,7 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { go func() { defer cluster.wg.Done() - // Protect global state mutation with a mutex - miniClusterMutex.Lock() - defer miniClusterMutex.Unlock() - - // Save current directory and args + // Save current directory and args, restore on exit. oldDir, _ := os.Getwd() oldArgs := os.Args defer func() { @@ -581,21 +686,24 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { os.Args = oldArgs }() - // Change to test directory so mini picks up security.toml + // Change to test directory so mini picks up security.toml. os.Chdir(testDir) - // Configure args for mini command - os.Args = []string{ - "weed", + baseArgs := []string{ "-dir=" + testDir, + "-master.dir=" + testDir, "-master.port=" + strconv.Itoa(masterPort), "-master.port.grpc=" + strconv.Itoa(masterGrpcPort), "-volume.port=" + strconv.Itoa(volumePort), "-volume.port.grpc=" + strconv.Itoa(volumeGrpcPort), + "-volume.port.public=" + strconv.Itoa(volumePort), + "-volume.publicUrl=127.0.0.1:" + strconv.Itoa(volumePort), "-filer.port=" + strconv.Itoa(filerPort), "-filer.port.grpc=" + strconv.Itoa(filerGrpcPort), "-s3.port=" + strconv.Itoa(s3Port), "-s3.port.grpc=" + strconv.Itoa(s3GrpcPort), + "-admin.port=" + strconv.Itoa(adminPort), + "-admin.port.grpc=" + strconv.Itoa(adminGrpcPort), "-webdav.port=0", // Disable WebDAV "-admin.ui=false", // Disable admin UI "-master.volumeSizeLimitMB=32", // Small volumes for testing @@ -603,6 +711,10 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { "-master.peers=none", // Faster startup "-s3.iam.readOnly=false", // Enable IAM write operations for tests } + if len(extraArgs) > 0 { + baseArgs = append(baseArgs, extraArgs...) + } + os.Args = append([]string{"weed"}, baseArgs...) // Suppress most logging during tests glog.MaxSize = 1024 * 1024 @@ -621,14 +733,34 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { }() // Wait for S3 service to be ready - err = waitForS3Ready(cluster.s3Endpoint, 30*time.Second) - if err != nil { + if err = waitForS3Ready(cluster.s3Endpoint, 30*time.Second); err != nil { cancel() return nil, fmt.Errorf("S3 service failed to start: %v", err) } cluster.isRunning = true + return cluster, nil +} + +// startMiniClusterWithExtraArgs starts a weed mini instance for a single test. +// It uses t.TempDir() for data isolation and t.Setenv for credential scoping. +func startMiniClusterWithExtraArgs(t *testing.T, extraArgs []string) (*TestCluster, error) { + t.Helper() + testDir := t.TempDir() + // Scope credentials to the test so they are restored after test completion. + if os.Getenv("AWS_ACCESS_KEY_ID") == "" { + t.Setenv("AWS_ACCESS_KEY_ID", "admin") + } + if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" { + t.Setenv("AWS_SECRET_ACCESS_KEY", "admin") + } + + cluster, err := startMiniClusterInDir(testDir, extraArgs) + if err != nil { + return nil, err + } + cluster.t = t t.Logf("Test cluster started successfully at %s", cluster.s3Endpoint) return cluster, nil } @@ -654,22 +786,15 @@ func (c *TestCluster) Stop() { case <-done: // Goroutine finished case <-timer.C: - // Timeout - goroutine doesn't respond to context cancel - // This may indicate the mini cluster didn't shut down cleanly - c.t.Log("Warning: Test cluster shutdown timed out after 2 seconds") - } - - // Reset the global cmdMini flags to prevent state leakage to other tests - for _, cmd := range command.Commands { - if cmd.Name() == "mini" { - // Reset flags to defaults - cmd.Flag.VisitAll(func(f *flag.Flag) { - // Reset to default value - f.Value.Set(f.DefValue) - }) - break + // Timeout - goroutine doesn't respond to context cancel. + // This may indicate the mini cluster didn't shut down cleanly. + if c.t != nil { + c.t.Log("Warning: Test cluster shutdown timed out after 2 seconds") + } else { + fmt.Println("Warning: Test cluster shutdown timed out after 2 seconds") } } + } // waitForS3Ready waits for the S3 service to be ready diff --git a/test/s3tables/table-buckets/setup.go b/test/s3tables/table-buckets/setup.go index 2d0c9a3c0..18627e598 100644 --- a/test/s3tables/table-buckets/setup.go +++ b/test/s3tables/table-buckets/setup.go @@ -8,6 +8,11 @@ import ( "time" ) +// sharedCluster is the single default TestCluster shared across all tests +// that do not require a specialised cluster configuration. +// It is initialised by TestMain and must not be modified by individual tests. +var sharedCluster *TestCluster + // TestCluster manages the weed mini instance for integration testing type TestCluster struct { t *testing.T @@ -46,8 +51,9 @@ func NewS3TablesClient(endpoint, region, accessKey, secretKey string) *S3TablesC // Test configuration constants const ( - testRegion = "us-west-2" - testAccessKey = "admin" - testSecretKey = "admin" - testAccountID = "111122223333" + testRegion = "us-west-2" + testAccessKey = "admin" + testSecretKey = "admin" + testAccountID = "111122223333" + testIAMSigningKey = "dGVzdC1zaWduaW5nLWtleS1mb3Itc3RzLWludGVncmF0aW9uLXRlc3Rz" ) diff --git a/weed/s3api/identity_reflection_test.go b/weed/s3api/identity_reflection_test.go new file mode 100644 index 000000000..b40cc31a7 --- /dev/null +++ b/weed/s3api/identity_reflection_test.go @@ -0,0 +1,39 @@ +package s3api + +import ( + "reflect" + "testing" +) + +// TestIdentityFieldsForS3TablesReflection ensures the identity struct keeps the +// fields relied on by s3tables.getIdentityPrincipalArn, getIdentityPolicyNames, +// and getIdentityClaims via reflection. +func TestIdentityFieldsForS3TablesReflection(t *testing.T) { + typ := reflect.TypeOf(Identity{}) + checkField(t, typ, "PrincipalArn", reflect.String) + field, ok := typ.FieldByName("PolicyNames") + if !ok { + t.Fatalf("Identity.PolicyNames missing") + } + if field.Type.Kind() != reflect.Slice { + t.Fatalf("Identity.PolicyNames must be a slice, got %s", field.Type.Kind()) + } + field, ok = typ.FieldByName("Claims") + if !ok { + t.Fatalf("Identity.Claims missing") + } + if field.Type.Kind() != reflect.Map || field.Type.Key().Kind() != reflect.String { + t.Fatalf("Identity.Claims must be map[string]..., got %s/%s", field.Type.Kind(), field.Type.Key().Kind()) + } +} + +func checkField(t *testing.T, typ reflect.Type, name string, kind reflect.Kind) { + t.Helper() + field, ok := typ.FieldByName(name) + if !ok { + t.Fatalf("Identity.%s missing", name) + } + if field.Type.Kind() != kind { + t.Fatalf("Identity.%s must be %s, got %s", name, kind, field.Type.Kind()) + } +} diff --git a/weed/s3api/s3api_tables.go b/weed/s3api/s3api_tables.go index ac4a37604..b27943334 100644 --- a/weed/s3api/s3api_tables.go +++ b/weed/s3api/s3api_tables.go @@ -48,6 +48,11 @@ func (st *S3TablesApiServer) SetDefaultAllow(allow bool) { st.handler.SetDefaultAllow(allow) } +// SetIAMAuthorizer injects the IAM authorizer for S3 Tables IAM checks. +func (st *S3TablesApiServer) SetIAMAuthorizer(authorizer s3tables.IAMAuthorizer) { + st.handler.SetIAMAuthorizer(authorizer) +} + // S3TablesHandler handles S3 Tables API requests func (st *S3TablesApiServer) S3TablesHandler(w http.ResponseWriter, r *http.Request) { st.handler.HandleRequest(w, r, st) @@ -64,6 +69,9 @@ func (s3a *S3ApiServer) registerS3TablesRoutes(router *mux.Router) { s3TablesApi := NewS3TablesApiServer(s3a) if s3a.iam != nil && s3a.iam.iamIntegration != nil { s3TablesApi.SetDefaultAllow(s3a.iam.iamIntegration.DefaultAllow()) + if s3Integration, ok := s3a.iam.iamIntegration.(*S3IAMIntegration); ok && s3Integration.iamManager != nil { + s3TablesApi.SetIAMAuthorizer(s3Integration.iamManager) + } } else { // If IAM is not configured, allow all access by default s3TablesApi.SetDefaultAllow(true) diff --git a/weed/s3api/s3tables/handler.go b/weed/s3api/s3tables/handler.go index 962d3bfc9..1ead82309 100644 --- a/weed/s3api/s3tables/handler.go +++ b/weed/s3api/s3tables/handler.go @@ -44,9 +44,10 @@ const ( // S3TablesHandler handles S3 Tables API requests type S3TablesHandler struct { - region string - accountID string - defaultAllow bool // Whether to allow access by default (for zero-config IAM) + region string + accountID string + defaultAllow bool // Whether to allow access by default (for zero-config IAM) + iamAuthorizer IAMAuthorizer } // NewS3TablesHandler creates a new S3 Tables handler @@ -259,20 +260,10 @@ func normalizePrincipalID(id string) string { // getIdentityActions extracts the action list from the identity object in the request context. // Uses reflection to avoid import cycles with s3api package. func getIdentityActions(r *http.Request) []string { - identityRaw := s3_constants.GetIdentityFromContext(r) - if identityRaw == nil { - return nil - } - - // Use reflection to access the Actions field to avoid import cycle - val := reflect.ValueOf(identityRaw) - if val.Kind() == reflect.Ptr { - val = val.Elem() - } - if val.Kind() != reflect.Struct { + val, ok := getIdentityStructValue(r) + if !ok { return nil } - actionsField := val.FieldByName("Actions") if !actionsField.IsValid() || actionsField.Kind() != reflect.Slice { return nil diff --git a/weed/s3api/s3tables/handler_bucket_create.go b/weed/s3api/s3tables/handler_bucket_create.go index c093931dd..9e7f4e283 100644 --- a/weed/s3api/s3tables/handler_bucket_create.go +++ b/weed/s3api/s3tables/handler_bucket_create.go @@ -14,15 +14,6 @@ import ( // handleCreateTableBucket creates a new table bucket func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error { - // Check permission - principal := h.getAccountID(r) - if !CheckPermissionWithContext("CreateTableBucket", principal, principal, "", "", &PolicyContext{ - DefaultAllow: h.defaultAllow, - }) { - h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets") - return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets") - } - var req CreateTableBucketRequest if err := h.readRequestBody(r, &req); err != nil { h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error()) @@ -35,6 +26,40 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http return err } + principal := h.getAccountID(r) + identityActions := getIdentityActions(r) + identityPolicyNames := getIdentityPolicyNames(r) + useIAM := h.shouldUseIAM(r, identityActions, identityPolicyNames) + useLegacy := !useIAM + if useIAM { + allowed, err := h.authorizeIAMAction(r, identityPolicyNames, "CreateTableBucket", h.generateTableBucketARN(principal, req.Name), fmt.Sprintf("arn:aws:s3:::%s", req.Name)) + if err != nil { + h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets") + return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets") + } + if !allowed { + if h.defaultAllow && len(identityActions) == 0 && len(identityPolicyNames) == 0 { + useLegacy = true + } else { + h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets") + return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets") + } + } + } + if useLegacy { + owner := h.accountID + if owner == "" { + owner = DefaultAccountID + } + if !CheckPermissionWithContext("CreateTableBucket", principal, owner, "", "", &PolicyContext{ + IdentityActions: identityActions, + DefaultAllow: h.defaultAllow, + }) { + h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets") + return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets") + } + } + bucketPath := GetTableBucketPath(req.Name) // Check if bucket already exists and ensure no conflict with object store buckets @@ -89,7 +114,7 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http metadata := &tableBucketMetadata{ Name: req.Name, CreatedAt: now, - OwnerAccountID: h.getAccountID(r), + OwnerAccountID: principal, } metadataBytes, err := json.Marshal(metadata) diff --git a/weed/s3api/s3tables/iam.go b/weed/s3api/s3tables/iam.go new file mode 100644 index 000000000..993eeaeeb --- /dev/null +++ b/weed/s3api/s3tables/iam.go @@ -0,0 +1,266 @@ +package s3tables + +import ( + "context" + "fmt" + "net/http" + "reflect" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/iam/integration" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" +) + +// IAMAuthorizer allows s3tables handlers to evaluate IAM policies without importing s3api. +type IAMAuthorizer interface { + IsActionAllowed(ctx context.Context, request *integration.ActionRequest) (bool, error) +} + +// SetIAMAuthorizer injects the IAM authorizer for policy-based access checks. +func (h *S3TablesHandler) SetIAMAuthorizer(authorizer IAMAuthorizer) { + h.iamAuthorizer = authorizer +} + +func (h *S3TablesHandler) shouldUseIAM(r *http.Request, identityActions, identityPolicyNames []string) bool { + if h.iamAuthorizer == nil || r == nil { + return false + } + if s3_constants.GetIdentityFromContext(r) == nil { + return false + } + // When default-allow is enabled, keep anonymous requests on the legacy path + // to preserve zero-config behavior (IAM policies are not available for anonymous). + if h.defaultAllow && isAnonymousIdentity(r) { + return false + } + // An empty inline `identityActions` slice doesn't mean the identity has no + // permissions—it just means authorization lives in IAM policies or session + // tokens instead of static action lists. We therefore prefer the IAM path + // whenever inline actions are absent and fall back to default policy names + // or session tokens. + if hasSessionToken(r) { + return true + } + if len(identityActions) == 0 { + return true + } + return len(identityPolicyNames) > 0 +} + +func isAnonymousIdentity(r *http.Request) bool { + val, ok := getIdentityStructValue(r) + if !ok { + return false + } + if nameField := val.FieldByName("Name"); nameField.IsValid() && nameField.Kind() == reflect.String { + if nameField.String() == s3_constants.AccountAnonymousId { + return true + } + } + accountField := val.FieldByName("Account") + if accountField.IsValid() && !accountField.IsNil() { + if accountField.Kind() == reflect.Ptr { + accountField = accountField.Elem() + } + if accountField.Kind() == reflect.Struct { + if idField := accountField.FieldByName("Id"); idField.IsValid() && idField.Kind() == reflect.String { + if idField.String() == s3_constants.AccountAnonymousId { + return true + } + } + } + } + return false +} + +func hasSessionToken(r *http.Request) bool { + return extractSessionToken(r) != "" +} + +func extractSessionToken(r *http.Request) string { + if token := r.Header.Get("X-SeaweedFS-Session-Token"); token != "" { + return token + } + if token := r.Header.Get("X-Amz-Security-Token"); token != "" { + return token + } + return r.URL.Query().Get("X-Amz-Security-Token") +} + +func (h *S3TablesHandler) authorizeIAMAction(r *http.Request, identityPolicyNames []string, action string, resources ...string) (bool, error) { + if h.iamAuthorizer == nil { + err := fmt.Errorf("nil iamAuthorizer in authorizeIAMAction") + glog.V(2).Infof("S3Tables: %v", err) + return false, err + } + principal := r.Header.Get("X-SeaweedFS-Principal") + if principal == "" { + principal = getIdentityPrincipalArn(r) + } + if principal == "" { + return false, fmt.Errorf("missing principal for IAM authorization") + } + + if !strings.Contains(action, ":") { + action = "s3tables:" + action + } + + sessionToken := extractSessionToken(r) + + requestContext := buildIAMRequestContext(r, getIdentityClaims(r)) + policyNames := identityPolicyNames + if len(policyNames) == 0 { + policyNames = getIdentityPolicyNames(r) + } + + if len(resources) == 0 { + return false, fmt.Errorf("no resources provided to authorizeIAMAction") + } + checkedResource := false + for _, resource := range resources { + if resource == "" { + continue + } + checkedResource = true + allowed, err := h.iamAuthorizer.IsActionAllowed(r.Context(), &integration.ActionRequest{ + Principal: principal, + Action: action, + Resource: resource, + SessionToken: sessionToken, + RequestContext: requestContext, + PolicyNames: policyNames, + }) + if err != nil { + glog.V(2).Infof("S3Tables: IAM authorization error action=%s resource=%s principal=%s: %v", action, resource, principal, err) + return false, err + } + if !allowed { + err := fmt.Errorf("access denied by IAM for resource %s", resource) + return false, err + } + } + if !checkedResource { + return false, fmt.Errorf("no non-empty resources provided to authorizeIAMAction") + } + return true, nil +} + +func getIdentityPrincipalArn(r *http.Request) string { + val, ok := getIdentityStructValue(r) + if !ok { + return "" + } + field := val.FieldByName("PrincipalArn") + if field.IsValid() && field.Kind() == reflect.String { + return field.String() + } + return "" +} + +func getIdentityPolicyNames(r *http.Request) []string { + val, ok := getIdentityStructValue(r) + if !ok { + return nil + } + field := val.FieldByName("PolicyNames") + if !field.IsValid() || field.Kind() != reflect.Slice { + return nil + } + policies := make([]string, 0, field.Len()) + for i := 0; i < field.Len(); i++ { + item := field.Index(i) + if item.Kind() == reflect.String { + policies = append(policies, item.String()) + } else if item.CanInterface() { + policies = append(policies, fmt.Sprint(item.Interface())) + } + } + if len(policies) == 0 { + return nil + } + return policies +} + +func getIdentityClaims(r *http.Request) map[string]interface{} { + val, ok := getIdentityStructValue(r) + if !ok { + return nil + } + field := val.FieldByName("Claims") + if !field.IsValid() || field.Kind() != reflect.Map || field.IsNil() { + return nil + } + if field.Type().Key().Kind() != reflect.String { + return nil + } + claims := make(map[string]interface{}, field.Len()) + for _, key := range field.MapKeys() { + if key.Kind() != reflect.String { + continue + } + val := field.MapIndex(key) + if !val.IsValid() { + continue + } + claims[key.String()] = val.Interface() + } + if len(claims) == 0 { + return nil + } + return claims +} + +func buildIAMRequestContext(r *http.Request, claims map[string]interface{}) map[string]interface{} { + ctx := make(map[string]interface{}) + if ua := r.Header.Get("User-Agent"); ua != "" { + ctx["userAgent"] = ua + } + if referer := r.Header.Get("Referer"); referer != "" { + ctx["referer"] = referer + } + for k, v := range claims { + if strings.HasPrefix(k, "jwt:") { + if _, exists := ctx[k]; !exists { + ctx[k] = v + } + } + } + for k, v := range claims { + if strings.HasPrefix(k, "jwt:") { + continue + } + if _, exists := ctx[k]; !exists { + ctx[k] = v + } + jwtKey := "jwt:" + k + if _, exists := ctx[jwtKey]; !exists { + ctx[jwtKey] = v + } + } + if len(ctx) == 0 { + return nil + } + return ctx +} + +// getIdentityStructValue fetches the identity struct held in the request context. +// The identity is expected to be a pointer to a struct with the fields used by +// the reflection helpers (PrincipalArn string, PolicyNames []string, +// Claims map[string]interface{}). +// This helper centralizes the nil-check and ptr-deref logic so callers focus on +// reading the specific fields they need. +func getIdentityStructValue(r *http.Request) (reflect.Value, bool) { + identityRaw := s3_constants.GetIdentityFromContext(r) + if identityRaw == nil { + return reflect.Value{}, false + } + val := reflect.ValueOf(identityRaw) + if val.Kind() == reflect.Ptr { + val = val.Elem() + } + if val.Kind() != reflect.Struct { + return reflect.Value{}, false + } + return val, true +}