diff --git a/test/s3/iam/README.md b/test/s3/iam/README.md index 62bb9fe37..0554efe2b 100644 --- a/test/s3/iam/README.md +++ b/test/s3/iam/README.md @@ -4,11 +4,13 @@ This directory contains comprehensive integration tests for the SeaweedFS S3 API ## Overview +**Important**: The STS service uses a **stateless JWT design** where all session information is embedded directly in the JWT token. No external session storage is required. + The S3 IAM integration tests validate the complete end-to-end functionality of: - **JWT Authentication**: OIDC token-based authentication with S3 API - **Policy Enforcement**: Fine-grained access control for S3 operations -- **Session Management**: STS session token validation and expiration +- **Stateless Session Management**: JWT-based session token validation and expiration (no external storage) - **Role-Based Access Control (RBAC)**: IAM roles with different permission levels - **Bucket Policies**: Resource-based access control integration - **Multipart Upload IAM**: Policy enforcement for multipart operations @@ -23,9 +25,8 @@ The S3 IAM integration tests validate the complete end-to-end functionality of: 2. **IAM Manager** - Core IAM orchestration and policy evaluation 3. **STS Service** - Security Token Service for temporary credentials 4. **Policy Engine** - AWS IAM-compatible policy evaluation -5. **Identity Providers** - OIDC and LDAP authentication providers -6. **Session Store** - Persistent session storage using SeaweedFS filer -7. **Policy Store** - Persistent policy storage using SeaweedFS filer +5. **Identity Providers** - OIDC and LDAP authentication providers +6. **Policy Store** - Persistent policy storage using SeaweedFS filer ### Test Framework @@ -162,7 +163,7 @@ The test configuration defines: - **Identity Providers**: OIDC and LDAP configurations - **IAM Roles**: Role definitions with trust policies - **IAM Policies**: Permission policies for different access levels -- **Session/Policy Stores**: Persistent storage configurations +- **Policy Stores**: Persistent storage configurations for IAM policies and roles ### Service Ports diff --git a/test/s3/iam/test_config.json b/test/s3/iam/test_config.json index 0a95cf693..b9ec5dc01 100644 --- a/test/s3/iam/test_config.json +++ b/test/s3/iam/test_config.json @@ -104,13 +104,6 @@ } } }, - "sessionStore": { - "type": "filer", - "config": { - "filerAddress": "localhost:8888", - "basePath": "/seaweedfs/iam/sessions" - } - }, "policyStore": { "type": "filer", "config": { diff --git a/weed/s3api/s3_iam_middleware.go b/weed/s3api/s3_iam_middleware.go index fd169e01c..b23e96476 100644 --- a/weed/s3api/s3_iam_middleware.go +++ b/weed/s3api/s3_iam_middleware.go @@ -6,7 +6,6 @@ import ( "net" "net/http" "net/url" - "sort" "strings" "time" @@ -311,6 +310,9 @@ func determineGranularS3Action(r *http.Request, fallbackAction Action, bucket st if _, hasVersions := query["versions"]; hasVersions { return "s3:GetObjectVersion" } + if _, hasUploadId := query["uploadId"]; hasUploadId { + return "s3:ListParts" + } // Default object read return "s3:GetObject" @@ -800,87 +802,17 @@ func (s3iam *S3IAMIntegration) validateOIDCToken(ctx context.Context, token stri return nil, fmt.Errorf("token not valid for any registered OIDC provider") } -// selectPrimaryRole intelligently selects the primary role from multiple available roles -// This provides deterministic role selection to prevent unpredictable access control behavior +// selectPrimaryRole simply picks the first role from the list +// The OIDC provider should return roles in priority order (most important first) func (s3iam *S3IAMIntegration) selectPrimaryRole(roles []string, externalIdentity *providers.ExternalIdentity) string { - if len(roles) == 1 { - return roles[0] - } - - glog.V(2).Infof("🔍 selectPrimaryRole: Selecting from %d roles: %v", len(roles), roles) - - // Strategy 1: Check for explicit primary_role claim - if primaryRole, exists := externalIdentity.Attributes["primary_role"]; exists && primaryRole != "" { - primaryRole = strings.TrimSpace(primaryRole) - // Verify the primary role is in the available roles list - for _, role := range roles { - if strings.EqualFold(role, primaryRole) { - glog.V(2).Infof("🔍 selectPrimaryRole: Using explicit primary_role: %s", role) - return role - } - } - glog.V(1).Infof("⚠️ selectPrimaryRole: primary_role '%s' not found in available roles, falling back", primaryRole) - } - - // Strategy 2: Role hierarchy - select most privileged role - selectedRole := s3iam.selectByRoleHierarchy(roles) - if selectedRole != "" { - glog.V(2).Infof("🔍 selectPrimaryRole: Using hierarchical selection: %s", selectedRole) - return selectedRole - } - - // Strategy 3: Deterministic fallback - alphabetical order (consistent behavior) - sort.Strings(roles) - glog.V(2).Infof("🔍 selectPrimaryRole: Using deterministic selection (first alphabetically): %s", roles[0]) - return roles[0] -} - -// selectByRoleHierarchy selects a role based on predefined privilege hierarchy -// Returns the most privileged role available, or empty string if no hierarchy match -func (s3iam *S3IAMIntegration) selectByRoleHierarchy(roles []string) string { - // Define role hierarchy from most privileged to least privileged - // This covers common enterprise role naming patterns - roleHierarchy := [][]string{ - // Tier 1: Super Admin roles - {"SuperAdmin", "super-admin", "super_admin", "root", "owner"}, - - // Tier 2: Admin roles - {"Admin", "admin", "Administrator", "administrator", "system-admin", "system_admin"}, - - // Tier 3: Manager/Power User roles - {"Manager", "manager", "PowerUser", "power-user", "power_user", "lead", "supervisor"}, - - // Tier 4: Editor/Write roles - {"Editor", "editor", "Writer", "writer", "Contributor", "contributor", "write", "readwrite", "read-write"}, - - // Tier 5: Viewer/Read roles - {"Viewer", "viewer", "Reader", "reader", "read-only", "read_only", "readonly", "read", "guest"}, + if len(roles) == 0 { + return "" } - // Find the highest priority role available - for _, tier := range roleHierarchy { - for _, privilegedRole := range tier { - for _, availableRole := range roles { - // Check for exact match or contains match (case-insensitive) - if strings.EqualFold(availableRole, privilegedRole) || - strings.Contains(strings.ToLower(availableRole), strings.ToLower(privilegedRole)) { - return availableRole - } - } - } - } - - // No hierarchy match found - return "" -} - -// getProviderNames returns a list of provider names for debugging -func getProviderNames(providers map[string]providers.IdentityProvider) []string { - names := make([]string, 0, len(providers)) - for name := range providers { - names = append(names, name) - } - return names + // Just pick the first one - keep it simple + selectedRole := roles[0] + glog.V(2).Infof("🔍 selectPrimaryRole: Selected first role: %s from %v", selectedRole, roles) + return selectedRole } // isSTSIssuer determines if an issuer belongs to the STS service diff --git a/weed/s3api/s3_iam_role_selection_test.go b/weed/s3api/s3_iam_role_selection_test.go index d6f1cba56..91b1f2822 100644 --- a/weed/s3api/s3_iam_role_selection_test.go +++ b/weed/s3api/s3_iam_role_selection_test.go @@ -1,7 +1,6 @@ package s3api import ( - "strings" "testing" "github.com/seaweedfs/seaweedfs/weed/iam/providers" @@ -11,249 +10,52 @@ import ( func TestSelectPrimaryRole(t *testing.T) { s3iam := &S3IAMIntegration{} - t.Run("single_role_returns_that_role", func(t *testing.T) { - roles := []string{"admin"} - externalIdentity := &providers.ExternalIdentity{ - Attributes: make(map[string]string), - } - - result := s3iam.selectPrimaryRole(roles, externalIdentity) - assert.Equal(t, "admin", result) - }) - - t.Run("explicit_primary_role_takes_precedence", func(t *testing.T) { - roles := []string{"admin", "reader", "writer"} - externalIdentity := &providers.ExternalIdentity{ - Attributes: map[string]string{ - "primary_role": "reader", - }, - } - - result := s3iam.selectPrimaryRole(roles, externalIdentity) - assert.Equal(t, "reader", result) - }) - - t.Run("explicit_primary_role_case_insensitive", func(t *testing.T) { - roles := []string{"Admin", "Reader", "Writer"} - externalIdentity := &providers.ExternalIdentity{ - Attributes: map[string]string{ - "primary_role": "admin", - }, - } - - result := s3iam.selectPrimaryRole(roles, externalIdentity) - assert.Equal(t, "Admin", result) - }) - - t.Run("invalid_primary_role_falls_back_to_hierarchy", func(t *testing.T) { - roles := []string{"admin", "reader", "writer"} - externalIdentity := &providers.ExternalIdentity{ - Attributes: map[string]string{ - "primary_role": "nonexistent", - }, - } - - result := s3iam.selectPrimaryRole(roles, externalIdentity) - assert.Equal(t, "admin", result) // Should select admin via hierarchy + t.Run("empty_roles_returns_empty", func(t *testing.T) { + identity := &providers.ExternalIdentity{Attributes: make(map[string]string)} + result := s3iam.selectPrimaryRole([]string{}, identity) + assert.Equal(t, "", result) }) - t.Run("hierarchy_selection_admin_over_reader", func(t *testing.T) { - roles := []string{"reader", "admin", "writer"} - externalIdentity := &providers.ExternalIdentity{ - Attributes: make(map[string]string), - } - - result := s3iam.selectPrimaryRole(roles, externalIdentity) - assert.Equal(t, "admin", result) // Admin has higher priority + t.Run("single_role_returns_that_role", func(t *testing.T) { + identity := &providers.ExternalIdentity{Attributes: make(map[string]string)} + result := s3iam.selectPrimaryRole([]string{"admin"}, identity) + assert.Equal(t, "admin", result) }) - t.Run("hierarchy_selection_case_insensitive", func(t *testing.T) { - roles := []string{"Reader", "ADMIN", "writer"} - externalIdentity := &providers.ExternalIdentity{ - Attributes: make(map[string]string), - } - - result := s3iam.selectPrimaryRole(roles, externalIdentity) - assert.Equal(t, "ADMIN", result) + t.Run("multiple_roles_returns_first", func(t *testing.T) { + identity := &providers.ExternalIdentity{Attributes: make(map[string]string)} + roles := []string{"viewer", "manager", "admin"} + result := s3iam.selectPrimaryRole(roles, identity) + assert.Equal(t, "viewer", result, "Should return first role") }) - t.Run("hierarchy_selection_contains_match", func(t *testing.T) { - roles := []string{"system-reader", "system-admin-user", "system-writer"} - externalIdentity := &providers.ExternalIdentity{ - Attributes: make(map[string]string), - } + t.Run("order_matters", func(t *testing.T) { + identity := &providers.ExternalIdentity{Attributes: make(map[string]string)} - result := s3iam.selectPrimaryRole(roles, externalIdentity) - assert.Equal(t, "system-admin-user", result) // Contains "admin" - }) + // Test different orderings + roles1 := []string{"admin", "viewer", "manager"} + result1 := s3iam.selectPrimaryRole(roles1, identity) + assert.Equal(t, "admin", result1) - t.Run("deterministic_fallback_alphabetical", func(t *testing.T) { - // Roles that don't match any hierarchy - roles := []string{"zebra", "alpha", "beta"} - externalIdentity := &providers.ExternalIdentity{ - Attributes: make(map[string]string), - } + roles2 := []string{"viewer", "admin", "manager"} + result2 := s3iam.selectPrimaryRole(roles2, identity) + assert.Equal(t, "viewer", result2) - result := s3iam.selectPrimaryRole(roles, externalIdentity) - assert.Equal(t, "alpha", result) // First alphabetically + roles3 := []string{"manager", "admin", "viewer"} + result3 := s3iam.selectPrimaryRole(roles3, identity) + assert.Equal(t, "manager", result3) }) t.Run("complex_enterprise_roles", func(t *testing.T) { + identity := &providers.ExternalIdentity{Attributes: make(map[string]string)} roles := []string{ - "app-user-readonly", - "app-user-contributor", - "app-admin-full", - "system-guest", - } - externalIdentity := &providers.ExternalIdentity{ - Attributes: make(map[string]string), - } - - result := s3iam.selectPrimaryRole(roles, externalIdentity) - assert.Equal(t, "app-admin-full", result) // Contains "admin" - }) -} - -func TestSelectByRoleHierarchy(t *testing.T) { - s3iam := &S3IAMIntegration{} - - t.Run("super_admin_highest_priority", func(t *testing.T) { - roles := []string{"admin", "super-admin", "reader"} - result := s3iam.selectByRoleHierarchy(roles) - assert.Equal(t, "super-admin", result) - }) - - t.Run("admin_over_manager", func(t *testing.T) { - roles := []string{"manager", "admin", "reader"} - result := s3iam.selectByRoleHierarchy(roles) - assert.Equal(t, "admin", result) - }) - - t.Run("manager_over_editor", func(t *testing.T) { - roles := []string{"editor", "manager", "reader"} - result := s3iam.selectByRoleHierarchy(roles) - assert.Equal(t, "manager", result) - }) - - t.Run("editor_over_viewer", func(t *testing.T) { - roles := []string{"viewer", "editor"} - result := s3iam.selectByRoleHierarchy(roles) - assert.Equal(t, "editor", result) - }) - - t.Run("no_hierarchy_match_returns_empty", func(t *testing.T) { - roles := []string{"custom-role-1", "custom-role-2", "special-user"} - result := s3iam.selectByRoleHierarchy(roles) - assert.Equal(t, "", result) - }) - - t.Run("multiple_same_tier_returns_first_found", func(t *testing.T) { - roles := []string{"viewer", "reader", "guest"} - result := s3iam.selectByRoleHierarchy(roles) - // Should return first match found in the hierarchy (viewer comes first in tier definition) - assert.Equal(t, "viewer", result) - }) - - t.Run("case_variations", func(t *testing.T) { - roles := []string{"ADMIN", "Reader", "writer"} - result := s3iam.selectByRoleHierarchy(roles) - assert.Equal(t, "ADMIN", result) - }) -} - -func TestRoleSelectionIntegration(t *testing.T) { - t.Run("real_world_enterprise_scenario", func(t *testing.T) { - // Simulate a real enterprise OIDC token with multiple roles - testCases := []struct { - name string - roles []string - primaryRole string // explicit primary_role claim - expectedRole string - selectionType string - }{ - { - name: "explicit_primary_overrides_hierarchy", - roles: []string{"admin", "reader", "writer"}, - primaryRole: "reader", - expectedRole: "reader", - selectionType: "explicit", - }, - { - name: "hierarchy_selects_admin_over_others", - roles: []string{"contributor", "admin", "viewer"}, - primaryRole: "", // No explicit primary - expectedRole: "admin", - selectionType: "hierarchy", - }, - { - name: "deterministic_fallback_for_unknown_roles", - roles: []string{"zebra-role", "alpha-role", "beta-role"}, - primaryRole: "", - expectedRole: "alpha-role", - selectionType: "deterministic", - }, - { - name: "complex_enterprise_naming", - roles: []string{"org-user-readonly", "org-power-user", "org-system-admin"}, - primaryRole: "", - expectedRole: "org-system-admin", // Contains "admin" - selectionType: "hierarchy", - }, - } - - s3iam := &S3IAMIntegration{} - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - externalIdentity := &providers.ExternalIdentity{ - Attributes: make(map[string]string), - } - if tc.primaryRole != "" { - externalIdentity.Attributes["primary_role"] = tc.primaryRole - } - - result := s3iam.selectPrimaryRole(tc.roles, externalIdentity) - assert.Equal(t, tc.expectedRole, result, - "Expected %s selection to return %s, got %s", - tc.selectionType, tc.expectedRole, result) - }) - } - }) -} - -// Test helper function to verify role parsing improvements -func TestRoleParsingImprovements(t *testing.T) { - t.Run("whitespace_handling", func(t *testing.T) { - // Test the improved role parsing logic - rolesStr := " admin , reader , writer " - roles := strings.Split(rolesStr, ",") - - // Clean up role names (this is what the main code does now) - var cleanRoles []string - for _, role := range roles { - cleanRole := strings.TrimSpace(role) - if cleanRole != "" { - cleanRoles = append(cleanRoles, cleanRole) - } - } - - expected := []string{"admin", "reader", "writer"} - assert.Equal(t, expected, cleanRoles) - }) - - t.Run("empty_roles_filtered", func(t *testing.T) { - rolesStr := "admin,,reader, ,writer" - roles := strings.Split(rolesStr, ",") - - var cleanRoles []string - for _, role := range roles { - cleanRole := strings.TrimSpace(role) - if cleanRole != "" { - cleanRoles = append(cleanRoles, cleanRole) - } - } - - expected := []string{"admin", "reader", "writer"} - assert.Equal(t, expected, cleanRoles) + "finance-readonly", + "hr-manager", + "it-system-admin", + "guest-viewer", + } + result := s3iam.selectPrimaryRole(roles, identity) + // Should return the first role + assert.Equal(t, "finance-readonly", result, "Should return first role in list") }) } diff --git a/weed/s3api/s3_list_parts_action_test.go b/weed/s3api/s3_list_parts_action_test.go new file mode 100644 index 000000000..4c0a28eff --- /dev/null +++ b/weed/s3api/s3_list_parts_action_test.go @@ -0,0 +1,286 @@ +package s3api + +import ( + "net/http" + "net/url" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/stretchr/testify/assert" +) + +// TestListPartsActionMapping tests the fix for the missing s3:ListParts action mapping +// when GET requests include an uploadId query parameter +func TestListPartsActionMapping(t *testing.T) { + testCases := []struct { + name string + method string + bucket string + objectKey string + queryParams map[string]string + fallbackAction Action + expectedAction string + description string + }{ + { + name: "get_object_without_uploadId", + method: "GET", + bucket: "test-bucket", + objectKey: "test-object.txt", + queryParams: map[string]string{}, + fallbackAction: s3_constants.ACTION_READ, + expectedAction: "s3:GetObject", + description: "GET request without uploadId should map to s3:GetObject", + }, + { + name: "get_object_with_uploadId", + method: "GET", + bucket: "test-bucket", + objectKey: "test-object.txt", + queryParams: map[string]string{"uploadId": "test-upload-id"}, + fallbackAction: s3_constants.ACTION_READ, + expectedAction: "s3:ListParts", + description: "GET request with uploadId should map to s3:ListParts (this was the missing mapping)", + }, + { + name: "get_object_with_uploadId_and_other_params", + method: "GET", + bucket: "test-bucket", + objectKey: "test-object.txt", + queryParams: map[string]string{ + "uploadId": "test-upload-id-123", + "max-parts": "100", + "part-number-marker": "50", + }, + fallbackAction: s3_constants.ACTION_READ, + expectedAction: "s3:ListParts", + description: "GET request with uploadId plus other multipart params should map to s3:ListParts", + }, + { + name: "get_object_versions", + method: "GET", + bucket: "test-bucket", + objectKey: "test-object.txt", + queryParams: map[string]string{"versions": ""}, + fallbackAction: s3_constants.ACTION_READ, + expectedAction: "s3:GetObjectVersion", + description: "GET request with versions should still map to s3:GetObjectVersion (precedence check)", + }, + { + name: "get_object_acl_without_uploadId", + method: "GET", + bucket: "test-bucket", + objectKey: "test-object.txt", + queryParams: map[string]string{"acl": ""}, + fallbackAction: s3_constants.ACTION_READ_ACP, + expectedAction: "s3:GetObjectAcl", + description: "GET request with acl should map to s3:GetObjectAcl (not affected by uploadId fix)", + }, + { + name: "post_multipart_upload_without_uploadId", + method: "POST", + bucket: "test-bucket", + objectKey: "test-object.txt", + queryParams: map[string]string{"uploads": ""}, + fallbackAction: s3_constants.ACTION_WRITE, + expectedAction: "s3:CreateMultipartUpload", + description: "POST request to initiate multipart upload should not be affected by uploadId fix", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create HTTP request with query parameters + req := &http.Request{ + Method: tc.method, + URL: &url.URL{Path: "/" + tc.bucket + "/" + tc.objectKey}, + } + + // Add query parameters + query := req.URL.Query() + for key, value := range tc.queryParams { + query.Set(key, value) + } + req.URL.RawQuery = query.Encode() + + // Call the granular action determination function + action := determineGranularS3Action(req, tc.fallbackAction, tc.bucket, tc.objectKey) + + // Verify the action mapping + assert.Equal(t, tc.expectedAction, action, + "Test case: %s - %s", tc.name, tc.description) + }) + } +} + +// TestListPartsActionMappingSecurityScenarios tests security scenarios for the ListParts fix +func TestListPartsActionMappingSecurityScenarios(t *testing.T) { + t.Run("privilege_separation_listparts_vs_getobject", func(t *testing.T) { + // Scenario: User has permission to list multipart upload parts but NOT to get the actual object content + // This is a common enterprise pattern where users can manage uploads but not read final objects + + // Test request 1: List parts with uploadId + req1 := &http.Request{ + Method: "GET", + URL: &url.URL{Path: "/secure-bucket/confidential-document.pdf"}, + } + query1 := req1.URL.Query() + query1.Set("uploadId", "active-upload-123") + req1.URL.RawQuery = query1.Encode() + action1 := determineGranularS3Action(req1, s3_constants.ACTION_READ, "secure-bucket", "confidential-document.pdf") + + // Test request 2: Get object without uploadId + req2 := &http.Request{ + Method: "GET", + URL: &url.URL{Path: "/secure-bucket/confidential-document.pdf"}, + } + action2 := determineGranularS3Action(req2, s3_constants.ACTION_READ, "secure-bucket", "confidential-document.pdf") + + // These should be different actions, allowing different permissions + assert.Equal(t, "s3:ListParts", action1, "Listing multipart parts should require s3:ListParts permission") + assert.Equal(t, "s3:GetObject", action2, "Reading object content should require s3:GetObject permission") + assert.NotEqual(t, action1, action2, "ListParts and GetObject should be separate permissions for security") + }) + + t.Run("policy_enforcement_precision", func(t *testing.T) { + // This test documents the security improvement - before the fix, both operations + // would incorrectly map to s3:GetObject, preventing fine-grained access control + + testCases := []struct { + description string + queryParams map[string]string + expectedAction string + securityNote string + }{ + { + description: "List multipart upload parts", + queryParams: map[string]string{"uploadId": "upload-abc123"}, + expectedAction: "s3:ListParts", + securityNote: "FIXED: Now correctly maps to s3:ListParts instead of s3:GetObject", + }, + { + description: "Get actual object content", + queryParams: map[string]string{}, + expectedAction: "s3:GetObject", + securityNote: "UNCHANGED: Still correctly maps to s3:GetObject", + }, + { + description: "Get object with complex upload ID", + queryParams: map[string]string{"uploadId": "complex-upload-id-with-hyphens-123-abc-def"}, + expectedAction: "s3:ListParts", + securityNote: "FIXED: Complex upload IDs now correctly detected", + }, + } + + for _, tc := range testCases { + req := &http.Request{ + Method: "GET", + URL: &url.URL{Path: "/test-bucket/test-object"}, + } + + query := req.URL.Query() + for key, value := range tc.queryParams { + query.Set(key, value) + } + req.URL.RawQuery = query.Encode() + + action := determineGranularS3Action(req, s3_constants.ACTION_READ, "test-bucket", "test-object") + + assert.Equal(t, tc.expectedAction, action, + "%s - %s", tc.description, tc.securityNote) + } + }) +} + +// TestListPartsActionRealWorldScenarios tests realistic enterprise multipart upload scenarios +func TestListPartsActionRealWorldScenarios(t *testing.T) { + t.Run("large_file_upload_workflow", func(t *testing.T) { + // Simulate a large file upload workflow where users need different permissions for each step + + // Step 1: Initiate multipart upload (POST with uploads query) + req1 := &http.Request{ + Method: "POST", + URL: &url.URL{Path: "/data/large-dataset.csv"}, + } + query1 := req1.URL.Query() + query1.Set("uploads", "") + req1.URL.RawQuery = query1.Encode() + action1 := determineGranularS3Action(req1, s3_constants.ACTION_WRITE, "data", "large-dataset.csv") + + // Step 2: List existing parts (GET with uploadId query) - THIS WAS THE MISSING MAPPING + req2 := &http.Request{ + Method: "GET", + URL: &url.URL{Path: "/data/large-dataset.csv"}, + } + query2 := req2.URL.Query() + query2.Set("uploadId", "dataset-upload-20240827-001") + req2.URL.RawQuery = query2.Encode() + action2 := determineGranularS3Action(req2, s3_constants.ACTION_READ, "data", "large-dataset.csv") + + // Step 3: Upload a part (PUT with uploadId and partNumber) + req3 := &http.Request{ + Method: "PUT", + URL: &url.URL{Path: "/data/large-dataset.csv"}, + } + query3 := req3.URL.Query() + query3.Set("uploadId", "dataset-upload-20240827-001") + query3.Set("partNumber", "5") + req3.URL.RawQuery = query3.Encode() + action3 := determineGranularS3Action(req3, s3_constants.ACTION_WRITE, "data", "large-dataset.csv") + + // Step 4: Complete multipart upload (POST with uploadId) + req4 := &http.Request{ + Method: "POST", + URL: &url.URL{Path: "/data/large-dataset.csv"}, + } + query4 := req4.URL.Query() + query4.Set("uploadId", "dataset-upload-20240827-001") + req4.URL.RawQuery = query4.Encode() + action4 := determineGranularS3Action(req4, s3_constants.ACTION_WRITE, "data", "large-dataset.csv") + + // Verify each step has the correct action mapping + assert.Equal(t, "s3:CreateMultipartUpload", action1, "Step 1: Initiate upload") + assert.Equal(t, "s3:ListParts", action2, "Step 2: List parts (FIXED by this PR)") + assert.Equal(t, "s3:UploadPart", action3, "Step 3: Upload part") + assert.Equal(t, "s3:CompleteMultipartUpload", action4, "Step 4: Complete upload") + + // Verify that each step requires different permissions (security principle) + actions := []string{action1, action2, action3, action4} + for i, action := range actions { + for j, otherAction := range actions { + if i != j { + assert.NotEqual(t, action, otherAction, + "Each multipart operation step should require different permissions for fine-grained control") + } + } + } + }) + + t.Run("edge_case_upload_ids", func(t *testing.T) { + // Test various upload ID formats to ensure the fix works with real AWS-compatible upload IDs + + testUploadIds := []string{ + "simple123", + "complex-upload-id-with-hyphens", + "upload_with_underscores_123", + "2VmVGvGhqM0sXnVeBjMNCqtRvr.ygGz0pWPLKAj.YW3zK7VmpFHYuLKVR8OOXnHEhP3WfwlwLKMYJxoHgkGYYv", + "very-long-upload-id-that-might-be-generated-by-aws-s3-or-compatible-services-abcd1234", + "uploadId-with.dots.and-dashes_and_underscores123", + } + + for _, uploadId := range testUploadIds { + req := &http.Request{ + Method: "GET", + URL: &url.URL{Path: "/test-bucket/test-file.bin"}, + } + query := req.URL.Query() + query.Set("uploadId", uploadId) + req.URL.RawQuery = query.Encode() + + action := determineGranularS3Action(req, s3_constants.ACTION_READ, "test-bucket", "test-file.bin") + + assert.Equal(t, "s3:ListParts", action, + "Upload ID format %s should be correctly detected and mapped to s3:ListParts", uploadId) + } + }) +}