Browse Source

Merge branch 'master' into feature/jwks-caching

pull/8311/head
YGoetschel 3 weeks ago
committed by GitHub
parent
commit
e1bea6495d
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 30
      .github/workflows/go.yml
  2. 2
      test/kafka/kafka-client-loadtest/tools/pom.xml
  3. 213
      test/s3tables/sts_integration/sts_integration_test.go
  4. 18
      test/volume_server/grpc/scrub_query_test.go
  5. 5
      weed/admin/dash/admin_data.go
  6. 5
      weed/admin/dash/service_account_management.go
  7. 4
      weed/admin/view/app/service_accounts.templ
  8. 4
      weed/admin/view/app/service_accounts_templ.go
  9. 126
      weed/iam/integration/iam_integration_test.go
  10. 168
      weed/iam/integration/iam_manager.go
  11. 62
      weed/iam/policy/policy_engine.go
  12. 9
      weed/iam/sts/session_claims.go
  13. 3
      weed/iam/sts/session_claims_test.go
  14. 35
      weed/iam/sts/session_policy.go
  15. 228
      weed/iam/sts/session_policy_test.go
  16. 29
      weed/iam/sts/sts_service.go
  17. 2
      weed/iam/sts/token_utils.go
  18. 33
      weed/iam/utils/arn_utils.go
  19. 2
      weed/iamapi/iamapi_handlers.go
  20. 63
      weed/iamapi/iamapi_management_handlers.go
  21. 1
      weed/iamapi/iamapi_response.go
  22. 52
      weed/iamapi/iamapi_test.go
  23. 8
      weed/mount/weedfs_rename.go
  24. 93
      weed/mount/weedfs_rename_test.go
  25. 7
      weed/pb/server_address.go
  26. 4
      weed/pb/volume_server_pb/volume_server.pb.go
  27. 102
      weed/pb/volume_server_pb/volume_server_grpc.pb.go
  28. 17
      weed/s3api/auth_credentials.go
  29. 45
      weed/s3api/auth_credentials_subscribe.go
  30. 124
      weed/s3api/auth_credentials_subscribe_test.go
  31. 14
      weed/s3api/auth_credentials_test.go
  32. 8
      weed/s3api/auth_signature_v4.go
  33. 3
      weed/s3api/s3api_object_handlers_list.go
  34. 86
      weed/s3api/s3api_object_handlers_list_test.go
  35. 11
      weed/s3api/s3api_server.go
  36. 141
      weed/s3api/s3api_sts.go
  37. 153
      weed/s3api/s3api_sts_assume_role_test.go
  38. 6
      weed/server/master_grpc_server.go
  39. 16
      weed/server/volume_grpc_scrub.go
  40. 2
      weed/shell/command_s3_configure.go
  41. 3
      weed/shell/command_volume_scrub.go
  42. 19
      weed/storage/erasure_coding/ec_volume_scrub.go
  43. 2
      weed/storage/needle/needle_read.go
  44. BIN
      weed/storage/test_files/bitrot_volume.dat
  45. BIN
      weed/storage/test_files/bitrot_volume.idx
  46. BIN
      weed/storage/test_files/healthy_volume.dat
  47. BIN
      weed/storage/test_files/healthy_volume.idx
  48. 124
      weed/storage/volume_checking.go
  49. 80
      weed/storage/volume_checking_test.go
  50. 16
      weed/topology/topology.go

30
.github/workflows/go.yml

@ -15,24 +15,20 @@ permissions:
jobs:
build:
name: Build
vet:
name: Go Vet
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2
with:
go-version: ^1.13
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2
- name: Get dependencies
run: |
cd weed; go get -v -t -d ./...
- name: Go Vet (excluding protobuf lock copying)
run: |
cd weed
@ -42,8 +38,30 @@ jobs:
# Fail only if there are actual vet errors (not counting the filtered lock warnings)
if grep -q "vet:" vet-output.txt; then exit 1; fi
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2
with:
go-version: ^1.13
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2
- name: Build
run: cd weed; go build -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v .
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@a5f9b05d2d216f63e13859e0d847461041025775 # v2
with:
go-version: ^1.13
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v2
- name: Test
run: cd weed; go test -tags "elastic gocdk sqlite ydb tarantool tikv rclone" -v ./...

2
test/kafka/kafka-client-loadtest/tools/pom.xml

@ -41,7 +41,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.4</version>
<version>1.11.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>

213
test/s3tables/sts_integration/sts_integration_test.go

