You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

628 lines
21 KiB

10 months ago
  1. """Test module for KeycloakUMA."""
  2. import re
  3. from inspect import iscoroutinefunction, signature
  4. import pytest
  5. from keycloak import KeycloakAdmin, KeycloakOpenIDConnection, KeycloakUMA
  6. from keycloak.exceptions import (
  7. KeycloakDeleteError,
  8. KeycloakGetError,
  9. KeycloakPostError,
  10. KeycloakPutError,
  11. )
  12. from keycloak.uma_permissions import UMAPermission
  13. def test_keycloak_uma_init(oid_connection_with_authz: KeycloakOpenIDConnection):
  14. """Test KeycloakUMA's init method.
  15. :param oid_connection_with_authz: Keycloak OpenID connection manager with preconfigured authz
  16. :type oid_connection_with_authz: KeycloakOpenIDConnection
  17. """
  18. connection = oid_connection_with_authz
  19. uma = KeycloakUMA(connection=connection)
  20. assert isinstance(uma.connection, KeycloakOpenIDConnection)
  21. # should initially be empty
  22. assert uma._well_known is None
  23. assert uma.uma_well_known
  24. # should be cached after first reference
  25. assert uma._well_known is not None
  26. def test_uma_well_known(uma: KeycloakUMA):
  27. """Test the well_known method.
  28. :param uma: Keycloak UMA client
  29. :type uma: KeycloakUMA
  30. """
  31. res = uma.uma_well_known
  32. assert res is not None
  33. assert res != dict()
  34. for key in ["resource_registration_endpoint"]:
  35. assert key in res
  36. def test_uma_resource_sets(uma: KeycloakUMA):
  37. """Test resource sets.
  38. :param uma: Keycloak UMA client
  39. :type uma: KeycloakUMA
  40. """
  41. # Check that only the default resource is present
  42. resource_sets = uma.resource_set_list()
  43. resource_set_list = list(resource_sets)
  44. assert len(resource_set_list) == 1, resource_set_list
  45. assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
  46. # Test query for resource sets
  47. resource_set_list_ids = uma.resource_set_list_ids()
  48. assert len(resource_set_list_ids) == 1
  49. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default")
  50. assert resource_set_list_ids2 == resource_set_list_ids
  51. resource_set_list_ids2 = uma.resource_set_list_ids(name="Default Resource")
  52. assert resource_set_list_ids2 == resource_set_list_ids
  53. resource_set_list_ids = uma.resource_set_list_ids(name="Default", exact_name=True)
  54. assert len(resource_set_list_ids) == 0
  55. resource_set_list_ids = uma.resource_set_list_ids(first=1)
  56. assert len(resource_set_list_ids) == 0
  57. resource_set_list_ids = uma.resource_set_list_ids(scope="Invalid")
  58. assert len(resource_set_list_ids) == 0
  59. resource_set_list_ids = uma.resource_set_list_ids(owner="Invalid")
  60. assert len(resource_set_list_ids) == 0
  61. resource_set_list_ids = uma.resource_set_list_ids(resource_type="Invalid")
  62. assert len(resource_set_list_ids) == 0
  63. resource_set_list_ids = uma.resource_set_list_ids(name="Invalid")
  64. assert len(resource_set_list_ids) == 0
  65. resource_set_list_ids = uma.resource_set_list_ids(uri="Invalid")
  66. assert len(resource_set_list_ids) == 0
  67. resource_set_list_ids = uma.resource_set_list_ids(maximum=0)
  68. assert len(resource_set_list_ids) == 0
  69. # Test create resource set
  70. resource_to_create = {
  71. "name": "mytest",
  72. "scopes": ["test:read", "test:write"],
  73. "type": "urn:test",
  74. }
  75. created_resource = uma.resource_set_create(resource_to_create)
  76. assert created_resource
  77. assert created_resource["_id"], created_resource
  78. assert set(resource_to_create).issubset(set(created_resource)), created_resource
  79. # Test create the same resource set
  80. with pytest.raises(KeycloakPostError) as err:
  81. uma.resource_set_create(resource_to_create)
  82. assert err.match(
  83. re.escape(
  84. '409: b\'{"error":"invalid_request","error_description":'
  85. '"Resource with name [mytest] already exists."}\''
  86. )
  87. )
  88. # Test get resource set
  89. latest_resource = uma.resource_set_read(created_resource["_id"])
  90. assert latest_resource["name"] == created_resource["name"]
  91. # Test update resource set
  92. latest_resource["name"] = "New Resource Name"
  93. res = uma.resource_set_update(created_resource["_id"], latest_resource)
  94. assert res == dict(), res
  95. updated_resource = uma.resource_set_read(created_resource["_id"])
  96. assert updated_resource["name"] == "New Resource Name"
  97. # Test update resource set fail
  98. with pytest.raises(KeycloakPutError) as err:
  99. uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
  100. assert err.match('400: b\'{"error":"Unrecognized field')
  101. # Test delete resource set
  102. res = uma.resource_set_delete(resource_id=created_resource["_id"])
  103. assert res == dict(), res
  104. with pytest.raises(KeycloakGetError) as err:
  105. uma.resource_set_read(created_resource["_id"])
  106. err.match("404: b''")
  107. # Test delete fail
  108. with pytest.raises(KeycloakDeleteError) as err:
  109. uma.resource_set_delete(resource_id=created_resource["_id"])
  110. assert err.match("404: b''")
  111. def test_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
  112. """Test policies.
  113. :param uma: Keycloak UMA client
  114. :type uma: KeycloakUMA
  115. :param admin: Keycloak Admin client
  116. :type admin: KeycloakAdmin
  117. """
  118. # Create some required test data
  119. resource_to_create = {
  120. "name": "mytest",
  121. "scopes": ["test:read", "test:write"],
  122. "type": "urn:test",
  123. "ownerManagedAccess": True,
  124. }
  125. created_resource = uma.resource_set_create(resource_to_create)
  126. group_id = admin.create_group({"name": "UMAPolicyGroup"})
  127. role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
  128. other_client_id = admin.create_client({"name": "UMAOtherClient"})
  129. client = admin.get_client(other_client_id)
  130. resource_id = created_resource["_id"]
  131. # Create a role policy
  132. policy_to_create = {
  133. "name": "TestPolicyRole",
  134. "description": "Test resource policy description",
  135. "scopes": ["test:read", "test:write"],
  136. "roles": ["roleUMAPolicy"],
  137. }
  138. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  139. assert policy
  140. # Create a client policy
  141. policy_to_create = {
  142. "name": "TestPolicyClient",
  143. "description": "Test resource policy description",
  144. "scopes": ["test:read"],
  145. "clients": [client["clientId"]],
  146. }
  147. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  148. assert policy
  149. policy_to_create = {
  150. "name": "TestPolicyGroup",
  151. "description": "Test resource policy description",
  152. "scopes": ["test:read"],
  153. "groups": ["/UMAPolicyGroup"],
  154. }
  155. policy = uma.policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  156. assert policy
  157. policies = uma.policy_query()
  158. assert len(policies) == 3
  159. policies = uma.policy_query(name="TestPolicyGroup")
  160. assert len(policies) == 1
  161. policy_id = policy["id"]
  162. uma.policy_delete(policy_id)
  163. with pytest.raises(KeycloakDeleteError) as err:
  164. uma.policy_delete(policy_id)
  165. assert err.match(
  166. '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
  167. )
  168. policies = uma.policy_query()
  169. assert len(policies) == 2
  170. policy = policies[0]
  171. uma.policy_update(policy_id=policy["id"], payload=policy)
  172. policies = uma.policy_query()
  173. assert len(policies) == 2
  174. policies = uma.policy_query(name="Invalid")
  175. assert len(policies) == 0
  176. policies = uma.policy_query(scope="Invalid")
  177. assert len(policies) == 0
  178. policies = uma.policy_query(resource="Invalid")
  179. assert len(policies) == 0
  180. policies = uma.policy_query(first=3)
  181. assert len(policies) == 0
  182. policies = uma.policy_query(maximum=0)
  183. assert len(policies) == 0
  184. policies = uma.policy_query(name=policy["name"])
  185. assert len(policies) == 1
  186. policies = uma.policy_query(scope=policy["scopes"][0])
  187. assert len(policies) == 2
  188. policies = uma.policy_query(resource=resource_id)
  189. assert len(policies) == 2
  190. uma.resource_set_delete(resource_id)
  191. admin.delete_client(other_client_id)
  192. admin.delete_realm_role(role_id)
  193. admin.delete_group(group_id)
  194. def test_uma_access(uma: KeycloakUMA):
  195. """Test permission access checks.
  196. :param uma: Keycloak UMA client
  197. :type uma: KeycloakUMA
  198. """
  199. resource_to_create = {
  200. "name": "mytest",
  201. "scopes": ["read", "write"],
  202. "type": "urn:test",
  203. "ownerManagedAccess": True,
  204. }
  205. resource = uma.resource_set_create(resource_to_create)
  206. policy_to_create = {
  207. "name": "TestPolicy",
  208. "description": "Test resource policy description",
  209. "scopes": [resource_to_create["scopes"][0]],
  210. "clients": [uma.connection.client_id],
  211. }
  212. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  213. token = uma.connection.token
  214. permissions = list()
  215. assert uma.permissions_check(token["access_token"], permissions)
  216. permissions.append(UMAPermission(resource=resource_to_create["name"]))
  217. assert uma.permissions_check(token["access_token"], permissions)
  218. permissions.append(UMAPermission(resource="not valid"))
  219. assert not uma.permissions_check(token["access_token"], permissions)
  220. uma.resource_set_delete(resource["_id"])
  221. def test_uma_permission_ticket(uma: KeycloakUMA):
  222. """Test permission ticket generation.
  223. :param uma: Keycloak UMA client
  224. :type uma: KeycloakUMA
  225. """
  226. resource_to_create = {
  227. "name": "mytest",
  228. "scopes": ["read", "write"],
  229. "type": "urn:test",
  230. "ownerManagedAccess": True,
  231. }
  232. resource = uma.resource_set_create(resource_to_create)
  233. policy_to_create = {
  234. "name": "TestPolicy",
  235. "description": "Test resource policy description",
  236. "scopes": [resource_to_create["scopes"][0]],
  237. "clients": [uma.connection.client_id],
  238. }
  239. uma.policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  240. permissions = (
  241. UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
  242. )
  243. response = uma.permission_ticket_create(permissions)
  244. rpt = uma.connection.keycloak_openid.token(
  245. grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
  246. )
  247. assert rpt
  248. assert "access_token" in rpt
  249. permissions = (UMAPermission(resource="invalid"),)
  250. with pytest.raises(KeycloakPostError):
  251. uma.permission_ticket_create(permissions)
  252. uma.resource_set_delete(resource["_id"])
  253. # async function start
  254. @pytest.mark.asyncio
  255. async def test_a_uma_well_known(uma: KeycloakUMA):
  256. """Test the well_known method.
  257. :param uma: Keycloak UMA client
  258. :type uma: KeycloakUMA
  259. """
  260. res = uma.uma_well_known
  261. assert res is not None
  262. assert res != dict()
  263. for key in ["resource_registration_endpoint"]:
  264. assert key in res
  265. @pytest.mark.asyncio
  266. async def test_a_uma_resource_sets(uma: KeycloakUMA):
  267. """Test resource sets.
  268. :param uma: Keycloak UMA client
  269. :type uma: KeycloakUMA
  270. """
  271. # Check that only the default resource is present
  272. resource_sets = uma.resource_set_list()
  273. resource_set_list = list(resource_sets)
  274. assert len(resource_set_list) == 1, resource_set_list
  275. assert resource_set_list[0]["name"] == "Default Resource", resource_set_list[0]["name"]
  276. # Test query for resource sets
  277. resource_set_list_ids = await uma.a_resource_set_list_ids()
  278. assert len(resource_set_list_ids) == 1
  279. resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default")
  280. assert resource_set_list_ids2 == resource_set_list_ids
  281. resource_set_list_ids2 = await uma.a_resource_set_list_ids(name="Default Resource")
  282. assert resource_set_list_ids2 == resource_set_list_ids
  283. resource_set_list_ids = await uma.a_resource_set_list_ids(name="Default", exact_name=True)
  284. assert len(resource_set_list_ids) == 0
  285. resource_set_list_ids = await uma.a_resource_set_list_ids(first=1)
  286. assert len(resource_set_list_ids) == 0
  287. resource_set_list_ids = await uma.a_resource_set_list_ids(scope="Invalid")
  288. assert len(resource_set_list_ids) == 0
  289. resource_set_list_ids = await uma.a_resource_set_list_ids(owner="Invalid")
  290. assert len(resource_set_list_ids) == 0
  291. resource_set_list_ids = await uma.a_resource_set_list_ids(resource_type="Invalid")
  292. assert len(resource_set_list_ids) == 0
  293. resource_set_list_ids = await uma.a_resource_set_list_ids(name="Invalid")
  294. assert len(resource_set_list_ids) == 0
  295. resource_set_list_ids = await uma.a_resource_set_list_ids(uri="Invalid")
  296. assert len(resource_set_list_ids) == 0
  297. resource_set_list_ids = await uma.a_resource_set_list_ids(maximum=0)
  298. assert len(resource_set_list_ids) == 0
  299. # Test create resource set
  300. resource_to_create = {
  301. "name": "mytest",
  302. "scopes": ["test:read", "test:write"],
  303. "type": "urn:test",
  304. }
  305. created_resource = await uma.a_resource_set_create(resource_to_create)
  306. assert created_resource
  307. assert created_resource["_id"], created_resource
  308. assert set(resource_to_create).issubset(set(created_resource)), created_resource
  309. # Test create the same resource set
  310. with pytest.raises(KeycloakPostError) as err:
  311. await uma.a_resource_set_create(resource_to_create)
  312. assert err.match(
  313. re.escape(
  314. '409: b\'{"error":"invalid_request","error_description":'
  315. '"Resource with name [mytest] already exists."}\''
  316. )
  317. )
  318. # Test get resource set
  319. latest_resource = await uma.a_resource_set_read(created_resource["_id"])
  320. assert latest_resource["name"] == created_resource["name"]
  321. # Test update resource set
  322. latest_resource["name"] = "New Resource Name"
  323. res = await uma.a_resource_set_update(created_resource["_id"], latest_resource)
  324. assert res == dict(), res
  325. updated_resource = await uma.a_resource_set_read(created_resource["_id"])
  326. assert updated_resource["name"] == "New Resource Name"
  327. # Test update resource set fail
  328. with pytest.raises(KeycloakPutError) as err:
  329. uma.resource_set_update(resource_id=created_resource["_id"], payload={"wrong": "payload"})
  330. assert err.match('400: b\'{"error":"Unrecognized field')
  331. # Test delete resource set
  332. res = await uma.a_resource_set_delete(resource_id=created_resource["_id"])
  333. assert res == dict(), res
  334. with pytest.raises(KeycloakGetError) as err:
  335. await uma.a_resource_set_read(created_resource["_id"])
  336. err.match("404: b''")
  337. # Test delete fail
  338. with pytest.raises(KeycloakDeleteError) as err:
  339. await uma.a_resource_set_delete(resource_id=created_resource["_id"])
  340. assert err.match("404: b''")
  341. @pytest.mark.asyncio
  342. async def test_a_uma_policy(uma: KeycloakUMA, admin: KeycloakAdmin):
  343. """Test policies.
  344. :param uma: Keycloak UMA client
  345. :type uma: KeycloakUMA
  346. :param admin: Keycloak Admin client
  347. :type admin: KeycloakAdmin
  348. """
  349. # Create some required test data
  350. resource_to_create = {
  351. "name": "mytest",
  352. "scopes": ["test:read", "test:write"],
  353. "type": "urn:test",
  354. "ownerManagedAccess": True,
  355. }
  356. created_resource = await uma.a_resource_set_create(resource_to_create)
  357. group_id = admin.create_group({"name": "UMAPolicyGroup"})
  358. role_id = admin.create_realm_role(payload={"name": "roleUMAPolicy"})
  359. other_client_id = admin.create_client({"name": "UMAOtherClient"})
  360. client = admin.get_client(other_client_id)
  361. resource_id = created_resource["_id"]
  362. # Create a role policy
  363. policy_to_create = {
  364. "name": "TestPolicyRole",
  365. "description": "Test resource policy description",
  366. "scopes": ["test:read", "test:write"],
  367. "roles": ["roleUMAPolicy"],
  368. }
  369. policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  370. assert policy
  371. # Create a client policy
  372. policy_to_create = {
  373. "name": "TestPolicyClient",
  374. "description": "Test resource policy description",
  375. "scopes": ["test:read"],
  376. "clients": [client["clientId"]],
  377. }
  378. policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  379. assert policy
  380. policy_to_create = {
  381. "name": "TestPolicyGroup",
  382. "description": "Test resource policy description",
  383. "scopes": ["test:read"],
  384. "groups": ["/UMAPolicyGroup"],
  385. }
  386. policy = await uma.a_policy_resource_create(resource_id=resource_id, payload=policy_to_create)
  387. assert policy
  388. policies = await uma.a_policy_query()
  389. assert len(policies) == 3
  390. policies = await uma.a_policy_query(name="TestPolicyGroup")
  391. assert len(policies) == 1
  392. policy_id = policy["id"]
  393. await uma.a_policy_delete(policy_id)
  394. with pytest.raises(KeycloakDeleteError) as err:
  395. await uma.a_policy_delete(policy_id)
  396. assert err.match(
  397. '404: b\'{"error":"invalid_request","error_description":"Policy with .* does not exist"}\''
  398. )
  399. policies = await uma.a_policy_query()
  400. assert len(policies) == 2
  401. policy = policies[0]
  402. await uma.a_policy_update(policy_id=policy["id"], payload=policy)
  403. policies = await uma.a_policy_query()
  404. assert len(policies) == 2
  405. policies = await uma.a_policy_query(name="Invalid")
  406. assert len(policies) == 0
  407. policies = await uma.a_policy_query(scope="Invalid")
  408. assert len(policies) == 0
  409. policies = await uma.a_policy_query(resource="Invalid")
  410. assert len(policies) == 0
  411. policies = await uma.a_policy_query(first=3)
  412. assert len(policies) == 0
  413. policies = await uma.a_policy_query(maximum=0)
  414. assert len(policies) == 0
  415. policies = await uma.a_policy_query(name=policy["name"])
  416. assert len(policies) == 1
  417. policies = await uma.a_policy_query(scope=policy["scopes"][0])
  418. assert len(policies) == 2
  419. policies = await uma.a_policy_query(resource=resource_id)
  420. assert len(policies) == 2
  421. await uma.a_resource_set_delete(resource_id)
  422. await admin.a_delete_client(other_client_id)
  423. await admin.a_delete_realm_role(role_id)
  424. await admin.a_delete_group(group_id)
  425. @pytest.mark.asyncio
  426. async def test_a_uma_access(uma: KeycloakUMA):
  427. """Test permission access checks.
  428. :param uma: Keycloak UMA client
  429. :type uma: KeycloakUMA
  430. """
  431. resource_to_create = {
  432. "name": "mytest",
  433. "scopes": ["read", "write"],
  434. "type": "urn:test",
  435. "ownerManagedAccess": True,
  436. }
  437. resource = await uma.a_resource_set_create(resource_to_create)
  438. policy_to_create = {
  439. "name": "TestPolicy",
  440. "description": "Test resource policy description",
  441. "scopes": [resource_to_create["scopes"][0]],
  442. "clients": [uma.connection.client_id],
  443. }
  444. await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  445. token = uma.connection.token
  446. permissions = list()
  447. assert await uma.a_permissions_check(token["access_token"], permissions)
  448. permissions.append(UMAPermission(resource=resource_to_create["name"]))
  449. assert await uma.a_permissions_check(token["access_token"], permissions)
  450. permissions.append(UMAPermission(resource="not valid"))
  451. assert not await uma.a_permissions_check(token["access_token"], permissions)
  452. uma.resource_set_delete(resource["_id"])
  453. @pytest.mark.asyncio
  454. async def test_a_uma_permission_ticket(uma: KeycloakUMA):
  455. """Test permission ticket generation.
  456. :param uma: Keycloak UMA client
  457. :type uma: KeycloakUMA
  458. """
  459. resource_to_create = {
  460. "name": "mytest",
  461. "scopes": ["read", "write"],
  462. "type": "urn:test",
  463. "ownerManagedAccess": True,
  464. }
  465. resource = await uma.a_resource_set_create(resource_to_create)
  466. policy_to_create = {
  467. "name": "TestPolicy",
  468. "description": "Test resource policy description",
  469. "scopes": [resource_to_create["scopes"][0]],
  470. "clients": [uma.connection.client_id],
  471. }
  472. await uma.a_policy_resource_create(resource_id=resource["_id"], payload=policy_to_create)
  473. permissions = (
  474. UMAPermission(resource=resource_to_create["name"], scope=resource_to_create["scopes"][0]),
  475. )
  476. response = await uma.a_permission_ticket_create(permissions)
  477. rpt = await uma.connection.keycloak_openid.a_token(
  478. grant_type="urn:ietf:params:oauth:grant-type:uma-ticket", ticket=response["ticket"]
  479. )
  480. assert rpt
  481. assert "access_token" in rpt
  482. permissions = (UMAPermission(resource="invalid"),)
  483. with pytest.raises(KeycloakPostError):
  484. uma.permission_ticket_create(permissions)
  485. await uma.a_resource_set_delete(resource["_id"])
  486. def test_counter_part():
  487. """Test that each function has its async counter part."""
  488. uma_methods = [func for func in dir(KeycloakUMA) if callable(getattr(KeycloakUMA, func))]
  489. sync_methods = [
  490. method
  491. for method in uma_methods
  492. if not method.startswith("a_") and not method.startswith("_")
  493. ]
  494. async_methods = [
  495. method for method in uma_methods if iscoroutinefunction(getattr(KeycloakUMA, method))
  496. ]
  497. for method in sync_methods:
  498. async_method = f"a_{method}"
  499. assert (async_method in uma_methods) is True
  500. sync_sign = signature(getattr(KeycloakUMA, method))
  501. async_sign = signature(getattr(KeycloakUMA, async_method))
  502. assert sync_sign.parameters == async_sign.parameters
  503. for async_method in async_methods:
  504. if async_method[2:].startswith("_"):
  505. continue
  506. assert async_method[2:] in sync_methods