Browse Source

Enforce IAM for S3 Tables bucket creation (#8388)

* Enforce IAM for s3tables bucket creation

* Prefer IAM path when policies exist

* Ensure IAM enforcement honors default allow

* address comments

* Reused the precomputed principal when setting tableBucketMetadata.OwnerAccountID, avoiding the redundant getAccountID call.

* get identity

* fix

* dedup

* fix

* comments

* fix tests

* update iam config

* go fmt

* fix ports

* fix flags

* mini clean shutdown

* Revert "update iam config"

This reverts commit ca48fdbb0a.

Revert "mini clean shutdown"

This reverts commit 9e17f6baff.

Revert "fix flags"

This reverts commit e9e7b29d2f.

Revert "go fmt"

This reverts commit bd3241960b.

* test/s3tables: share single weed mini per test package via TestMain

Previously each top-level test function in the catalog and s3tables
package started and stopped its own weed mini instance. This caused
failures when a prior instance wasn't cleanly stopped before the next
one started (port conflicts, leaked global state).

Changes:
- catalog/iceberg_catalog_test.go: introduce TestMain that starts one
  shared TestEnvironment (external weed binary) before all tests and
  tears it down after. All individual test functions now use sharedEnv.
  Added randomSuffix() for unique resource names across tests.
- catalog/pyiceberg_test.go: updated to use sharedEnv instead of
  per-test environments.
- catalog/pyiceberg_test_helpers.go -> pyiceberg_test_helpers_test.go:
  renamed to a _test.go file so it can access TestEnvironment which is
  defined in a test file.
- table-buckets/setup.go: add package-level sharedCluster variable.
- table-buckets/s3tables_integration_test.go: introduce TestMain that
  starts one shared TestCluster before all tests. TestS3TablesIntegration
  now uses sharedCluster. Extract startMiniClusterInDir (no *testing.T)
  for TestMain use. TestS3TablesCreateBucketIAMPolicy keeps its own
  cluster (different IAM config). Remove miniClusterMutex (no longer
  needed). Fix Stop() to not panic when t is nil."

* delete

* parse

* default allow should work with anonymous

* fix port

* iceberg route

The failures are from Iceberg REST using the default bucket warehouse when no prefix is provided. Your tests create random buckets, so /v1/namespaces was looking in warehouse and failing. I updated the tests to use the prefixed Iceberg routes (/v1/{bucket}/...) via a small helper.

* test(s3tables): fix port conflicts and IAM ARN matching in integration tests

- Pass -master.dir explicitly to prevent filer store directory collision
  between shared cluster and per-test clusters running in the same process
- Pass -volume.port.public and -volume.publicUrl to prevent the global
  publicPort flag (mutated from 0 → concrete port by first cluster) from
  being reused by a second cluster, causing 'address already in use'
- Remove the flag-reset loop in Stop() that reset global flag values while
  other goroutines were reading them (race → panic)
- Fix IAM policy Resource ARN in TestS3TablesCreateBucketIAMPolicy to use
  wildcards (arn:aws:s3tables:*:*:bucket/<name>) because the handler
  generates ARNs with its own DefaultRegion (us-east-1) and principal name
  ('admin'), not the test constants testRegion/testAccountID
pull/7183/merge
Chris Lu 15 hours ago
committed by GitHub
parent
commit
36c469e34e
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 210
      test/s3tables/catalog/iceberg_catalog_test.go
  2. 15
      test/s3tables/catalog/pyiceberg_test.go
  3. 0
      test/s3tables/catalog/pyiceberg_test_helpers_test.go
  4. 225
      test/s3tables/table-buckets/s3tables_integration_test.go
  5. 14
      test/s3tables/table-buckets/setup.go
  6. 39
      weed/s3api/identity_reflection_test.go
  7. 8
      weed/s3api/s3api_tables.go
  8. 21
      weed/s3api/s3tables/handler.go
  9. 45
      weed/s3api/s3tables/handler_bucket_create.go
  10. 266
      weed/s3api/s3tables/iam.go

210
test/s3tables/catalog/iceberg_catalog_test.go

@ -6,6 +6,7 @@ import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net"
@ -18,6 +19,37 @@ import (
"time"
)
// sharedEnv is the single TestEnvironment shared across all tests in this package.
var sharedEnv *TestEnvironment
// TestMain starts one weed mini instance for the whole package and tears it down
// after all tests have run.
func TestMain(m *testing.M) {
flag.Parse()
if os.Getenv("SHORT") != "" || testing.Short() {
// Let tests self-skip when run with -short.
os.Exit(m.Run())
}
env, err := newTestEnvironmentForMain()
if err != nil {
fmt.Fprintf(os.Stderr, "SKIP: setup failed: %v\n", err)
os.Exit(0) // Skip all tests rather than fail
}
sharedEnv = env
if startErr := sharedEnv.startSeaweedFSForMain(); startErr != nil {
fmt.Fprintf(os.Stderr, "SKIP: weed mini failed to start: %v\n", startErr)
sharedEnv.cleanupForMain()
os.Exit(0)
}
code := m.Run()
sharedEnv.cleanupForMain()
os.Exit(code)
}
// TestEnvironment contains the test environment configuration
type TestEnvironment struct {
seaweedDir string
@ -54,17 +86,15 @@ func getFreePort() (int, net.Listener, error) {
return addr.Port, listener, nil
}
// NewTestEnvironment creates a new test environment
func NewTestEnvironment(t *testing.T) *TestEnvironment {
t.Helper()
// newTestEnvironmentForMain creates a TestEnvironment without calling t.Fatalf so it
// can be used from TestMain (which has no *testing.T).
func newTestEnvironmentForMain() (*TestEnvironment, error) {
// Find the SeaweedFS root directory
wd, err := os.Getwd()
if err != nil {
t.Fatalf("Failed to get working directory: %v", err)
return nil, fmt.Errorf("get working directory: %w", err)
}
// Navigate up to find the SeaweedFS root (contains go.mod)
seaweedDir := wd
for i := 0; i < 5; i++ {
if _, err := os.Stat(filepath.Join(seaweedDir, "go.mod")); err == nil {
@ -76,82 +106,93 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment {
// Check for weed binary
weedBinary := filepath.Join(seaweedDir, "weed", "weed")
if _, err := os.Stat(weedBinary); os.IsNotExist(err) {
// Try system PATH
weedBinary = "weed"
if _, err := exec.LookPath(weedBinary); err != nil {
t.Skip("weed binary not found, skipping integration test")
return nil, fmt.Errorf("weed binary not found")
}
}
// Create temporary data directory
dataDir, err := os.MkdirTemp("", "seaweed-iceberg-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
return nil, fmt.Errorf("create temp dir: %w", err)
}
// Allocate free ephemeral ports for each service
var listeners []net.Listener
defer func() {
closeListeners := func() {
for _, l := range listeners {
l.Close()
}
}()
}
var l net.Listener
s3Port, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for S3: %v", err)
closeListeners()
return nil, fmt.Errorf("get free port for S3: %w", err)
}
listeners = append(listeners, l)
icebergPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Iceberg: %v", err)
closeListeners()
return nil, fmt.Errorf("get free port for Iceberg: %w", err)
}
listeners = append(listeners, l)
s3GrpcPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for S3 gRPC: %v", err)
closeListeners()
return nil, fmt.Errorf("get free port for S3 gRPC: %w", err)
}
listeners = append(listeners, l)
masterPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Master: %v", err)
closeListeners()
return nil, fmt.Errorf("get free port for Master: %w", err)
}
listeners = append(listeners, l)
masterGrpcPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Master gRPC: %v", err)
closeListeners()
return nil, fmt.Errorf("get free port for Master gRPC: %w", err)
}
listeners = append(listeners, l)
filerPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Filer: %v", err)
closeListeners()
return nil, fmt.Errorf("get free port for Filer: %w", err)
}
listeners = append(listeners, l)
filerGrpcPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Filer gRPC: %v", err)
closeListeners()
return nil, fmt.Errorf("get free port for Filer gRPC: %w", err)
}
listeners = append(listeners, l)
volumePort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Volume: %v", err)
closeListeners()
return nil, fmt.Errorf("get free port for Volume: %w", err)
}
listeners = append(listeners, l)
volumeGrpcPort, l, err := getFreePort()
if err != nil {
t.Fatalf("Failed to get free port for Volume gRPC: %v", err)
closeListeners()
return nil, fmt.Errorf("get free port for Volume gRPC: %w", err)
}
listeners = append(listeners, l)
// Release the port reservations so weed mini can bind to them
closeListeners()
return &TestEnvironment{
seaweedDir: seaweedDir,
weedBinary: weedBinary,
@ -166,13 +207,11 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment {
volumePort: volumePort,
volumeGrpcPort: volumeGrpcPort,
dockerAvailable: hasDocker(),
}
}, nil
}
// StartSeaweedFS starts a SeaweedFS mini cluster
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
t.Helper()
// startSeaweedFSForMain starts weed mini without a *testing.T (for use in TestMain).
func (env *TestEnvironment) startSeaweedFSForMain() error {
ctx, cancel := context.WithCancel(context.Background())
env.weedCancel = cancel
@ -182,7 +221,8 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
for _, dir := range []string{masterDir, filerDir, volumeDir} {
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatalf("Failed to create directory %s: %v", dir, err)
cancel()
return fmt.Errorf("create directory %s: %w", dir, err)
}
}
@ -203,13 +243,30 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
t.Fatalf("Failed to start SeaweedFS: %v", err)
cancel()
return fmt.Errorf("start SeaweedFS: %w", err)
}
env.weedProcess = cmd
// Wait for services to be ready
if !env.waitForService(fmt.Sprintf("http://127.0.0.1:%d/v1/config", env.icebergPort), 30*time.Second) {
t.Fatalf("Iceberg REST API did not become ready")
cancel()
cmd.Wait()
return fmt.Errorf("Iceberg REST API did not become ready")
}
return nil
}
// cleanupForMain stops SeaweedFS and cleans up resources (no *testing.T needed).
func (env *TestEnvironment) cleanupForMain() {
if env.weedCancel != nil {
env.weedCancel()
}
if env.weedProcess != nil {
time.Sleep(2 * time.Second)
env.weedProcess.Wait()
}
if env.dataDir != "" {
os.RemoveAll(env.dataDir)
}
}
@ -230,25 +287,6 @@ func (env *TestEnvironment) waitForService(url string, timeout time.Duration) bo
return false
}
// Cleanup stops SeaweedFS and cleans up resources
func (env *TestEnvironment) Cleanup(t *testing.T) {
t.Helper()
if env.weedCancel != nil {
env.weedCancel()
}
if env.weedProcess != nil {
// Give process time to shut down gracefully
time.Sleep(2 * time.Second)
env.weedProcess.Wait()
}
if env.dataDir != "" {
os.RemoveAll(env.dataDir)
}
}
// IcebergURL returns the Iceberg REST Catalog URL
func (env *TestEnvironment) IcebergURL() string {
return fmt.Sprintf("http://127.0.0.1:%d", env.icebergPort)
@ -260,10 +298,7 @@ func TestIcebergConfig(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env.StartSeaweedFS(t)
env := sharedEnv
// Test GET /v1/config
resp, err := http.Get(env.IcebergURL() + "/v1/config")
@ -294,16 +329,14 @@ func TestIcebergNamespaces(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env.StartSeaweedFS(t)
env := sharedEnv
// Create the default table bucket first via S3
createTableBucket(t, env, "warehouse")
bucketName := "warehouse-ns-" + randomSuffix()
createTableBucket(t, env, bucketName)
// Test GET /v1/namespaces (should return empty list initially)
resp, err := http.Get(env.IcebergURL() + "/v1/namespaces")
resp, err := http.Get(env.IcebergURL() + icebergPath(bucketName, "/v1/namespaces"))
if err != nil {
t.Fatalf("Failed to list namespaces: %v", err)
}
@ -321,16 +354,14 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env.StartSeaweedFS(t)
createTableBucket(t, env, "warehouse")
env := sharedEnv
bucketName := "warehouse-stage-" + randomSuffix()
createTableBucket(t, env, bucketName)
namespace := "stage_ns"
namespace := "stage_ns_" + randomSuffix()
tableName := "orders"
status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{
status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{
"namespace": []string{namespace},
})
if err != nil {
@ -340,7 +371,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) {
t.Fatalf("Create namespace status = %d, want 200 or 409", status)
}
status, badReqResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables", namespace), map[string]any{
status, badReqResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{
"stage-create": true,
})
if err != nil {
@ -358,7 +389,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) {
t.Fatalf("error.message = %v, want it to include %q", errorObj["message"], "table name is required")
}
status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables", namespace), map[string]any{
status, stageResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables", namespace)), map[string]any{
"name": tableName,
"stage-create": true,
})
@ -373,7 +404,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) {
t.Fatalf("stage metadata-location = %q, want suffix /metadata/v1.metadata.json", stageLocation)
}
status, _, err = doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil)
status, _, err = doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil)
if err != nil {
t.Fatalf("Load staged table request failed: %v", err)
}
@ -381,7 +412,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) {
t.Fatalf("Load staged table status = %d, want 404", status)
}
status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{
status, commitResp, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{
"requirements": []map[string]any{
{"type": "assert-create"},
},
@ -398,7 +429,7 @@ func TestStageCreateAndFinalizeFlow(t *testing.T) {
t.Fatalf("final metadata-location = %q, want suffix /metadata/v1.metadata.json", commitLocation)
}
status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), nil)
status, loadResp, err := doIcebergJSONRequest(env, http.MethodGet, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), nil)
if err != nil {
t.Fatalf("Load finalized table request failed: %v", err)
}
@ -417,16 +448,14 @@ func TestCommitMissingTableWithoutAssertCreate(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env.StartSeaweedFS(t)
createTableBucket(t, env, "warehouse")
env := sharedEnv
bucketName := "warehouse-missing-" + randomSuffix()
createTableBucket(t, env, bucketName)
namespace := "stage_missing_assert_ns"
namespace := "stage_missing_assert_ns_" + randomSuffix()
tableName := "missing_table"
status, _, err := doIcebergJSONRequest(env, http.MethodPost, "/v1/namespaces", map[string]any{
status, _, err := doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, "/v1/namespaces"), map[string]any{
"namespace": []string{namespace},
})
if err != nil {
@ -436,7 +465,7 @@ func TestCommitMissingTableWithoutAssertCreate(t *testing.T) {
t.Fatalf("Create namespace status = %d, want 200 or 409", status)
}
status, _, err = doIcebergJSONRequest(env, http.MethodPost, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName), map[string]any{
status, _, err = doIcebergJSONRequest(env, http.MethodPost, icebergPath(bucketName, fmt.Sprintf("/v1/namespaces/%s/tables/%s", namespace, tableName)), map[string]any{
"requirements": []any{},
"updates": []any{},
})
@ -491,6 +520,21 @@ func doIcebergJSONRequest(env *TestEnvironment, method, path string, payload any
return resp.StatusCode, decoded, nil
}
// icebergPath inserts the table bucket prefix into Iceberg REST API paths.
// For example, "/v1/namespaces" with prefix "my-bucket" becomes
// "/v1/my-bucket/namespaces".
func icebergPath(prefix, path string) string {
if prefix == "" {
return path
}
const base = "/v1/"
if !strings.HasPrefix(path, base) {
return path
}
withPrefix := base + prefix + "/" + strings.TrimPrefix(path, base)
return withPrefix
}
// createTableBucket creates a table bucket via the S3Tables REST API
func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
t.Helper()
@ -520,6 +564,11 @@ func createTableBucket(t *testing.T, env *TestEnvironment, bucketName string) {
t.Logf("Created table bucket %s", bucketName)
}
// randomSuffix returns a short random hex suffix for unique resource naming.
func randomSuffix() string {
return fmt.Sprintf("%x", time.Now().UnixNano()&0xffffffff)
}
// TestDuckDBIntegration tests Iceberg catalog operations using DuckDB
// This test requires Docker to be available
func TestDuckDBIntegration(t *testing.T) {
@ -527,15 +576,12 @@ func TestDuckDBIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env := sharedEnv
if !env.dockerAvailable {
t.Skip("Docker not available, skipping DuckDB integration test")
}
env.StartSeaweedFS(t)
// Create a temporary SQL file for DuckDB to execute
sqlFile := filepath.Join(env.dataDir, "test.sql")
sqlContent := fmt.Sprintf(`

15
test/s3tables/catalog/pyiceberg_test.go

@ -26,17 +26,14 @@ func TestPyIcebergRestCatalog(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env := sharedEnv
if !env.dockerAvailable {
t.Skip("Docker not available, skipping PyIceberg integration test")
}
env.StartSeaweedFS(t)
// Create the test bucket first
bucketName := "pyiceberg-compat-test"
bucketName := "pyiceberg-compat-test-" + randomSuffix()
createTableBucket(t, env, bucketName)
// Build the test working directory path
@ -84,8 +81,7 @@ func TestPyIcebergRestCatalogAuthenticated(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
env := NewTestEnvironment(t)
defer env.Cleanup(t)
env := sharedEnv
if !env.dockerAvailable {
t.Skip("Docker not available, skipping PyIceberg integration test")
@ -95,11 +91,8 @@ func TestPyIcebergRestCatalogAuthenticated(t *testing.T) {
testAccessKey := "admin"
testSecretKey := "admin"
// Start SeaweedFS (it will use default admin credentials from environment if set)
env.StartSeaweedFS(t)
// Create the test bucket first (using unauthenticated request, which works with DefaultAllow)
bucketName := "pyiceberg-auth-test"
bucketName := "pyiceberg-auth-test-" + randomSuffix()
createTableBucket(t, env, bucketName)
// Build the test working directory path

0
test/s3tables/catalog/pyiceberg_test_helpers.go → test/s3tables/catalog/pyiceberg_test_helpers_test.go

225
test/s3tables/table-buckets/s3tables_integration_test.go

@ -12,33 +12,57 @@ import (
"time"
cryptorand "crypto/rand"
"sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"flag"
"github.com/seaweedfs/seaweedfs/weed/command"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
flag "github.com/seaweedfs/seaweedfs/weed/util/fla9"
)
var (
miniClusterMutex sync.Mutex
)
// TestMain starts a single default weed mini cluster for the whole package and
// tears it down after all tests have completed. Tests that require a different
// cluster configuration (e.g. TestS3TablesCreateBucketIAMPolicy) start their
// own cluster independently.
func TestMain(m *testing.M) {
flag.Parse()
if testing.Short() {
// Tests self-skip with t.Skip when -short is set; no cluster needed.
os.Exit(m.Run())
}
// Create a temporary T-less context so we can use t.TempDir-equivalent.
testDir, err := os.MkdirTemp("", "seaweed-s3tables-shared-*")
if err != nil {
fmt.Fprintf(os.Stderr, "SKIP: failed to create shared temp dir: %v\n", err)
os.Exit(0)
}
cluster, err := startMiniClusterInDir(testDir, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "SKIP: failed to start shared weed mini cluster: %v\n", err)
os.RemoveAll(testDir)
os.Exit(0)
}
sharedCluster = cluster
code := m.Run()
sharedCluster.Stop()
os.RemoveAll(testDir)
os.Exit(code)
}
func TestS3TablesIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Create and start test cluster
cluster, err := startMiniCluster(t)
require.NoError(t, err)
defer cluster.Stop()
// Create S3 Tables client
client := NewS3TablesClient(cluster.s3Endpoint, testRegion, testAccessKey, testSecretKey)
// Re-use the shared cluster started by TestMain.
client := NewS3TablesClient(sharedCluster.s3Endpoint, testRegion, testAccessKey, testSecretKey)
// Run test suite
t.Run("TableBucketLifecycle", func(t *testing.T) {
@ -70,6 +94,92 @@ func TestS3TablesIntegration(t *testing.T) {
})
}
func TestS3TablesCreateBucketIAMPolicy(t *testing.T) {
if testing.Short() {
t.Skip("Skipping IAM integration test in short mode")
}
t.Setenv("AWS_ACCESS_KEY_ID", "env-admin")
t.Setenv("AWS_SECRET_ACCESS_KEY", "env-secret")
allowedBucket := "tables-allowed"
deniedBucket := "tables-denied"
iamConfigDir := t.TempDir()
iamConfigPath := filepath.Join(iamConfigDir, "iam_config.json")
iamConfig := fmt.Sprintf(`{
"sts": {
"tokenDuration": "1h",
"maxSessionLength": "12h",
"issuer": "seaweedfs-sts",
"signingKey": "%s"
},
"accounts": [
{
"id": "%s",
"displayName": "tables-integration"
}
],
"identities": [
{
"name": "admin",
"credentials": [
{
"accessKey": "%s",
"secretKey": "%s"
}
],
"account": {
"id": "%s",
"displayName": "tables-integration"
},
"policyNames": ["S3TablesBucketPolicy"]
}
],
"policy": {
"defaultEffect": "Deny",
"storeType": "memory"
},
"policies": [
{
"name": "S3TablesBucketPolicy",
"document": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3tables:CreateTableBucket"],
"Resource": [
"arn:aws:s3tables:*:*:bucket/%s",
"arn:aws:s3:::%s"
]
}
]
}
}
]
}`, testIAMSigningKey, testAccountID, testAccessKey, testSecretKey, testAccountID, allowedBucket, allowedBucket)
require.NoError(t, os.WriteFile(iamConfigPath, []byte(iamConfig), 0644))
cluster, err := startMiniClusterWithExtraArgs(t, []string{
"-s3.config=" + iamConfigPath,
"-s3.iam.config=" + iamConfigPath,
})
require.NoError(t, err, "failed to start cluster with IAM config")
defer cluster.Stop()
client := NewS3TablesClient(cluster.s3Endpoint, testRegion, testAccessKey, testSecretKey)
_, err = client.CreateTableBucket(deniedBucket, nil)
require.Error(t, err, "denied bucket creation should fail")
assert.Contains(t, err.Error(), "AccessDenied")
allowedResp, err := client.CreateTableBucket(allowedBucket, nil)
require.NoError(t, err, "allowed bucket creation should succeed")
defer func() {
_ = client.DeleteTableBucket(allowedResp.ARN)
}()
}
func testTableBucketLifecycle(t *testing.T, client *S3TablesClient) {
bucketName := "test-bucket-" + randomString(8)
@ -508,11 +618,12 @@ func findAvailablePorts(n int) ([]int, error) {
return ports, nil
}
// startMiniCluster starts a weed mini instance directly without exec
func startMiniCluster(t *testing.T) (*TestCluster, error) {
// Find available ports
// We need 8 unique ports: Master(2), Volume(2), Filer(2), S3(2)
ports, err := findAvailablePorts(8)
// startMiniClusterInDir starts a weed mini instance using testDir as the data
// directory. It does not require a *testing.T so it can be called from TestMain.
// extraArgs are appended to the default mini command flags.
func startMiniClusterInDir(testDir string, extraArgs []string) (*TestCluster, error) {
// We need 10 unique ports: Master(2), Volume(2), Filer(2), S3(2), Admin(2)
ports, err := findAvailablePorts(10)
if err != nil {
return nil, fmt.Errorf("failed to find available ports: %v", err)
}
@ -525,8 +636,8 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) {
filerGrpcPort := ports[5]
s3Port := ports[6]
s3GrpcPort := ports[7]
// Create temporary directory for test data
testDir := t.TempDir()
adminPort := ports[8]
adminGrpcPort := ports[9]
// Ensure no configuration file from previous runs
configFile := filepath.Join(testDir, "mini.options")
@ -537,7 +648,6 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) {
s3Endpoint := fmt.Sprintf("http://127.0.0.1:%d", s3Port)
cluster := &TestCluster{
t: t,
dataDir: testDir,
ctx: ctx,
cancel: cancel,
@ -550,18 +660,17 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) {
// Create empty security.toml to disable JWT authentication in tests
securityToml := filepath.Join(testDir, "security.toml")
err = os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644)
if err != nil {
if err = os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644); err != nil {
cancel()
return nil, fmt.Errorf("failed to create security.toml: %v", err)
}
// Set environment variables for admin credentials safely for this test
// Ensure AWS credentials are set (don't use t.Setenv here — we are in TestMain).
if os.Getenv("AWS_ACCESS_KEY_ID") == "" {
t.Setenv("AWS_ACCESS_KEY_ID", "admin")
os.Setenv("AWS_ACCESS_KEY_ID", "admin")
}
if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" {
t.Setenv("AWS_SECRET_ACCESS_KEY", "admin")
os.Setenv("AWS_SECRET_ACCESS_KEY", "admin")
}
// Start weed mini in a goroutine by calling the command directly
@ -569,11 +678,7 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) {
go func() {
defer cluster.wg.Done()
// Protect global state mutation with a mutex
miniClusterMutex.Lock()
defer miniClusterMutex.Unlock()
// Save current directory and args
// Save current directory and args, restore on exit.
oldDir, _ := os.Getwd()
oldArgs := os.Args
defer func() {
@ -581,21 +686,24 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) {
os.Args = oldArgs
}()
// Change to test directory so mini picks up security.toml
// Change to test directory so mini picks up security.toml.
os.Chdir(testDir)
// Configure args for mini command
os.Args = []string{
"weed",
baseArgs := []string{
"-dir=" + testDir,
"-master.dir=" + testDir,
"-master.port=" + strconv.Itoa(masterPort),
"-master.port.grpc=" + strconv.Itoa(masterGrpcPort),
"-volume.port=" + strconv.Itoa(volumePort),
"-volume.port.grpc=" + strconv.Itoa(volumeGrpcPort),
"-volume.port.public=" + strconv.Itoa(volumePort),
"-volume.publicUrl=127.0.0.1:" + strconv.Itoa(volumePort),
"-filer.port=" + strconv.Itoa(filerPort),
"-filer.port.grpc=" + strconv.Itoa(filerGrpcPort),
"-s3.port=" + strconv.Itoa(s3Port),
"-s3.port.grpc=" + strconv.Itoa(s3GrpcPort),
"-admin.port=" + strconv.Itoa(adminPort),
"-admin.port.grpc=" + strconv.Itoa(adminGrpcPort),
"-webdav.port=0", // Disable WebDAV
"-admin.ui=false", // Disable admin UI
"-master.volumeSizeLimitMB=32", // Small volumes for testing
@ -603,6 +711,10 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) {
"-master.peers=none", // Faster startup
"-s3.iam.readOnly=false", // Enable IAM write operations for tests
}
if len(extraArgs) > 0 {
baseArgs = append(baseArgs, extraArgs...)
}
os.Args = append([]string{"weed"}, baseArgs...)
// Suppress most logging during tests
glog.MaxSize = 1024 * 1024
@ -621,14 +733,34 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) {
}()
// Wait for S3 service to be ready
err = waitForS3Ready(cluster.s3Endpoint, 30*time.Second)
if err != nil {
if err = waitForS3Ready(cluster.s3Endpoint, 30*time.Second); err != nil {
cancel()
return nil, fmt.Errorf("S3 service failed to start: %v", err)
}
cluster.isRunning = true
return cluster, nil
}
// startMiniClusterWithExtraArgs starts a weed mini instance for a single test.
// It uses t.TempDir() for data isolation and t.Setenv for credential scoping.
func startMiniClusterWithExtraArgs(t *testing.T, extraArgs []string) (*TestCluster, error) {
t.Helper()
testDir := t.TempDir()
// Scope credentials to the test so they are restored after test completion.
if os.Getenv("AWS_ACCESS_KEY_ID") == "" {
t.Setenv("AWS_ACCESS_KEY_ID", "admin")
}
if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" {
t.Setenv("AWS_SECRET_ACCESS_KEY", "admin")
}
cluster, err := startMiniClusterInDir(testDir, extraArgs)
if err != nil {
return nil, err
}
cluster.t = t
t.Logf("Test cluster started successfully at %s", cluster.s3Endpoint)
return cluster, nil
}
@ -654,22 +786,15 @@ func (c *TestCluster) Stop() {
case <-done:
// Goroutine finished
case <-timer.C:
// Timeout - goroutine doesn't respond to context cancel
// This may indicate the mini cluster didn't shut down cleanly
c.t.Log("Warning: Test cluster shutdown timed out after 2 seconds")
}
// Reset the global cmdMini flags to prevent state leakage to other tests
for _, cmd := range command.Commands {
if cmd.Name() == "mini" {
// Reset flags to defaults
cmd.Flag.VisitAll(func(f *flag.Flag) {
// Reset to default value
f.Value.Set(f.DefValue)
})
break
// Timeout - goroutine doesn't respond to context cancel.
// This may indicate the mini cluster didn't shut down cleanly.
if c.t != nil {
c.t.Log("Warning: Test cluster shutdown timed out after 2 seconds")
} else {
fmt.Println("Warning: Test cluster shutdown timed out after 2 seconds")
}
}
}
// waitForS3Ready waits for the S3 service to be ready

14
test/s3tables/table-buckets/setup.go

@ -8,6 +8,11 @@ import (
"time"
)
// sharedCluster is the single default TestCluster shared across all tests
// that do not require a specialised cluster configuration.
// It is initialised by TestMain and must not be modified by individual tests.
var sharedCluster *TestCluster
// TestCluster manages the weed mini instance for integration testing
type TestCluster struct {
t *testing.T
@ -46,8 +51,9 @@ func NewS3TablesClient(endpoint, region, accessKey, secretKey string) *S3TablesC
// Test configuration constants
const (
testRegion = "us-west-2"
testAccessKey = "admin"
testSecretKey = "admin"
testAccountID = "111122223333"
testRegion = "us-west-2"
testAccessKey = "admin"
testSecretKey = "admin"
testAccountID = "111122223333"
testIAMSigningKey = "dGVzdC1zaWduaW5nLWtleS1mb3Itc3RzLWludGVncmF0aW9uLXRlc3Rz"
)

39
weed/s3api/identity_reflection_test.go

@ -0,0 +1,39 @@
package s3api
import (
"reflect"
"testing"
)
// TestIdentityFieldsForS3TablesReflection ensures the identity struct keeps the
// fields relied on by s3tables.getIdentityPrincipalArn, getIdentityPolicyNames,
// and getIdentityClaims via reflection.
func TestIdentityFieldsForS3TablesReflection(t *testing.T) {
typ := reflect.TypeOf(Identity{})
checkField(t, typ, "PrincipalArn", reflect.String)
field, ok := typ.FieldByName("PolicyNames")
if !ok {
t.Fatalf("Identity.PolicyNames missing")
}
if field.Type.Kind() != reflect.Slice {
t.Fatalf("Identity.PolicyNames must be a slice, got %s", field.Type.Kind())
}
field, ok = typ.FieldByName("Claims")
if !ok {
t.Fatalf("Identity.Claims missing")
}
if field.Type.Kind() != reflect.Map || field.Type.Key().Kind() != reflect.String {
t.Fatalf("Identity.Claims must be map[string]..., got %s/%s", field.Type.Kind(), field.Type.Key().Kind())
}
}
func checkField(t *testing.T, typ reflect.Type, name string, kind reflect.Kind) {
t.Helper()
field, ok := typ.FieldByName(name)
if !ok {
t.Fatalf("Identity.%s missing", name)
}
if field.Type.Kind() != kind {
t.Fatalf("Identity.%s must be %s, got %s", name, kind, field.Type.Kind())
}
}

8
weed/s3api/s3api_tables.go

@ -48,6 +48,11 @@ func (st *S3TablesApiServer) SetDefaultAllow(allow bool) {
st.handler.SetDefaultAllow(allow)
}
// SetIAMAuthorizer injects the IAM authorizer for S3 Tables IAM checks.
func (st *S3TablesApiServer) SetIAMAuthorizer(authorizer s3tables.IAMAuthorizer) {
st.handler.SetIAMAuthorizer(authorizer)
}
// S3TablesHandler handles S3 Tables API requests
func (st *S3TablesApiServer) S3TablesHandler(w http.ResponseWriter, r *http.Request) {
st.handler.HandleRequest(w, r, st)
@ -64,6 +69,9 @@ func (s3a *S3ApiServer) registerS3TablesRoutes(router *mux.Router) {
s3TablesApi := NewS3TablesApiServer(s3a)
if s3a.iam != nil && s3a.iam.iamIntegration != nil {
s3TablesApi.SetDefaultAllow(s3a.iam.iamIntegration.DefaultAllow())
if s3Integration, ok := s3a.iam.iamIntegration.(*S3IAMIntegration); ok && s3Integration.iamManager != nil {
s3TablesApi.SetIAMAuthorizer(s3Integration.iamManager)
}
} else {
// If IAM is not configured, allow all access by default
s3TablesApi.SetDefaultAllow(true)

21
weed/s3api/s3tables/handler.go

@ -44,9 +44,10 @@ const (
// S3TablesHandler handles S3 Tables API requests
type S3TablesHandler struct {
region string
accountID string
defaultAllow bool // Whether to allow access by default (for zero-config IAM)
region string
accountID string
defaultAllow bool // Whether to allow access by default (for zero-config IAM)
iamAuthorizer IAMAuthorizer
}
// NewS3TablesHandler creates a new S3 Tables handler
@ -259,20 +260,10 @@ func normalizePrincipalID(id string) string {
// getIdentityActions extracts the action list from the identity object in the request context.
// Uses reflection to avoid import cycles with s3api package.
func getIdentityActions(r *http.Request) []string {
identityRaw := s3_constants.GetIdentityFromContext(r)
if identityRaw == nil {
return nil
}
// Use reflection to access the Actions field to avoid import cycle
val := reflect.ValueOf(identityRaw)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
if val.Kind() != reflect.Struct {
val, ok := getIdentityStructValue(r)
if !ok {
return nil
}
actionsField := val.FieldByName("Actions")
if !actionsField.IsValid() || actionsField.Kind() != reflect.Slice {
return nil

45
weed/s3api/s3tables/handler_bucket_create.go

@ -14,15 +14,6 @@ import (
// handleCreateTableBucket creates a new table bucket
func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http.Request, filerClient FilerClient) error {
// Check permission
principal := h.getAccountID(r)
if !CheckPermissionWithContext("CreateTableBucket", principal, principal, "", "", &PolicyContext{
DefaultAllow: h.defaultAllow,
}) {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets")
return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets")
}
var req CreateTableBucketRequest
if err := h.readRequestBody(r, &req); err != nil {
h.writeError(w, http.StatusBadRequest, ErrCodeInvalidRequest, err.Error())
@ -35,6 +26,40 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http
return err
}
principal := h.getAccountID(r)
identityActions := getIdentityActions(r)
identityPolicyNames := getIdentityPolicyNames(r)
useIAM := h.shouldUseIAM(r, identityActions, identityPolicyNames)
useLegacy := !useIAM
if useIAM {
allowed, err := h.authorizeIAMAction(r, identityPolicyNames, "CreateTableBucket", h.generateTableBucketARN(principal, req.Name), fmt.Sprintf("arn:aws:s3:::%s", req.Name))
if err != nil {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets")
return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets")
}
if !allowed {
if h.defaultAllow && len(identityActions) == 0 && len(identityPolicyNames) == 0 {
useLegacy = true
} else {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets")
return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets")
}
}
}
if useLegacy {
owner := h.accountID
if owner == "" {
owner = DefaultAccountID
}
if !CheckPermissionWithContext("CreateTableBucket", principal, owner, "", "", &PolicyContext{
IdentityActions: identityActions,
DefaultAllow: h.defaultAllow,
}) {
h.writeError(w, http.StatusForbidden, ErrCodeAccessDenied, "not authorized to create table buckets")
return NewAuthError("CreateTableBucket", principal, "not authorized to create table buckets")
}
}
bucketPath := GetTableBucketPath(req.Name)
// Check if bucket already exists and ensure no conflict with object store buckets
@ -89,7 +114,7 @@ func (h *S3TablesHandler) handleCreateTableBucket(w http.ResponseWriter, r *http
metadata := &tableBucketMetadata{
Name: req.Name,
CreatedAt: now,
OwnerAccountID: h.getAccountID(r),
OwnerAccountID: principal,
}
metadataBytes, err := json.Marshal(metadata)

266
weed/s3api/s3tables/iam.go

@ -0,0 +1,266 @@
package s3tables
import (
"context"
"fmt"
"net/http"
"reflect"
"strings"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/iam/integration"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
// IAMAuthorizer allows s3tables handlers to evaluate IAM policies without importing s3api.
type IAMAuthorizer interface {
IsActionAllowed(ctx context.Context, request *integration.ActionRequest) (bool, error)
}
// SetIAMAuthorizer injects the IAM authorizer for policy-based access checks.
func (h *S3TablesHandler) SetIAMAuthorizer(authorizer IAMAuthorizer) {
h.iamAuthorizer = authorizer
}
func (h *S3TablesHandler) shouldUseIAM(r *http.Request, identityActions, identityPolicyNames []string) bool {
if h.iamAuthorizer == nil || r == nil {
return false
}
if s3_constants.GetIdentityFromContext(r) == nil {
return false
}
// When default-allow is enabled, keep anonymous requests on the legacy path
// to preserve zero-config behavior (IAM policies are not available for anonymous).
if h.defaultAllow && isAnonymousIdentity(r) {
return false
}
// An empty inline `identityActions` slice doesn't mean the identity has no
// permissions—it just means authorization lives in IAM policies or session
// tokens instead of static action lists. We therefore prefer the IAM path
// whenever inline actions are absent and fall back to default policy names
// or session tokens.
if hasSessionToken(r) {
return true
}
if len(identityActions) == 0 {
return true
}
return len(identityPolicyNames) > 0
}
func isAnonymousIdentity(r *http.Request) bool {
val, ok := getIdentityStructValue(r)
if !ok {
return false
}
if nameField := val.FieldByName("Name"); nameField.IsValid() && nameField.Kind() == reflect.String {
if nameField.String() == s3_constants.AccountAnonymousId {
return true
}
}
accountField := val.FieldByName("Account")
if accountField.IsValid() && !accountField.IsNil() {
if accountField.Kind() == reflect.Ptr {
accountField = accountField.Elem()
}
if accountField.Kind() == reflect.Struct {
if idField := accountField.FieldByName("Id"); idField.IsValid() && idField.Kind() == reflect.String {
if idField.String() == s3_constants.AccountAnonymousId {
return true
}
}
}
}
return false
}
func hasSessionToken(r *http.Request) bool {
return extractSessionToken(r) != ""
}
func extractSessionToken(r *http.Request) string {
if token := r.Header.Get("X-SeaweedFS-Session-Token"); token != "" {
return token
}
if token := r.Header.Get("X-Amz-Security-Token"); token != "" {
return token
}
return r.URL.Query().Get("X-Amz-Security-Token")
}
func (h *S3TablesHandler) authorizeIAMAction(r *http.Request, identityPolicyNames []string, action string, resources ...string) (bool, error) {
if h.iamAuthorizer == nil {
err := fmt.Errorf("nil iamAuthorizer in authorizeIAMAction")
glog.V(2).Infof("S3Tables: %v", err)
return false, err
}
principal := r.Header.Get("X-SeaweedFS-Principal")
if principal == "" {
principal = getIdentityPrincipalArn(r)
}
if principal == "" {
return false, fmt.Errorf("missing principal for IAM authorization")
}
if !strings.Contains(action, ":") {
action = "s3tables:" + action
}
sessionToken := extractSessionToken(r)
requestContext := buildIAMRequestContext(r, getIdentityClaims(r))
policyNames := identityPolicyNames
if len(policyNames) == 0 {
policyNames = getIdentityPolicyNames(r)
}
if len(resources) == 0 {
return false, fmt.Errorf("no resources provided to authorizeIAMAction")
}
checkedResource := false
for _, resource := range resources {
if resource == "" {
continue
}
checkedResource = true
allowed, err := h.iamAuthorizer.IsActionAllowed(r.Context(), &integration.ActionRequest{
Principal: principal,
Action: action,
Resource: resource,
SessionToken: sessionToken,
RequestContext: requestContext,
PolicyNames: policyNames,
})
if err != nil {
glog.V(2).Infof("S3Tables: IAM authorization error action=%s resource=%s principal=%s: %v", action, resource, principal, err)
return false, err
}
if !allowed {
err := fmt.Errorf("access denied by IAM for resource %s", resource)
return false, err
}
}
if !checkedResource {
return false, fmt.Errorf("no non-empty resources provided to authorizeIAMAction")
}
return true, nil
}
func getIdentityPrincipalArn(r *http.Request) string {
val, ok := getIdentityStructValue(r)
if !ok {
return ""
}
field := val.FieldByName("PrincipalArn")
if field.IsValid() && field.Kind() == reflect.String {
return field.String()
}
return ""
}
func getIdentityPolicyNames(r *http.Request) []string {
val, ok := getIdentityStructValue(r)
if !ok {
return nil
}
field := val.FieldByName("PolicyNames")
if !field.IsValid() || field.Kind() != reflect.Slice {
return nil
}
policies := make([]string, 0, field.Len())
for i := 0; i < field.Len(); i++ {
item := field.Index(i)
if item.Kind() == reflect.String {
policies = append(policies, item.String())
} else if item.CanInterface() {
policies = append(policies, fmt.Sprint(item.Interface()))
}
}
if len(policies) == 0 {
return nil
}
return policies
}
func getIdentityClaims(r *http.Request) map[string]interface{} {
val, ok := getIdentityStructValue(r)
if !ok {
return nil
}
field := val.FieldByName("Claims")
if !field.IsValid() || field.Kind() != reflect.Map || field.IsNil() {
return nil
}
if field.Type().Key().Kind() != reflect.String {
return nil
}
claims := make(map[string]interface{}, field.Len())
for _, key := range field.MapKeys() {
if key.Kind() != reflect.String {
continue
}
val := field.MapIndex(key)
if !val.IsValid() {
continue
}
claims[key.String()] = val.Interface()
}
if len(claims) == 0 {
return nil
}
return claims
}
func buildIAMRequestContext(r *http.Request, claims map[string]interface{}) map[string]interface{} {
ctx := make(map[string]interface{})
if ua := r.Header.Get("User-Agent"); ua != "" {
ctx["userAgent"] = ua
}
if referer := r.Header.Get("Referer"); referer != "" {
ctx["referer"] = referer
}
for k, v := range claims {
if strings.HasPrefix(k, "jwt:") {
if _, exists := ctx[k]; !exists {
ctx[k] = v
}
}
}
for k, v := range claims {
if strings.HasPrefix(k, "jwt:") {
continue
}
if _, exists := ctx[k]; !exists {
ctx[k] = v
}
jwtKey := "jwt:" + k
if _, exists := ctx[jwtKey]; !exists {
ctx[jwtKey] = v
}
}
if len(ctx) == 0 {
return nil
}
return ctx
}
// getIdentityStructValue fetches the identity struct held in the request context.
// The identity is expected to be a pointer to a struct with the fields used by
// the reflection helpers (PrincipalArn string, PolicyNames []string,
// Claims map[string]interface{}).
// This helper centralizes the nil-check and ptr-deref logic so callers focus on
// reading the specific fields they need.
func getIdentityStructValue(r *http.Request) (reflect.Value, bool) {
identityRaw := s3_constants.GetIdentityFromContext(r)
if identityRaw == nil {
return reflect.Value{}, false
}
val := reflect.ValueOf(identityRaw)
if val.Kind() == reflect.Ptr {
val = val.Elem()
}
if val.Kind() != reflect.Struct {
return reflect.Value{}, false
}
return val, true
}
Loading…
Cancel
Save