You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

311 lines
10 KiB

  1. """Test module for KeycloakUMA."""
  2. import re
  3. import pytest
  4. from keycloak import KeycloakAdmin, KeycloakOpenIDConnection, KeycloakUMA
  5. from keycloak.exceptions import (
  6. KeycloakDeleteError,
  7. KeycloakGetError,
  8. KeycloakPostError,
  9. KeycloakPutError,
  10. )
  11. from keycloak.uma_permissions import UMAPermission
  12. def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
  13. """Test KeycloakUMA's init method.
  14. :param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
  15. :type oid_connection_with_authz: KeycloakOpenIDConnection
  16. """
  17. connection = oid_connection_with_authz
  18. uma = KeycloakUMA(connection=connection)
  19. assert isinstance(uma.connection, KeycloakOpenIDConnection)
  20. # should initially be empty
  21. assert uma._well_known is None
  22. assert uma.uma_well_known
  23. # should be cached after first reference
  24. assert uma._well_known is not None
  25. def test_uma_well_known(uma: KeycloakUMA):
  26. """Test the well_known method.
  27. :param uma: Keycloak UMA client
  28. :type uma: KeycloakUMA
  29. """
  30. res = uma.uma_well_known
  31. assert res is not None
  32. assert res != dict()
  33. for key in ["resource_registration_endpoint"]:
  34. assert key in res
  35. def test_uma_resource_sets(uma: KeycloakUMA):
  36. """Test resource sets.
  37. :param uma: Keycloak UMA client
  38. :type uma: KeycloakUMA
  39. """
  40. # Check that only the default resource is present
  41. resource_sets = uma.resource_set_list()
  42. resource_set_list = list(resource_sets)
  43. assert len(resource_set_list) == 1, resource_set_list
  44. assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
  45. # Test query for resource sets
  46. resource_set_list_ids = uma.resource_set_list_ids()
  47. assert len(resource_set_list_ids) == 1
  48. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default")
  49. assert resource_set_list_ids2 == resource_set_list_ids
  50. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default Resource")
  51. assert resource_set_list_ids2 == resource_set_list_ids
  52. resource_set_list_ids = uma.resource_set_list_ids(name="Default", exact_name=True)
  53. assert len(resource_set_list_ids) == 0
  54. resource_set_list_ids = uma.resource_set_list_ids(first=1)
  55. assert len(resource_set_list_ids) == 0
  56. resource_set_list_ids = uma.resource_set_list_ids(scope="Invalid")
  57. assert len(resource_set_list_ids) == 0
  58. resource_set_list_ids = uma.resource_set_list_ids(owner="Invalid")
  59. assert len(resource_set_list_ids) == 0
  60. resource_set_list_ids = uma.resource_set_list_ids(resource_type="Invalid")
  61. assert len(resource_set_list_ids) == 0
  62. resource_set_list_ids = uma.resource_set_list_ids(name="Invalid")
  63. assert len(resource_set_list_ids) == 0
  64. resource_set_list_ids = uma.resource_set_list_ids(uri="Invalid")
  65. assert len(resource_set_list_ids) == 0
  66. resource_set_list_ids = uma.resource_set_list_ids(maximum=0)
  67. assert len(resource_set_list_ids) == 0
  68. # Test create resource set
  69. resource_to_create = {
  70. "name": "mytest",
  71. "scopes": ["test:read", "test:write"],
  72. "type": "urn:test",
  73. }
  74. created_resource = uma.resource_set_create(resource_to_create)
  75. assert created_resource
  76. assert created_resource["_id"], created_resource
  77. assert set(resource_to_create).issubset(set(created_resource)), created_resource
  78. # Test create the same resource set
  79. with pytest.raises(KeycloakPostError) as err:
  80. uma.resource_set_create(resource_to_create)
  81. assert err.match(
  82. re.escape(
  83. '409: b\'{"error":"invalid_request","error_description":'
  84. '"Resource with name [mytest] already exists."}\''
  85. )
  86. )
  87. # Test get resource set
  88. latest_resource = uma.resource_set_read(created_resource["_id"])
  89. assert latest_resource["name"] == created_resource["name"]
  90. # Test update resource set
  91. latest_resource["name"] = "New Resource Name"
  92. res = uma.resource_set_update(created_resource["_id"], latest_resource)
  93. assert res == dict(), res
  94. updated_resource = uma.resource_set_read(created_resource["_id"])
  95. assert updated_resource["name"] == "New Resource Name"
  96. # Test update resource set fail
  97. with pytest.raises(KeycloakPutError) as err:
  98. uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
  99. assert err.match('400: b\'{"error":"Unrecognized field')
  100. # Test delete resource set
  101. res = uma.resource_set_delete(resource_id=created_resource["_id"])
  102. assert res == dict(), res
  103. with pytest.raises(KeycloakGetError) as err:
  104. uma.resource_set_read(created_resource["_id"])
  105. err.match("404: b''")
  106. # Test delete fail
  107. with pytest.raises(KeycloakDeleteError) as err:
  108. uma.resource_set_delete(resource_id=created_resource["_id"])
  109. assert err.match("404: b''")
  110. def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
  111. """Test policies.
  112. :param uma: Keycloak UMA client
  113. :type uma: KeycloakUMA
  114. :param admin: Keycloak Admin client
  115. :type admin: KeycloakAdmin
  116. """
  117. # Create some required test data
  118. resource_to_create = {
  119. "name": "mytest",
  120. "scopes": ["test:read", "test:write"],
  121. "type": "urn:test",
  122. "ownerManagedAccess": True,
  123. }
  124. created_resource = uma.resource_set_create(resource_to_create)
  125. group_id = admin.create_group({"name": "UMAPolicyGroup"})
  126. role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
  127. other_client_id = admin.create_client({"name": "UMAOtherClient"})
  128. client = admin.get_client(other_client_id)
  129. resource_id = created_resource["_id"]
  130. # Create a role policy
  131. policy_to_create = {
  132. "name": "TestPolicyRole",
  133. "description": "Test resource policy description",
  134. "scopes": ["test:read", "test:write"],
  135. "roles": ["roleUMAPolicy"],
  136. }
  137. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  138. assert policy
  139. # Create a client policy
  140. policy_to_create = {
  141. "name": "TestPolicyClient",
  142. "description": "Test resource policy description",
  143. "scopes": ["test:read"],
  144. "clients": [client["clientId"]],
  145. }
  146. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  147. assert policy
  148. policy_to_create = {
  149. "name": "TestPolicyGroup",
  150. "description": "Test resource policy description",
  151. "scopes": ["test:read"],
  152. "groups": ["/UMAPolicyGroup"],
  153. }
  154. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  155. assert policy
  156. policies = uma.policy_query()
  157. assert len(policies) == 3
  158. policies = uma.policy_query(name="TestPolicyGroup")
  159. assert len(policies) == 1
  160. policy_id = policy["id"]
  161. uma.policy_delete(policy_id)
  162. with pytest.raises(KeycloakDeleteError) as err:
  163. uma.policy_delete(policy_id)
  164. assert err.match(
  165. '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
  166. )
  167. policies = uma.policy_query()
  168. assert len(policies) == 2
  169. policy = policies[0]
  170. uma.policy_update(policy_id=policy["id"], payload=policy)
  171. policies = uma.policy_query()
  172. assert len(policies) == 2
  173. policies = uma.policy_query(name="Invalid")
  174. assert len(policies) == 0
  175. policies = uma.policy_query(scope="Invalid")
  176. assert len(policies) == 0
  177. policies = uma.policy_query(resource="Invalid")
  178. assert len(policies) == 0
  179. policies = uma.policy_query(first=3)
  180. assert len(policies) == 0
  181. policies = uma.policy_query(maximum=0)
  182. assert len(policies) == 0
  183. policies = uma.policy_query(name=policy["name"])
  184. assert len(policies) == 1
  185. policies = uma.policy_query(scope=policy["scopes"][0])
  186. assert len(policies) == 2
  187. policies = uma.policy_query(resource=resource_id)
  188. assert len(policies) == 2
  189. uma.resource_set_delete(resource_id)
  190. admin.delete_client(other_client_id)
  191. admin.delete_realm_role(role_id)
  192. admin.delete_group(group_id)
  193. def test_uma_access(uma: KeycloakUMA):
  194. """Test permission access checks.
  195. :param uma: Keycloak UMA client
  196. :type uma: KeycloakUMA
  197. """
  198. resource_to_create = {
  199. "name": "mytest",
  200. "scopes": ["read", "write"],
  201. "type": "urn:test",
  202. "ownerManagedAccess": True,
  203. }
  204. resource = uma.resource_set_create(resource_to_create)
  205. policy_to_create = {
  206. "name": "TestPolicy",
  207. "description": "Test resource policy description",
  208. "scopes": [resource_to_create["scopes"][0]],
  209. "clients": [uma.connection.client_id],
  210. }
  211. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  212. token = uma.connection.token
  213. permissions = list()
  214. assert uma.permissions_check(token["access_token"], permissions)
  215. permissions.append(UMAPermission(resource=resource_to_create["name"]))
  216. assert uma.permissions_check(token["access_token"], permissions)
  217. permissions.append(UMAPermission(resource="not valid"))
  218. assert not uma.permissions_check(token["access_token"], permissions)
  219. uma.resource_set_delete(resource["_id"])
  220. def test_uma_permission_ticket(uma: KeycloakUMA):
  221. """Test permission ticket generation.
  222. :param uma: Keycloak UMA client
  223. :type uma: KeycloakUMA
  224. """
  225. resource_to_create = {
  226. "name": "mytest",
  227. "scopes": ["read", "write"],
  228. "type": "urn:test",
  229. "ownerManagedAccess": True,
  230. }
  231. resource = uma.resource_set_create(resource_to_create)
  232. policy_to_create = {
  233. "name": "TestPolicy",
  234. "description": "Test resource policy description",
  235. "scopes": [resource_to_create["scopes"][0]],
  236. "clients": [uma.connection.client_id],
  237. }
  238. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  239. permissions = (
  240. UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
  241. )
  242. response = uma.permission_ticket_create(permissions)
  243. rpt = uma.connection.keycloak_openid.token(
  244. grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
  245. )
  246. assert rpt
  247. assert "access_token" in rpt
  248. permissions = (UMAPermission(resource="invalid"),)
  249. with pytest.raises(KeycloakPostError):
  250. uma.permission_ticket_create(permissions)
  251. uma.resource_set_delete(resource["_id"])