You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

650 lines
22 KiB

11 months ago
  1. """Test module for KeycloakUMA."""
  2. import re
  3. from inspect import iscoroutinefunction, signature
  4. import pytest
  5. from keycloak import KeycloakAdmin, KeycloakOpenIDConnection, KeycloakUMA
  6. from keycloak.exceptions import (
  7. KeycloakDeleteError,
  8. KeycloakGetError,
  9. KeycloakPostError,
  10. KeycloakPutError,
  11. )
  12. from keycloak.uma_permissions import UMAPermission
  13. def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
  14. """Test KeycloakUMA's init method.
  15. :param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
  16. :type oid_connection_with_authz: KeycloakOpenIDConnection
  17. """
  18. connection = oid_connection_with_authz
  19. uma = KeycloakUMA(connection=connection)
  20. assert isinstance(uma.connection, KeycloakOpenIDConnection)
  21. # should initially be empty
  22. assert uma._well_known is None
  23. assert uma.uma_well_known
  24. # should be cached after first reference
  25. assert uma._well_known is not None
  26. def test_uma_well_known(uma: KeycloakUMA):
  27. """Test the well_known method.
  28. :param uma: Keycloak UMA client
  29. :type uma: KeycloakUMA
  30. """
  31. res = uma.uma_well_known
  32. assert res is not None
  33. assert res != dict()
  34. for key in ["resource_registration_endpoint"]:
  35. assert key in res
  36. def test_uma_resource_sets(uma: KeycloakUMA):
  37. """Test resource sets.
  38. :param uma: Keycloak UMA client
  39. :type uma: KeycloakUMA
  40. """
  41. # Check that only the default resource is present
  42. resource_sets = uma.resource_set_list()
  43. resource_set_list = list(resource_sets)
  44. assert len(resource_set_list) == 1, resource_set_list
  45. assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
  46. # Test query for resource sets
  47. resource_set_list_ids = uma.resource_set_list_ids()
  48. assert len(resource_set_list_ids) == 1
  49. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default")
  50. assert resource_set_list_ids2 == resource_set_list_ids
  51. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default Resource")
  52. assert resource_set_list_ids2 == resource_set_list_ids
  53. resource_set_list_ids = uma.resource_set_list_ids(name="Default", exact_name=True)
  54. assert len(resource_set_list_ids) == 0
  55. resource_set_list_ids = uma.resource_set_list_ids(first=1)
  56. assert len(resource_set_list_ids) == 0
  57. resource_set_list_ids = uma.resource_set_list_ids(scope="Invalid")
  58. assert len(resource_set_list_ids) == 0
  59. resource_set_list_ids = uma.resource_set_list_ids(owner="Invalid")
  60. assert len(resource_set_list_ids) == 0
  61. resource_set_list_ids = uma.resource_set_list_ids(resource_type="Invalid")
  62. assert len(resource_set_list_ids) == 0
  63. resource_set_list_ids = uma.resource_set_list_ids(name="Invalid")
  64. assert len(resource_set_list_ids) == 0
  65. resource_set_list_ids = uma.resource_set_list_ids(uri="Invalid")
  66. assert len(resource_set_list_ids) == 0
  67. resource_set_list_ids = uma.resource_set_list_ids(maximum=0)
  68. assert len(resource_set_list_ids) == 0
  69. # Test create resource set
  70. resource_to_create = {
  71. "name": "mytest",
  72. "scopes": ["test:read", "test:write"],
  73. "type": "urn:test",
  74. "uris": ["/some_resources/*"],
  75. }
  76. created_resource = uma.resource_set_create(resource_to_create)
  77. assert created_resource
  78. assert created_resource["_id"], created_resource
  79. assert set(resource_to_create).issubset(set(created_resource)), created_resource
  80. # Test getting resource with wildcard
  81. # Without matchingUri query option
  82. resource_set_list_ids = uma.resource_set_list_ids(uri="/some_resources/resource")
  83. assert len(resource_set_list_ids) == 0
  84. # With matchingUri query option
  85. resource_set_list_ids = uma.resource_set_list_ids(
  86. uri="/some_resources/resource", matchingUri=True
  87. )
  88. assert len(resource_set_list_ids) == 1
  89. # Test create the same resource set
  90. with pytest.raises(KeycloakPostError) as err:
  91. uma.resource_set_create(resource_to_create)
  92. assert err.match(
  93. re.escape(
  94. '409: b\'{"error":"invalid_request","error_description":'
  95. '"Resource with name [mytest] already exists."}\''
  96. )
  97. )
  98. # Test get resource set
  99. latest_resource = uma.resource_set_read(created_resource["_id"])
  100. assert latest_resource["name"] == created_resource["name"]
  101. # Test update resource set
  102. latest_resource["name"] = "New Resource Name"
  103. res = uma.resource_set_update(created_resource["_id"], latest_resource)
  104. assert res == dict(), res
  105. updated_resource = uma.resource_set_read(created_resource["_id"])
  106. assert updated_resource["name"] == "New Resource Name"
  107. # Test update resource set fail
  108. with pytest.raises(KeycloakPutError) as err:
  109. uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
  110. assert err.match('400: b\'{"error":"Unrecognized field')
  111. # Test delete resource set
  112. res = uma.resource_set_delete(resource_id=created_resource["_id"])
  113. assert res == dict(), res
  114. with pytest.raises(KeycloakGetError) as err:
  115. uma.resource_set_read(created_resource["_id"])
  116. err.match("404: b''")
  117. # Test delete fail
  118. with pytest.raises(KeycloakDeleteError) as err:
  119. uma.resource_set_delete(resource_id=created_resource["_id"])
  120. assert err.match("404: b''")
  121. def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
  122. """Test policies.
  123. :param uma: Keycloak UMA client
  124. :type uma: KeycloakUMA
  125. :param admin: Keycloak Admin client
  126. :type admin: KeycloakAdmin
  127. """
  128. # Create some required test data
  129. resource_to_create = {
  130. "name": "mytest",
  131. "scopes": ["test:read", "test:write"],
  132. "type": "urn:test",
  133. "ownerManagedAccess": True,
  134. }
  135. created_resource = uma.resource_set_create(resource_to_create)
  136. group_id = admin.create_group({"name": "UMAPolicyGroup"})
  137. role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
  138. other_client_id = admin.create_client({"name": "UMAOtherClient"})
  139. client = admin.get_client(other_client_id)
  140. resource_id = created_resource["_id"]
  141. # Create a role policy
  142. policy_to_create = {
  143. "name": "TestPolicyRole",
  144. "description": "Test resource policy description",
  145. "scopes": ["test:read", "test:write"],
  146. "roles": ["roleUMAPolicy"],
  147. }
  148. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  149. assert policy
  150. # Create a client policy
  151. policy_to_create = {
  152. "name": "TestPolicyClient",
  153. "description": "Test resource policy description",
  154. "scopes": ["test:read"],
  155. "clients": [client["clientId"]],
  156. }
  157. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  158. assert policy
  159. policy_to_create = {
  160. "name": "TestPolicyGroup",
  161. "description": "Test resource policy description",
  162. "scopes": ["test:read"],
  163. "groups": ["/UMAPolicyGroup"],
  164. }
  165. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  166. assert policy
  167. policies = uma.policy_query()
  168. assert len(policies) == 3
  169. policies = uma.policy_query(name="TestPolicyGroup")
  170. assert len(policies) == 1
  171. policy_id = policy["id"]
  172. uma.policy_delete(policy_id)
  173. with pytest.raises(KeycloakDeleteError) as err:
  174. uma.policy_delete(policy_id)
  175. assert err.match(
  176. '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
  177. )
  178. policies = uma.policy_query()
  179. assert len(policies) == 2
  180. policy = policies[0]
  181. uma.policy_update(policy_id=policy["id"], payload=policy)
  182. policies = uma.policy_query()
  183. assert len(policies) == 2
  184. policies = uma.policy_query(name="Invalid")
  185. assert len(policies) == 0
  186. policies = uma.policy_query(scope="Invalid")
  187. assert len(policies) == 0
  188. policies = uma.policy_query(resource="Invalid")
  189. assert len(policies) == 0
  190. policies = uma.policy_query(first=3)
  191. assert len(policies) == 0
  192. policies = uma.policy_query(maximum=0)
  193. assert len(policies) == 0
  194. policies = uma.policy_query(name=policy["name"])
  195. assert len(policies) == 1
  196. policies = uma.policy_query(scope=policy["scopes"][0])
  197. assert len(policies) == 2
  198. policies = uma.policy_query(resource=resource_id)
  199. assert len(policies) == 2
  200. uma.resource_set_delete(resource_id)
  201. admin.delete_client(other_client_id)
  202. admin.delete_realm_role(role_id)
  203. admin.delete_group(group_id)
  204. def test_uma_access(uma: KeycloakUMA):
  205. """Test permission access checks.
  206. :param uma: Keycloak UMA client
  207. :type uma: KeycloakUMA
  208. """
  209. resource_to_create = {
  210. "name": "mytest",
  211. "scopes": ["read", "write"],
  212. "type": "urn:test",
  213. "ownerManagedAccess": True,
  214. }
  215. resource = uma.resource_set_create(resource_to_create)
  216. policy_to_create = {
  217. "name": "TestPolicy",
  218. "description": "Test resource policy description",
  219. "scopes": [resource_to_create["scopes"][0]],
  220. "clients": [uma.connection.client_id],
  221. }
  222. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  223. token = uma.connection.token
  224. permissions = list()
  225. assert uma.permissions_check(token["access_token"], permissions)
  226. permissions.append(UMAPermission(resource=resource_to_create["name"]))
  227. assert uma.permissions_check(token["access_token"], permissions)
  228. permissions.append(UMAPermission(resource="not valid"))
  229. assert not uma.permissions_check(token["access_token"], permissions)
  230. uma.resource_set_delete(resource["_id"])
  231. def test_uma_permission_ticket(uma: KeycloakUMA):
  232. """Test permission ticket generation.
  233. :param uma: Keycloak UMA client
  234. :type uma: KeycloakUMA
  235. """
  236. resource_to_create = {
  237. "name": "mytest",
  238. "scopes": ["read", "write"],
  239. "type": "urn:test",
  240. "ownerManagedAccess": True,
  241. }
  242. resource = uma.resource_set_create(resource_to_create)
  243. policy_to_create = {
  244. "name": "TestPolicy",
  245. "description": "Test resource policy description",
  246. "scopes": [resource_to_create["scopes"][0]],
  247. "clients": [uma.connection.client_id],
  248. }
  249. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  250. permissions = (
  251. UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
  252. )
  253. response = uma.permission_ticket_create(permissions)
  254. rpt = uma.connection.keycloak_openid.token(
  255. grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
  256. )
  257. assert rpt
  258. assert "access_token" in rpt
  259. permissions = (UMAPermission(resource="invalid"),)
  260. with pytest.raises(KeycloakPostError):
  261. uma.permission_ticket_create(permissions)
  262. uma.resource_set_delete(resource["_id"])
  263. # async function start
  264. @pytest.mark.asyncio
  265. async def test_a_uma_well_known(uma: KeycloakUMA):
  266. """Test the well_known method.
  267. :param uma: Keycloak UMA client
  268. :type uma: KeycloakUMA
  269. """
  270. res = uma.uma_well_known
  271. assert res is not None
  272. assert res != dict()
  273. for key in ["resource_registration_endpoint"]:
  274. assert key in res
  275. @pytest.mark.asyncio
  276. async def test_a_uma_resource_sets(uma: KeycloakUMA):
  277. """Test resource sets.
  278. :param uma: Keycloak UMA client
  279. :type uma: KeycloakUMA
  280. """
  281. # Check that only the default resource is present
  282. resource_sets = uma.resource_set_list()
  283. resource_set_list = list(resource_sets)
  284. assert len(resource_set_list) == 1, resource_set_list
  285. assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
  286. # Test query for resource sets
  287. resource_set_list_ids = await uma.a_resource_set_list_ids()
  288. assert len(resource_set_list_ids) == 1
  289. resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default")
  290. assert resource_set_list_ids2 == resource_set_list_ids
  291. resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default Resource")
  292. assert resource_set_list_ids2 == resource_set_list_ids
  293. resource_set_list_ids = await uma.a_resource_set_list_ids(name="Default", exact_name=True)
  294. assert len(resource_set_list_ids) == 0
  295. resource_set_list_ids = await uma.a_resource_set_list_ids(first=1)
  296. assert len(resource_set_list_ids) == 0
  297. resource_set_list_ids = await uma.a_resource_set_list_ids(scope="Invalid")
  298. assert len(resource_set_list_ids) == 0
  299. resource_set_list_ids = await uma.a_resource_set_list_ids(owner="Invalid")
  300. assert len(resource_set_list_ids) == 0
  301. resource_set_list_ids = await uma.a_resource_set_list_ids(resource_type="Invalid")
  302. assert len(resource_set_list_ids) == 0
  303. resource_set_list_ids = await uma.a_resource_set_list_ids(name="Invalid")
  304. assert len(resource_set_list_ids) == 0
  305. resource_set_list_ids = await uma.a_resource_set_list_ids(uri="Invalid")
  306. assert len(resource_set_list_ids) == 0
  307. resource_set_list_ids = await uma.a_resource_set_list_ids(maximum=0)
  308. assert len(resource_set_list_ids) == 0
  309. # Test create resource set
  310. resource_to_create = {
  311. "name": "mytest",
  312. "scopes": ["test:read", "test:write"],
  313. "type": "urn:test",
  314. "uris": ["/some_resources/*"],
  315. }
  316. created_resource = await uma.a_resource_set_create(resource_to_create)
  317. assert created_resource
  318. assert created_resource["_id"], created_resource
  319. assert set(resource_to_create).issubset(set(created_resource)), created_resource
  320. # Test getting resource with wildcard
  321. # Without matchingUri query option
  322. resource_set_list_ids = await uma.a_resource_set_list_ids(uri="/some_resources/resource")
  323. assert len(resource_set_list_ids) == 0
  324. # With matchingUri query option
  325. resource_set_list_ids = await uma.a_resource_set_list_ids(
  326. uri="/some_resources/resource", matchingUri=True
  327. )
  328. assert len(resource_set_list_ids) == 1
  329. # Test create the same resource set
  330. with pytest.raises(KeycloakPostError) as err:
  331. await uma.a_resource_set_create(resource_to_create)
  332. assert err.match(
  333. re.escape(
  334. '409: b\'{"error":"invalid_request","error_description":'
  335. '"Resource with name [mytest] already exists."}\''
  336. )
  337. )
  338. # Test get resource set
  339. latest_resource = await uma.a_resource_set_read(created_resource["_id"])
  340. assert latest_resource["name"] == created_resource["name"]
  341. # Test update resource set
  342. latest_resource["name"] = "New Resource Name"
  343. res = await uma.a_resource_set_update(created_resource["_id"], latest_resource)
  344. assert res == dict(), res
  345. updated_resource = await uma.a_resource_set_read(created_resource["_id"])
  346. assert updated_resource["name"] == "New Resource Name"
  347. # Test update resource set fail
  348. with pytest.raises(KeycloakPutError) as err:
  349. uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
  350. assert err.match('400: b\'{"error":"Unrecognized field')
  351. # Test delete resource set
  352. res = await uma.a_resource_set_delete(resource_id=created_resource["_id"])
  353. assert res == dict(), res
  354. with pytest.raises(KeycloakGetError) as err:
  355. await uma.a_resource_set_read(created_resource["_id"])
  356. err.match("404: b''")
  357. # Test delete fail
  358. with pytest.raises(KeycloakDeleteError) as err:
  359. await uma.a_resource_set_delete(resource_id=created_resource["_id"])
  360. assert err.match("404: b''")
  361. @pytest.mark.asyncio
  362. async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
  363. """Test policies.
  364. :param uma: Keycloak UMA client
  365. :type uma: KeycloakUMA
  366. :param admin: Keycloak Admin client
  367. :type admin: KeycloakAdmin
  368. """
  369. # Create some required test data
  370. resource_to_create = {
  371. "name": "mytest",
  372. "scopes": ["test:read", "test:write"],
  373. "type": "urn:test",
  374. "ownerManagedAccess": True,
  375. }
  376. created_resource = await uma.a_resource_set_create(resource_to_create)
  377. group_id = admin.create_group({"name": "UMAPolicyGroup"})
  378. role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
  379. other_client_id = admin.create_client({"name": "UMAOtherClient"})
  380. client = admin.get_client(other_client_id)
  381. resource_id = created_resource["_id"]
  382. # Create a role policy
  383. policy_to_create = {
  384. "name": "TestPolicyRole",
  385. "description": "Test resource policy description",
  386. "scopes": ["test:read", "test:write"],
  387. "roles": ["roleUMAPolicy"],
  388. }
  389. policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  390. assert policy
  391. # Create a client policy
  392. policy_to_create = {
  393. "name": "TestPolicyClient",
  394. "description": "Test resource policy description",
  395. "scopes": ["test:read"],
  396. "clients": [client["clientId"]],
  397. }
  398. policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  399. assert policy
  400. policy_to_create = {
  401. "name": "TestPolicyGroup",
  402. "description": "Test resource policy description",
  403. "scopes": ["test:read"],
  404. "groups": ["/UMAPolicyGroup"],
  405. }
  406. policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  407. assert policy
  408. policies = await uma.a_policy_query()
  409. assert len(policies) == 3
  410. policies = await uma.a_policy_query(name="TestPolicyGroup")
  411. assert len(policies) == 1
  412. policy_id = policy["id"]
  413. await uma.a_policy_delete(policy_id)
  414. with pytest.raises(KeycloakDeleteError) as err:
  415. await uma.a_policy_delete(policy_id)
  416. assert err.match(
  417. '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
  418. )
  419. policies = await uma.a_policy_query()
  420. assert len(policies) == 2
  421. policy = policies[0]
  422. await uma.a_policy_update(policy_id=policy["id"], payload=policy)
  423. policies = await uma.a_policy_query()
  424. assert len(policies) == 2
  425. policies = await uma.a_policy_query(name="Invalid")
  426. assert len(policies) == 0
  427. policies = await uma.a_policy_query(scope="Invalid")
  428. assert len(policies) == 0
  429. policies = await uma.a_policy_query(resource="Invalid")
  430. assert len(policies) == 0
  431. policies = await uma.a_policy_query(first=3)
  432. assert len(policies) == 0
  433. policies = await uma.a_policy_query(maximum=0)
  434. assert len(policies) == 0
  435. policies = await uma.a_policy_query(name=policy["name"])
  436. assert len(policies) == 1
  437. policies = await uma.a_policy_query(scope=policy["scopes"][0])
  438. assert len(policies) == 2
  439. policies = await uma.a_policy_query(resource=resource_id)
  440. assert len(policies) == 2
  441. await uma.a_resource_set_delete(resource_id)
  442. await admin.a_delete_client(other_client_id)
  443. await admin.a_delete_realm_role(role_id)
  444. await admin.a_delete_group(group_id)
  445. @pytest.mark.asyncio
  446. async def test_a_uma_access(uma: KeycloakUMA):
  447. """Test permission access checks.
  448. :param uma: Keycloak UMA client
  449. :type uma: KeycloakUMA
  450. """
  451. resource_to_create = {
  452. "name": "mytest",
  453. "scopes": ["read", "write"],
  454. "type": "urn:test",
  455. "ownerManagedAccess": True,
  456. }
  457. resource = await uma.a_resource_set_create(resource_to_create)
  458. policy_to_create = {
  459. "name": "TestPolicy",
  460. "description": "Test resource policy description",
  461. "scopes": [resource_to_create["scopes"][0]],
  462. "clients": [uma.connection.client_id],
  463. }
  464. await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  465. token = uma.connection.token
  466. permissions = list()
  467. assert await uma.a_permissions_check(token["access_token"], permissions)
  468. permissions.append(UMAPermission(resource=resource_to_create["name"]))
  469. assert await uma.a_permissions_check(token["access_token"], permissions)
  470. permissions.append(UMAPermission(resource="not valid"))
  471. assert not await uma.a_permissions_check(token["access_token"], permissions)
  472. uma.resource_set_delete(resource["_id"])
  473. @pytest.mark.asyncio
  474. async def test_a_uma_permission_ticket(uma: KeycloakUMA):
  475. """Test permission ticket generation.
  476. :param uma: Keycloak UMA client
  477. :type uma: KeycloakUMA
  478. """
  479. resource_to_create = {
  480. "name": "mytest",
  481. "scopes": ["read", "write"],
  482. "type": "urn:test",
  483. "ownerManagedAccess": True,
  484. }
  485. resource = await uma.a_resource_set_create(resource_to_create)
  486. policy_to_create = {
  487. "name": "TestPolicy",
  488. "description": "Test resource policy description",
  489. "scopes": [resource_to_create["scopes"][0]],
  490. "clients": [uma.connection.client_id],
  491. }
  492. await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  493. permissions = (
  494. UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
  495. )
  496. response = await uma.a_permission_ticket_create(permissions)
  497. rpt = await uma.connection.keycloak_openid.a_token(
  498. grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
  499. )
  500. assert rpt
  501. assert "access_token" in rpt
  502. permissions = (UMAPermission(resource="invalid"),)
  503. with pytest.raises(KeycloakPostError):
  504. uma.permission_ticket_create(permissions)
  505. await uma.a_resource_set_delete(resource["_id"])
  506. def test_counter_part():
  507. """Test that each function has its async counter part."""
  508. uma_methods = [func for func in dir(KeycloakUMA) if callable(getattr(KeycloakUMA, func))]
  509. sync_methods = [
  510. method
  511. for method in uma_methods
  512. if not method.startswith("a_") and not method.startswith("_")
  513. ]
  514. async_methods = [
  515. method for method in uma_methods if iscoroutinefunction(getattr(KeycloakUMA, method))
  516. ]
  517. for method in sync_methods:
  518. async_method = f"a_{method}"
  519. assert (async_method in uma_methods) is True
  520. sync_sign = signature(getattr(KeycloakUMA, method))
  521. async_sign = signature(getattr(KeycloakUMA, async_method))
  522. assert sync_sign.parameters == async_sign.parameters
  523. for async_method in async_methods:
  524. if async_method[2:].startswith("_"):
  525. continue
  526. assert async_method[2:] in sync_methods