From fc6a70f459f3caafbd85bc29fa423890cd68caf0 Mon Sep 17 00:00:00 2001
From: Fredrik Lindner <fredrik.lindner@gmail.com>
Date: Mon, 3 Oct 2022 11:36:40 +0200
Subject: [PATCH] feat: attack detection API implementation

---
 src/keycloak/keycloak_admin.py |  38 ++++++++++
 src/keycloak/urls_patterns.py  |   5 ++
 tests/test_keycloak_admin.py   | 127 +++++++++++++++++++++++++++++++--
 3 files changed, 164 insertions(+), 6 deletions(-)

diff --git a/src/keycloak/keycloak_admin.py b/src/keycloak/keycloak_admin.py
index 798ba72..994b97d 100644
--- a/src/keycloak/keycloak_admin.py
+++ b/src/keycloak/keycloak_admin.py
@@ -3597,3 +3597,41 @@ class KeycloakAdmin:
             urls_patterns.URL_ADMIN_REQUIRED_ACTIONS_ALIAS.format(**params_path), data=payload
         )
         return raise_error_from_response(data_raw, KeycloakPutError)
+
+    def get_bruteforce_detection_status(self, user_id):
+        """Get bruteforce detection status for user.
+
+        :param user_id: User id
+        :type user_id: str
+        :return: Bruteforce status.
+        :rtype: dict
+        """
+        params_path = {"realm-name": self.realm_name, "id": user_id}
+        data_raw = self.raw_get(
+            urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path)
+        )
+        return raise_error_from_response(data_raw, KeycloakGetError)
+
+    def clear_bruteforce_attempts_for_user(self, user_id):
+        """Clear bruteforce attempts for user.
+
+        :param user_id: User id
+        :type user_id: str
+        :return: empty dictionary.
+        :rtype: dict
+        """
+        params_path = {"realm-name": self.realm_name, "id": user_id}
+        data_raw = self.raw_delete(
+            urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path)
+        )
+        return raise_error_from_response(data_raw, KeycloakDeleteError)
+
+    def clear_all_bruteforce_attempts(self):
+        """Clear bruteforce attempts for all users in realm.
+
+        :return: empty dictionary.
+        :rtype: dict
+        """
+        params_path = {"realm-name": self.realm_name}
+        data_raw = self.raw_delete(urls_patterns.URL_ADMIN_ATTACK_DETECTION.format(**params_path))
+        return raise_error_from_response(data_raw, KeycloakDeleteError)
diff --git a/src/keycloak/urls_patterns.py b/src/keycloak/urls_patterns.py
index f2a2188..b5f3277 100644
--- a/src/keycloak/urls_patterns.py
+++ b/src/keycloak/urls_patterns.py
@@ -195,3 +195,8 @@ URL_ADMIN_CLIENT_ROLE_CHILDREN = (
 URL_ADMIN_CLIENT_CERT_UPLOAD = URL_ADMIN_CLIENT_CERTS + "/upload-certificate"
 URL_ADMIN_REQUIRED_ACTIONS = URL_ADMIN_REALM + "/authentication/required-actions"
 URL_ADMIN_REQUIRED_ACTIONS_ALIAS = URL_ADMIN_REQUIRED_ACTIONS + "/{action-alias}"
+
+URL_ADMIN_ATTACK_DETECTION = "admin/realms/{realm-name}/attack-detection/brute-force/users"
+URL_ADMIN_ATTACK_DETECTION_USER = (
+    "admin/realms/{realm-name}/attack-detection/brute-force/users/{id}"
+)
diff --git a/tests/test_keycloak_admin.py b/tests/test_keycloak_admin.py
index b3ad951..b1625f0 100644
--- a/tests/test_keycloak_admin.py
+++ b/tests/test_keycloak_admin.py
@@ -1,11 +1,12 @@
 """Test the keycloak admin object."""
 
 import copy
+from typing import Tuple
 
 import pytest
 
 import keycloak
-from keycloak import KeycloakAdmin
+from keycloak import KeycloakAdmin, KeycloakOpenID
 from keycloak.connection import ConnectionManager
 from keycloak.exceptions import (
     KeycloakAuthenticationError,
@@ -1135,15 +1136,12 @@ def test_role_attributes(
     attribute_role = "test-realm-role-w-attr"
     test_attrs = {"attr1": ["val1"], "attr2": ["val2-1", "val2-2"]}
     role_id = admin.create_realm_role(
-        payload={"name": attribute_role, "attributes": test_attrs},
-        skip_exists=True,
+        payload={"name": attribute_role, "attributes": test_attrs}, skip_exists=True
     )
     assert role_id, role_id
 
     cli_role_id = admin.create_client_role(
-        client,
-        payload={"name": attribute_role, "attributes": test_attrs},
-        skip_exists=True,
+        client, payload={"name": attribute_role, "attributes": test_attrs}, skip_exists=True
     )
     assert cli_role_id, cli_role_id
 
@@ -2285,3 +2283,120 @@ def test_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfs
     admin.upload_certificate(client, cert)
     cl = admin.get_client(client)
     assert cl["attributes"]["jwt.credential.certificate"] == "".join(cert.splitlines()[1:-1])
+
+
+def test_get_bruteforce_status_for_user(
+    admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
+):
+    """Test users.
+
+    :param admin: Keycloak Admin client
+    :type admin: KeycloakAdmin
+    :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
+    :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
+    :param realm: Keycloak realm
+    :type realm: str
+    """
+    oid, username, password = oid_with_credentials
+    admin.realm_name = realm
+
+    # Turn on bruteforce protection
+    res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": True})
+    res = admin.get_realm(realm_name=realm)
+    assert res["bruteForceProtected"] is True
+
+    # Test login user with wrong credentials
+    try:
+        oid.token(username=username, password="wrongpassword")
+    except KeycloakAuthenticationError:
+        pass
+
+    user_id = admin.get_user_id(username)
+    bruteforce_status = admin.get_bruteforce_detection_status(user_id)
+
+    assert bruteforce_status["numFailures"] == 1
+
+    # Cleanup
+    res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False})
+    res = admin.get_realm(realm_name=realm)
+    assert res["bruteForceProtected"] is False
+
+
+def test_clear_bruteforce_attempts_for_user(
+    admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
+):
+    """Test users.
+
+    :param admin: Keycloak Admin client
+    :type admin: KeycloakAdmin
+    :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
+    :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
+    :param realm: Keycloak realm
+    :type realm: str
+    """
+    oid, username, password = oid_with_credentials
+    admin.realm_name = realm
+
+    # Turn on bruteforce protection
+    res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": True})
+    res = admin.get_realm(realm_name=realm)
+    assert res["bruteForceProtected"] is True
+
+    # Test login user with wrong credentials
+    try:
+        oid.token(username=username, password="wrongpassword")
+    except KeycloakAuthenticationError:
+        pass
+
+    user_id = admin.get_user_id(username)
+    bruteforce_status = admin.get_bruteforce_detection_status(user_id)
+    assert bruteforce_status["numFailures"] == 1
+
+    res = admin.clear_bruteforce_attempts_for_user(user_id)
+    bruteforce_status = admin.get_bruteforce_detection_status(user_id)
+    assert bruteforce_status["numFailures"] == 0
+
+    # Cleanup
+    res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False})
+    res = admin.get_realm(realm_name=realm)
+    assert res["bruteForceProtected"] is False
+
+
+def test_clear_bruteforce_attempts_for_all_users(
+    admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
+):
+    """Test users.
+
+    :param admin: Keycloak Admin client
+    :type admin: KeycloakAdmin
+    :param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
+    :type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
+    :param realm: Keycloak realm
+    :type realm: str
+    """
+    oid, username, password = oid_with_credentials
+    admin.realm_name = realm
+
+    # Turn on bruteforce protection
+    res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": True})
+    res = admin.get_realm(realm_name=realm)
+    assert res["bruteForceProtected"] is True
+
+    # Test login user with wrong credentials
+    try:
+        oid.token(username=username, password="wrongpassword")
+    except KeycloakAuthenticationError:
+        pass
+
+    user_id = admin.get_user_id(username)
+    bruteforce_status = admin.get_bruteforce_detection_status(user_id)
+    assert bruteforce_status["numFailures"] == 1
+
+    res = admin.clear_all_bruteforce_attempts()
+    bruteforce_status = admin.get_bruteforce_detection_status(user_id)
+    assert bruteforce_status["numFailures"] == 0
+
+    # Cleanup
+    res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False})
+    res = admin.get_realm(realm_name=realm)
+    assert res["bruteForceProtected"] is False