You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

650 lines
22 KiB

9 months ago
  1. """Test module for KeycloakUMA."""
  2. import re
  3. from inspect import iscoroutinefunction, signature
  4. import pytest
  5. from keycloak import KeycloakAdmin, KeycloakOpenIDConnection, KeycloakUMA
  6. from keycloak.exceptions import (
  7. KeycloakDeleteError,
  8. KeycloakGetError,
  9. KeycloakPostError,
  10. KeycloakPutError,
  11. )
  12. from keycloak.uma_permissions import UMAPermission
  13. def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
  14. """Test KeycloakUMA's init method.
  15. :param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
  16. :type oid_connection_with_authz: KeycloakOpenIDConnection
  17. """
  18. connection = oid_connection_with_authz
  19. uma = KeycloakUMA(connection=connection)
  20. assert isinstance(uma.connection, KeycloakOpenIDConnection)
  21. # should initially be empty
  22. assert uma._well_known is None
  23. assert uma.uma_well_known
  24. # should be cached after first reference
  25. assert uma._well_known is not None
  26. def test_uma_well_known(uma: KeycloakUMA):
  27. """Test the well_known method.
  28. :param uma: Keycloak UMA client
  29. :type uma: KeycloakUMA
  30. """
  31. res = uma.uma_well_known
  32. assert res is not None
  33. assert res != dict()
  34. for key in ["resource_registration_endpoint"]:
  35. assert key in res
  36. def test_uma_resource_sets(uma: KeycloakUMA):
  37. """Test resource sets.
  38. :param uma: Keycloak UMA client
  39. :type uma: KeycloakUMA
  40. """
  41. # Check that only the default resource is present
  42. resource_sets = uma.resource_set_list()
  43. resource_set_list = list(resource_sets)
  44. assert len(resource_set_list) == 1, resource_set_list
  45. assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
  46. # Test query for resource sets
  47. resource_set_list_ids = uma.resource_set_list_ids()
  48. assert len(resource_set_list_ids) == 1
  49. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default")
  50. assert resource_set_list_ids2 == resource_set_list_ids
  51. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default Resource")
  52. assert resource_set_list_ids2 == resource_set_list_ids
  53. resource_set_list_ids = uma.resource_set_list_ids(name="Default", exact_name=True)
  54. assert len(resource_set_list_ids) == 0
  55. resource_set_list_ids = uma.resource_set_list_ids(first=1)
  56. assert len(resource_set_list_ids) == 0
  57. resource_set_list_ids = uma.resource_set_list_ids(scope="Invalid")
  58. assert len(resource_set_list_ids) == 0
  59. resource_set_list_ids = uma.resource_set_list_ids(owner="Invalid")
  60. assert len(resource_set_list_ids) == 0
  61. resource_set_list_ids = uma.resource_set_list_ids(resource_type="Invalid")
  62. assert len(resource_set_list_ids) == 0
  63. resource_set_list_ids = uma.resource_set_list_ids(name="Invalid")
  64. assert len(resource_set_list_ids) == 0
  65. resource_set_list_ids = uma.resource_set_list_ids(uri="Invalid")
  66. assert len(resource_set_list_ids) == 0
  67. resource_set_list_ids = uma.resource_set_list_ids(maximum=0)
  68. assert len(resource_set_list_ids) == 0
  69. # Test create resource set
  70. resource_to_create = {
  71. "name": "mytest",
  72. "scopes": ["test:read", "test:write"],
  73. "type": "urn:test",
  74. "uris": ["/some_resources/*"],
  75. }
  76. created_resource = uma.resource_set_create(resource_to_create)
  77. assert created_resource
  78. assert created_resource["_id"], created_resource
  79. assert set(resource_to_create).issubset(set(created_resource)), created_resource
  80. # Test getting resource with wildcard
  81. # Without matchingUri query option
  82. resource_set_list_ids = uma.resource_set_list_ids(uri="/some_resources/resource")
  83. assert len(resource_set_list_ids) == 0
  84. # With matchingUri query option
  85. resource_set_list_ids = uma.resource_set_list_ids(
  86. uri="/some_resources/resource", matchingUri=True
  87. )
  88. assert len(resource_set_list_ids) == 1
  89. # Test create the same resource set
  90. with pytest.raises(KeycloakPostError) as err:
  91. uma.resource_set_create(resource_to_create)
  92. assert err.match(
  93. re.escape(
  94. '409: b\'{"error":"invalid_request","error_description":'
  95. '"Resource with name [mytest] already exists."}\''
  96. )
  97. )
  98. # Test get resource set
  99. latest_resource = uma.resource_set_read(created_resource["_id"])
  100. assert latest_resource["name"] == created_resource["name"]
  101. # Test update resource set
  102. latest_resource["name"] = "New Resource Name"
  103. res = uma.resource_set_update(created_resource["_id"], latest_resource)
  104. assert res == dict(), res
  105. updated_resource = uma.resource_set_read(created_resource["_id"])
  106. assert updated_resource["name"] == "New Resource Name"
  107. # Test update resource set fail
  108. with pytest.raises(KeycloakPutError) as err:
  109. uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
  110. assert err.match('400: b\'{"error":"Unrecognized field')
  111. # Test delete resource set
  112. res = uma.resource_set_delete(resource_id=created_resource["_id"])
  113. assert res == dict(), res
  114. with pytest.raises(KeycloakGetError) as err:
  115. uma.resource_set_read(created_resource["_id"])
  116. err.match("404: b''")
  117. # Test delete fail
  118. with pytest.raises(KeycloakDeleteError) as err:
  119. uma.resource_set_delete(resource_id=created_resource["_id"])
  120. assert err.match("404: b''")
  121. def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
  122. """Test policies.
  123. :param uma: Keycloak UMA client
  124. :type uma: KeycloakUMA
  125. :param admin: Keycloak Admin client
  126. :type admin: KeycloakAdmin
  127. """
  128. # Create some required test data
  129. resource_to_create = {
  130. "name": "mytest",
  131. "scopes": ["test:read", "test:write"],
  132. "type": "urn:test",
  133. "ownerManagedAccess": True,
  134. }
  135. created_resource = uma.resource_set_create(resource_to_create)
  136. group_id = admin.create_group({"name": "UMAPolicyGroup"})
  137. role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
  138. other_client_id = admin.create_client({"name": "UMAOtherClient"})
  139. client = admin.get_client(other_client_id)
  140. resource_id = created_resource["_id"]
  141. # Create a role policy
  142. policy_to_create = {
  143. "name": "TestPolicyRole",
  144. "description": "Test resource policy description",
  145. "scopes": ["test:read", "test:write"],
  146. "roles": ["roleUMAPolicy"],
  147. }
  148. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  149. assert policy
  150. # Create a client policy
  151. policy_to_create = {
  152. "name": "TestPolicyClient",
  153. "description": "Test resource policy description",
  154. "scopes": ["test:read"],
  155. "clients": [client["clientId"]],
  156. }
  157. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  158. assert policy
  159. policy_to_create = {
  160. "name": "TestPolicyGroup",
  161. "description": "Test resource policy description",
  162. "scopes": ["test:read"],
  163. "groups": ["/UMAPolicyGroup"],
  164. }
  165. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  166. assert policy
  167. policies = uma.policy_query()
  168. assert len(policies) == 3
  169. policies = uma.policy_query(name="TestPolicyGroup")
  170. assert len(policies) == 1
  171. policy_id = policy["id"]
  172. uma.policy_delete(policy_id)
  173. with pytest.raises(KeycloakDeleteError) as err:
  174. uma.policy_delete(policy_id)
  175. assert err.match(
  176. '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
  177. )
  178. policies = uma.policy_query()
  179. assert len(policies) == 2
  180. policy = policies[0]
  181. uma.policy_update(policy_id=policy["id"], payload=policy)
  182. policies = uma.policy_query()
  183. assert len(policies) == 2
  184. policies = uma.policy_query(name="Invalid")
  185. assert len(policies) == 0
  186. policies = uma.policy_query(scope="Invalid")
  187. assert len(policies) == 0
  188. policies = uma.policy_query(resource="Invalid")
  189. assert len(policies) == 0
  190. policies = uma.policy_query(first=3)
  191. assert len(policies) == 0
  192. policies = uma.policy_query(maximum=0)
  193. assert len(policies) == 0
  194. policies = uma.policy_query(name=policy["name"])
  195. assert len(policies) == 1
  196. policies = uma.policy_query(scope=policy["scopes"][0])
  197. assert len(policies) == 2
  198. policies = uma.policy_query(resource=resource_id)
  199. assert len(policies) == 2
  200. uma.resource_set_delete(resource_id)
  201. admin.delete_client(other_client_id)
  202. admin.delete_realm_role(role_id)
  203. admin.delete_group(group_id)
  204. def test_uma_access(uma: KeycloakUMA):
  205. """Test permission access checks.
  206. :param uma: Keycloak UMA client
  207. :type uma: KeycloakUMA
  208. """
  209. resource_to_create = {
  210. "name": "mytest",
  211. "scopes": ["read", "write"],
  212. "type": "urn:test",
  213. "ownerManagedAccess": True,
  214. }
  215. resource = uma.resource_set_create(resource_to_create)
  216. policy_to_create = {
  217. "name": "TestPolicy",
  218. "description": "Test resource policy description",
  219. "scopes": [resource_to_create["scopes"][0]],
  220. "clients": [uma.connection.client_id],
  221. }
  222. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  223. token = uma.connection.token
  224. permissions = list()
  225. assert uma.permissions_check(token["access_token"], permissions)
  226. permissions.append(UMAPermission(resource=resource_to_create["name"]))
  227. assert uma.permissions_check(token["access_token"], permissions)
  228. permissions.append(UMAPermission(resource="not valid"))
  229. assert not uma.permissions_check(token["access_token"], permissions)
  230. uma.resource_set_delete(resource["_id"])
  231. def test_uma_permission_ticket(uma: KeycloakUMA):
  232. """Test permission ticket generation.
  233. :param uma: Keycloak UMA client
  234. :type uma: KeycloakUMA
  235. """
  236. resource_to_create = {
  237. "name": "mytest",
  238. "scopes": ["read", "write"],
  239. "type": "urn:test",
  240. "ownerManagedAccess": True,
  241. }
  242. resource = uma.resource_set_create(resource_to_create)
  243. policy_to_create = {
  244. "name": "TestPolicy",
  245. "description": "Test resource policy description",
  246. "scopes": [resource_to_create["scopes"][0]],
  247. "clients": [uma.connection.client_id],
  248. }
  249. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  250. permissions = (
  251. UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
  252. )
  253. response = uma.permission_ticket_create(permissions)
  254. rpt = uma.connection.keycloak_openid.token(
  255. grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
  256. )
  257. assert rpt
  258. assert "access_token" in rpt
  259. permissions = (UMAPermission(resource="invalid"),)
  260. with pytest.raises(KeycloakPostError):
  261. uma.permission_ticket_create(permissions)
  262. uma.resource_set_delete(resource["_id"])
  263. # async function start
  264. @pytest.mark.asyncio
  265. async def test_a_uma_well_known(uma: KeycloakUMA):
  266. """Test the well_known method.
  267. :param uma: Keycloak UMA client
  268. :type uma: KeycloakUMA
  269. """
  270. res = uma.uma_well_known
  271. assert res is not None
  272. assert res != dict()
  273. for key in ["resource_registration_endpoint"]:
  274. assert key in res
  275. @pytest.mark.asyncio
  276. async def test_a_uma_resource_sets(uma: KeycloakUMA):
  277. """Test resource sets.
  278. :param uma: Keycloak UMA client
  279. :type uma: KeycloakUMA
  280. """
  281. # Check that only the default resource is present
  282. resource_sets = uma.resource_set_list()
  283. resource_set_list = list(resource_sets)
  284. assert len(resource_set_list) == 1, resource_set_list
  285. assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
  286. # Test query for resource sets
  287. resource_set_list_ids = await uma.a_resource_set_list_ids()
  288. assert len(resource_set_list_ids) == 1
  289. resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default")
  290. assert resource_set_list_ids2 == resource_set_list_ids
  291. resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default Resource")
  292. assert resource_set_list_ids2 == resource_set_list_ids
  293. resource_set_list_ids = await uma.a_resource_set_list_ids(name="Default", exact_name=True)
  294. assert len(resource_set_list_ids) == 0
  295. resource_set_list_ids = await uma.a_resource_set_list_ids(first=1)
  296. assert len(resource_set_list_ids) == 0
  297. resource_set_list_ids = await uma.a_resource_set_list_ids(scope="Invalid")
  298. assert len(resource_set_list_ids) == 0
  299. resource_set_list_ids = await uma.a_resource_set_list_ids(owner="Invalid")
  300. assert len(resource_set_list_ids) == 0
  301. resource_set_list_ids = await uma.a_resource_set_list_ids(resource_type="Invalid")
  302. assert len(resource_set_list_ids) == 0
  303. resource_set_list_ids = await uma.a_resource_set_list_ids(name="Invalid")
  304. assert len(resource_set_list_ids) == 0
  305. resource_set_list_ids = await uma.a_resource_set_list_ids(uri="Invalid")
  306. assert len(resource_set_list_ids) == 0
  307. resource_set_list_ids = await uma.a_resource_set_list_ids(maximum=0)
  308. assert len(resource_set_list_ids) == 0
  309. # Test create resource set
  310. resource_to_create = {
  311. "name": "mytest",
  312. "scopes": ["test:read", "test:write"],
  313. "type": "urn:test",
  314. "uris": ["/some_resources/*"],
  315. }
  316. created_resource = await uma.a_resource_set_create(resource_to_create)
  317. assert created_resource
  318. assert created_resource["_id"], created_resource
  319. assert set(resource_to_create).issubset(set(created_resource)), created_resource
  320. # Test getting resource with wildcard
  321. # Without matchingUri query option
  322. resource_set_list_ids = await uma.a_resource_set_list_ids(uri="/some_resources/resource")
  323. assert len(resource_set_list_ids) == 0
  324. # With matchingUri query option
  325. resource_set_list_ids = await uma.a_resource_set_list_ids(
  326. uri="/some_resources/resource", matchingUri=True
  327. )
  328. assert len(resource_set_list_ids) == 1
  329. # Test create the same resource set
  330. with pytest.raises(KeycloakPostError) as err:
  331. await uma.a_resource_set_create(resource_to_create)
  332. assert err.match(
  333. re.escape(
  334. '409: b\'{"error":"invalid_request","error_description":'
  335. '"Resource with name [mytest] already exists."}\''
  336. )
  337. )
  338. # Test get resource set
  339. latest_resource = await uma.a_resource_set_read(created_resource["_id"])
  340. assert latest_resource["name"] == created_resource["name"]
  341. # Test update resource set
  342. latest_resource["name"] = "New Resource Name"
  343. res = await uma.a_resource_set_update(created_resource["_id"], latest_resource)
  344. assert res == dict(), res
  345. updated_resource = await uma.a_resource_set_read(created_resource["_id"])
  346. assert updated_resource["name"] == "New Resource Name"
  347. # Test update resource set fail
  348. with pytest.raises(KeycloakPutError) as err:
  349. uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
  350. assert err.match('400: b\'{"error":"Unrecognized field')
  351. # Test delete resource set
  352. res = await uma.a_resource_set_delete(resource_id=created_resource["_id"])
  353. assert res == dict(), res
  354. with pytest.raises(KeycloakGetError) as err:
  355. await uma.a_resource_set_read(created_resource["_id"])
  356. err.match("404: b''")
  357. # Test delete fail
  358. with pytest.raises(KeycloakDeleteError) as err:
  359. await uma.a_resource_set_delete(resource_id=created_resource["_id"])
  360. assert err.match("404: b''")
  361. @pytest.mark.asyncio
  362. async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
  363. """Test policies.
  364. :param uma: Keycloak UMA client
  365. :type uma: KeycloakUMA
  366. :param admin: Keycloak Admin client
  367. :type admin: KeycloakAdmin
  368. """
  369. # Create some required test data
  370. resource_to_create = {
  371. "name": "mytest",
  372. "scopes": ["test:read", "test:write"],
  373. "type": "urn:test",
  374. "ownerManagedAccess": True,
  375. }
  376. created_resource = await uma.a_resource_set_create(resource_to_create)
  377. group_id = admin.create_group({"name": "UMAPolicyGroup"})
  378. role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
  379. other_client_id = admin.create_client({"name": "UMAOtherClient"})
  380. client = admin.get_client(other_client_id)
  381. resource_id = created_resource["_id"]
  382. # Create a role policy
  383. policy_to_create = {
  384. "name": "TestPolicyRole",
  385. "description": "Test resource policy description",
  386. "scopes": ["test:read", "test:write"],
  387. "roles": ["roleUMAPolicy"],
  388. }
  389. policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  390. assert policy
  391. # Create a client policy
  392. policy_to_create = {
  393. "name": "TestPolicyClient",
  394. "description": "Test resource policy description",
  395. "scopes": ["test:read"],
  396. "clients": [client["clientId"]],
  397. }
  398. policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  399. assert policy
  400. policy_to_create = {
  401. "name": "TestPolicyGroup",
  402. "description": "Test resource policy description",
  403. "scopes": ["test:read"],
  404. "groups": ["/UMAPolicyGroup"],
  405. }
  406. policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  407. assert policy
  408. policies = await uma.a_policy_query()
  409. assert len(policies) == 3
  410. policies = await uma.a_policy_query(name="TestPolicyGroup")
  411. assert len(policies) == 1
  412. policy_id = policy["id"]
  413. await uma.a_policy_delete(policy_id)
  414. with pytest.raises(KeycloakDeleteError) as err:
  415. await uma.a_policy_delete(policy_id)
  416. assert err.match(
  417. '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
  418. )
  419. policies = await uma.a_policy_query()
  420. assert len(policies) == 2
  421. policy = policies[0]
  422. await uma.a_policy_update(policy_id=policy["id"], payload=policy)
  423. policies = await uma.a_policy_query()
  424. assert len(policies) == 2
  425. policies = await uma.a_policy_query(name="Invalid")
  426. assert len(policies) == 0
  427. policies = await uma.a_policy_query(scope="Invalid")
  428. assert len(policies) == 0
  429. policies = await uma.a_policy_query(resource="Invalid")
  430. assert len(policies) == 0
  431. policies = await uma.a_policy_query(first=3)
  432. assert len(policies) == 0
  433. policies = await uma.a_policy_query(maximum=0)
  434. assert len(policies) == 0
  435. policies = await uma.a_policy_query(name=policy["name"])
  436. assert len(policies) == 1
  437. policies = await uma.a_policy_query(scope=policy["scopes"][0])
  438. assert len(policies) == 2
  439. policies = await uma.a_policy_query(resource=resource_id)
  440. assert len(policies) == 2
  441. await uma.a_resource_set_delete(resource_id)
  442. await admin.a_delete_client(other_client_id)
  443. await admin.a_delete_realm_role(role_id)
  444. await admin.a_delete_group(group_id)
  445. @pytest.mark.asyncio
  446. async def test_a_uma_access(uma: KeycloakUMA):
  447. """Test permission access checks.
  448. :param uma: Keycloak UMA client
  449. :type uma: KeycloakUMA
  450. """
  451. resource_to_create = {
  452. "name": "mytest",
  453. "scopes": ["read", "write"],
  454. "type": "urn:test",
  455. "ownerManagedAccess": True,
  456. }
  457. resource = await uma.a_resource_set_create(resource_to_create)
  458. policy_to_create = {
  459. "name": "TestPolicy",
  460. "description": "Test resource policy description",
  461. "scopes": [resource_to_create["scopes"][0]],
  462. "clients": [uma.connection.client_id],
  463. }
  464. await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  465. token = uma.connection.token
  466. permissions = list()
  467. assert await uma.a_permissions_check(token["access_token"], permissions)
  468. permissions.append(UMAPermission(resource=resource_to_create["name"]))
  469. assert await uma.a_permissions_check(token["access_token"], permissions)
  470. permissions.append(UMAPermission(resource="not valid"))
  471. assert not await uma.a_permissions_check(token["access_token"], permissions)
  472. uma.resource_set_delete(resource["_id"])
  473. @pytest.mark.asyncio
  474. async def test_a_uma_permission_ticket(uma: KeycloakUMA):
  475. """Test permission ticket generation.
  476. :param uma: Keycloak UMA client
  477. :type uma: KeycloakUMA
  478. """
  479. resource_to_create = {
  480. "name": "mytest",
  481. "scopes": ["read", "write"],
  482. "type": "urn:test",
  483. "ownerManagedAccess": True,
  484. }
  485. resource = await uma.a_resource_set_create(resource_to_create)
  486. policy_to_create = {
  487. "name": "TestPolicy",
  488. "description": "Test resource policy description",
  489. "scopes": [resource_to_create["scopes"][0]],
  490. "clients": [uma.connection.client_id],
  491. }
  492. await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  493. permissions = (
  494. UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
  495. )
  496. response = await uma.a_permission_ticket_create(permissions)
  497. rpt = await uma.connection.keycloak_openid.a_token(
  498. grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
  499. )
  500. assert rpt
  501. assert "access_token" in rpt
  502. permissions = (UMAPermission(resource="invalid"),)
  503. with pytest.raises(KeycloakPostError):
  504. uma.permission_ticket_create(permissions)
  505. await uma.a_resource_set_delete(resource["_id"])
  506. def test_counter_part():
  507. """Test that each function has its async counter part."""
  508. uma_methods = [func for func in dir(KeycloakUMA) if callable(getattr(KeycloakUMA, func))]
  509. sync_methods = [
  510. method
  511. for method in uma_methods
  512. if not method.startswith("a_") and not method.startswith("_")
  513. ]
  514. async_methods = [
  515. method for method in uma_methods if iscoroutinefunction(getattr(KeycloakUMA, method))
  516. ]
  517. for method in sync_methods:
  518. async_method = f"a_{method}"
  519. assert (async_method in uma_methods) is True
  520. sync_sign = signature(getattr(KeycloakUMA, method))
  521. async_sign = signature(getattr(KeycloakUMA, async_method))
  522. assert sync_sign.parameters == async_sign.parameters
  523. for async_method in async_methods:
  524. if async_method[2:].startswith("_"):
  525. continue
  526. assert async_method[2:] in sync_methods