You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

312 lines
10 KiB

3 months ago
  1. """Test module for KeycloakUMA."""
  2. import re
  3. import pytest
  4. from keycloak import KeycloakAdmin, KeycloakOpenIDConnection, KeycloakUMA
  5. from keycloak.exceptions import (
  6. KeycloakDeleteError,
  7. KeycloakGetError,
  8. KeycloakPostError,
  9. KeycloakPutError,
  10. )
  11. from keycloak.uma_permissions import UMAPermission
  12. def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
  13. """Test KeycloakUMA's init method.
  14. :param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
  15. :type oid_connection_with_authz: KeycloakOpenIDConnection
  16. """
  17. connection = oid_connection_with_authz
  18. uma = KeycloakUMA(connection=connection)
  19. assert isinstance(uma.connection, KeycloakOpenIDConnection)
  20. # should initially be empty
  21. assert uma._well_known is None
  22. assert uma.uma_well_known
  23. # should be cached after first reference
  24. assert uma._well_known is not None
  25. def test_uma_well_known(uma: KeycloakUMA):
  26. """Test the well_known method.
  27. :param uma: Keycloak UMA client
  28. :type uma: KeycloakUMA
  29. """
  30. res = uma.uma_well_known
  31. assert res is not None
  32. assert res != dict()
  33. for key in ["resource_registration_endpoint"]:
  34. assert key in res
  35. def test_uma_resource_sets(uma: KeycloakUMA):
  36. """Test resource sets.
  37. :param uma: Keycloak UMA client
  38. :type uma: KeycloakUMA
  39. """
  40. # Check that only the default resource is present
  41. resource_sets = uma.resource_set_list()
  42. resource_set_list = list(resource_sets)
  43. assert len(resource_set_list) == 1, resource_set_list
  44. assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
  45. # Test query for resource sets
  46. resource_set_list_ids = uma.resource_set_list_ids()
  47. assert len(resource_set_list_ids) == 1
  48. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default")
  49. assert resource_set_list_ids2 == resource_set_list_ids
  50. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default Resource")
  51. assert resource_set_list_ids2 == resource_set_list_ids
  52. resource_set_list_ids = uma.resource_set_list_ids(name="Default", exact_name=True)
  53. assert len(resource_set_list_ids) == 0
  54. resource_set_list_ids = uma.resource_set_list_ids(first=1)
  55. assert len(resource_set_list_ids) == 0
  56. resource_set_list_ids = uma.resource_set_list_ids(scope="Invalid")
  57. assert len(resource_set_list_ids) == 0
  58. resource_set_list_ids = uma.resource_set_list_ids(owner="Invalid")
  59. assert len(resource_set_list_ids) == 0
  60. resource_set_list_ids = uma.resource_set_list_ids(resource_type="Invalid")
  61. assert len(resource_set_list_ids) == 0
  62. resource_set_list_ids = uma.resource_set_list_ids(name="Invalid")
  63. assert len(resource_set_list_ids) == 0
  64. resource_set_list_ids = uma.resource_set_list_ids(uri="Invalid")
  65. assert len(resource_set_list_ids) == 0
  66. resource_set_list_ids = uma.resource_set_list_ids(maximum=0)
  67. assert len(resource_set_list_ids) == 0
  68. # Test create resource set
  69. resource_to_create = {
  70. "name": "mytest",
  71. "scopes": ["test:read", "test:write"],
  72. "type": "urn:test",
  73. }
  74. created_resource = uma.resource_set_create(resource_to_create)
  75. assert created_resource
  76. assert created_resource["_id"], created_resource
  77. assert set(resource_to_create).issubset(set(created_resource)), created_resource
  78. # Test create the same resource set
  79. with pytest.raises(KeycloakPostError) as err:
  80. uma.resource_set_create(resource_to_create)
  81. assert err.match(
  82. re.escape(
  83. '409: b\'{"error":"invalid_request","error_description":'
  84. '"Resource with name [mytest] already exists."}\''
  85. )
  86. )
  87. # Test get resource set
  88. latest_resource = uma.resource_set_read(created_resource["_id"])
  89. assert latest_resource["name"] == created_resource["name"]
  90. # Test update resource set
  91. latest_resource["name"] = "New Resource Name"
  92. res = uma.resource_set_update(created_resource["_id"], latest_resource)
  93. assert res == dict(), res
  94. updated_resource = uma.resource_set_read(created_resource["_id"])
  95. assert updated_resource["name"] == "New Resource Name"
  96. # Test update resource set fail
  97. with pytest.raises(KeycloakPutError) as err:
  98. uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
  99. assert err.match('400: b\'{"error":"Unrecognized field')
  100. # Test delete resource set
  101. res = uma.resource_set_delete(resource_id=created_resource["_id"])
  102. assert res == dict(), res
  103. with pytest.raises(KeycloakGetError) as err:
  104. uma.resource_set_read(created_resource["_id"])
  105. err.match("404: b''")
  106. # Test delete fail
  107. with pytest.raises(KeycloakDeleteError) as err:
  108. uma.resource_set_delete(resource_id=created_resource["_id"])
  109. assert err.match("404: b''")
  110. def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
  111. """Test policies.
  112. :param uma: Keycloak UMA client
  113. :type uma: KeycloakUMA
  114. :param admin: Keycloak Admin client
  115. :type admin: KeycloakAdmin
  116. """
  117. # Create some required test data
  118. resource_to_create = {
  119. "name": "mytest",
  120. "scopes": ["test:read", "test:write"],
  121. "type": "urn:test",
  122. "ownerManagedAccess": True,
  123. }
  124. created_resource = uma.resource_set_create(resource_to_create)
  125. group_id = admin.create_group({"name": "UMAPolicyGroup"})
  126. role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
  127. other_client_id = admin.create_client({"name": "UMAOtherClient"})
  128. client = admin.get_client(other_client_id)
  129. resource_id = created_resource["_id"]
  130. # Create a role policy
  131. policy_to_create = {
  132. "name": "TestPolicyRole",
  133. "description": "Test resource policy description",
  134. "scopes": ["test:read", "test:write"],
  135. "roles": ["roleUMAPolicy"],
  136. }
  137. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  138. assert policy
  139. # Create a client policy
  140. policy_to_create = {
  141. "name": "TestPolicyClient",
  142. "description": "Test resource policy description",
  143. "scopes": ["test:read"],
  144. "clients": [client["clientId"]],
  145. }
  146. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  147. assert policy
  148. policy_to_create = {
  149. "name": "TestPolicyGroup",
  150. "description": "Test resource policy description",
  151. "scopes": ["test:read"],
  152. "groups": ["/UMAPolicyGroup"],
  153. }
  154. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  155. assert policy
  156. policies = uma.policy_query()
  157. assert len(policies) == 3
  158. policies = uma.policy_query(name="TestPolicyGroup")
  159. assert len(policies) == 1
  160. policy_id = policy["id"]
  161. uma.policy_delete(policy_id)
  162. with pytest.raises(KeycloakDeleteError) as err:
  163. uma.policy_delete(policy_id)
  164. assert err.match(
  165. '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
  166. )
  167. policies = uma.policy_query()
  168. assert len(policies) == 2
  169. policy = policies[0]
  170. uma.policy_update(policy_id=policy["id"], payload=policy)
  171. policies = uma.policy_query()
  172. assert len(policies) == 2
  173. policies = uma.policy_query(name="Invalid")
  174. assert len(policies) == 0
  175. policies = uma.policy_query(scope="Invalid")
  176. assert len(policies) == 0
  177. policies = uma.policy_query(resource="Invalid")
  178. assert len(policies) == 0
  179. policies = uma.policy_query(first=3)
  180. assert len(policies) == 0
  181. policies = uma.policy_query(maximum=0)
  182. assert len(policies) == 0
  183. policies = uma.policy_query(name=policy["name"])
  184. assert len(policies) == 1
  185. policies = uma.policy_query(scope=policy["scopes"][0])
  186. assert len(policies) == 2
  187. policies = uma.policy_query(resource=resource_id)
  188. assert len(policies) == 2
  189. uma.resource_set_delete(resource_id)
  190. admin.delete_client(other_client_id)
  191. admin.delete_realm_role(role_id)
  192. admin.delete_group(group_id)
  193. def test_uma_access(uma: KeycloakUMA):
  194. """Test permission access checks.
  195. :param uma: Keycloak UMA client
  196. :type uma: KeycloakUMA
  197. """
  198. resource_to_create = {
  199. "name": "mytest",
  200. "scopes": ["read", "write"],
  201. "type": "urn:test",
  202. "ownerManagedAccess": True,
  203. }
  204. resource = uma.resource_set_create(resource_to_create)
  205. policy_to_create = {
  206. "name": "TestPolicy",
  207. "description": "Test resource policy description",
  208. "scopes": [resource_to_create["scopes"][0]],
  209. "clients": [uma.connection.client_id],
  210. }
  211. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  212. token = uma.connection.token
  213. permissions = list()
  214. assert uma.permissions_check(token["access_token"], permissions)
  215. permissions.append(UMAPermission(resource=resource_to_create["name"]))
  216. assert uma.permissions_check(token["access_token"], permissions)
  217. permissions.append(UMAPermission(resource="not valid"))
  218. assert not uma.permissions_check(token["access_token"], permissions)
  219. uma.resource_set_delete(resource["_id"])
  220. def test_uma_permission_ticket(uma: KeycloakUMA):
  221. """Test permission ticket generation.
  222. :param uma: Keycloak UMA client
  223. :type uma: KeycloakUMA
  224. """
  225. resource_to_create = {
  226. "name": "mytest",
  227. "scopes": ["read", "write"],
  228. "type": "urn:test",
  229. "ownerManagedAccess": True,
  230. }
  231. resource = uma.resource_set_create(resource_to_create)
  232. policy_to_create = {
  233. "name": "TestPolicy",
  234. "description": "Test resource policy description",
  235. "scopes": [resource_to_create["scopes"][0]],
  236. "clients": [uma.connection.client_id],
  237. }
  238. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  239. permissions = (
  240. UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
  241. )
  242. response = uma.permission_ticket_create(permissions)
  243. rpt = uma.connection.keycloak_openid.token(
  244. grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
  245. )
  246. assert rpt
  247. assert "access_token" in rpt
  248. permissions = (UMAPermission(resource="invalid"),)
  249. with pytest.raises(KeycloakPostError):
  250. uma.permission_ticket_create(permissions)
  251. uma.resource_set_delete(resource["_id"])