@ -33,6 +33,8 @@ type TestEnvironment struct {
secretKey string
}
const testSTSIntegrationSigningKey = "dGVzdC1zaWduaW5nLWtleS1mb3Itc3RzLWludGVncmF0aW9uLXRlc3Rz" // gitleaks:allow - test-signing-key-for-sts-integration-tests
func TestSTSIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
@ -110,17 +112,70 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment {
volumePort: volumePort,
volumeGrpcPort: volumeGrpcPort,
dockerAvailable: testutil.HasDocker(),
accessKey: "admin", // Matching default in testutil.WriteIAMConfig
secretKey: "admin",
accessKey: "admin",
secretKey: "adminadmin",
}
}
func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
t.Helper()
// Create IAM config file
iamConfigPath, err := testutil.WriteIAMConfig(env.dataDir, env.accessKey, env.secretKey)
if err != nil {
iamConfigPath := filepath.Join(env.dataDir, "iam.json")
// Note: signingKey must be base64 encoded for []byte JSON unmarshaling
iamConfig := fmt.Sprintf(`{
"identities": [
{
"name": "admin",
"credentials": [
{ "accessKey": "%s", "secretKey": "%s" }
],
"actions": ["Admin", "Read", "Write", "List", "Tagging"]
}
],
"sts": {
"tokenDuration": "1h",
"maxSessionLength": "12h",
"issuer": "seaweedfs-sts",
"signingKey": "%s"
},
"policy": {
"defaultEffect": "Deny",
"storeType": "memory"
},
"policies": [
{
"name": "S3FullAccessPolicy",
"document": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:*"],
"Resource": ["*"]
}
]
}
}
],
"roles": [
{
"roleName": "TestRole",
"roleArn": "arn:aws:iam::role/TestRole",
"attachedPolicies": ["S3FullAccessPolicy"],
"trustPolicy": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": ["sts:AssumeRole"]
}
]
}
}
]
}`, env.accessKey, env.secretKey, testSTSIntegrationSigningKey)
if err := os.WriteFile(iamConfigPath, []byte(iamConfig), 0644); err != nil {
t.Fatalf("Failed to create IAM config: %v", err)
}
@ -143,6 +198,8 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) {
"-s3.port", fmt.Sprintf("%d", env.s3Port),
"-s3.port.grpc", fmt.Sprintf("%d", env.s3GrpcPort),
"-s3.config", iamConfigPath,
"-s3.iam.config", iamConfigPath,
"-s3.iam.readOnly", "false",
"-ip", env.bindIP,
"-ip.bind", "0.0.0.0",
"-dir", env.dataDir,
@ -190,22 +247,62 @@ func runPythonSTSClient(t *testing.T, env *TestEnvironment) {
import boto3
import botocore.config
from botocore.exceptions import ClientError
import os
import json
import sys
import time
import urllib.error
import urllib.request
print("Starting STS test...")
print("Starting STS inline session policy test...")
endpoint_url = "http://host.docker.internal:%d"
primary_endpoint = "http://host.docker.internal:%d"
fallback_endpoint = "http://%s:%d"
access_key = "%s"
secret_key = "%s"
region = "us-east-1"
print(f"Connecting to {endpoint_url} with key {access_key}")
try:
def wait_for_endpoint(url, timeout=30):
deadline = time.time() + timeout
while time.time() < deadline:
try:
with urllib.request.urlopen(url, timeout=2):
return True
except urllib.error.HTTPError:
return True
except Exception:
time.sleep(1)
return False
def select_endpoint(urls):
for url in urls:
if wait_for_endpoint(url):
return url
raise Exception("No reachable S3 endpoint from container")
endpoint_url = select_endpoint([primary_endpoint, fallback_endpoint])
print(f"Using endpoint {endpoint_url}")
config = botocore.config.Config(
retries={'max_attempts': 0}
retries={'max_attempts': 0},
s3={'addressing_style': 'path'}
)
admin_s3 = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
config=config
)
bucket = f"sts-inline-policy-{int(time.time() * 1000)}"
key = "allowed.txt"
print(f"Creating bucket {bucket} with admin credentials")
admin_s3.create_bucket(Bucket=bucket)
admin_s3.put_object(Bucket=bucket, Key=key, Body=b"ok")
sts = boto3.client(
'sts',
endpoint_url=endpoint_url,
@ -215,45 +312,77 @@ try:
config=config
)
role_arn = "arn:aws:iam::000000000000:role/test-role"
role_arn = "arn:aws:iam::role/TestRole"
session_name = "test-session"
print(f"Calling AssumeRole on {role_arn}")
# This call typically sends parameters in POST body by default in boto3
session_policy = json.dumps({
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": [f"arn:aws:s3:::{bucket}"]
},
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{bucket}/*"]
}
]
})
print(f"Calling AssumeRole on {role_arn} with inline session policy")
response = sts.assume_role(
RoleArn=role_arn,
RoleSessionName=session_name
RoleSessionName=session_name,
Policy=session_policy
)
print("Success! Got credentials:")
print(response['Credentials'])
except ClientError as e:
# Print available keys for debugging if needed
# print(e.response.keys())
response_meta = e.response.get('ResponseMetadata', {})
http_code = response_meta.get('HTTPStatusCode')
error_data = e.response.get('Error', {})
error_code = error_data.get('Code', 'Unknown')
print(f"Got error: {http_code} {error_code}")
# We expect 503 ServiceUnavailable because stsHandlers is nil in weed mini
# This confirms the request was routed to STS handler logic (UnifiedPostHandler)
# instead of IAM handler (which would return 403 AccessDenied or 501 NotImplemented)
if http_code == 503:
print("SUCCESS: Got expected 503 Service Unavailable (STS not configured)")
sys.exit(0)
print(f"FAILED: Unexpected error {e}")
sys.exit(1)
creds = response['Credentials']
vended_s3 = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=creds['AccessKeyId'],
aws_secret_access_key=creds['SecretAccessKey'],
aws_session_token=creds['SessionToken'],
region_name=region,
config=config
)
print("Listing objects (allowed)")
list_resp = vended_s3.list_objects_v2(Bucket=bucket)
keys = [obj.get('Key') for obj in list_resp.get('Contents', [])]
if key not in keys:
print(f"FAILED: Expected to see {key} in list_objects_v2 results")
sys.exit(1)
print("Getting object (allowed)")
body = vended_s3.get_object(Bucket=bucket, Key=key)['Body'].read()
if body != b"ok":
print("FAILED: Unexpected object content")
sys.exit(1)
print("Putting object (expected to be denied)")
try:
vended_s3.put_object(Bucket=bucket, Key="denied.txt", Body=b"no")
print("FAILED: PutObject unexpectedly succeeded")
sys.exit(1)
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
if error_code != 'AccessDenied':
print(f"FAILED: Expected AccessDenied, got {error_code}")
sys.exit(1)
print("PutObject correctly denied by inline session policy")
print("SUCCESS: Inline session policy downscoping verified")
sys.exit(0)
except Exception as e:
print(f"FAILED: {e}")
if hasattr(e, 'response'):
print(f"Response: {e.response}")
import traceback
traceback.print_exc()
sys.exit(1)
`, env.s3Port, env.accessKey, env.secretKey)
`, env.s3Port, env.bindIP, env.s3Port, env.accessKey, env.secretKey)
scriptPath := filepath.Join(env.dataDir, "sts_test.py")
if err := os.WriteFile(scriptPath, []byte(scriptContent), 0644); err != nil {

18
test/volume_server/grpc/scrub_query_test.go

@ -144,7 +144,7 @@ func TestQueryInvalidAndMissingFileIDPaths(t *testing.T) {
}
}
func TestScrubVolumeAutoSelectAndNotImplementedModes(t *testing.T) {
func TestScrubVolumeAutoSelectAndAllModes(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
@ -158,6 +158,11 @@ func TestScrubVolumeAutoSelectAndNotImplementedModes(t *testing.T) {
framework.AllocateVolume(t, grpcClient, volumeIDA, "")
framework.AllocateVolume(t, grpcClient, volumeIDB, "")
// upload some data so index files are not zero-sized
httpClient := framework.NewHTTPClient()
framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), framework.NewFileID(volumeIDA, 1, 1), []byte("test data A"))
framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), framework.NewFileID(volumeIDB, 2, 2), []byte("test data B"))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
@ -181,11 +186,8 @@ func TestScrubVolumeAutoSelectAndNotImplementedModes(t *testing.T) {
if localResp.GetTotalVolumes() != 1 {
t.Fatalf("ScrubVolume local mode expected total_volumes=1, got %d", localResp.GetTotalVolumes())
}
if len(localResp.GetBrokenVolumeIds()) != 1 || localResp.GetBrokenVolumeIds()[0] != volumeIDA {
t.Fatalf("ScrubVolume local mode expected broken volume %d, got %v", volumeIDA, localResp.GetBrokenVolumeIds())
}
if len(localResp.GetDetails()) == 0 || !strings.Contains(strings.Join(localResp.GetDetails(), " "), "not implemented") {
t.Fatalf("ScrubVolume local mode expected not-implemented details, got %v", localResp.GetDetails())
if len(localResp.GetBrokenVolumeIds()) != 0 {
t.Fatalf("ScrubVolume local mode expected no broken volumes, got %v: %v", localResp.GetBrokenVolumeIds(), localResp.GetDetails())
}
fullResp, err := grpcClient.ScrubVolume(ctx, &volume_server_pb.ScrubVolumeRequest{
@ -198,8 +200,8 @@ func TestScrubVolumeAutoSelectAndNotImplementedModes(t *testing.T) {
if fullResp.GetTotalVolumes() != 1 {
t.Fatalf("ScrubVolume full mode expected total_volumes=1, got %d", fullResp.GetTotalVolumes())
}
if len(fullResp.GetDetails()) == 0 || !strings.Contains(strings.Join(fullResp.GetDetails(), " "), "not implemented") {
t.Fatalf("ScrubVolume full mode expected not-implemented details, got %v", fullResp.GetDetails())
if len(fullResp.GetBrokenVolumeIds()) != 0 {
t.Fatalf("ScrubVolume full mode expected no broken volumes, got %v: %v", fullResp.GetBrokenVolumeIds(), fullResp.GetDetails())
}
}

5
weed/admin/dash/admin_data.go

@ -9,13 +9,14 @@ import (
"github.com/gin-gonic/gin"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/iam"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
// Access key status constants
const (
AccessKeyStatusActive = "Active"
AccessKeyStatusInactive = "Inactive"
AccessKeyStatusActive = iam.AccessKeyStatusActive
AccessKeyStatusInactive = iam.AccessKeyStatusInactive
)
type AdminData struct {

5
weed/admin/dash/service_account_management.go

@ -8,6 +8,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/iam"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
@ -21,8 +22,8 @@ const (
accessKeyPrefix = "ABIA" // Service account access keys use ABIA prefix
// Status constants
StatusActive = "Active"
StatusInactive = "Inactive"
StatusActive = iam.AccessKeyStatusActive
StatusInactive = iam.AccessKeyStatusInactive
)
// GetServiceAccounts returns all service accounts, optionally filtered by parent user

4
weed/admin/view/app/service_accounts.templ

@ -126,7 +126,7 @@ templ ServiceAccounts(data dash.ServiceAccountsData) {
<code class="text-muted">{sa.AccessKeyId}</code>
</td>
<td>
if sa.Status == "Active" {
if sa.Status == dash.StatusActive {
<span class="badge bg-success">Active</span>
} else {
<span class="badge bg-secondary">Inactive</span>
@ -141,7 +141,7 @@ templ ServiceAccounts(data dash.ServiceAccountsData) {
</button>
<button type="button" class="btn btn-outline-primary"
data-action="toggle-sa-status" data-sa-id={ sa.ID } data-current-status={ sa.Status }>
if sa.Status == "Active" {
if sa.Status == dash.StatusActive {
<i class="fas fa-pause"></i>
} else {
<i class="fas fa-play"></i>

4
weed/admin/view/app/service_accounts_templ.go

@ -121,7 +121,7 @@ func ServiceAccounts(data dash.ServiceAccountsData) templ.Component {
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
if sa.Status == "Active" {
if sa.Status == dash.StatusActive {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 9, "<span class=\"badge bg-success\">Active</span>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
@ -188,7 +188,7 @@ func ServiceAccounts(data dash.ServiceAccountsData) templ.Component {
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err
}
if sa.Status == "Active" {
if sa.Status == dash.StatusActive {
templ_7745c5c3_Err = templruntime.WriteString(templ_7745c5c3_Buffer, 16, "<i class=\"fas fa-pause\"></i>")
if templ_7745c5c3_Err != nil {
return templ_7745c5c3_Err

126
weed/iam/integration/iam_integration_test.go

@ -251,6 +251,132 @@ func TestPolicyEnforcement(t *testing.T) {
}
}
// TestSessionPolicyBoundary verifies that inline session policies restrict permissions.
func TestSessionPolicyBoundary(t *testing.T) {
iamManager := setupIntegratedIAMSystem(t)
ctx := context.Background()
stsService := iamManager.GetSTSService()
require.NotNil(t, stsService)
sessionPolicy := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":["s3:GetObject"],"Resource":["arn:aws:s3:::test-bucket/allowed/*"]}]}`
sessionId, err := sts.GenerateSessionId()
require.NoError(t, err)
expiresAt := time.Now().Add(time.Hour)
principal := "arn:aws:sts::000000000000:assumed-role/S3ReadOnlyRole/policy-session"
claims := sts.NewSTSSessionClaims(sessionId, stsService.Config.Issuer, expiresAt).
WithSessionName("policy-session").
WithRoleInfo("arn:aws:iam::role/S3ReadOnlyRole", principal, principal).
WithSessionPolicy(sessionPolicy)
sessionToken, err := stsService.GetTokenGenerator().GenerateJWTWithClaims(claims)
require.NoError(t, err)
allowed, err := iamManager.IsActionAllowed(ctx, &ActionRequest{
Principal: principal,
Action: "s3:GetObject",
Resource: "arn:aws:s3:::test-bucket/allowed/file.txt",
SessionToken: sessionToken,
})
require.NoError(t, err)
assert.True(t, allowed, "Session policy should allow GetObject within allowed prefix")
allowed, err = iamManager.IsActionAllowed(ctx, &ActionRequest{
Principal: principal,
Action: "s3:GetObject",
Resource: "arn:aws:s3:::test-bucket/other/file.txt",
SessionToken: sessionToken,
})
require.NoError(t, err)
assert.False(t, allowed, "Session policy should deny GetObject outside allowed prefix")
allowed, err = iamManager.IsActionAllowed(ctx, &ActionRequest{
Principal: principal,
Action: "s3:ListBucket",
Resource: "arn:aws:s3:::test-bucket",
SessionToken: sessionToken,
})
require.NoError(t, err)
assert.False(t, allowed, "Session policy should deny ListBucket when not explicitly allowed")
}
// TestAssumeRoleWithWebIdentitySessionPolicy verifies Policy downscoping is applied to web identity sessions.
func TestAssumeRoleWithWebIdentitySessionPolicy(t *testing.T) {
iamManager := setupIntegratedIAMSystem(t)
ctx := context.Background()
validJWTToken := createTestJWT(t, "https://test-issuer.com", "test-user-123", "test-signing-key")
sessionPolicy := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":["s3:GetObject"],"Resource":["arn:aws:s3:::test-bucket/allowed/*"]}]}`
assumeRequest := &sts.AssumeRoleWithWebIdentityRequest{
RoleArn: "arn:aws:iam::role/S3ReadOnlyRole",
WebIdentityToken: validJWTToken,
RoleSessionName: "policy-web-identity",
Policy: &sessionPolicy,
}
response, err := iamManager.AssumeRoleWithWebIdentity(ctx, assumeRequest)
require.NoError(t, err)
allowed, err := iamManager.IsActionAllowed(ctx, &ActionRequest{
Principal: response.AssumedRoleUser.Arn,
Action: "s3:GetObject",
Resource: "arn:aws:s3:::test-bucket/allowed/file.txt",
SessionToken: response.Credentials.SessionToken,
})
require.NoError(t, err)
assert.True(t, allowed, "Session policy should allow GetObject within allowed prefix")
allowed, err = iamManager.IsActionAllowed(ctx, &ActionRequest{
Principal: response.AssumedRoleUser.Arn,
Action: "s3:GetObject",
Resource: "arn:aws:s3:::test-bucket/other/file.txt",
SessionToken: response.Credentials.SessionToken,
})
require.NoError(t, err)
assert.False(t, allowed, "Session policy should deny GetObject outside allowed prefix")
}
// TestAssumeRoleWithCredentialsSessionPolicy verifies Policy downscoping is applied to credentials sessions.
func TestAssumeRoleWithCredentialsSessionPolicy(t *testing.T) {
iamManager := setupIntegratedIAMSystem(t)
ctx := context.Background()
sessionPolicy := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":["filer:CreateEntry"],"Resource":["arn:aws:filer::path/user-docs/allowed/*"]}]}`
assumeRequest := &sts.AssumeRoleWithCredentialsRequest{
RoleArn: "arn:aws:iam::role/LDAPUserRole",
Username: "testuser",
Password: "testpass",
RoleSessionName: "policy-ldap",
ProviderName: "test-ldap",
Policy: &sessionPolicy,
}
response, err := iamManager.AssumeRoleWithCredentials(ctx, assumeRequest)
require.NoError(t, err)
allowed, err := iamManager.IsActionAllowed(ctx, &ActionRequest{
Principal: response.AssumedRoleUser.Arn,
Action: "filer:CreateEntry",
Resource: "arn:aws:filer::path/user-docs/allowed/file.txt",
SessionToken: response.Credentials.SessionToken,
})
require.NoError(t, err)
assert.True(t, allowed, "Session policy should allow CreateEntry within allowed prefix")
allowed, err = iamManager.IsActionAllowed(ctx, &ActionRequest{
Principal: response.AssumedRoleUser.Arn,
Action: "filer:CreateEntry",
Resource: "arn:aws:filer::path/user-docs/other/file.txt",
SessionToken: response.Credentials.SessionToken,
})
require.NoError(t, err)
assert.False(t, allowed, "Session policy should deny CreateEntry outside allowed prefix")
}
// TestSessionExpiration tests session expiration and cleanup
func TestSessionExpiration(t *testing.T) {
iamManager := setupIntegratedIAMSystem(t)

168
weed/iam/integration/iam_manager.go

@ -7,10 +7,12 @@ import (
"fmt"
"strings"
"github.com/golang-jwt/jwt/v5"
"github.com/seaweedfs/seaweedfs/weed/iam/policy"
"github.com/seaweedfs/seaweedfs/weed/iam/providers"
"github.com/seaweedfs/seaweedfs/weed/iam/sts"
"github.com/seaweedfs/seaweedfs/weed/iam/utils"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
// maxPoliciesForEvaluation defines an upper bound on the number of policies that
@ -23,6 +25,7 @@ type IAMManager struct {
stsService *sts.STSService
policyEngine *policy.PolicyEngine
roleStore RoleStore
userStore UserStore
filerAddressProvider func() string // Function to get current filer address
initialized bool
}
@ -48,6 +51,11 @@ type RoleStoreConfig struct {
StoreConfig map[string]interface{} `json:"storeConfig,omitempty"`
}
// UserStore defines the interface for retrieving IAM user policy attachments.
type UserStore interface {
GetUser(ctx context.Context, username string) (*iam_pb.Identity, error)
}
// RoleDefinition defines a role with its trust policy and attached policies
type RoleDefinition struct {
// RoleName is the name of the role
@ -92,6 +100,11 @@ func NewIAMManager() *IAMManager {
return &IAMManager{}
}
// SetUserStore assigns the user store used to resolve IAM user policy attachments.
func (m *IAMManager) SetUserStore(store UserStore) {
m.userStore = store
}
// Initialize initializes the IAM manager with all components
func (m *IAMManager) Initialize(config *IAMConfig, filerAddressProvider func() string) error {
if config == nil {
@ -310,12 +323,30 @@ func (m *IAMManager) IsActionAllowed(ctx context.Context, request *ActionRequest
return false, fmt.Errorf("IAM manager not initialized")
}
// Validate session token if present (skip for OIDC tokens which are already validated,
// and skip for empty tokens which represent static access keys)
if request.SessionToken != "" && !isOIDCToken(request.SessionToken) {
_, err := m.stsService.ValidateSessionToken(ctx, request.SessionToken)
if err != nil {
return false, fmt.Errorf("invalid session: %w", err)
// Validate session token if present
// We always try to validate with the internal STS service first if it's a SeaweedFS token.
// This ensures that session policies embedded in the token are correctly extracted and enforced.
var sessionInfo *sts.SessionInfo
if request.SessionToken != "" {
// Parse unverified to check issuer
parsed, _, err := new(jwt.Parser).ParseUnverified(request.SessionToken, jwt.MapClaims{})
isInternal := false
if err == nil {
if claims, ok := parsed.Claims.(jwt.MapClaims); ok {
if issuer, ok := claims["iss"].(string); ok && m.stsService != nil && m.stsService.Config != nil {
if issuer == m.stsService.Config.Issuer {
isInternal = true
}
}
}
}
if isInternal || !isOIDCToken(request.SessionToken) {
var err error
sessionInfo, err = m.stsService.ValidateSessionToken(ctx, request.SessionToken)
if err != nil {
return false, fmt.Errorf("invalid session: %w", err)
}
}
}
@ -334,7 +365,17 @@ func (m *IAMManager) IsActionAllowed(ctx context.Context, request *ActionRequest
// Add principal to context for policy matching
// The PolicyEngine checks RequestContext["principal"] or RequestContext["aws:PrincipalArn"]
evalCtx.RequestContext["principal"] = request.Principal
evalCtx.RequestContext["aws:PrincipalArn"] = request.Principal
evalCtx.RequestContext["aws:PrincipalArn"] = request.Principal // AWS standard key
// Check if this is an admin request - bypass policy evaluation if so
// This mirrors the logic in auth_signature_v4.go but applies it at authorization time
isAdmin := false
if request.RequestContext != nil {
if val, ok := request.RequestContext["is_admin"].(bool); ok && val {
isAdmin = true
}
// Print full request context for debugging
}
// Parse principal ARN to extract details for context variables (e.g. ${aws:username})
arnInfo := utils.ParsePrincipalARN(request.Principal)
@ -349,6 +390,9 @@ func (m *IAMManager) IsActionAllowed(ctx context.Context, request *ActionRequest
evalCtx.RequestContext["aws:username"] = awsUsername
evalCtx.RequestContext["aws:userid"] = arnInfo.RoleName
} else if userName := utils.ExtractUserNameFromPrincipal(request.Principal); userName != "" {
evalCtx.RequestContext["aws:username"] = userName
evalCtx.RequestContext["aws:userid"] = userName
}
if arnInfo.AccountID != "" {
evalCtx.RequestContext["aws:PrincipalAccount"] = arnInfo.AccountID
@ -364,58 +408,83 @@ func (m *IAMManager) IsActionAllowed(ctx context.Context, request *ActionRequest
}
}
// If explicit policy names are provided (e.g. from user identity), evaluate them directly
if len(request.PolicyNames) > 0 {
var baseResult *policy.EvaluationResult
var err error
if isAdmin {
// Admin always has base access allowed
baseResult = &policy.EvaluationResult{Effect: policy.EffectAllow}
} else {
policies := request.PolicyNames
if len(policies) == 0 {
// Extract role name from principal ARN
roleName := utils.ExtractRoleNameFromPrincipal(request.Principal)
if roleName == "" {
userName := utils.ExtractUserNameFromPrincipal(request.Principal)
if userName == "" {
return false, fmt.Errorf("could not extract role from principal: %s", request.Principal)
}
if m.userStore == nil {
return false, fmt.Errorf("user store unavailable for principal: %s", request.Principal)
}
user, err := m.userStore.GetUser(ctx, userName)
if err != nil || user == nil {
return false, fmt.Errorf("user not found for principal: %s (user=%s)", request.Principal, userName)
}
policies = user.GetPolicyNames()
} else {
// Get role definition
roleDef, err := m.roleStore.GetRole(ctx, m.getFilerAddress(), roleName)
if err != nil {
return false, fmt.Errorf("role not found: %s", roleName)
}
policies = roleDef.AttachedPolicies
}
}
if bucketPolicyName != "" {
// Enforce an upper bound on the number of policies to avoid excessive allocations
if len(policies) >= maxPoliciesForEvaluation {
return false, fmt.Errorf("too many policies for evaluation: %d >= %d", len(policies), maxPoliciesForEvaluation)
}
// Create a new slice to avoid modifying the request and append the bucket policy
// Create a new slice to avoid modifying the original and append the bucket policy
copied := make([]string, len(policies))
copy(copied, policies)
policies = append(copied, bucketPolicyName)
}
result, err := m.policyEngine.Evaluate(ctx, "", evalCtx, policies)
baseResult, err = m.policyEngine.Evaluate(ctx, "", evalCtx, policies)
if err != nil {
return false, fmt.Errorf("policy evaluation failed: %w", err)
}
return result.Effect == policy.EffectAllow, nil
}
// Extract role name from principal ARN
roleName := utils.ExtractRoleNameFromPrincipal(request.Principal)
if roleName == "" {
return false, fmt.Errorf("could not extract role from principal: %s", request.Principal)
// Base policy must allow; if it doesn't, deny immediately (session policy can only further restrict)
if baseResult.Effect != policy.EffectAllow {
return false, nil
}
// Get role definition
roleDef, err := m.roleStore.GetRole(ctx, m.getFilerAddress(), roleName)
if err != nil {
return false, fmt.Errorf("role not found: %s", roleName)
}
// Evaluate policies attached to the role
policies := roleDef.AttachedPolicies
if bucketPolicyName != "" {
// Enforce an upper bound on the number of policies to avoid excessive allocations
if len(policies) >= maxPoliciesForEvaluation {
return false, fmt.Errorf("too many policies for evaluation: %d >= %d", len(policies), maxPoliciesForEvaluation)
// If there's a session policy, it must also allow the action
if sessionInfo != nil && sessionInfo.SessionPolicy != "" {
var sessionPolicy policy.PolicyDocument
if err := json.Unmarshal([]byte(sessionInfo.SessionPolicy), &sessionPolicy); err != nil {
return false, fmt.Errorf("invalid session policy JSON: %w", err)
}
if err := policy.ValidatePolicyDocument(&sessionPolicy); err != nil {
return false, fmt.Errorf("invalid session policy document: %w", err)
}
sessionResult, err := m.policyEngine.EvaluatePolicyDocument(ctx, evalCtx, "session-policy", &sessionPolicy, policy.EffectDeny)
if err != nil {
return false, fmt.Errorf("session policy evaluation failed: %w", err)
}
if sessionResult.Effect != policy.EffectAllow {
// Session policy does not allow this action
return false, nil
}
// Create a new slice to avoid modifying the role definition and append the bucket policy
copied := make([]string, len(policies))
copy(copied, policies)
policies = append(copied, bucketPolicyName)
}
result, err := m.policyEngine.Evaluate(ctx, "", evalCtx, policies)
if err != nil {
return false, fmt.Errorf("policy evaluation failed: %w", err)
}
return result.Effect == policy.EffectAllow, nil
return true, nil
}
// ValidateTrustPolicy validates if a principal can assume a role (for testing)
@ -643,7 +712,28 @@ func isOIDCToken(token string) bool {
}
// JWT tokens typically start with "eyJ" (base64 encoded JSON starting with "{")
return strings.HasPrefix(token, "eyJ")
if !strings.HasPrefix(token, "eyJ") {
return false
}
parsed, _, err := new(jwt.Parser).ParseUnverified(token, jwt.MapClaims{})
if err != nil {
return false
}
claims, ok := parsed.Claims.(jwt.MapClaims)
if !ok {
return false
}
if typ, ok := claims["typ"].(string); ok && typ == sts.TokenTypeSession {
return false
}
if typ, ok := claims[sts.JWTClaimTokenType].(string); ok && typ == sts.TokenTypeSession {
return false
}
return true
}
// TrustPolicyValidator interface implementation

62
weed/iam/policy/policy_engine.go

@ -474,6 +474,68 @@ func (e *PolicyEngine) EvaluateTrustPolicy(ctx context.Context, trustPolicy *Pol
return result, nil
}
// EvaluatePolicyDocument evaluates a single policy document without storing it.
// defaultEffect controls the fallback result when no statements match.
func (e *PolicyEngine) EvaluatePolicyDocument(ctx context.Context, evalCtx *EvaluationContext, policyName string, policyDoc *PolicyDocument, defaultEffect Effect) (*EvaluationResult, error) {
if !e.initialized {
return nil, fmt.Errorf("policy engine not initialized")
}
if evalCtx == nil {
return nil, fmt.Errorf("evaluation context cannot be nil")
}
if policyDoc == nil {
return nil, fmt.Errorf("policy document cannot be nil")
}
if policyName == "" {
policyName = "inline-policy"
}
result := &EvaluationResult{
Effect: defaultEffect,
EvaluationDetails: &EvaluationDetails{
Principal: evalCtx.Principal,
Action: evalCtx.Action,
Resource: evalCtx.Resource,
PoliciesEvaluated: []string{policyName},
},
}
var matchingStatements []StatementMatch
explicitDeny := false
hasAllow := false
for _, statement := range policyDoc.Statement {
if e.statementMatches(&statement, evalCtx) {
match := StatementMatch{
PolicyName: policyName,
StatementSid: statement.Sid,
Effect: Effect(statement.Effect),
Reason: "Action, Resource, and Condition matched",
}
matchingStatements = append(matchingStatements, match)
if statement.Effect == "Deny" {
explicitDeny = true
} else if statement.Effect == "Allow" {
hasAllow = true
}
}
}
result.MatchingStatements = matchingStatements
if explicitDeny {
result.Effect = EffectDeny
} else if hasAllow {
result.Effect = EffectAllow
}
return result, nil
}
// statementMatches checks if a statement matches the evaluation context
func (e *PolicyEngine) statementMatches(statement *Statement, evalCtx *EvaluationContext) bool {
// Check principal match (for trust policies)

9
weed/iam/sts/session_claims.go

@ -31,6 +31,8 @@ type STSSessionClaims struct {
// Authorization data
Policies []string `json:"pol,omitempty"` // policies (abbreviated)
// SessionPolicy contains inline session policy JSON (optional)
SessionPolicy string `json:"spol,omitempty"`
// Identity provider information
IdentityProvider string `json:"idp"` // identity_provider
@ -88,6 +90,7 @@ func (c *STSSessionClaims) ToSessionInfo() *SessionInfo {
AssumedRoleUser: c.AssumedRole,
Principal: c.Principal,
Policies: c.Policies,
SessionPolicy: c.SessionPolicy,
ExpiresAt: expiresAt,
IdentityProvider: c.IdentityProvider,
ExternalUserId: c.ExternalUserId,
@ -148,6 +151,12 @@ func (c *STSSessionClaims) WithPolicies(policies []string) *STSSessionClaims {
return c
}
// WithSessionPolicy sets the inline session policy JSON for this session
func (c *STSSessionClaims) WithSessionPolicy(policy string) *STSSessionClaims {
c.SessionPolicy = policy
return c
}
// WithIdentityProvider sets identity provider information
func (c *STSSessionClaims) WithIdentityProvider(providerName, externalUserId, providerIssuer string) *STSSessionClaims {
c.IdentityProvider = providerName

3
weed/iam/sts/session_claims_test.go

@ -89,6 +89,7 @@ func TestSTSSessionClaimsToSessionInfoPreservesAllFields(t *testing.T) {
expiresAt := time.Now().Add(2 * time.Hour)
policies := []string{"policy1", "policy2"}
sessionPolicy := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":["s3:GetObject"],"Resource":["arn:aws:s3:::bucket/*"]}]}`
requestContext := map[string]interface{}{
"sourceIp": "192.168.1.1",
"userAgent": "test-agent",
@ -99,6 +100,7 @@ func TestSTSSessionClaimsToSessionInfoPreservesAllFields(t *testing.T) {
WithRoleInfo("role-arn", "assumed-role", "principal").
WithIdentityProvider("provider", "external-id", "issuer").
WithPolicies(policies).
WithSessionPolicy(sessionPolicy).
WithRequestContext(requestContext).
WithMaxDuration(2 * time.Hour)
@ -114,6 +116,7 @@ func TestSTSSessionClaimsToSessionInfoPreservesAllFields(t *testing.T) {
assert.Equal(t, "external-id", sessionInfo.ExternalUserId)
assert.Equal(t, "issuer", sessionInfo.ProviderIssuer)
assert.Equal(t, policies, sessionInfo.Policies)
assert.Equal(t, sessionPolicy, sessionInfo.SessionPolicy)
assert.Equal(t, requestContext, sessionInfo.RequestContext)
assert.WithinDuration(t, expiresAt, sessionInfo.ExpiresAt, 1*time.Second)
}

35
weed/iam/sts/session_policy.go

@ -0,0 +1,35 @@
package sts
import (
"encoding/json"
"fmt"
"strings"
"github.com/seaweedfs/seaweedfs/weed/iam/policy"
)
// NormalizeSessionPolicy validates and normalizes inline session policy JSON.
// It returns an empty string if the input is empty or whitespace.
func NormalizeSessionPolicy(policyJSON string) (string, error) {
trimmed := strings.TrimSpace(policyJSON)
if trimmed == "" {
return "", nil
}
const maxSessionPolicySize = 2048
if len(trimmed) > maxSessionPolicySize {
return "", fmt.Errorf("session policy exceeds maximum size of %d characters", maxSessionPolicySize)
}
var policyDoc policy.PolicyDocument
if err := json.Unmarshal([]byte(trimmed), &policyDoc); err != nil {
return "", fmt.Errorf("invalid session policy JSON: %w", err)
}
if err := policy.ValidatePolicyDocument(&policyDoc); err != nil {
return "", fmt.Errorf("invalid session policy document: %w", err)
}
normalized, err := json.Marshal(&policyDoc)
if err != nil {
return "", fmt.Errorf("failed to normalize session policy: %w", err)
}
return string(normalized), nil
}

228
weed/iam/sts/session_policy_test.go

@ -25,194 +25,88 @@ func createSessionPolicyTestJWT(t *testing.T, issuer, subject string) string {
return tokenString
}
// TestAssumeRoleWithWebIdentity_SessionPolicy tests the handling of the Policy field
// in AssumeRoleWithWebIdentityRequest to ensure users are properly informed that
// session policies are not currently supported
// TestAssumeRoleWithWebIdentity_SessionPolicy verifies inline session policies are preserved in tokens.
func TestAssumeRoleWithWebIdentity_SessionPolicy(t *testing.T) {
service := setupTestSTSService(t)
ctx := context.Background()
t.Run("should_reject_request_with_session_policy", func(t *testing.T) {
ctx := context.Background()
// Create a request with a session policy
sessionPolicy := `{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::example-bucket/*"
}]
}`
sessionPolicy := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:GetObject","Resource":"arn:aws:s3:::example-bucket/*"}]}`
testToken := createSessionPolicyTestJWT(t, "test-issuer", "test-user")
testToken := createSessionPolicyTestJWT(t, "test-issuer", "test-user")
request := &AssumeRoleWithWebIdentityRequest{
RoleArn: "arn:aws:iam::role/TestRole",
WebIdentityToken: testToken,
RoleSessionName: "test-session",
Policy: &sessionPolicy,
}
request := &AssumeRoleWithWebIdentityRequest{
RoleArn: "arn:aws:iam::role/TestRole",
WebIdentityToken: testToken,
RoleSessionName: "test-session",
DurationSeconds: nil, // Use default
Policy: &sessionPolicy, // ← Session policy provided
}
response, err := service.AssumeRoleWithWebIdentity(ctx, request)
require.NoError(t, err)
require.NotNil(t, response)
// Should return an error indicating session policies are not supported
response, err := service.AssumeRoleWithWebIdentity(ctx, request)
sessionInfo, err := service.ValidateSessionToken(ctx, response.Credentials.SessionToken)
require.NoError(t, err)
// Verify the error
assert.Error(t, err)
assert.Nil(t, response)
assert.Contains(t, err.Error(), "session policies are not currently supported")
assert.Contains(t, err.Error(), "Policy parameter must be omitted")
})
normalized, err := NormalizeSessionPolicy(sessionPolicy)
require.NoError(t, err)
assert.Equal(t, normalized, sessionInfo.SessionPolicy)
t.Run("should_succeed_without_session_policy", func(t *testing.T) {
ctx := context.Background()
testToken := createSessionPolicyTestJWT(t, "test-issuer", "test-user")
request := &AssumeRoleWithWebIdentityRequest{
RoleArn: "arn:aws:iam::role/TestRole",
WebIdentityToken: testToken,
WebIdentityToken: createSessionPolicyTestJWT(t, "test-issuer", "test-user"),
RoleSessionName: "test-session",
DurationSeconds: nil, // Use default
Policy: nil, // ← No session policy
}
// Should succeed without session policy
response, err := service.AssumeRoleWithWebIdentity(ctx, request)
// Verify success
require.NoError(t, err)
require.NotNil(t, response)
assert.NotNil(t, response.Credentials)
assert.NotEmpty(t, response.Credentials.AccessKeyId)
assert.NotEmpty(t, response.Credentials.SecretAccessKey)
assert.NotEmpty(t, response.Credentials.SessionToken)
})
t.Run("should_succeed_with_empty_policy_pointer", func(t *testing.T) {
ctx := context.Background()
testToken := createSessionPolicyTestJWT(t, "test-issuer", "test-user")
request := &AssumeRoleWithWebIdentityRequest{
RoleArn: "arn:aws:iam::role/TestRole",
WebIdentityToken: testToken,
RoleSessionName: "test-session",
Policy: nil, // ← Explicitly nil
}
// Should succeed with nil policy pointer
response, err := service.AssumeRoleWithWebIdentity(ctx, request)
sessionInfo, err := service.ValidateSessionToken(ctx, response.Credentials.SessionToken)
require.NoError(t, err)
require.NotNil(t, response)
assert.NotNil(t, response.Credentials)
assert.Empty(t, sessionInfo.SessionPolicy)
})
}
t.Run("should_reject_empty_string_policy", func(t *testing.T) {
ctx := context.Background()
// Test edge case scenarios for the Policy field handling
func TestAssumeRoleWithWebIdentity_SessionPolicy_EdgeCases(t *testing.T) {
service := setupTestSTSService(t)
ctx := context.Background()
emptyPolicy := "" // Empty string, but still a non-nil pointer
t.Run("malformed_json_policy_rejected", func(t *testing.T) {
malformedPolicy := `{"Version": "2012-10-17", "Statement": [` // Incomplete JSON
request := &AssumeRoleWithWebIdentityRequest{
RoleArn: "arn:aws:iam::role/TestRole",
WebIdentityToken: createSessionPolicyTestJWT(t, "test-issuer", "test-user"),
RoleSessionName: "test-session",
Policy: &emptyPolicy, // ← Non-nil pointer to empty string
Policy: &malformedPolicy,
}
// Should still reject because pointer is not nil
response, err := service.AssumeRoleWithWebIdentity(ctx, request)
assert.Error(t, err)
assert.Nil(t, response)
assert.Contains(t, err.Error(), "session policies are not currently supported")
assert.Contains(t, err.Error(), "invalid session policy JSON")
})
}
// TestAssumeRoleWithWebIdentity_SessionPolicy_ErrorMessage tests that the error message
// is clear and helps users understand what they need to do
func TestAssumeRoleWithWebIdentity_SessionPolicy_ErrorMessage(t *testing.T) {
service := setupTestSTSService(t)
ctx := context.Background()
complexPolicy := `{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowS3Access",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::my-bucket/*",
"arn:aws:s3:::my-bucket"
],
"Condition": {
"StringEquals": {
"s3:prefix": ["documents/", "images/"]
}
}
}
]
}`
testToken := createSessionPolicyTestJWT(t, "test-issuer", "test-user")
request := &AssumeRoleWithWebIdentityRequest{
RoleArn: "arn:aws:iam::role/TestRole",
WebIdentityToken: testToken,
RoleSessionName: "test-session-with-complex-policy",
Policy: &complexPolicy,
}
response, err := service.AssumeRoleWithWebIdentity(ctx, request)
// Verify error details
require.Error(t, err)
assert.Nil(t, response)
errorMsg := err.Error()
// The error should be clear and actionable
assert.Contains(t, errorMsg, "session policies are not currently supported",
"Error should explain that session policies aren't supported")
assert.Contains(t, errorMsg, "Policy parameter must be omitted",
"Error should specify what action the user needs to take")
// Should NOT contain internal implementation details
assert.NotContains(t, errorMsg, "nil pointer",
"Error should not expose internal implementation details")
assert.NotContains(t, errorMsg, "struct field",
"Error should not expose internal struct details")
}
// Test edge case scenarios for the Policy field handling
func TestAssumeRoleWithWebIdentity_SessionPolicy_EdgeCases(t *testing.T) {
service := setupTestSTSService(t)
t.Run("malformed_json_policy_still_rejected", func(t *testing.T) {
ctx := context.Background()
malformedPolicy := `{"Version": "2012-10-17", "Statement": [` // Incomplete JSON
t.Run("invalid_policy_document_rejected", func(t *testing.T) {
invalidPolicy := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow"}]}`
request := &AssumeRoleWithWebIdentityRequest{
RoleArn: "arn:aws:iam::role/TestRole",
WebIdentityToken: createSessionPolicyTestJWT(t, "test-issuer", "test-user"),
RoleSessionName: "test-session",
Policy: &malformedPolicy,
Policy: &invalidPolicy,
}
// Should reject before even parsing the policy JSON
response, err := service.AssumeRoleWithWebIdentity(ctx, request)
assert.Error(t, err)
assert.Nil(t, response)
assert.Contains(t, err.Error(), "session policies are not currently supported")
assert.Contains(t, err.Error(), "invalid session policy document")
})
t.Run("policy_with_whitespace_still_rejected", func(t *testing.T) {
ctx := context.Background()
whitespacePolicy := " \t\n " // Only whitespace
t.Run("whitespace_policy_ignored", func(t *testing.T) {
whitespacePolicy := " \t\n "
request := &AssumeRoleWithWebIdentityRequest{
RoleArn: "arn:aws:iam::role/TestRole",
@ -221,58 +115,54 @@ func TestAssumeRoleWithWebIdentity_SessionPolicy_EdgeCases(t *testing.T) {
Policy: &whitespacePolicy,
}
// Should reject any non-nil policy, even whitespace
response, err := service.AssumeRoleWithWebIdentity(ctx, request)
require.NoError(t, err)
require.NotNil(t, response)
assert.Error(t, err)
assert.Nil(t, response)
assert.Contains(t, err.Error(), "session policies are not currently supported")
sessionInfo, err := service.ValidateSessionToken(ctx, response.Credentials.SessionToken)
require.NoError(t, err)
assert.Empty(t, sessionInfo.SessionPolicy)
})
}
// TestAssumeRoleWithWebIdentity_PolicyFieldDocumentation verifies that the struct
// field is properly documented to help developers understand the limitation
// TestAssumeRoleWithWebIdentity_PolicyFieldDocumentation verifies that the struct field exists and is optional.
func TestAssumeRoleWithWebIdentity_PolicyFieldDocumentation(t *testing.T) {
// This test documents the current behavior and ensures the struct field
// exists with proper typing
request := &AssumeRoleWithWebIdentityRequest{}
// Verify the Policy field exists and has the correct type
assert.IsType(t, (*string)(nil), request.Policy,
"Policy field should be *string type for optional JSON policy")
// Verify initial value is nil (no policy by default)
assert.Nil(t, request.Policy,
"Policy field should default to nil (no session policy)")
// Test that we can set it to a string pointer (even though it will be rejected)
policyValue := `{"Version": "2012-10-17"}`
request.Policy = &policyValue
assert.NotNil(t, request.Policy, "Should be able to assign policy value")
assert.Equal(t, policyValue, *request.Policy, "Policy value should be preserved")
}
// TestAssumeRoleWithCredentials_NoSessionPolicySupport verifies that
// AssumeRoleWithCredentialsRequest doesn't have a Policy field, which is correct
// since credential-based role assumption typically doesn't support session policies
func TestAssumeRoleWithCredentials_NoSessionPolicySupport(t *testing.T) {
// Verify that AssumeRoleWithCredentialsRequest doesn't have a Policy field
// This is the expected behavior since session policies are typically only
// supported with web identity (OIDC/SAML) flows in AWS STS
// TestAssumeRoleWithCredentials_SessionPolicy verifies session policy support for credentials-based flow.
func TestAssumeRoleWithCredentials_SessionPolicy(t *testing.T) {
service := setupTestSTSService(t)
ctx := context.Background()
sessionPolicy := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"filer:CreateEntry","Resource":"arn:aws:filer::path/user-docs/*"}]}`
request := &AssumeRoleWithCredentialsRequest{
RoleArn: "arn:aws:iam::role/TestRole",
Username: "testuser",
Password: "testpass",
RoleSessionName: "test-session",
ProviderName: "ldap",
ProviderName: "test-ldap",
Policy: &sessionPolicy,
}
// The struct should compile and work without a Policy field
assert.NotNil(t, request)
assert.Equal(t, "arn:aws:iam::role/TestRole", request.RoleArn)
assert.Equal(t, "testuser", request.Username)
response, err := service.AssumeRoleWithCredentials(ctx, request)
require.NoError(t, err)
require.NotNil(t, response)
sessionInfo, err := service.ValidateSessionToken(ctx, response.Credentials.SessionToken)
require.NoError(t, err)
// This documents that credential-based assume role does NOT support session policies
// which matches AWS STS behavior where session policies are primarily for
// web identity (OIDC/SAML) and federation scenarios
normalized, err := NormalizeSessionPolicy(sessionPolicy)
require.NoError(t, err)
assert.Equal(t, normalized, sessionInfo.SessionPolicy)
}

29
weed/iam/sts/sts_service.go

@ -161,6 +161,9 @@ type AssumeRoleWithCredentialsRequest struct {
// DurationSeconds is the duration of the role session (optional)
DurationSeconds *int64 `json:"DurationSeconds,omitempty"`
// Policy is an optional session policy (optional)
Policy *string `json:"Policy,omitempty"`
}
// AssumeRoleResponse represents the response from assume role operations
@ -237,6 +240,9 @@ type SessionInfo struct {
// Policies are the policies associated with this session
Policies []string `json:"policies"`
// SessionPolicy is the inline session policy JSON (optional)
SessionPolicy string `json:"sessionPolicy,omitempty"`
// RequestContext contains additional request context for policy evaluation
RequestContext map[string]interface{} `json:"requestContext,omitempty"`
@ -418,9 +424,13 @@ func (s *STSService) AssumeRoleWithWebIdentity(ctx context.Context, request *Ass
return nil, fmt.Errorf("invalid request: %w", err)
}
// Check for unsupported session policy
sessionPolicy := ""
if request.Policy != nil {
return nil, fmt.Errorf("session policies are not currently supported - Policy parameter must be omitted")
normalized, err := NormalizeSessionPolicy(*request.Policy)
if err != nil {
return nil, fmt.Errorf("invalid session policy: %w", err)
}
sessionPolicy = normalized
}
// 1. Validate the web identity token with appropriate provider
@ -485,6 +495,9 @@ func (s *STSService) AssumeRoleWithWebIdentity(ctx context.Context, request *Ass
WithIdentityProvider(provider.Name(), externalIdentity.UserID, "").
WithMaxDuration(sessionDuration).
WithRequestContext(requestContext)
if sessionPolicy != "" {
sessionClaims.WithSessionPolicy(sessionPolicy)
}
// Generate self-contained JWT token with all session information
jwtToken, err := s.tokenGenerator.GenerateJWTWithClaims(sessionClaims)
@ -517,6 +530,15 @@ func (s *STSService) AssumeRoleWithCredentials(ctx context.Context, request *Ass
return nil, fmt.Errorf("invalid request: %w", err)
}
sessionPolicy := ""
if request.Policy != nil {
normalized, err := NormalizeSessionPolicy(*request.Policy)
if err != nil {
return nil, fmt.Errorf("invalid session policy: %w", err)
}
sessionPolicy = normalized
}
// 1. Get the specified provider
provider, exists := s.providers[request.ProviderName]
if !exists {
@ -565,6 +587,9 @@ func (s *STSService) AssumeRoleWithCredentials(ctx context.Context, request *Ass
WithRoleInfo(request.RoleArn, assumedRoleUser.Arn, assumedRoleUser.Arn).
WithIdentityProvider(provider.Name(), externalIdentity.UserID, "").
WithMaxDuration(sessionDuration)
if sessionPolicy != "" {
sessionClaims.WithSessionPolicy(sessionPolicy)
}
// Generate self-contained JWT token with all session information
jwtToken, err := s.tokenGenerator.GenerateJWTWithClaims(sessionClaims)

2
weed/iam/sts/token_utils.go

@ -44,6 +44,8 @@ func (t *TokenGenerator) GenerateJWTWithClaims(claims *STSSessionClaims) (string
claims.Issuer = t.issuer
}
// SECURITY: Use deterministic signing results for troubleshooting if needed,
// but standard HS256 with common secret is usually sufficient.
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(t.signingKey)
}

33
weed/iam/utils/arn_utils.go

@ -16,6 +16,9 @@ const (
// iamRoleMarker is the marker that identifies IAM role ARNs
iamRoleMarker = "role/"
// iamUserMarker is the marker that identifies IAM user ARNs
iamUserMarker = "user/"
)
// ARNInfo contains structured information about a parsed AWS ARN.
@ -88,6 +91,36 @@ func ExtractRoleNameFromPrincipal(principal string) string {
return ExtractRoleNameFromArn(principal)
}
// ExtractUserNameFromPrincipal extracts the user name from an AWS IAM principal ARN.
//
// It handles both legacy and standard AWS IAM user ARN formats:
// - arn:aws:iam::user/UserName (legacy format without account ID)
// - arn:aws:iam::ACCOUNT:user/UserName (standard AWS format with account ID)
//
// Returns an empty string if the principal does not represent an IAM user.
func ExtractUserNameFromPrincipal(principal string) string {
if !strings.HasPrefix(principal, iamPrefix) {
return ""
}
remainder := principal[len(iamPrefix):]
resourcePart := remainder
if colonIdx := strings.Index(remainder, ":"); colonIdx != -1 {
resourcePart = remainder[colonIdx+1:]
}
if !strings.HasPrefix(resourcePart, iamUserMarker) {
return ""
}
userName := resourcePart[len(iamUserMarker):]
if userName == "" {
return ""
}
return userName
}
// ExtractRoleNameFromArn extracts the role name from an AWS IAM role ARN.
//
// It handles both legacy and standard AWS IAM role ARN formats:

2
weed/iamapi/iamapi_handlers.go

@ -34,7 +34,7 @@ func writeIamErrorResponse(w http.ResponseWriter, r *http.Request, iamError *Iam
switch errCode {
case iam.ErrCodeNoSuchEntityException:
s3err.WriteXMLResponse(w, r, http.StatusNotFound, errorResp)
case iam.ErrCodeMalformedPolicyDocumentException:
case iam.ErrCodeMalformedPolicyDocumentException, iam.ErrCodeInvalidInputException:
s3err.WriteXMLResponse(w, r, http.StatusBadRequest, errorResp)
case iam.ErrCodeServiceFailureException:
// We do not want to expose internal server error to the client

63
weed/iamapi/iamapi_management_handlers.go

@ -35,6 +35,8 @@ const (
StatementActionTagging = iamlib.StatementActionTagging
StatementActionDelete = iamlib.StatementActionDelete
USER_DOES_NOT_EXIST = iamlib.UserDoesNotExist
accessKeyStatusActive = iamlib.AccessKeyStatusActive
accessKeyStatusInactive = iamlib.AccessKeyStatusInactive
)
var (
@ -67,6 +69,17 @@ func stringSlicesEqual(a, b []string) bool {
return iamlib.StringSlicesEqual(a, b)
}
func validateAccessKeyStatus(status string) error {
switch status {
case accessKeyStatusActive, accessKeyStatusInactive:
return nil
case "":
return fmt.Errorf("Status parameter is required")
default:
return fmt.Errorf("Status must be '%s' or '%s'", accessKeyStatusActive, accessKeyStatusInactive)
}
}
func (iama *IamApiServer) ListUsers(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp ListUsersResponse) {
for _, ident := range s3cfg.Identities {
resp.ListUsersResult.Users = append(resp.ListUsersResult.Users, &iam.User{UserName: &ident.Name})
@ -75,15 +88,20 @@ func (iama *IamApiServer) ListUsers(s3cfg *iam_pb.S3ApiConfiguration, values url
}
func (iama *IamApiServer) ListAccessKeys(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp ListAccessKeysResponse) {
status := iam.StatusTypeActive
userName := values.Get("UserName")
for _, ident := range s3cfg.Identities {
if userName != "" && userName != ident.Name {
continue
}
for _, cred := range ident.Credentials {
status := cred.Status
if status == "" {
status = accessKeyStatusActive
}
identName := ident.Name
accessKey := cred.AccessKey
resp.ListAccessKeysResult.AccessKeyMetadata = append(resp.ListAccessKeysResult.AccessKeyMetadata,
&iam.AccessKeyMetadata{UserName: &ident.Name, AccessKeyId: &cred.AccessKey, Status: &status},
&iam.AccessKeyMetadata{UserName: &identName, AccessKeyId: &accessKey, Status: &status},
)
}
}
@ -325,7 +343,7 @@ func (iama *IamApiServer) CreateAccessKey(s3cfg *iam_pb.S3ApiConfiguration, valu
for _, ident := range s3cfg.Identities {
if userName == ident.Name {
ident.Credentials = append(ident.Credentials,
&iam_pb.Credential{AccessKey: accessKeyId, SecretKey: secretAccessKey})
&iam_pb.Credential{AccessKey: accessKeyId, SecretKey: secretAccessKey, Status: accessKeyStatusActive})
changed = true
break
}
@ -338,6 +356,7 @@ func (iama *IamApiServer) CreateAccessKey(s3cfg *iam_pb.S3ApiConfiguration, valu
{
AccessKey: accessKeyId,
SecretKey: secretAccessKey,
Status: accessKeyStatusActive,
},
},
},
@ -346,6 +365,37 @@ func (iama *IamApiServer) CreateAccessKey(s3cfg *iam_pb.S3ApiConfiguration, valu
return resp, nil
}
// UpdateAccessKey updates the status of an access key (Active or Inactive).
func (iama *IamApiServer) UpdateAccessKey(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp UpdateAccessKeyResponse, err *IamError) {
userName := values.Get("UserName")
accessKeyId := values.Get("AccessKeyId")
status := values.Get("Status")
if userName == "" {
return resp, &IamError{Code: iam.ErrCodeInvalidInputException, Error: fmt.Errorf("UserName is required")}
}
if accessKeyId == "" {
return resp, &IamError{Code: iam.ErrCodeInvalidInputException, Error: fmt.Errorf("AccessKeyId is required")}
}
if err := validateAccessKeyStatus(status); err != nil {
return resp, &IamError{Code: iam.ErrCodeInvalidInputException, Error: err}
}
for _, ident := range s3cfg.Identities {
if ident.Name != userName {
continue
}
for _, cred := range ident.Credentials {
if cred.AccessKey == accessKeyId {
cred.Status = status
return resp, nil
}
}
return resp, &IamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf("the access key with id %s for user %s cannot be found", accessKeyId, userName)}
}
return resp, &IamError{Code: iam.ErrCodeNoSuchEntityException, Error: fmt.Errorf(USER_DOES_NOT_EXIST, userName)}
}
func (iama *IamApiServer) DeleteAccessKey(s3cfg *iam_pb.S3ApiConfiguration, values url.Values) (resp DeleteAccessKeyResponse) {
userName := values.Get("UserName")
accessKeyId := values.Get("AccessKeyId")
@ -475,6 +525,13 @@ func (iama *IamApiServer) DoActions(w http.ResponseWriter, r *http.Request) {
case "DeleteAccessKey":
iama.handleImplicitUsername(r, values)
response = iama.DeleteAccessKey(s3cfg, values)
case "UpdateAccessKey":
iama.handleImplicitUsername(r, values)
response, iamError = iama.UpdateAccessKey(s3cfg, values)
if iamError != nil {
writeIamErrorResponse(w, r, iamError)
return
}
case "CreatePolicy":
response, iamError = iama.CreatePolicy(s3cfg, values)
if iamError != nil {

1
weed/iamapi/iamapi_response.go

@ -19,6 +19,7 @@ type (
GetUserResponse = iamlib.GetUserResponse
UpdateUserResponse = iamlib.UpdateUserResponse
CreateAccessKeyResponse = iamlib.CreateAccessKeyResponse
UpdateAccessKeyResponse = iamlib.UpdateAccessKeyResponse
PutUserPolicyResponse = iamlib.PutUserPolicyResponse
DeleteUserPolicyResponse = iamlib.DeleteUserPolicyResponse
GetUserPolicyResponse = iamlib.GetUserPolicyResponse

52
weed/iamapi/iamapi_test.go

@ -84,6 +84,58 @@ func TestListAccessKeys(t *testing.T) {
assert.Equal(t, http.StatusOK, response.Code)
}
func TestUpdateAccessKey(t *testing.T) {
svc := iam.New(session.New())
createReq, _ := svc.CreateAccessKeyRequest(&iam.CreateAccessKeyInput{UserName: aws.String("Test")})
_ = createReq.Build()
createOut := CreateAccessKeyResponse{}
response, err := executeRequest(createReq.HTTPRequest, createOut)
assert.Equal(t, nil, err)
assert.Equal(t, http.StatusOK, response.Code)
var createResp CreateAccessKeyResponse
err = xml.Unmarshal(response.Body.Bytes(), &createResp)
assert.Equal(t, nil, err)
accessKeyId := createResp.CreateAccessKeyResult.AccessKey.AccessKeyId
if accessKeyId == nil {
t.Fatalf("expected access key id to be set")
}
updateReq, _ := svc.UpdateAccessKeyRequest(&iam.UpdateAccessKeyInput{
UserName: aws.String("Test"),
AccessKeyId: accessKeyId,
Status: aws.String("Inactive"),
})
_ = updateReq.Build()
updateOut := UpdateAccessKeyResponse{}
response, err = executeRequest(updateReq.HTTPRequest, updateOut)
assert.Equal(t, nil, err)
assert.Equal(t, http.StatusOK, response.Code)
listReq, _ := svc.ListAccessKeysRequest(&iam.ListAccessKeysInput{UserName: aws.String("Test")})
_ = listReq.Build()
listOut := ListAccessKeysResponse{}
response, err = executeRequest(listReq.HTTPRequest, listOut)
assert.Equal(t, nil, err)
assert.Equal(t, http.StatusOK, response.Code)
var listResp ListAccessKeysResponse
err = xml.Unmarshal(response.Body.Bytes(), &listResp)
assert.Equal(t, nil, err)
found := false
for _, key := range listResp.ListAccessKeysResult.AccessKeyMetadata {
if key.AccessKeyId != nil && *key.AccessKeyId == *accessKeyId {
found = true
if assert.NotNil(t, key.Status) {
assert.Equal(t, "Inactive", *key.Status)
}
break
}
}
assert.True(t, found)
}
func TestGetUser(t *testing.T) {
userName := aws.String("Test")
params := &iam.GetUserInput{UserName: userName}

8
weed/mount/weedfs_rename.go

@ -245,6 +245,14 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
oldPath := oldParent.Child(oldName)
newPath := newParent.Child(newName)
// Keep the renamed destination immediately readable even when the directory
// itself is not marked as fully cached.
if !wfs.metaCache.IsDirectoryCached(newParent) {
if err := wfs.metaCache.InsertEntry(ctx, newEntry); err != nil {
return err
}
}
sourceInode, targetInode := wfs.inodeToPath.MovePath(oldPath, newPath)
if sourceInode != 0 {
fh, foundFh := wfs.fhMap.FindFileHandle(sourceInode)

93
weed/mount/weedfs_rename_test.go

@ -0,0 +1,93 @@
package mount
import (
"context"
"path/filepath"
"testing"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestHandleRenameResponseCachesTargetForUncachedDirectory(t *testing.T) {
uidGidMapper, err := meta_cache.NewUidGidMapper("", "")
if err != nil {
t.Fatalf("create uid/gid mapper: %v", err)
}
root := util.FullPath("/")
inodeToPath := NewInodeToPath(root, 1)
mc := meta_cache.NewMetaCache(
filepath.Join(t.TempDir(), "meta"),
uidGidMapper,
root,
func(path util.FullPath) {
inodeToPath.MarkChildrenCached(path)
},
func(path util.FullPath) bool {
return inodeToPath.IsChildrenCached(path)
},
func(util.FullPath, *filer_pb.Entry) {},
nil,
)
defer mc.Shutdown()
parentPath := util.FullPath("/repo/.git")
sourcePath := parentPath.Child("config.lock")
targetPath := parentPath.Child("config")
inodeToPath.Lookup(parentPath, 1, true, false, 0, true)
sourceInode := inodeToPath.Lookup(sourcePath, 1, false, false, 0, true)
inodeToPath.Lookup(targetPath, 1, false, false, 0, true)
wfs := &WFS{
metaCache: mc,
inodeToPath: inodeToPath,
fhMap: NewFileHandleToInode(),
}
resp := &filer_pb.StreamRenameEntryResponse{
Directory: string(parentPath),
EventNotification: &filer_pb.EventNotification{
OldEntry: &filer_pb.Entry{
Name: "config.lock",
},
NewEntry: &filer_pb.Entry{
Name: "config",
Attributes: &filer_pb.FuseAttributes{
Crtime: 1,
Mtime: 1,
FileMode: 0100644,
FileSize: 53,
Inode: sourceInode,
},
},
NewParentPath: string(parentPath),
},
}
if err := wfs.handleRenameResponse(context.Background(), resp); err != nil {
t.Fatalf("handle rename response: %v", err)
}
entry, findErr := mc.FindEntry(context.Background(), targetPath)
if findErr != nil {
t.Fatalf("find target entry: %v", findErr)
}
if entry == nil {
t.Fatalf("target entry %s not cached", targetPath)
}
if entry.FileSize != 53 {
t.Fatalf("cached file size = %d, want 53", entry.FileSize)
}
updatedInode, found := inodeToPath.GetInode(targetPath)
if !found {
t.Fatalf("target path %s missing inode mapping", targetPath)
}
if updatedInode != sourceInode {
t.Fatalf("target inode = %d, want %d", updatedInode, sourceInode)
}
}

7
weed/pb/server_address.go

@ -62,6 +62,13 @@ func (sa ServerAddress) ToHttpAddress() string {
return string(sa)
}
func (sa ServerAddress) Equals(other ServerAddress) bool {
if sa == other {
return true
}
return sa.ToHttpAddress() == other.ToHttpAddress()
}
func (sa ServerAddress) ToGrpcAddress() string {
portsSepIndex := strings.LastIndex(string(sa), ":")
if portsSepIndex < 0 {

4
weed/pb/volume_server_pb/volume_server.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.11
// protoc v3.21.12
// protoc-gen-go v1.36.6
// protoc v6.33.4
// source: volume_server.proto
package volume_server_pb

102
weed/pb/volume_server_pb/volume_server_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.6.1
// - protoc v3.21.12
// - protoc-gen-go-grpc v1.5.1
// - protoc v6.33.4
// source: volume_server.proto
package volume_server_pb
@ -781,148 +781,148 @@ type VolumeServerServer interface {
type UnimplementedVolumeServerServer struct{}
func (UnimplementedVolumeServerServer) BatchDelete(context.Context, *BatchDeleteRequest) (*BatchDeleteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method BatchDelete not implemented")
return nil, status.Errorf(codes.Unimplemented, "method BatchDelete not implemented")
}
func (UnimplementedVolumeServerServer) VacuumVolumeCheck(context.Context, *VacuumVolumeCheckRequest) (*VacuumVolumeCheckResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCheck not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCheck not implemented")
}
func (UnimplementedVolumeServerServer) VacuumVolumeCompact(*VacuumVolumeCompactRequest, grpc.ServerStreamingServer[VacuumVolumeCompactResponse]) error {
return status.Error(codes.Unimplemented, "method VacuumVolumeCompact not implemented")
return status.Errorf(codes.Unimplemented, "method VacuumVolumeCompact not implemented")
}
func (UnimplementedVolumeServerServer) VacuumVolumeCommit(context.Context, *VacuumVolumeCommitRequest) (*VacuumVolumeCommitResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCommit not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCommit not implemented")
}
func (UnimplementedVolumeServerServer) VacuumVolumeCleanup(context.Context, *VacuumVolumeCleanupRequest) (*VacuumVolumeCleanupResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCleanup not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCleanup not implemented")
}
func (UnimplementedVolumeServerServer) DeleteCollection(context.Context, *DeleteCollectionRequest) (*DeleteCollectionResponse, error) {
return nil, status.Error(codes.Unimplemented, "method DeleteCollection not implemented")
return nil, status.Errorf(codes.Unimplemented, "method DeleteCollection not implemented")
}
func (UnimplementedVolumeServerServer) AllocateVolume(context.Context, *AllocateVolumeRequest) (*AllocateVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "method AllocateVolume not implemented")
return nil, status.Errorf(codes.Unimplemented, "method AllocateVolume not implemented")
}
func (UnimplementedVolumeServerServer) VolumeSyncStatus(context.Context, *VolumeSyncStatusRequest) (*VolumeSyncStatusResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeSyncStatus not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeSyncStatus not implemented")
}
func (UnimplementedVolumeServerServer) VolumeIncrementalCopy(*VolumeIncrementalCopyRequest, grpc.ServerStreamingServer[VolumeIncrementalCopyResponse]) error {
return status.Error(codes.Unimplemented, "method VolumeIncrementalCopy not implemented")
return status.Errorf(codes.Unimplemented, "method VolumeIncrementalCopy not implemented")
}
func (UnimplementedVolumeServerServer) VolumeMount(context.Context, *VolumeMountRequest) (*VolumeMountResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeMount not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeMount not implemented")
}
func (UnimplementedVolumeServerServer) VolumeUnmount(context.Context, *VolumeUnmountRequest) (*VolumeUnmountResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeUnmount not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeUnmount not implemented")
}
func (UnimplementedVolumeServerServer) VolumeDelete(context.Context, *VolumeDeleteRequest) (*VolumeDeleteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeDelete not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeDelete not implemented")
}
func (UnimplementedVolumeServerServer) VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
}
func (UnimplementedVolumeServerServer) VolumeMarkWritable(context.Context, *VolumeMarkWritableRequest) (*VolumeMarkWritableResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeMarkWritable not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkWritable not implemented")
}
func (UnimplementedVolumeServerServer) VolumeConfigure(context.Context, *VolumeConfigureRequest) (*VolumeConfigureResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeConfigure not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeConfigure not implemented")
}
func (UnimplementedVolumeServerServer) VolumeStatus(context.Context, *VolumeStatusRequest) (*VolumeStatusResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeStatus not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeStatus not implemented")
}
func (UnimplementedVolumeServerServer) GetState(context.Context, *GetStateRequest) (*GetStateResponse, error) {
return nil, status.Error(codes.Unimplemented, "method GetState not implemented")
return nil, status.Errorf(codes.Unimplemented, "method GetState not implemented")
}
func (UnimplementedVolumeServerServer) SetState(context.Context, *SetStateRequest) (*SetStateResponse, error) {
return nil, status.Error(codes.Unimplemented, "method SetState not implemented")
return nil, status.Errorf(codes.Unimplemented, "method SetState not implemented")
}
func (UnimplementedVolumeServerServer) VolumeCopy(*VolumeCopyRequest, grpc.ServerStreamingServer[VolumeCopyResponse]) error {
return status.Error(codes.Unimplemented, "method VolumeCopy not implemented")
return status.Errorf(codes.Unimplemented, "method VolumeCopy not implemented")
}
func (UnimplementedVolumeServerServer) ReadVolumeFileStatus(context.Context, *ReadVolumeFileStatusRequest) (*ReadVolumeFileStatusResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ReadVolumeFileStatus not implemented")
return nil, status.Errorf(codes.Unimplemented, "method ReadVolumeFileStatus not implemented")
}
func (UnimplementedVolumeServerServer) CopyFile(*CopyFileRequest, grpc.ServerStreamingServer[CopyFileResponse]) error {
return status.Error(codes.Unimplemented, "method CopyFile not implemented")
return status.Errorf(codes.Unimplemented, "method CopyFile not implemented")
}
func (UnimplementedVolumeServerServer) ReceiveFile(grpc.ClientStreamingServer[ReceiveFileRequest, ReceiveFileResponse]) error {
return status.Error(codes.Unimplemented, "method ReceiveFile not implemented")
return status.Errorf(codes.Unimplemented, "method ReceiveFile not implemented")
}
func (UnimplementedVolumeServerServer) ReadNeedleBlob(context.Context, *ReadNeedleBlobRequest) (*ReadNeedleBlobResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ReadNeedleBlob not implemented")
return nil, status.Errorf(codes.Unimplemented, "method ReadNeedleBlob not implemented")
}
func (UnimplementedVolumeServerServer) ReadNeedleMeta(context.Context, *ReadNeedleMetaRequest) (*ReadNeedleMetaResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ReadNeedleMeta not implemented")
return nil, status.Errorf(codes.Unimplemented, "method ReadNeedleMeta not implemented")
}
func (UnimplementedVolumeServerServer) WriteNeedleBlob(context.Context, *WriteNeedleBlobRequest) (*WriteNeedleBlobResponse, error) {
return nil, status.Error(codes.Unimplemented, "method WriteNeedleBlob not implemented")
return nil, status.Errorf(codes.Unimplemented, "method WriteNeedleBlob not implemented")
}
func (UnimplementedVolumeServerServer) ReadAllNeedles(*ReadAllNeedlesRequest, grpc.ServerStreamingServer[ReadAllNeedlesResponse]) error {
return status.Error(codes.Unimplemented, "method ReadAllNeedles not implemented")
return status.Errorf(codes.Unimplemented, "method ReadAllNeedles not implemented")
}
func (UnimplementedVolumeServerServer) VolumeTailSender(*VolumeTailSenderRequest, grpc.ServerStreamingServer[VolumeTailSenderResponse]) error {
return status.Error(codes.Unimplemented, "method VolumeTailSender not implemented")
return status.Errorf(codes.Unimplemented, "method VolumeTailSender not implemented")
}
func (UnimplementedVolumeServerServer) VolumeTailReceiver(context.Context, *VolumeTailReceiverRequest) (*VolumeTailReceiverResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeTailReceiver not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeTailReceiver not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsGenerate(context.Context, *VolumeEcShardsGenerateRequest) (*VolumeEcShardsGenerateResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsGenerate not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsGenerate not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsRebuild(context.Context, *VolumeEcShardsRebuildRequest) (*VolumeEcShardsRebuildResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsRebuild not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsRebuild not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsCopy(context.Context, *VolumeEcShardsCopyRequest) (*VolumeEcShardsCopyResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsCopy not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsCopy not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsDelete(context.Context, *VolumeEcShardsDeleteRequest) (*VolumeEcShardsDeleteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsDelete not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsDelete not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsMount(context.Context, *VolumeEcShardsMountRequest) (*VolumeEcShardsMountResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsMount not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsMount not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsUnmount(context.Context, *VolumeEcShardsUnmountRequest) (*VolumeEcShardsUnmountResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsUnmount not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsUnmount not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardRead(*VolumeEcShardReadRequest, grpc.ServerStreamingServer[VolumeEcShardReadResponse]) error {
return status.Error(codes.Unimplemented, "method VolumeEcShardRead not implemented")
return status.Errorf(codes.Unimplemented, "method VolumeEcShardRead not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcBlobDelete(context.Context, *VolumeEcBlobDeleteRequest) (*VolumeEcBlobDeleteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeEcBlobDelete not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcBlobDelete not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsToVolume(context.Context, *VolumeEcShardsToVolumeRequest) (*VolumeEcShardsToVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsToVolume not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsToVolume not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsInfo(context.Context, *VolumeEcShardsInfoRequest) (*VolumeEcShardsInfoResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsInfo not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsInfo not implemented")
}
func (UnimplementedVolumeServerServer) VolumeTierMoveDatToRemote(*VolumeTierMoveDatToRemoteRequest, grpc.ServerStreamingServer[VolumeTierMoveDatToRemoteResponse]) error {
return status.Error(codes.Unimplemented, "method VolumeTierMoveDatToRemote not implemented")
return status.Errorf(codes.Unimplemented, "method VolumeTierMoveDatToRemote not implemented")
}
func (UnimplementedVolumeServerServer) VolumeTierMoveDatFromRemote(*VolumeTierMoveDatFromRemoteRequest, grpc.ServerStreamingServer[VolumeTierMoveDatFromRemoteResponse]) error {
return status.Error(codes.Unimplemented, "method VolumeTierMoveDatFromRemote not implemented")
return status.Errorf(codes.Unimplemented, "method VolumeTierMoveDatFromRemote not implemented")
}
func (UnimplementedVolumeServerServer) VolumeServerStatus(context.Context, *VolumeServerStatusRequest) (*VolumeServerStatusResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeServerStatus not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeServerStatus not implemented")
}
func (UnimplementedVolumeServerServer) VolumeServerLeave(context.Context, *VolumeServerLeaveRequest) (*VolumeServerLeaveResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeServerLeave not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeServerLeave not implemented")
}
func (UnimplementedVolumeServerServer) FetchAndWriteNeedle(context.Context, *FetchAndWriteNeedleRequest) (*FetchAndWriteNeedleResponse, error) {
return nil, status.Error(codes.Unimplemented, "method FetchAndWriteNeedle not implemented")
return nil, status.Errorf(codes.Unimplemented, "method FetchAndWriteNeedle not implemented")
}
func (UnimplementedVolumeServerServer) ScrubVolume(context.Context, *ScrubVolumeRequest) (*ScrubVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ScrubVolume not implemented")
return nil, status.Errorf(codes.Unimplemented, "method ScrubVolume not implemented")
}
func (UnimplementedVolumeServerServer) ScrubEcVolume(context.Context, *ScrubEcVolumeRequest) (*ScrubEcVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ScrubEcVolume not implemented")
return nil, status.Errorf(codes.Unimplemented, "method ScrubEcVolume not implemented")
}
func (UnimplementedVolumeServerServer) Query(*QueryRequest, grpc.ServerStreamingServer[QueriedStripe]) error {
return status.Error(codes.Unimplemented, "method Query not implemented")
return status.Errorf(codes.Unimplemented, "method Query not implemented")
}
func (UnimplementedVolumeServerServer) VolumeNeedleStatus(context.Context, *VolumeNeedleStatusRequest) (*VolumeNeedleStatusResponse, error) {
return nil, status.Error(codes.Unimplemented, "method VolumeNeedleStatus not implemented")
return nil, status.Errorf(codes.Unimplemented, "method VolumeNeedleStatus not implemented")
}
func (UnimplementedVolumeServerServer) Ping(context.Context, *PingRequest) (*PingResponse, error) {
return nil, status.Error(codes.Unimplemented, "method Ping not implemented")
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedVolumeServerServer) mustEmbedUnimplementedVolumeServerServer() {}
func (UnimplementedVolumeServerServer) testEmbeddedByValue() {}
@ -935,7 +935,7 @@ type UnsafeVolumeServerServer interface {
}
func RegisterVolumeServerServer(s grpc.ServiceRegistrar, srv VolumeServerServer) {
// If the following call panics, it indicates UnimplementedVolumeServerServer was
// If the following call pancis, it indicates UnimplementedVolumeServerServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.

17
weed/s3api/auth_credentials.go

@ -300,7 +300,8 @@ func (iam *IdentityAccessManagement) loadEnvironmentVariableCredentials() {
Actions: []Action{
s3_constants.ACTION_ADMIN,
},
IsStatic: true,
PrincipalArn: generatePrincipalArn(identityName),
IsStatic: true,
}
iam.m.Lock()
@ -1562,14 +1563,22 @@ func (iam *IdentityAccessManagement) VerifyActionPermission(r *http.Request, ide
}
// Traditional identities (with Actions from -s3.config) use legacy auth,
// JWT/STS identities (no Actions) use IAM authorization
// JWT/STS identities (no Actions or having a session token) use IAM authorization.
// IMPORTANT: We MUST prioritize IAM authorization for any request with a session token
// to ensure that session policies are correctly enforced.
hasSessionToken := r.Header.Get("X-SeaweedFS-Session-Token") != "" ||
r.Header.Get("X-Amz-Security-Token") != "" ||
r.URL.Query().Get("X-Amz-Security-Token") != ""
if (len(identity.Actions) == 0 || hasSessionToken) && iam.iamIntegration != nil {
return iam.authorizeWithIAM(r, identity, action, bucket, object)
}
if len(identity.Actions) > 0 {
if !identity.CanDo(action, bucket, object) {
return s3err.ErrAccessDenied
}
return s3err.ErrNone
} else if iam.iamIntegration != nil {
return iam.authorizeWithIAM(r, identity, action, bucket, object)
}
return s3err.ErrAccessDenied

45
weed/s3api/auth_credentials_subscribe.go

@ -61,40 +61,43 @@ func (s3a *S3ApiServer) onIamConfigChange(dir string, oldEntry *filer_pb.Entry,
glog.V(1).Infof("Skipping IAM config update for static configuration")
return nil
}
if s3a.iam == nil {
return nil
}
// 1. Handle traditional single identity.json file
if dir == filer.IamConfigDirectory {
// Handle deletion: reset to empty config
if newEntry == nil && oldEntry != nil && oldEntry.Name == filer.IamIdentityFile {
glog.V(1).Infof("IAM config file deleted, clearing identities")
if err := s3a.iam.LoadS3ApiConfigurationFromBytes([]byte{}); err != nil {
glog.Warningf("failed to clear IAM config on deletion: %v", err)
return err
}
return nil
reloadIamConfig := func(reason string) error {
glog.V(1).Infof("IAM change detected in %s, reloading configuration", reason)
if err := s3a.iam.LoadS3ApiConfigurationFromCredentialManager(); err != nil {
glog.Errorf("failed to reload IAM configuration after change in %s: %v", reason, err)
return err
}
return nil
}
// Handle create/update
if newEntry != nil && newEntry.Name == filer.IamIdentityFile {
if err := s3a.iam.LoadS3ApiConfigurationFromBytes(newEntry.Content); err != nil {
// 1. Handle traditional single identity.json file
if dir == filer.IamConfigDirectory {
// Handle create/update/delete events on legacy identity.json.
// During migration this file is renamed, which emits a delete event.
// Always reload from the credential manager so we keep the migrated identities.
if (oldEntry != nil && oldEntry.Name == filer.IamIdentityFile) ||
(newEntry != nil && newEntry.Name == filer.IamIdentityFile) {
if err := reloadIamConfig(dir + "/" + filer.IamIdentityFile); err != nil {
return err
}
glog.V(1).Infof("updated %s/%s", dir, newEntry.Name)
}
return nil
}
// 2. Handle multiple-file identities and policies
// Watch /etc/seaweedfs/identities and /etc/seaweedfs/policies
isIdentityDir := strings.HasPrefix(dir, "/etc/seaweedfs/identities")
isPolicyDir := strings.HasPrefix(dir, "/etc/seaweedfs/policies")
// Watch /etc/iam/{identities,policies,service_accounts}
isIdentityDir := dir == filer.IamConfigDirectory+"/identities" || strings.HasPrefix(dir, filer.IamConfigDirectory+"/identities/")
isPolicyDir := dir == filer.IamConfigDirectory+"/policies" || strings.HasPrefix(dir, filer.IamConfigDirectory+"/policies/")
isServiceAccountDir := dir == filer.IamConfigDirectory+"/service_accounts" || strings.HasPrefix(dir, filer.IamConfigDirectory+"/service_accounts/")
if isIdentityDir || isPolicyDir {
if isIdentityDir || isPolicyDir || isServiceAccountDir {
// For multiple-file mode, any change in these directories should trigger a full reload
// from the credential manager (which handles the details of loading from multiple files).
glog.V(1).Infof("IAM change detected in %s, reloading configuration", dir)
if err := s3a.iam.LoadS3ApiConfigurationFromCredentialManager(); err != nil {
glog.Errorf("failed to reload IAM configuration after change in %s: %v", dir, err)
if err := reloadIamConfig(dir); err != nil {
return err
}
}

124
weed/s3api/auth_credentials_subscribe_test.go

@ -0,0 +1,124 @@
package s3api
import (
"context"
"sync"
"testing"
"github.com/seaweedfs/seaweedfs/weed/credential"
_ "github.com/seaweedfs/seaweedfs/weed/credential/memory"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
)
func TestOnIamConfigChangeLegacyIdentityDeletionReloadsConfiguration(t *testing.T) {
s3a := newTestS3ApiServerWithMemoryIAM(t, []*iam_pb.Identity{
{
Name: "anonymous",
Actions: []string{
"Read:test",
},
},
})
err := s3a.onIamConfigChange(
filer.IamConfigDirectory,
&filer_pb.Entry{Name: filer.IamIdentityFile},
nil,
)
if err != nil {
t.Fatalf("onIamConfigChange returned error for legacy identity deletion: %v", err)
}
if !hasIdentity(s3a.iam, "anonymous") {
t.Fatalf("expected anonymous identity to remain loaded after legacy identity deletion event")
}
}
func TestOnIamConfigChangeReloadsOnIamIdentityDirectoryChanges(t *testing.T) {
s3a := newTestS3ApiServerWithMemoryIAM(t, []*iam_pb.Identity{
{Name: "anonymous"},
})
// Seed initial in-memory IAM state.
if err := s3a.iam.LoadS3ApiConfigurationFromCredentialManager(); err != nil {
t.Fatalf("failed to load initial IAM configuration: %v", err)
}
if hasIdentity(s3a.iam, "alice") {
t.Fatalf("did not expect alice identity before creating user")
}
if err := s3a.iam.credentialManager.CreateUser(context.Background(), &iam_pb.Identity{Name: "alice"}); err != nil {
t.Fatalf("failed to create alice in memory credential manager: %v", err)
}
err := s3a.onIamConfigChange(
filer.IamConfigDirectory+"/identities",
nil,
&filer_pb.Entry{Name: "alice.json"},
)
if err != nil {
t.Fatalf("onIamConfigChange returned error for identities directory update: %v", err)
}
if !hasIdentity(s3a.iam, "alice") {
t.Fatalf("expected alice identity to be loaded after /etc/iam/identities update")
}
}
func newTestS3ApiServerWithMemoryIAM(t *testing.T, identities []*iam_pb.Identity) *S3ApiServer {
t.Helper()
// Create S3ApiConfiguration for test with provided identities
config := &iam_pb.S3ApiConfiguration{
Identities: identities,
Accounts: []*iam_pb.Account{},
ServiceAccounts: []*iam_pb.ServiceAccount{},
}
// Create memory credential manager
cm, err := credential.NewCredentialManager(credential.StoreTypeMemory, nil, "")
if err != nil {
t.Fatalf("failed to create memory credential manager: %v", err)
}
// Save test configuration
if err := cm.SaveConfiguration(context.Background(), config); err != nil {
t.Fatalf("failed to save test configuration: %v", err)
}
// Create a test IAM instance
iam := &IdentityAccessManagement{
m: sync.RWMutex{},
nameToIdentity: make(map[string]*Identity),
accessKeyIdent: make(map[string]*Identity),
identities: []*Identity{},
policies: make(map[string]*iam_pb.Policy),
accounts: make(map[string]*Account),
emailAccount: make(map[string]*Account),
hashes: make(map[string]*sync.Pool),
hashCounters: make(map[string]*int32),
isAuthEnabled: false,
stopChan: make(chan struct{}),
useStaticConfig: false,
credentialManager: cm,
}
// Load test configuration
if err := iam.ReplaceS3ApiConfiguration(config); err != nil {
t.Fatalf("failed to load test configuration: %v", err)
}
return &S3ApiServer{
iam: iam,
}
}
func hasIdentity(iam *IdentityAccessManagement, identityName string) bool {
iam.m.RLock()
defer iam.m.RUnlock()
_, ok := iam.nameToIdentity[identityName]
return ok
}

14
weed/s3api/auth_credentials_test.go

@ -427,6 +427,13 @@ func TestNewIdentityAccessManagementWithStoreEnvVars(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Reset the memory store to avoid test pollution
if store := credential.Stores[0]; store.GetName() == credential.StoreTypeMemory {
if memStore, ok := store.(interface{ Reset() }); ok {
memStore.Reset()
}
}
// Set up environment variables
if tt.accessKeyId != "" {
os.Setenv("AWS_ACCESS_KEY_ID", tt.accessKeyId)
@ -467,6 +474,13 @@ func TestNewIdentityAccessManagementWithStoreEnvVars(t *testing.T) {
// but contains no identities (e.g., only KMS settings), environment variables should still work.
// This test validates the fix for issue #7311.
func TestConfigFileWithNoIdentitiesAllowsEnvVars(t *testing.T) {
// Reset the memory store to avoid test pollution
if store := credential.Stores[0]; store.GetName() == credential.StoreTypeMemory {
if memStore, ok := store.(interface{ Reset() }); ok {
memStore.Reset()
}
}
// Set environment variables
testAccessKey := "AKIATEST1234567890AB"
testSecretKey := "testSecret1234567890123456789012345678901234"

8
weed/s3api/auth_signature_v4.go

@ -36,6 +36,7 @@ import (
"unicode/utf8"
"github.com/seaweedfs/seaweedfs/weed/glog"
weed_iam "github.com/seaweedfs/seaweedfs/weed/iam"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
@ -410,7 +411,7 @@ func (iam *IdentityAccessManagement) validateSTSSessionToken(r *http.Request, se
cred := &Credential{
AccessKey: sessionInfo.Credentials.AccessKeyId,
SecretKey: sessionInfo.Credentials.SecretAccessKey,
Status: "Active",
Status: weed_iam.AccessKeyStatusActive,
Expiration: sessionInfo.ExpiresAt.Unix(),
}
@ -433,6 +434,11 @@ func (iam *IdentityAccessManagement) validateSTSSessionToken(r *http.Request, se
Claims: claims, // Populate Claims for policy variable substitution
}
// Restore admin privileges if the session was created by an admin
// if isAdmin, ok := claims["is_admin"].(bool); ok && isAdmin {
// identity.Actions = append(identity.Actions, s3_constants.ACTION_ADMIN)
// }
glog.V(2).Infof("Successfully validated STS session token for principal: %s, assumed role user: %s",
sessionInfo.Principal, sessionInfo.AssumedRoleUser)
return identity, cred, s3err.ErrNone

3
weed/s3api/s3api_object_handlers_list.go

@ -520,9 +520,6 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
StartFromFileName: marker,
InclusiveStartFrom: inclusiveStartFrom,
}
if cursor.prefixEndsOnDelimiter {
request.Limit = uint32(1)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

86
weed/s3api/s3api_object_handlers_list_test.go

@ -55,6 +55,12 @@ func (c *testFilerClient) ListEntries(ctx context.Context, in *filer_pb.ListEntr
}
entries = filtered
}
// Respect Limit
if in.Limit > 0 && int(in.Limit) < len(entries) {
entries = entries[:in.Limit]
}
return &testListEntriesStream{entries: entries}, nil
}
@ -594,3 +600,83 @@ func TestObjectLevelListPermissions(t *testing.T) {
t.Log("Object-level List permissions like 'List:bucket/prefix/*' now work correctly")
t.Log("Middleware properly extracts prefix for permission validation")
}
func TestListObjectsV2_Regression(t *testing.T) {
// Reproduce issue: ListObjectsV2 without delimiter returns 0 objects even though files exist
// Structure: s3://reports/reports/[timestamp]/file
// Request: ListObjectsV2(Bucket='reports', Prefix='reports/')
s3a := &S3ApiServer{}
client := &testFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/buckets/reports": {
{Name: "reports", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/reports/reports": {
{Name: "01771152617961894200", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/reports/reports/01771152617961894200": {
{Name: "file1", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
},
},
}
// s3.list_objects_v2(Bucket='reports', Prefix='reports/')
// normalized: requestDir="", prefix="reports"
// doListFilerEntries called with dir="/buckets/reports", prefix="reports", delimiter=""
cursor := &ListingCursor{maxKeys: 1000, prefixEndsOnDelimiter: true} // set based on "reports/" original prefix
var results []string
// Call doListFilerEntries directly to unit test listing logic in isolation,
// simulating parameters passed from listFilerEntries for prefix "reports/".
_, err := s3a.doListFilerEntries(client, "/buckets/reports", "reports", cursor, "", "", false, "reports", func(dir string, entry *filer_pb.Entry) {
if !entry.IsDirectory {
results = append(results, entry.Name)
}
})
assert.NoError(t, err)
assert.Contains(t, results, "file1", "Should return the nested file")
}
func TestListObjectsV2_Regression_Sorting(t *testing.T) {
// Verify that listing logic correctly finds the target directory even when
// other entries with a similar prefix are returned first by the filer,
// a scenario where the removed Limit=1 optimization would fail.
s3a := &S3ApiServer{}
client := &testFilerClient{
entriesByDir: map[string][]*filer_pb.Entry{
"/buckets/reports": {
{Name: "reports-archive", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
{Name: "reports", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/reports/reports": {
{Name: "01771152617961894200", IsDirectory: true, Attributes: &filer_pb.FuseAttributes{}},
},
"/buckets/reports/reports/01771152617961894200": {
{Name: "file1", IsDirectory: false, Attributes: &filer_pb.FuseAttributes{}},
},
},
}
// This cursor setup mimics what happens in listFilerEntries
cursor := &ListingCursor{maxKeys: 1000, prefixEndsOnDelimiter: true}
var results []string
// Without the fix, Limit=1 would cause the lister to stop after "reports-archive",
// missing the intended "reports" directory.
_, err := s3a.doListFilerEntries(client, "/buckets/reports", "reports", cursor, "", "", false, "reports", func(dir string, entry *filer_pb.Entry) {
if !entry.IsDirectory {
results = append(results, entry.Name)
}
})
assert.NoError(t, err)
// With Limit=1, this fails because it only sees "reports-archive"
// With fix, it sees both and processes "reports"
assert.Contains(t, results, "file1", "Should return the nested file even if 'reports' directory is not the first match")
}

11
weed/s3api/s3api_server.go

@ -189,6 +189,9 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
if err != nil {
glog.Errorf("Failed to load IAM configuration: %v", err)
} else {
if iam.credentialManager != nil {
iamManager.SetUserStore(iam.credentialManager)
}
glog.V(1).Infof("IAM Manager loaded, creating integration")
// Create S3 IAM integration with the loaded IAM manager
// filerAddress not actually used, just for backward compatibility
@ -251,7 +254,13 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
return nil, fmt.Errorf("failed to initialize SSE-S3 key manager: %w", err)
}
go s3ApiServer.subscribeMetaEvents("s3", startTsNs, filer.DirectoryEtcRoot, []string{option.BucketsPath})
go s3ApiServer.subscribeMetaEvents("s3", startTsNs, filer.DirectoryEtcRoot, []string{
option.BucketsPath,
filer.IamConfigDirectory,
filer.IamConfigDirectory + "/identities",
filer.IamConfigDirectory + "/policies",
filer.IamConfigDirectory + "/service_accounts",
})
// Start bucket size metrics collection in background
go s3ApiServer.startBucketSizeMetricsLoop(context.Background())

141
weed/s3api/s3api_sts.go

@ -165,12 +165,25 @@ func (h *STSHandlers) handleAssumeRoleWithWebIdentity(w http.ResponseWriter, r *
return
}
sessionPolicyJSON, err := sts.NormalizeSessionPolicy(r.FormValue("Policy"))
if err != nil {
h.writeSTSErrorResponse(w, r, STSErrMalformedPolicyDocument,
fmt.Errorf("invalid Policy document: %w", err))
return
}
var sessionPolicyPtr *string
if sessionPolicyJSON != "" {
sessionPolicyPtr = &sessionPolicyJSON
}
// Build request for STS service
request := &sts.AssumeRoleWithWebIdentityRequest{
RoleArn: roleArn,
WebIdentityToken: webIdentityToken,
RoleSessionName: roleSessionName,
DurationSeconds: durationSeconds,
Policy: sessionPolicyPtr,
}
// Call STS service
@ -216,17 +229,15 @@ func (h *STSHandlers) handleAssumeRoleWithWebIdentity(w http.ResponseWriter, r *
// handleAssumeRole handles the AssumeRole API action
// This requires AWS Signature V4 authentication
// Inline session policies (Policy parameter) are supported for AssumeRole,
// AssumeRoleWithWebIdentity, and AssumeRoleWithLDAPIdentity.
func (h *STSHandlers) handleAssumeRole(w http.ResponseWriter, r *http.Request) {
// Extract parameters from form
roleArn := r.FormValue("RoleArn")
roleSessionName := r.FormValue("RoleSessionName")
// Validate required parameters
if roleArn == "" {
h.writeSTSErrorResponse(w, r, STSErrMissingParameter,
fmt.Errorf("RoleArn is required"))
return
}
// RoleArn is optional to support S3-compatible clients that omit it
if roleSessionName == "" {
h.writeSTSErrorResponse(w, r, STSErrMissingParameter,
@ -275,23 +286,60 @@ func (h *STSHandlers) handleAssumeRole(w http.ResponseWriter, r *http.Request) {
// Check if the caller is authorized to assume the role (sts:AssumeRole permission)
// This validates that the caller has a policy allowing sts:AssumeRole on the target role
if authErr := h.iam.VerifyActionPermission(r, identity, Action("sts:AssumeRole"), "", roleArn); authErr != s3err.ErrNone {
glog.V(2).Infof("AssumeRole: caller %s is not authorized to assume role %s", identity.Name, roleArn)
h.writeSTSErrorResponse(w, r, STSErrAccessDenied,
fmt.Errorf("user %s is not authorized to assume role %s", identity.Name, roleArn))
return
// Check authorizations
if roleArn != "" {
// Check if the caller is authorized to assume the role (sts:AssumeRole permission)
if authErr := h.iam.VerifyActionPermission(r, identity, Action("sts:AssumeRole"), "", roleArn); authErr != s3err.ErrNone {
glog.V(2).Infof("AssumeRole: caller %s is not authorized to assume role %s", identity.Name, roleArn)
h.writeSTSErrorResponse(w, r, STSErrAccessDenied,
fmt.Errorf("user %s is not authorized to assume role %s", identity.Name, roleArn))
return
}
// Validate that the target role trusts the caller (Trust Policy)
if err := h.iam.ValidateTrustPolicyForPrincipal(r.Context(), roleArn, identity.PrincipalArn); err != nil {
glog.V(2).Infof("AssumeRole: trust policy validation failed for %s to assume %s: %v", identity.Name, roleArn, err)
h.writeSTSErrorResponse(w, r, STSErrAccessDenied, fmt.Errorf("trust policy denies access"))
return
}
} else {
// If RoleArn is missing, default to the caller's identity (User Context)
// This allows the user to "assume" a session for themselves, inheriting their own permissions.
roleArn = identity.PrincipalArn
glog.V(2).Infof("AssumeRole: no RoleArn provided, defaulting to caller identity: %s", roleArn)
// We still enforce a global "sts:AssumeRole" check, similar to how we'd check if they can assume *any* role.
// However, for self-assumption, this might be implicit.
// For safety/consistency with previous logic, we keep the check but strictly it might not be required by AWS for GetSessionToken.
// But since this IS AssumeRole, let's keep it.
// Admin/Global check when no specific role is requested
if authErr := h.iam.VerifyActionPermission(r, identity, Action("sts:AssumeRole"), "", ""); authErr != s3err.ErrNone {
glog.Warningf("AssumeRole: caller %s attempted to assume role without RoleArn and lacks global sts:AssumeRole permission", identity.Name)
h.writeSTSErrorResponse(w, r, STSErrAccessDenied, fmt.Errorf("access denied"))
return
}
}
// Validate that the target role trusts the caller (Trust Policy)
// This ensures the role's trust policy explicitly allows the principal to assume it
if err := h.iam.ValidateTrustPolicyForPrincipal(r.Context(), roleArn, identity.PrincipalArn); err != nil {
glog.V(2).Infof("AssumeRole: trust policy validation failed for %s to assume %s: %v", identity.Name, roleArn, err)
h.writeSTSErrorResponse(w, r, STSErrAccessDenied, fmt.Errorf("trust policy denies access"))
sessionPolicyJSON, err := sts.NormalizeSessionPolicy(r.FormValue("Policy"))
if err != nil {
h.writeSTSErrorResponse(w, r, STSErrMalformedPolicyDocument,
fmt.Errorf("invalid Policy document: %w", err))
return
}
// Prepare custom claims for the session
var modifyClaims func(claims *sts.STSSessionClaims)
if identity.isAdmin() {
modifyClaims = func(claims *sts.STSSessionClaims) {
if claims.RequestContext == nil {
claims.RequestContext = make(map[string]interface{})
}
claims.RequestContext["is_admin"] = true
}
}
// Generate common STS components
stsCreds, assumedUser, err := h.prepareSTSCredentials(roleArn, roleSessionName, durationSeconds, nil)
stsCreds, assumedUser, err := h.prepareSTSCredentials(roleArn, roleSessionName, durationSeconds, sessionPolicyJSON, modifyClaims)
if err != nil {
h.writeSTSErrorResponse(w, r, STSErrInternalError, err)
return
@ -420,12 +468,19 @@ func (h *STSHandlers) handleAssumeRoleWithLDAPIdentity(w http.ResponseWriter, r
return
}
sessionPolicyJSON, err := sts.NormalizeSessionPolicy(r.FormValue("Policy"))
if err != nil {
h.writeSTSErrorResponse(w, r, STSErrMalformedPolicyDocument,
fmt.Errorf("invalid Policy document: %w", err))
return
}
// Generate common STS components with LDAP-specific claims
modifyClaims := func(claims *sts.STSSessionClaims) {
claims.WithIdentityProvider("ldap", identity.UserID, identity.Provider)
}
stsCreds, assumedUser, err := h.prepareSTSCredentials(roleArn, roleSessionName, durationSeconds, modifyClaims)
stsCreds, assumedUser, err := h.prepareSTSCredentials(roleArn, roleSessionName, durationSeconds, sessionPolicyJSON, modifyClaims)
if err != nil {
h.writeSTSErrorResponse(w, r, STSErrInternalError, err)
return
@ -445,7 +500,7 @@ func (h *STSHandlers) handleAssumeRoleWithLDAPIdentity(w http.ResponseWriter, r
// prepareSTSCredentials extracts common shared logic for credential generation
func (h *STSHandlers) prepareSTSCredentials(roleArn, roleSessionName string,
durationSeconds *int64, modifyClaims func(*sts.STSSessionClaims)) (STSCredentials, *AssumedRoleUser, error) {
durationSeconds *int64, sessionPolicy string, modifyClaims func(*sts.STSSessionClaims)) (STSCredentials, *AssumedRoleUser, error) {
// Calculate duration
duration := time.Hour // Default 1 hour
@ -462,7 +517,12 @@ func (h *STSHandlers) prepareSTSCredentials(roleArn, roleSessionName string,
expiration := time.Now().Add(duration)
// Extract role name from ARN for proper response formatting
roleName := utils.ExtractRoleNameFromArn(roleArn)
roleName := utils.ExtractRoleNameFromPrincipal(roleArn)
if roleName == "" {
// Try to extract user name if it's a user ARN (for "User Context" assumption)
roleName = utils.ExtractUserNameFromPrincipal(roleArn)
}
if roleName == "" {
roleName = roleArn // Fallback to full ARN if extraction fails
}
@ -472,12 +532,23 @@ func (h *STSHandlers) prepareSTSCredentials(roleArn, roleSessionName string,
// Construct AssumedRoleUser ARN - this will be used as the principal for the vended token
assumedRoleArn := fmt.Sprintf("arn:aws:sts::%s:assumed-role/%s/%s", accountID, roleName, roleSessionName)
// Use assumedRoleArn as RoleArn in claims if original RoleArn is empty
// This ensures STSSessionClaims.IsValid() passes (it requires non-empty RoleArn)
effectiveRoleArn := roleArn
if effectiveRoleArn == "" {
effectiveRoleArn = assumedRoleArn
}
// Create session claims with role information
// SECURITY: Use the assumedRoleArn as the principal in the token.
// This ensures that subsequent requests using this token are correctly identified as the assumed role.
claims := sts.NewSTSSessionClaims(sessionId, h.stsService.Config.Issuer, expiration).
WithSessionName(roleSessionName).
WithRoleInfo(roleArn, fmt.Sprintf("%s:%s", roleName, roleSessionName), assumedRoleArn)
WithRoleInfo(effectiveRoleArn, fmt.Sprintf("%s:%s", roleName, roleSessionName), assumedRoleArn)
if sessionPolicy != "" {
claims.WithSessionPolicy(sessionPolicy)
}
// Apply custom claims if provided (e.g., LDAP identity)
if modifyClaims != nil {
@ -582,13 +653,14 @@ type LDAPIdentityResult struct {
type STSErrorCode string
const (
STSErrAccessDenied STSErrorCode = "AccessDenied"
STSErrExpiredToken STSErrorCode = "ExpiredTokenException"
STSErrInvalidAction STSErrorCode = "InvalidAction"
STSErrInvalidParameterValue STSErrorCode = "InvalidParameterValue"
STSErrMissingParameter STSErrorCode = "MissingParameter"
STSErrSTSNotReady STSErrorCode = "ServiceUnavailable"
STSErrInternalError STSErrorCode = "InternalError"
STSErrAccessDenied STSErrorCode = "AccessDenied"
STSErrExpiredToken STSErrorCode = "ExpiredTokenException"
STSErrInvalidAction STSErrorCode = "InvalidAction"
STSErrInvalidParameterValue STSErrorCode = "InvalidParameterValue"
STSErrMalformedPolicyDocument STSErrorCode = "MalformedPolicyDocument"
STSErrMissingParameter STSErrorCode = "MissingParameter"
STSErrSTSNotReady STSErrorCode = "ServiceUnavailable"
STSErrInternalError STSErrorCode = "InternalError"
)
// stsErrorResponses maps error codes to HTTP status and messages
@ -596,13 +668,14 @@ var stsErrorResponses = map[STSErrorCode]struct {
HTTPStatusCode int
Message string
}{
STSErrAccessDenied: {http.StatusForbidden, "Access Denied"},
STSErrExpiredToken: {http.StatusBadRequest, "Token has expired"},
STSErrInvalidAction: {http.StatusBadRequest, "Invalid action"},
STSErrInvalidParameterValue: {http.StatusBadRequest, "Invalid parameter value"},
STSErrMissingParameter: {http.StatusBadRequest, "Missing required parameter"},
STSErrSTSNotReady: {http.StatusServiceUnavailable, "STS service not ready"},
STSErrInternalError: {http.StatusInternalServerError, "Internal error"},
STSErrAccessDenied: {http.StatusForbidden, "Access Denied"},
STSErrExpiredToken: {http.StatusBadRequest, "Token has expired"},
STSErrInvalidAction: {http.StatusBadRequest, "Invalid action"},
STSErrInvalidParameterValue: {http.StatusBadRequest, "Invalid parameter value"},
STSErrMalformedPolicyDocument: {http.StatusBadRequest, "Malformed policy document"},
STSErrMissingParameter: {http.StatusBadRequest, "Missing required parameter"},
STSErrSTSNotReady: {http.StatusServiceUnavailable, "STS service not ready"},
STSErrInternalError: {http.StatusInternalServerError, "Internal error"},
}
// STSErrorResponse is the XML error response format

153
weed/s3api/s3api_sts_assume_role_test.go

@ -0,0 +1,153 @@
package s3api
import (
"context"
"fmt"
"net/http"
"net/url"
"testing"
"github.com/seaweedfs/seaweedfs/weed/iam/sts"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestAssumeRole_CallerIdentityFallback tests the fallback logic when RoleArn is missing
func TestAssumeRole_CallerIdentityFallback(t *testing.T) {
// Setup STS service
stsService, _ := setupTestSTSService(t)
// Create IAM integration mock
iamMock := &MockIAMIntegration{
authorizeFunc: func(ctx context.Context, identity *IAMIdentity, action Action, bucket, object string, r *http.Request) s3err.ErrorCode {
// Allow global sts:AssumeRole
if action == "sts:AssumeRole" {
return s3err.ErrNone
}
return s3err.ErrAccessDenied
},
validateTrustPolicyFunc: func(ctx context.Context, roleArn, principalArn string) error {
// Allow all trust policies for this test
return nil
},
}
// Create IAM service with the mock integration
iam := &IdentityAccessManagement{
iamIntegration: iamMock,
}
// Create STS handlers
stsHandlers := NewSTSHandlers(stsService, iam)
// Test case 1: Caller is an IAM User, RoleArn is missing
t.Run("Caller is IAM User, No RoleArn", func(t *testing.T) {
// Mock request
req, err := http.NewRequest("POST", "/", nil)
require.NoError(t, err)
req.Form = url.Values{}
req.Form.Set("Action", "AssumeRole")
req.Form.Set("RoleSessionName", "test-session")
req.Form.Set("Version", "2011-06-15")
// Mock the authenticated identity (IAM User)
callerIdentity := &Identity{
Name: "alice",
Account: &AccountAdmin,
PrincipalArn: fmt.Sprintf("arn:aws:iam::%s:user/alice", defaultAccountID),
Actions: []Action{s3_constants.ACTION_ADMIN},
}
// 1. Test prepareSTSCredentials with NO RoleArn (simulating the fallback logic having passed PrincipalArn)
// expected RoleArn passed to prepareSTSCredentials would be the caller's PrincipalArn
fallbackRoleArn := callerIdentity.PrincipalArn
// Prepare custom claims for the session (mimicking handleAssumeRole logic)
var modifyClaims func(claims *sts.STSSessionClaims)
if callerIdentity.isAdmin() {
modifyClaims = func(claims *sts.STSSessionClaims) {
if claims.RequestContext == nil {
claims.RequestContext = make(map[string]interface{})
}
claims.RequestContext["is_admin"] = true
}
}
stsCreds, assumedUser, err := stsHandlers.prepareSTSCredentials(fallbackRoleArn, "test-session", nil, "", modifyClaims)
require.NoError(t, err)
// Assertions
// The role name should be extracted from the user ARN ("alice")
assert.Contains(t, assumedUser.Arn, fmt.Sprintf("assumed-role/alice/test-session"))
assert.Contains(t, assumedUser.AssumedRoleId, "alice:test-session")
// Verify token claims using ValidateSessionToken
sessionInfo, err := stsService.ValidateSessionToken(context.Background(), stsCreds.SessionToken)
require.NoError(t, err)
// The RoleArn in session info should match the fallback ARN (user ARN)
assert.Equal(t, fallbackRoleArn, sessionInfo.RoleArn)
// Verify is_admin claim is present
isAdmin, ok := sessionInfo.RequestContext["is_admin"].(bool)
assert.True(t, ok, "is_admin claim should be present")
assert.True(t, isAdmin, "is_admin claim should be true")
})
// Test case 2: Caller is an STS Assumed Role, No RoleArn
t.Run("Caller is STS Assumed Role, No RoleArn", func(t *testing.T) {
// Mock identity
callerIdentity := &Identity{
Name: "arn:aws:sts::111122223333:assumed-role/admin/session1",
Account: &AccountAdmin,
PrincipalArn: "arn:aws:sts::111122223333:assumed-role/admin/session1",
}
fallbackRoleArn := callerIdentity.PrincipalArn
stsCreds, assumedUser, err := stsHandlers.prepareSTSCredentials(fallbackRoleArn, "nested-session", nil, "", nil)
require.NoError(t, err)
// The role name should be extracted from the assumed role ARN ("admin")
assert.Contains(t, assumedUser.Arn, "assumed-role/admin/nested-session")
assert.Contains(t, assumedUser.AssumedRoleId, "admin:nested-session")
// Check claims
sessionInfo, err := stsService.ValidateSessionToken(context.Background(), stsCreds.SessionToken)
require.NoError(t, err)
assert.Equal(t, fallbackRoleArn, sessionInfo.RoleArn)
})
// Test case 3: Explicit RoleArn provided (Standard AssumeRole)
t.Run("Explicit RoleArn Provided", func(t *testing.T) {
explicitRoleArn := "arn:aws:iam::111122223333:role/TargetRole"
stsCreds, assumedUser, err := stsHandlers.prepareSTSCredentials(explicitRoleArn, "explicit-session", nil, "", nil)
require.NoError(t, err)
// Role name should be "TargetRole"
assert.Contains(t, assumedUser.Arn, "assumed-role/TargetRole/explicit-session")
// Check claims
sessionInfo, err := stsService.ValidateSessionToken(context.Background(), stsCreds.SessionToken)
require.NoError(t, err)
assert.Equal(t, explicitRoleArn, sessionInfo.RoleArn)
})
// Test case 4: Malformed ARN (Edge case)
t.Run("Malformed ARN", func(t *testing.T) {
malformedArn := "invalid-arn"
stsCreds, assumedUser, err := stsHandlers.prepareSTSCredentials(malformedArn, "bad-session", nil, "", nil)
require.NoError(t, err)
// Fallback behavior: use full string as role name if extraction fails
assert.Contains(t, assumedUser.Arn, "assumed-role/invalid-arn/bad-session")
sessionInfo, err := stsService.ValidateSessionToken(context.Background(), stsCreds.SessionToken)
require.NoError(t, err)
assert.Equal(t, malformedArn, sessionInfo.RoleArn)
})
}

6
weed/server/master_grpc_server.go

@ -110,10 +110,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
if !ms.Topo.IsLeader() {
// tell the volume servers about the leader
newLeader, err := ms.Topo.Leader()
if err != nil {
newLeader, err := ms.Topo.MaybeLeader()
if err != nil || newLeader == "" {
glog.Warningf("SendHeartbeat find leader: %v", err)
return err
return raft.NotLeaderError
}
if err := stream.Send(&master_pb.HeartbeatResponse{
Leader: string(newLeader),

16
weed/server/volume_grpc_scrub.go

@ -5,7 +5,6 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@ -35,11 +34,12 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S
var serrs []error
switch m := req.GetMode(); m {
case volume_server_pb.VolumeScrubMode_INDEX:
files, serrs = v.CheckIndex()
files, serrs = v.ScrubIndex()
case volume_server_pb.VolumeScrubMode_LOCAL:
files, serrs = scrubVolumeLocal(ctx, v)
// LOCAL is equivalent to FULL for regular volumes
fallthrough
case volume_server_pb.VolumeScrubMode_FULL:
files, serrs = scrubVolumeFull(ctx, v)
files, serrs = v.Scrub()
default:
return nil, fmt.Errorf("unsupported volume scrub mode %d", m)
}
@ -63,14 +63,6 @@ func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.S
return res, nil
}
func scrubVolumeLocal(ctx context.Context, v *storage.Volume) (int64, []error) {
return 0, []error{fmt.Errorf("scrubVolumeLocal(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")}
}
func scrubVolumeFull(ctx context.Context, v *storage.Volume) (int64, []error) {
return 0, []error{fmt.Errorf("scrubVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")}
}
func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb.ScrubEcVolumeRequest) (*volume_server_pb.ScrubEcVolumeResponse, error) {
vids := []needle.VolumeId{}
if len(req.GetVolumeIds()) == 0 {

2
weed/shell/command_s3_configure.go

@ -9,6 +9,7 @@ import (
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/iam"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"google.golang.org/grpc"
@ -240,6 +241,7 @@ func (c *commandS3Configure) applyChanges(identity *iam_pb.Identity, isNewUser b
identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
AccessKey: *accessKey,
SecretKey: *secretKey,
Status: iam.AccessKeyStatusActive,
})
}
}

3
weed/shell/command_volume_scrub.go

@ -50,8 +50,7 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io
volScrubCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server <host>:<port> (optional)")
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated volume IDs to process (optional)")
// TODO: switch default mode to LOCAL, once implemented.
mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)")
mode := volScrubCommand.String("mode", "full", "scrubbing mode (index/local/full)")
maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
if err = volScrubCommand.Parse(args); err != nil {

19
weed/storage/erasure_coding/ec_volume_scrub.go

@ -45,6 +45,9 @@ func (ecv *EcVolume) ScrubLocal() (int64, []*volume_server_pb.EcShardInfo, []err
}
var read int64
var hasRemoteChunks bool
var data []byte
locations := ecv.LocateEcShardNeedleInterval(ecv.Version, offset.ToActualOffset(), size)
for i, iv := range locations {
@ -53,6 +56,7 @@ func (ecv *EcVolume) ScrubLocal() (int64, []*volume_server_pb.EcShardInfo, []err
shard, found := ecv.FindEcVolumeShard(sid)
if !found {
// shard is not local :( skip it
hasRemoteChunks = true
read += ssize
continue
}
@ -61,8 +65,8 @@ func (ecv *EcVolume) ScrubLocal() (int64, []*volume_server_pb.EcShardInfo, []err
continue
}
buf := make([]byte, ssize)
got, err := shard.ReadAt(buf, soffset)
chunk := make([]byte, ssize)
got, err := shard.ReadAt(chunk, soffset)
if err != nil {
flagShardBroken(shard, "failed to read chunk %d/%d for needle %d from local shard %d at offset %d: %v", i+1, len(locations), id, sid, soffset, err)
continue
@ -71,12 +75,23 @@ func (ecv *EcVolume) ScrubLocal() (int64, []*volume_server_pb.EcShardInfo, []err
flagShardBroken(shard, "expected %d bytes for chunk %d/%d for needle %d from local shard %d, got %d", ssize, i+1, len(locations), id, sid, got)
continue
}
if !hasRemoteChunks {
data = append(data, chunk...)
}
read += int64(got)
}
if got, want := read, needle.GetActualSize(size, ecv.Version); got != want {
return fmt.Errorf("expected %d bytes for needle %d, got %d", want, id, got)
}
if !hasRemoteChunks {
// needle was fully recovered from local shards \o/ let's check it
n := needle.Needle{}
if err := n.ReadBytes(data, 0, size, ecv.Version); err != nil {
errs = append(errs, fmt.Errorf("needle %d on volume %d: %v", id, ecv.VolumeId, err))
}
}
return nil
})

2
weed/storage/needle/needle_read.go

@ -55,7 +55,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Versio
if n.Size != size {
if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatchOffsetSize).Inc()
glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
glog.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
return ErrorSizeMismatch
}
stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorSizeMismatch).Inc()

BIN
weed/storage/test_files/bitrot_volume.dat

BIN
weed/storage/test_files/bitrot_volume.idx

BIN
weed/storage/test_files/healthy_volume.dat

BIN
weed/storage/test_files/healthy_volume.idx

124
weed/storage/volume_checking.go

@ -10,30 +10,108 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (v *Volume) CheckIndex() (int64, []error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
// openIndex returns a file descriptor for the volume's index, and the index size in bytes.
func (v *Volume) openIndex() (*os.File, int64, error) {
idxFileName := v.FileName(".idx")
idxFile, err := os.OpenFile(idxFileName, os.O_RDONLY, 0644)
if err != nil {
return 0, []error{fmt.Errorf("failed to open IDX file %s for volume %v: %v", idxFileName, v.Id, err)}
return nil, 0, fmt.Errorf("failed to open IDX file %s for volume %v: %v", idxFileName, v.Id, err)
}
defer idxFile.Close()
idxStat, err := idxFile.Stat()
if err != nil {
return 0, []error{fmt.Errorf("failed to stat IDX file %s for volume %v: %v", idxFileName, v.Id, err)}
idxFile.Close()
return nil, 0, fmt.Errorf("failed to stat IDX file %s for volume %v: %v", idxFileName, v.Id, err)
}
if idxStat.Size() == 0 {
return 0, []error{fmt.Errorf("zero-size IDX file for volume %v at %s", v.Id, idxFileName)}
idxFile.Close()
return nil, 0, fmt.Errorf("zero-size IDX file for volume %v at %s", v.Id, idxFileName)
}
return idxFile, idxStat.Size(), nil
}
// ScrubIndex checks the volume's index for issues.
func (v *Volume) ScrubIndex() (int64, []error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
idxFile, idxFileSize, err := v.openIndex()
if err != nil {
return 0, []error{err}
}
defer idxFile.Close()
return idx.CheckIndexFile(idxFile, idxFileSize, v.Version())
}
// scrubVolumeData checks a volume content + index for issues.
func (v *Volume) scrubVolumeData(dataFile backend.BackendStorageFile, idxFile *os.File, idxFileSize int64) (int64, []error) {
// full scrubbing means also scrubbing the index
var count int64
_, errs := idx.CheckIndexFile(idxFile, idxFileSize, v.Version())
// read and check every indexed needle
var totalRead int64
version := v.Version()
err := idx.WalkIndexFile(idxFile, 0, func(id types.NeedleId, offset types.Offset, size types.Size) error {
count++
// compute the actual size of the needle in disk, including needle header, body and alignment padding.
actualSize := int64(needle.GetActualSize(size, version))
// TODO: Needle.ReadData() is currently broken for deleted files, which have a types.Size < 0. Fix
// so deleted needles get properly scrubbed as well.
// TODO: idx.WalkIndexFile() returns a size -1 (and actual size of 32 bytes) for deleted needles. We
// want to scrub deleted needles whenever possible.
if size.IsDeleted() {
totalRead += actualSize
return nil
}
n := needle.Needle{}
if err := n.ReadData(dataFile, offset.ToActualOffset(), size, version); err != nil {
errs = append(errs, fmt.Errorf("needle %d on volume %d: %v", id, v.Id, err))
}
totalRead += actualSize
return nil
})
if err != nil {
errs = append(errs, err)
}
// check total volume file size
wantSize := totalRead + super_block.SuperBlockSize
dataSize, _, err := dataFile.GetStat()
if err != nil {
errs = append(errs, fmt.Errorf("failed to stat data file for volume %d: %v", v.Id, err))
} else {
if dataSize < wantSize {
errs = append(errs, fmt.Errorf("data file for volume %d is smaller (%d) than the %d needles it contains (%d)", v.Id, dataSize, count, wantSize))
} else if dataSize != wantSize {
errs = append(errs, fmt.Errorf("data file size for volume %d (%d) doesn't match the size for %d needles read (%d)", v.Id, dataSize, count, wantSize))
}
}
return count, errs
}
// Scrub checks the entire volume content for issues.
func (v *Volume) Scrub() (int64, []error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
idxFile, idxFileSize, err := v.openIndex()
if err != nil {
return 0, []error{err}
}
defer idxFile.Close()
return idx.CheckIndexFile(idxFile, idxStat.Size(), v.Version())
return v.scrubVolumeData(v.DataBackend, idxFile, idxFileSize)
}
func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
@ -45,11 +123,11 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
return 0, nil
}
healthyIndexSize := indexSize
for i := 1; i <= 10 && indexSize >= int64(i)*NeedleMapEntrySize; i++ {
for i := 1; i <= 10 && indexSize >= int64(i)*types.NeedleMapEntrySize; i++ {
// check and fix last 10 entries
lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize)
lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*types.NeedleMapEntrySize)
if err == io.EOF {
healthyIndexSize = indexSize - int64(i)*NeedleMapEntrySize
healthyIndexSize = indexSize - int64(i)*types.NeedleMapEntrySize
continue
}
if err != ErrorSizeMismatch {
@ -79,7 +157,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (
} else {
if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key, size); err != nil {
if err == ErrorSizeMismatch {
return verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset()+int64(MaxPossibleVolumeSize), key, size)
return verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset()+int64(types.MaxPossibleVolumeSize), key, size)
}
return lastAppendAtNs, err
}
@ -89,7 +167,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
if indexSize, err = util.GetFileSize(indexFile); err == nil {
if indexSize%NeedleMapEntrySize != 0 {
if indexSize%types.NeedleMapEntrySize != 0 {
err = fmt.Errorf("index file's size is %d bytes, maybe corrupted", indexSize)
}
}
@ -101,16 +179,16 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
err = fmt.Errorf("offset %d for index file is invalid", offset)
return
}
bytes = make([]byte, NeedleMapEntrySize)
bytes = make([]byte, types.NeedleMapEntrySize)
var readCount int
readCount, err = indexFile.ReadAt(bytes, offset)
if err == io.EOF && readCount == NeedleMapEntrySize {
if err == io.EOF && readCount == types.NeedleMapEntrySize {
err = nil
}
return
}
func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key types.NeedleId, size types.Size) (lastAppendAtNs uint64, err error) {
n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset)
if err == io.EOF {
return 0, err
@ -122,10 +200,10 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
return 0, ErrorSizeMismatch
}
if v == needle.Version3 {
bytes := make([]byte, TimestampSize)
bytes := make([]byte, types.TimestampSize)
var readCount int
readCount, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
if err == io.EOF && readCount == TimestampSize {
readCount, err = datFile.ReadAt(bytes, offset+types.NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
if err == io.EOF && readCount == types.TimestampSize {
err = nil
}
if err == io.EOF {
@ -158,7 +236,7 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
return n.AppendAtNs, err
}
func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, key NeedleId) (lastAppendAtNs uint64, err error) {
func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, key types.NeedleId) (lastAppendAtNs uint64, err error) {
n := new(needle.Needle)
size := n.DiskSize(v)
var fileSize int64
@ -166,7 +244,7 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V
if err != nil {
return 0, fmt.Errorf("GetStat: %w", err)
}
if err = n.ReadData(datFile, fileSize-size, Size(0), v); err != nil {
if err = n.ReadData(datFile, fileSize-size, types.Size(0), v); err != nil {
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", fileSize-size, size, err)
}
if n.Id != key {

80
weed/storage/volume_checking_test.go

@ -0,0 +1,80 @@
package storage
import (
"fmt"
"os"
"reflect"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
func TestScrubVolumeData(t *testing.T) {
testCases := []struct {
name string
dataPath string
indexPath string
version needle.Version
want int64
wantErrs []error
}{
{
name: "healthy volume",
dataPath: "./test_files/healthy_volume.dat",
indexPath: "./test_files/healthy_volume.idx",
version: needle.Version3,
want: 27,
wantErrs: []error{},
},
{
name: "bitrot volume",
dataPath: "./test_files/bitrot_volume.dat",
indexPath: "./test_files/bitrot_volume.idx",
version: needle.Version3,
want: 27,
wantErrs: []error{
fmt.Errorf("needle 3 on volume 0: invalid CRC for needle 3 (got 0b243a0d, want 4af853fb), data on disk corrupted"),
fmt.Errorf("needle 48 on volume 0: invalid CRC for needle 30 (got 3c40e8d5, want 5077fea1), data on disk corrupted"),
fmt.Errorf("data file size for volume 0 (942864) doesn't match the size for 27 needles read (942856)"),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
datFile, err := os.OpenFile(tc.dataPath, os.O_RDONLY, 0)
if err != nil {
t.Fatalf("failed to open data file: %v", err)
}
defer datFile.Close()
idxFile, err := os.OpenFile(tc.indexPath, os.O_RDONLY, 0)
if err != nil {
t.Fatalf("failed to open index file: %v", err)
}
defer idxFile.Close()
idxStat, err := idxFile.Stat()
if err != nil {
t.Fatalf("failed to stat index file: %v", err)
}
v := Volume{
volumeInfo: &volume_server_pb.VolumeInfo{
Version: uint32(tc.version),
},
}
got, gotErrs := v.scrubVolumeData(backend.NewDiskFile(datFile), idxFile, idxStat.Size())
if got != tc.want {
t.Errorf("expected %d files processed, got %d", tc.want, got)
}
if !reflect.DeepEqual(gotErrs, tc.wantErrs) {
t.Errorf("expected errors %v, got %v", tc.wantErrs, gotErrs)
}
})
}
}

16
weed/topology/topology.go

@ -117,8 +117,10 @@ func (t *Topology) IsLeader() bool {
if t.RaftServer.State() == raft.Leader {
return true
}
if leader, err := t.Leader(); err == nil {
if pb.ServerAddress(t.RaftServer.Name()) == leader {
// Directly check leader to avoid re-acquiring lock via MaybeLeader()
leader := pb.ServerAddress(t.RaftServer.Leader())
if leader != "" {
if pb.ServerAddress(t.RaftServer.Name()).Equals(leader) {
return true
}
}
@ -175,6 +177,16 @@ func (t *Topology) Leader() (l pb.ServerAddress, err error) {
func() (l pb.ServerAddress, err error) {
l, err = t.MaybeLeader()
if err == nil && l == "" {
// Thread-safe check if we are the leader
t.RaftServerAccessLock.RLock()
if t.RaftServer != nil && t.RaftServer.State() == raft.Leader {
l = pb.ServerAddress(t.RaftServer.Name())
}
t.RaftServerAccessLock.RUnlock()
if l != "" {
return l, nil
}
err = leaderNotSelected
}
return l, err

Loading…
Cancel
Save