Browse Source

address comments

pull/7160/head
chrislu 1 month ago
parent
commit
5b34eea26c
  1. 9
      test/s3/iam/README.md
  2. 7
      test/s3/iam/test_config.json
  3. 90
      weed/s3api/s3_iam_middleware.go
  4. 266
      weed/s3api/s3_iam_role_selection_test.go
  5. 286
      weed/s3api/s3_list_parts_action_test.go

9
test/s3/iam/README.md

@ -4,11 +4,13 @@ This directory contains comprehensive integration tests for the SeaweedFS S3 API
## Overview
**Important**: The STS service uses a **stateless JWT design** where all session information is embedded directly in the JWT token. No external session storage is required.
The S3 IAM integration tests validate the complete end-to-end functionality of:
- **JWT Authentication**: OIDC token-based authentication with S3 API
- **Policy Enforcement**: Fine-grained access control for S3 operations
- **Session Management**: STS session token validation and expiration
- **Stateless Session Management**: JWT-based session token validation and expiration (no external storage)
- **Role-Based Access Control (RBAC)**: IAM roles with different permission levels
- **Bucket Policies**: Resource-based access control integration
- **Multipart Upload IAM**: Policy enforcement for multipart operations
@ -24,8 +26,7 @@ The S3 IAM integration tests validate the complete end-to-end functionality of:
3. **STS Service** - Security Token Service for temporary credentials
4. **Policy Engine** - AWS IAM-compatible policy evaluation
5. **Identity Providers** - OIDC and LDAP authentication providers
6. **Session Store** - Persistent session storage using SeaweedFS filer
7. **Policy Store** - Persistent policy storage using SeaweedFS filer
6. **Policy Store** - Persistent policy storage using SeaweedFS filer
### Test Framework
@ -162,7 +163,7 @@ The test configuration defines:
- **Identity Providers**: OIDC and LDAP configurations
- **IAM Roles**: Role definitions with trust policies
- **IAM Policies**: Permission policies for different access levels
- **Session/Policy Stores**: Persistent storage configurations
- **Policy Stores**: Persistent storage configurations for IAM policies and roles
### Service Ports

7
test/s3/iam/test_config.json

