diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py index 798ba72..994b97d 100644 --- a/src/keycloak/keycloak_admin.py +++ b/src/keycloak/keycloak_admin.py @@ -3597,3 +3597,41 @@ class KeycloakAdmin: urls_patterns.URL_ADMIN_REQUIRED_ACTIONS_ALIAS.format(**params_path), data=payload ) return raise_error_from_response(data_raw, KeycloakPutError) + + def get_bruteforce_detection_status(self, user_id): + """Get bruteforce detection status for user. + + :param user_id: User id + :type user_id: str + :return: Bruteforce status. + :rtype: dict + """ + params_path = {"realm-name": self.realm_name, "id": user_id} + data_raw = self.raw_get( + urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakGetError) + + def clear_bruteforce_attempts_for_user(self, user_id): + """Clear bruteforce attempts for user. + + :param user_id: User id + :type user_id: str + :return: empty dictionary. + :rtype: dict + """ + params_path = {"realm-name": self.realm_name, "id": user_id} + data_raw = self.raw_delete( + urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path) + ) + return raise_error_from_response(data_raw, KeycloakDeleteError) + + def clear_all_bruteforce_attempts(self): + """Clear bruteforce attempts for all users in realm. + + :return: empty dictionary. + :rtype: dict + """ + params_path = {"realm-name": self.realm_name} + data_raw = self.raw_delete(urls_patterns.URL_ADMIN_ATTACK_DETECTION.format(**params_path)) + return raise_error_from_response(data_raw, KeycloakDeleteError) diff --git a/src/keycloak/urls_patterns.py b/src/keycloak/urls_patterns.py index f2a2188..b5f3277 100644 --- a/src/keycloak/urls_patterns.py +++ b/src/keycloak/urls_patterns.py @@ -195,3 +195,8 @@ URL_ADMIN_CLIENT_ROLE_CHILDREN = ( URL_ADMIN_CLIENT_CERT_UPLOAD = URL_ADMIN_CLIENT_CERTS + "/upload-certificate" URL_ADMIN_REQUIRED_ACTIONS = URL_ADMIN_REALM + "/authentication/required-actions" URL_ADMIN_REQUIRED_ACTIONS_ALIAS = URL_ADMIN_REQUIRED_ACTIONS + "/{action-alias}" + +URL_ADMIN_ATTACK_DETECTION = "admin/realms/{realm-name}/attack-detection/brute-force/users" +URL_ADMIN_ATTACK_DETECTION_USER = ( + "admin/realms/{realm-name}/attack-detection/brute-force/users/{id}" +) diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py index b3ad951..b1625f0 100644 --- a/tests/test_keycloak_admin.py +++ b/tests/test_keycloak_admin.py @@ -1,11 +1,12 @@ """Test the keycloak admin object.""" import copy +from typing import Tuple import pytest import keycloak -from keycloak import KeycloakAdmin +from keycloak import KeycloakAdmin, KeycloakOpenID from keycloak.connection import ConnectionManager from keycloak.exceptions import ( KeycloakAuthenticationError, @@ -1135,15 +1136,12 @@ def test_role_attributes( attribute_role = "test-realm-role-w-attr" test_attrs = {"attr1": ["val1"], "attr2": ["val2-1", "val2-2"]} role_id = admin.create_realm_role( - payload={"name": attribute_role, "attributes": test_attrs}, - skip_exists=True, + payload={"name": attribute_role, "attributes": test_attrs}, skip_exists=True ) assert role_id, role_id cli_role_id = admin.create_client_role( - client, - payload={"name": attribute_role, "attributes": test_attrs}, - skip_exists=True, + client, payload={"name": attribute_role, "attributes": test_attrs}, skip_exists=True ) assert cli_role_id, cli_role_id @@ -2285,3 +2283,120 @@ def test_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfs admin.upload_certificate(client, cert) cl = admin.get_client(client) assert cl["attributes"]["jwt.credential.certificate"] == "".join(cert.splitlines()[1:-1]) + + +def test_get_bruteforce_status_for_user( + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str +): + """Test users. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + :param realm: Keycloak realm + :type realm: str + """ + oid, username, password = oid_with_credentials + admin.realm_name = realm + + # Turn on bruteforce protection + res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": True}) + res = admin.get_realm(realm_name=realm) + assert res["bruteForceProtected"] is True + + # Test login user with wrong credentials + try: + oid.token(username=username, password="wrongpassword") + except KeycloakAuthenticationError: + pass + + user_id = admin.get_user_id(username) + bruteforce_status = admin.get_bruteforce_detection_status(user_id) + + assert bruteforce_status["numFailures"] == 1 + + # Cleanup + res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False}) + res = admin.get_realm(realm_name=realm) + assert res["bruteForceProtected"] is False + + +def test_clear_bruteforce_attempts_for_user( + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str +): + """Test users. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + :param realm: Keycloak realm + :type realm: str + """ + oid, username, password = oid_with_credentials + admin.realm_name = realm + + # Turn on bruteforce protection + res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": True}) + res = admin.get_realm(realm_name=realm) + assert res["bruteForceProtected"] is True + + # Test login user with wrong credentials + try: + oid.token(username=username, password="wrongpassword") + except KeycloakAuthenticationError: + pass + + user_id = admin.get_user_id(username) + bruteforce_status = admin.get_bruteforce_detection_status(user_id) + assert bruteforce_status["numFailures"] == 1 + + res = admin.clear_bruteforce_attempts_for_user(user_id) + bruteforce_status = admin.get_bruteforce_detection_status(user_id) + assert bruteforce_status["numFailures"] == 0 + + # Cleanup + res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False}) + res = admin.get_realm(realm_name=realm) + assert res["bruteForceProtected"] is False + + +def test_clear_bruteforce_attempts_for_all_users( + admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str +): + """Test users. + + :param admin: Keycloak Admin client + :type admin: KeycloakAdmin + :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials + :type oid_with_credentials: Tuple[KeycloakOpenID, str, str] + :param realm: Keycloak realm + :type realm: str + """ + oid, username, password = oid_with_credentials + admin.realm_name = realm + + # Turn on bruteforce protection + res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": True}) + res = admin.get_realm(realm_name=realm) + assert res["bruteForceProtected"] is True + + # Test login user with wrong credentials + try: + oid.token(username=username, password="wrongpassword") + except KeycloakAuthenticationError: + pass + + user_id = admin.get_user_id(username) + bruteforce_status = admin.get_bruteforce_detection_status(user_id) + assert bruteforce_status["numFailures"] == 1 + + res = admin.clear_all_bruteforce_attempts() + bruteforce_status = admin.get_bruteforce_detection_status(user_id) + assert bruteforce_status["numFailures"] == 0 + + # Cleanup + res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False}) + res = admin.get_realm(realm_name=realm) + assert res["bruteForceProtected"] is False