@ -104,13 +104,6 @@
}
}
},
"sessionStore": {
"type": "filer",
"config": {
"filerAddress": "localhost:8888",
"basePath": "/seaweedfs/iam/sessions"
}
},
"policyStore": {
"type": "filer",
"config": {

90
weed/s3api/s3_iam_middleware.go

@ -6,7 +6,6 @@ import (
"net"
"net/http"
"net/url"
"sort"
"strings"
"time"
@ -311,6 +310,9 @@ func determineGranularS3Action(r *http.Request, fallbackAction Action, bucket st
if _, hasVersions := query["versions"]; hasVersions {
return "s3:GetObjectVersion"
}
if _, hasUploadId := query["uploadId"]; hasUploadId {
return "s3:ListParts"
}
// Default object read
return "s3:GetObject"
@ -800,87 +802,17 @@ func (s3iam *S3IAMIntegration) validateOIDCToken(ctx context.Context, token stri
return nil, fmt.Errorf("token not valid for any registered OIDC provider")
}
// selectPrimaryRole intelligently selects the primary role from multiple available roles
// This provides deterministic role selection to prevent unpredictable access control behavior
// selectPrimaryRole simply picks the first role from the list
// The OIDC provider should return roles in priority order (most important first)
func (s3iam *S3IAMIntegration) selectPrimaryRole(roles []string, externalIdentity *providers.ExternalIdentity) string {
if len(roles) == 1 {
return roles[0]
}
glog.V(2).Infof("🔍 selectPrimaryRole: Selecting from %d roles: %v", len(roles), roles)
// Strategy 1: Check for explicit primary_role claim
if primaryRole, exists := externalIdentity.Attributes["primary_role"]; exists && primaryRole != "" {
primaryRole = strings.TrimSpace(primaryRole)
// Verify the primary role is in the available roles list
for _, role := range roles {
if strings.EqualFold(role, primaryRole) {
glog.V(2).Infof("🔍 selectPrimaryRole: Using explicit primary_role: %s", role)
return role
}
}
glog.V(1).Infof("⚠️ selectPrimaryRole: primary_role '%s' not found in available roles, falling back", primaryRole)
}
// Strategy 2: Role hierarchy - select most privileged role
selectedRole := s3iam.selectByRoleHierarchy(roles)
if selectedRole != "" {
glog.V(2).Infof("🔍 selectPrimaryRole: Using hierarchical selection: %s", selectedRole)
return selectedRole
}
// Strategy 3: Deterministic fallback - alphabetical order (consistent behavior)
sort.Strings(roles)
glog.V(2).Infof("🔍 selectPrimaryRole: Using deterministic selection (first alphabetically): %s", roles[0])
return roles[0]
}
// selectByRoleHierarchy selects a role based on predefined privilege hierarchy
// Returns the most privileged role available, or empty string if no hierarchy match
func (s3iam *S3IAMIntegration) selectByRoleHierarchy(roles []string) string {
// Define role hierarchy from most privileged to least privileged
// This covers common enterprise role naming patterns
roleHierarchy := [][]string{
// Tier 1: Super Admin roles
{"SuperAdmin", "super-admin", "super_admin", "root", "owner"},
// Tier 2: Admin roles
{"Admin", "admin", "Administrator", "administrator", "system-admin", "system_admin"},
// Tier 3: Manager/Power User roles
{"Manager", "manager", "PowerUser", "power-user", "power_user", "lead", "supervisor"},
// Tier 4: Editor/Write roles
{"Editor", "editor", "Writer", "writer", "Contributor", "contributor", "write", "readwrite", "read-write"},
// Tier 5: Viewer/Read roles
{"Viewer", "viewer", "Reader", "reader", "read-only", "read_only", "readonly", "read", "guest"},
if len(roles) == 0 {
return ""
}
// Find the highest priority role available
for _, tier := range roleHierarchy {
for _, privilegedRole := range tier {
for _, availableRole := range roles {
// Check for exact match or contains match (case-insensitive)
if strings.EqualFold(availableRole, privilegedRole) ||
strings.Contains(strings.ToLower(availableRole), strings.ToLower(privilegedRole)) {
return availableRole
}
}
}
}
// No hierarchy match found
return ""
}
// getProviderNames returns a list of provider names for debugging
func getProviderNames(providers map[string]providers.IdentityProvider) []string {
names := make([]string, 0, len(providers))
for name := range providers {
names = append(names, name)
}
return names
// Just pick the first one - keep it simple
selectedRole := roles[0]
glog.V(2).Infof("🔍 selectPrimaryRole: Selected first role: %s from %v", selectedRole, roles)
return selectedRole
}
// isSTSIssuer determines if an issuer belongs to the STS service

266
weed/s3api/s3_iam_role_selection_test.go

@ -1,7 +1,6 @@
package s3api
import (
"strings"
"testing"
"github.com/seaweedfs/seaweedfs/weed/iam/providers"
@ -11,249 +10,52 @@ import (
func TestSelectPrimaryRole(t *testing.T) {
s3iam := &S3IAMIntegration{}
t.Run("single_role_returns_that_role", func(t *testing.T) {
roles := []string{"admin"}
externalIdentity := &providers.ExternalIdentity{
Attributes: make(map[string]string),
}
result := s3iam.selectPrimaryRole(roles, externalIdentity)
assert.Equal(t, "admin", result)
})
t.Run("explicit_primary_role_takes_precedence", func(t *testing.T) {
roles := []string{"admin", "reader", "writer"}
externalIdentity := &providers.ExternalIdentity{
Attributes: map[string]string{
"primary_role": "reader",
},
}
result := s3iam.selectPrimaryRole(roles, externalIdentity)
assert.Equal(t, "reader", result)
})
t.Run("explicit_primary_role_case_insensitive", func(t *testing.T) {
roles := []string{"Admin", "Reader", "Writer"}
externalIdentity := &providers.ExternalIdentity{
Attributes: map[string]string{
"primary_role": "admin",
},
}
result := s3iam.selectPrimaryRole(roles, externalIdentity)
assert.Equal(t, "Admin", result)
})
t.Run("invalid_primary_role_falls_back_to_hierarchy", func(t *testing.T) {
roles := []string{"admin", "reader", "writer"}
externalIdentity := &providers.ExternalIdentity{
Attributes: map[string]string{
"primary_role": "nonexistent",
},
}
result := s3iam.selectPrimaryRole(roles, externalIdentity)
assert.Equal(t, "admin", result) // Should select admin via hierarchy
t.Run("empty_roles_returns_empty", func(t *testing.T) {
identity := &providers.ExternalIdentity{Attributes: make(map[string]string)}
result := s3iam.selectPrimaryRole([]string{}, identity)
assert.Equal(t, "", result)
})
t.Run("hierarchy_selection_admin_over_reader", func(t *testing.T) {
roles := []string{"reader", "admin", "writer"}
externalIdentity := &providers.ExternalIdentity{
Attributes: make(map[string]string),
}
result := s3iam.selectPrimaryRole(roles, externalIdentity)
assert.Equal(t, "admin", result) // Admin has higher priority
t.Run("single_role_returns_that_role", func(t *testing.T) {
identity := &providers.ExternalIdentity{Attributes: make(map[string]string)}
result := s3iam.selectPrimaryRole([]string{"admin"}, identity)
assert.Equal(t, "admin", result)
})
t.Run("hierarchy_selection_case_insensitive", func(t *testing.T) {
roles := []string{"Reader", "ADMIN", "writer"}
externalIdentity := &providers.ExternalIdentity{
Attributes: make(map[string]string),
}
result := s3iam.selectPrimaryRole(roles, externalIdentity)
assert.Equal(t, "ADMIN", result)
t.Run("multiple_roles_returns_first", func(t *testing.T) {
identity := &providers.ExternalIdentity{Attributes: make(map[string]string)}
roles := []string{"viewer", "manager", "admin"}
result := s3iam.selectPrimaryRole(roles, identity)
assert.Equal(t, "viewer", result, "Should return first role")
})
t.Run("hierarchy_selection_contains_match", func(t *testing.T) {
roles := []string{"system-reader", "system-admin-user", "system-writer"}
externalIdentity := &providers.ExternalIdentity{
Attributes: make(map[string]string),
}
t.Run("order_matters", func(t *testing.T) {
identity := &providers.ExternalIdentity{Attributes: make(map[string]string)}
result := s3iam.selectPrimaryRole(roles, externalIdentity)
assert.Equal(t, "system-admin-user", result) // Contains "admin"
})
// Test different orderings
roles1 := []string{"admin", "viewer", "manager"}
result1 := s3iam.selectPrimaryRole(roles1, identity)
assert.Equal(t, "admin", result1)
t.Run("deterministic_fallback_alphabetical", func(t *testing.T) {
// Roles that don't match any hierarchy
roles := []string{"zebra", "alpha", "beta"}
externalIdentity := &providers.ExternalIdentity{
Attributes: make(map[string]string),
}
roles2 := []string{"viewer", "admin", "manager"}
result2 := s3iam.selectPrimaryRole(roles2, identity)
assert.Equal(t, "viewer", result2)
result := s3iam.selectPrimaryRole(roles, externalIdentity)
assert.Equal(t, "alpha", result) // First alphabetically
roles3 := []string{"manager", "admin", "viewer"}
result3 := s3iam.selectPrimaryRole(roles3, identity)
assert.Equal(t, "manager", result3)
})
t.Run("complex_enterprise_roles", func(t *testing.T) {
identity := &providers.ExternalIdentity{Attributes: make(map[string]string)}
roles := []string{
"app-user-readonly",
"app-user-contributor",
"app-admin-full",
"system-guest",
}
externalIdentity := &providers.ExternalIdentity{
Attributes: make(map[string]string),
}
result := s3iam.selectPrimaryRole(roles, externalIdentity)
assert.Equal(t, "app-admin-full", result) // Contains "admin"
})
}
func TestSelectByRoleHierarchy(t *testing.T) {
s3iam := &S3IAMIntegration{}
t.Run("super_admin_highest_priority", func(t *testing.T) {
roles := []string{"admin", "super-admin", "reader"}
result := s3iam.selectByRoleHierarchy(roles)
assert.Equal(t, "super-admin", result)
})
t.Run("admin_over_manager", func(t *testing.T) {
roles := []string{"manager", "admin", "reader"}
result := s3iam.selectByRoleHierarchy(roles)
assert.Equal(t, "admin", result)
})
t.Run("manager_over_editor", func(t *testing.T) {
roles := []string{"editor", "manager", "reader"}
result := s3iam.selectByRoleHierarchy(roles)
assert.Equal(t, "manager", result)
})
t.Run("editor_over_viewer", func(t *testing.T) {
roles := []string{"viewer", "editor"}
result := s3iam.selectByRoleHierarchy(roles)
assert.Equal(t, "editor", result)
})
t.Run("no_hierarchy_match_returns_empty", func(t *testing.T) {
roles := []string{"custom-role-1", "custom-role-2", "special-user"}
result := s3iam.selectByRoleHierarchy(roles)
assert.Equal(t, "", result)
})
t.Run("multiple_same_tier_returns_first_found", func(t *testing.T) {
roles := []string{"viewer", "reader", "guest"}
result := s3iam.selectByRoleHierarchy(roles)
// Should return first match found in the hierarchy (viewer comes first in tier definition)
assert.Equal(t, "viewer", result)
})
t.Run("case_variations", func(t *testing.T) {
roles := []string{"ADMIN", "Reader", "writer"}
result := s3iam.selectByRoleHierarchy(roles)
assert.Equal(t, "ADMIN", result)
})
}
func TestRoleSelectionIntegration(t *testing.T) {
t.Run("real_world_enterprise_scenario", func(t *testing.T) {
// Simulate a real enterprise OIDC token with multiple roles
testCases := []struct {
name string
roles []string
primaryRole string // explicit primary_role claim
expectedRole string
selectionType string
}{
{
name: "explicit_primary_overrides_hierarchy",
roles: []string{"admin", "reader", "writer"},
primaryRole: "reader",
expectedRole: "reader",
selectionType: "explicit",
},
{
name: "hierarchy_selects_admin_over_others",
roles: []string{"contributor", "admin", "viewer"},
primaryRole: "", // No explicit primary
expectedRole: "admin",
selectionType: "hierarchy",
},
{
name: "deterministic_fallback_for_unknown_roles",
roles: []string{"zebra-role", "alpha-role", "beta-role"},
primaryRole: "",
expectedRole: "alpha-role",
selectionType: "deterministic",
},
{
name: "complex_enterprise_naming",
roles: []string{"org-user-readonly", "org-power-user", "org-system-admin"},
primaryRole: "",
expectedRole: "org-system-admin", // Contains "admin"
selectionType: "hierarchy",
},
}
s3iam := &S3IAMIntegration{}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
externalIdentity := &providers.ExternalIdentity{
Attributes: make(map[string]string),
}
if tc.primaryRole != "" {
externalIdentity.Attributes["primary_role"] = tc.primaryRole
}
result := s3iam.selectPrimaryRole(tc.roles, externalIdentity)
assert.Equal(t, tc.expectedRole, result,
"Expected %s selection to return %s, got %s",
tc.selectionType, tc.expectedRole, result)
})
}
})
}
// Test helper function to verify role parsing improvements
func TestRoleParsingImprovements(t *testing.T) {
t.Run("whitespace_handling", func(t *testing.T) {
// Test the improved role parsing logic
rolesStr := " admin , reader , writer "
roles := strings.Split(rolesStr, ",")
// Clean up role names (this is what the main code does now)
var cleanRoles []string
for _, role := range roles {
cleanRole := strings.TrimSpace(role)
if cleanRole != "" {
cleanRoles = append(cleanRoles, cleanRole)
}
}
expected := []string{"admin", "reader", "writer"}
assert.Equal(t, expected, cleanRoles)
})
t.Run("empty_roles_filtered", func(t *testing.T) {
rolesStr := "admin,,reader, ,writer"
roles := strings.Split(rolesStr, ",")
var cleanRoles []string
for _, role := range roles {
cleanRole := strings.TrimSpace(role)
if cleanRole != "" {
cleanRoles = append(cleanRoles, cleanRole)
}
}
expected := []string{"admin", "reader", "writer"}
assert.Equal(t, expected, cleanRoles)
"finance-readonly",
"hr-manager",
"it-system-admin",
"guest-viewer",
}
result := s3iam.selectPrimaryRole(roles, identity)
// Should return the first role
assert.Equal(t, "finance-readonly", result, "Should return first role in list")
})
}

286
weed/s3api/s3_list_parts_action_test.go

@ -0,0 +1,286 @@
package s3api
import (
"net/http"
"net/url"
"testing"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/stretchr/testify/assert"
)
// TestListPartsActionMapping tests the fix for the missing s3:ListParts action mapping
// when GET requests include an uploadId query parameter
func TestListPartsActionMapping(t *testing.T) {
testCases := []struct {
name string
method string
bucket string
objectKey string
queryParams map[string]string
fallbackAction Action
expectedAction string
description string
}{
{
name: "get_object_without_uploadId",
method: "GET",
bucket: "test-bucket",
objectKey: "test-object.txt",
queryParams: map[string]string{},
fallbackAction: s3_constants.ACTION_READ,
expectedAction: "s3:GetObject",
description: "GET request without uploadId should map to s3:GetObject",
},
{
name: "get_object_with_uploadId",
method: "GET",
bucket: "test-bucket",
objectKey: "test-object.txt",
queryParams: map[string]string{"uploadId": "test-upload-id"},
fallbackAction: s3_constants.ACTION_READ,
expectedAction: "s3:ListParts",
description: "GET request with uploadId should map to s3:ListParts (this was the missing mapping)",
},
{
name: "get_object_with_uploadId_and_other_params",
method: "GET",
bucket: "test-bucket",
objectKey: "test-object.txt",
queryParams: map[string]string{
"uploadId": "test-upload-id-123",
"max-parts": "100",
"part-number-marker": "50",
},
fallbackAction: s3_constants.ACTION_READ,
expectedAction: "s3:ListParts",
description: "GET request with uploadId plus other multipart params should map to s3:ListParts",
},
{
name: "get_object_versions",
method: "GET",
bucket: "test-bucket",
objectKey: "test-object.txt",
queryParams: map[string]string{"versions": ""},
fallbackAction: s3_constants.ACTION_READ,
expectedAction: "s3:GetObjectVersion",
description: "GET request with versions should still map to s3:GetObjectVersion (precedence check)",
},
{
name: "get_object_acl_without_uploadId",
method: "GET",
bucket: "test-bucket",
objectKey: "test-object.txt",
queryParams: map[string]string{"acl": ""},
fallbackAction: s3_constants.ACTION_READ_ACP,
expectedAction: "s3:GetObjectAcl",
description: "GET request with acl should map to s3:GetObjectAcl (not affected by uploadId fix)",
},
{
name: "post_multipart_upload_without_uploadId",
method: "POST",
bucket: "test-bucket",
objectKey: "test-object.txt",
queryParams: map[string]string{"uploads": ""},
fallbackAction: s3_constants.ACTION_WRITE,
expectedAction: "s3:CreateMultipartUpload",
description: "POST request to initiate multipart upload should not be affected by uploadId fix",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create HTTP request with query parameters
req := &http.Request{
Method: tc.method,
URL: &url.URL{Path: "/" + tc.bucket + "/" + tc.objectKey},
}
// Add query parameters
query := req.URL.Query()
for key, value := range tc.queryParams {
query.Set(key, value)
}
req.URL.RawQuery = query.Encode()
// Call the granular action determination function
action := determineGranularS3Action(req, tc.fallbackAction, tc.bucket, tc.objectKey)
// Verify the action mapping
assert.Equal(t, tc.expectedAction, action,
"Test case: %s - %s", tc.name, tc.description)
})
}
}
// TestListPartsActionMappingSecurityScenarios tests security scenarios for the ListParts fix
func TestListPartsActionMappingSecurityScenarios(t *testing.T) {
t.Run("privilege_separation_listparts_vs_getobject", func(t *testing.T) {
// Scenario: User has permission to list multipart upload parts but NOT to get the actual object content
// This is a common enterprise pattern where users can manage uploads but not read final objects
// Test request 1: List parts with uploadId
req1 := &http.Request{
Method: "GET",
URL: &url.URL{Path: "/secure-bucket/confidential-document.pdf"},
}
query1 := req1.URL.Query()
query1.Set("uploadId", "active-upload-123")
req1.URL.RawQuery = query1.Encode()
action1 := determineGranularS3Action(req1, s3_constants.ACTION_READ, "secure-bucket", "confidential-document.pdf")
// Test request 2: Get object without uploadId
req2 := &http.Request{
Method: "GET",
URL: &url.URL{Path: "/secure-bucket/confidential-document.pdf"},
}
action2 := determineGranularS3Action(req2, s3_constants.ACTION_READ, "secure-bucket", "confidential-document.pdf")
// These should be different actions, allowing different permissions
assert.Equal(t, "s3:ListParts", action1, "Listing multipart parts should require s3:ListParts permission")
assert.Equal(t, "s3:GetObject", action2, "Reading object content should require s3:GetObject permission")
assert.NotEqual(t, action1, action2, "ListParts and GetObject should be separate permissions for security")
})
t.Run("policy_enforcement_precision", func(t *testing.T) {
// This test documents the security improvement - before the fix, both operations
// would incorrectly map to s3:GetObject, preventing fine-grained access control
testCases := []struct {
description string
queryParams map[string]string
expectedAction string
securityNote string
}{
{
description: "List multipart upload parts",
queryParams: map[string]string{"uploadId": "upload-abc123"},
expectedAction: "s3:ListParts",
securityNote: "FIXED: Now correctly maps to s3:ListParts instead of s3:GetObject",
},
{
description: "Get actual object content",
queryParams: map[string]string{},
expectedAction: "s3:GetObject",
securityNote: "UNCHANGED: Still correctly maps to s3:GetObject",
},
{
description: "Get object with complex upload ID",
queryParams: map[string]string{"uploadId": "complex-upload-id-with-hyphens-123-abc-def"},
expectedAction: "s3:ListParts",
securityNote: "FIXED: Complex upload IDs now correctly detected",
},
}
for _, tc := range testCases {
req := &http.Request{
Method: "GET",
URL: &url.URL{Path: "/test-bucket/test-object"},
}
query := req.URL.Query()
for key, value := range tc.queryParams {
query.Set(key, value)
}
req.URL.RawQuery = query.Encode()
action := determineGranularS3Action(req, s3_constants.ACTION_READ, "test-bucket", "test-object")
assert.Equal(t, tc.expectedAction, action,
"%s - %s", tc.description, tc.securityNote)
}
})
}
// TestListPartsActionRealWorldScenarios tests realistic enterprise multipart upload scenarios
func TestListPartsActionRealWorldScenarios(t *testing.T) {
t.Run("large_file_upload_workflow", func(t *testing.T) {
// Simulate a large file upload workflow where users need different permissions for each step
// Step 1: Initiate multipart upload (POST with uploads query)
req1 := &http.Request{
Method: "POST",
URL: &url.URL{Path: "/data/large-dataset.csv"},
}
query1 := req1.URL.Query()
query1.Set("uploads", "")
req1.URL.RawQuery = query1.Encode()
action1 := determineGranularS3Action(req1, s3_constants.ACTION_WRITE, "data", "large-dataset.csv")
// Step 2: List existing parts (GET with uploadId query) - THIS WAS THE MISSING MAPPING
req2 := &http.Request{
Method: "GET",
URL: &url.URL{Path: "/data/large-dataset.csv"},
}
query2 := req2.URL.Query()
query2.Set("uploadId", "dataset-upload-20240827-001")
req2.URL.RawQuery = query2.Encode()
action2 := determineGranularS3Action(req2, s3_constants.ACTION_READ, "data", "large-dataset.csv")
// Step 3: Upload a part (PUT with uploadId and partNumber)
req3 := &http.Request{
Method: "PUT",
URL: &url.URL{Path: "/data/large-dataset.csv"},
}
query3 := req3.URL.Query()
query3.Set("uploadId", "dataset-upload-20240827-001")
query3.Set("partNumber", "5")
req3.URL.RawQuery = query3.Encode()
action3 := determineGranularS3Action(req3, s3_constants.ACTION_WRITE, "data", "large-dataset.csv")
// Step 4: Complete multipart upload (POST with uploadId)
req4 := &http.Request{
Method: "POST",
URL: &url.URL{Path: "/data/large-dataset.csv"},
}
query4 := req4.URL.Query()
query4.Set("uploadId", "dataset-upload-20240827-001")
req4.URL.RawQuery = query4.Encode()
action4 := determineGranularS3Action(req4, s3_constants.ACTION_WRITE, "data", "large-dataset.csv")
// Verify each step has the correct action mapping
assert.Equal(t, "s3:CreateMultipartUpload", action1, "Step 1: Initiate upload")
assert.Equal(t, "s3:ListParts", action2, "Step 2: List parts (FIXED by this PR)")
assert.Equal(t, "s3:UploadPart", action3, "Step 3: Upload part")
assert.Equal(t, "s3:CompleteMultipartUpload", action4, "Step 4: Complete upload")
// Verify that each step requires different permissions (security principle)
actions := []string{action1, action2, action3, action4}
for i, action := range actions {
for j, otherAction := range actions {
if i != j {
assert.NotEqual(t, action, otherAction,
"Each multipart operation step should require different permissions for fine-grained control")
}
}
}
})
t.Run("edge_case_upload_ids", func(t *testing.T) {
// Test various upload ID formats to ensure the fix works with real AWS-compatible upload IDs
testUploadIds := []string{
"simple123",
"complex-upload-id-with-hyphens",
"upload_with_underscores_123",
"2VmVGvGhqM0sXnVeBjMNCqtRvr.ygGz0pWPLKAj.YW3zK7VmpFHYuLKVR8OOXnHEhP3WfwlwLKMYJxoHgkGYYv",
"very-long-upload-id-that-might-be-generated-by-aws-s3-or-compatible-services-abcd1234",
"uploadId-with.dots.and-dashes_and_underscores123",
}
for _, uploadId := range testUploadIds {
req := &http.Request{
Method: "GET",
URL: &url.URL{Path: "/test-bucket/test-file.bin"},
}
query := req.URL.Query()
query.Set("uploadId", uploadId)
req.URL.RawQuery = query.Encode()
action := determineGranularS3Action(req, s3_constants.ACTION_READ, "test-bucket", "test-file.bin")
assert.Equal(t, "s3:ListParts", action,
"Upload ID format %s should be correctly detected and mapped to s3:ListParts", uploadId)
}
})
}
Loading…
Cancel
Save