Browse Source

Merge pull request #379 from fredlb/feat-attackdetection-api

feat: attack detection API implementation
pull/382/head v2.6.0
Richard Nemeth 2 years ago
committed by GitHub
parent
commit
83e6f54a74
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      src/keycloak/keycloak_admin.py
  2. 5
      src/keycloak/urls_patterns.py
  3. 127
      tests/test_keycloak_admin.py

38
src/keycloak/keycloak_admin.py

@ -3597,3 +3597,41 @@ class KeycloakAdmin:
urls_patterns.URL_ADMIN_REQUIRED_ACTIONS_ALIAS.format(**params_path), data=payload urls_patterns.URL_ADMIN_REQUIRED_ACTIONS_ALIAS.format(**params_path), data=payload
) )
return raise_error_from_response(data_raw, KeycloakPutError) return raise_error_from_response(data_raw, KeycloakPutError)
def get_bruteforce_detection_status(self, user_id):
"""Get bruteforce detection status for user.
:param user_id: User id
:type user_id: str
:return: Bruteforce status.
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.raw_get(
urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakGetError)
def clear_bruteforce_attempts_for_user(self, user_id):
"""Clear bruteforce attempts for user.
:param user_id: User id
:type user_id: str
:return: empty dictionary.
:rtype: dict
"""
params_path = {"realm-name": self.realm_name, "id": user_id}
data_raw = self.raw_delete(
urls_patterns.URL_ADMIN_ATTACK_DETECTION_USER.format(**params_path)
)
return raise_error_from_response(data_raw, KeycloakDeleteError)
def clear_all_bruteforce_attempts(self):
"""Clear bruteforce attempts for all users in realm.
:return: empty dictionary.
:rtype: dict
"""
params_path = {"realm-name": self.realm_name}
data_raw = self.raw_delete(urls_patterns.URL_ADMIN_ATTACK_DETECTION.format(**params_path))
return raise_error_from_response(data_raw, KeycloakDeleteError)

5
src/keycloak/urls_patterns.py

@ -195,3 +195,8 @@ URL_ADMIN_CLIENT_ROLE_CHILDREN = (
URL_ADMIN_CLIENT_CERT_UPLOAD = URL_ADMIN_CLIENT_CERTS + "/upload-certificate" URL_ADMIN_CLIENT_CERT_UPLOAD = URL_ADMIN_CLIENT_CERTS + "/upload-certificate"
URL_ADMIN_REQUIRED_ACTIONS = URL_ADMIN_REALM + "/authentication/required-actions" URL_ADMIN_REQUIRED_ACTIONS = URL_ADMIN_REALM + "/authentication/required-actions"
URL_ADMIN_REQUIRED_ACTIONS_ALIAS = URL_ADMIN_REQUIRED_ACTIONS + "/{action-alias}" URL_ADMIN_REQUIRED_ACTIONS_ALIAS = URL_ADMIN_REQUIRED_ACTIONS + "/{action-alias}"
URL_ADMIN_ATTACK_DETECTION = "admin/realms/{realm-name}/attack-detection/brute-force/users"
URL_ADMIN_ATTACK_DETECTION_USER = (
"admin/realms/{realm-name}/attack-detection/brute-force/users/{id}"
)

127
tests/test_keycloak_admin.py

@ -1,11 +1,12 @@
"""Test the keycloak admin object.""" """Test the keycloak admin object."""
import copy import copy
from typing import Tuple
import pytest import pytest
import keycloak import keycloak
from keycloak import KeycloakAdmin
from keycloak import KeycloakAdmin, KeycloakOpenID
from keycloak.connection import ConnectionManager from keycloak.connection import ConnectionManager
from keycloak.exceptions import ( from keycloak.exceptions import (
KeycloakAuthenticationError, KeycloakAuthenticationError,
@ -1135,15 +1136,12 @@ def test_role_attributes(
attribute_role = "test-realm-role-w-attr" attribute_role = "test-realm-role-w-attr"
test_attrs = {"attr1": ["val1"], "attr2": ["val2-1", "val2-2"]} test_attrs = {"attr1": ["val1"], "attr2": ["val2-1", "val2-2"]}
role_id = admin.create_realm_role( role_id = admin.create_realm_role(
payload={"name": attribute_role, "attributes": test_attrs},
skip_exists=True,
payload={"name": attribute_role, "attributes": test_attrs}, skip_exists=True
) )
assert role_id, role_id assert role_id, role_id
cli_role_id = admin.create_client_role( cli_role_id = admin.create_client_role(
client,
payload={"name": attribute_role, "attributes": test_attrs},
skip_exists=True,
client, payload={"name": attribute_role, "attributes": test_attrs}, skip_exists=True
) )
assert cli_role_id, cli_role_id assert cli_role_id, cli_role_id
@ -2285,3 +2283,120 @@ def test_upload_certificate(admin: KeycloakAdmin, realm: str, client: str, selfs
admin.upload_certificate(client, cert) admin.upload_certificate(client, cert)
cl = admin.get_client(client) cl = admin.get_client(client)
assert cl["attributes"]["jwt.credential.certificate"] == "".join(cert.splitlines()[1:-1]) assert cl["attributes"]["jwt.credential.certificate"] == "".join(cert.splitlines()[1:-1])
def test_get_bruteforce_status_for_user(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
):
"""Test users.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
:param realm: Keycloak realm
:type realm: str
"""
oid, username, password = oid_with_credentials
admin.realm_name = realm
# Turn on bruteforce protection
res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": True})
res = admin.get_realm(realm_name=realm)
assert res["bruteForceProtected"] is True
# Test login user with wrong credentials
try:
oid.token(username=username, password="wrongpassword")
except KeycloakAuthenticationError:
pass
user_id = admin.get_user_id(username)
bruteforce_status = admin.get_bruteforce_detection_status(user_id)
assert bruteforce_status["numFailures"] == 1
# Cleanup
res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False})
res = admin.get_realm(realm_name=realm)
assert res["bruteForceProtected"] is False
def test_clear_bruteforce_attempts_for_user(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
):
"""Test users.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
:param realm: Keycloak realm
:type realm: str
"""
oid, username, password = oid_with_credentials
admin.realm_name = realm
# Turn on bruteforce protection
res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": True})
res = admin.get_realm(realm_name=realm)
assert res["bruteForceProtected"] is True
# Test login user with wrong credentials
try:
oid.token(username=username, password="wrongpassword")
except KeycloakAuthenticationError:
pass
user_id = admin.get_user_id(username)
bruteforce_status = admin.get_bruteforce_detection_status(user_id)
assert bruteforce_status["numFailures"] == 1
res = admin.clear_bruteforce_attempts_for_user(user_id)
bruteforce_status = admin.get_bruteforce_detection_status(user_id)
assert bruteforce_status["numFailures"] == 0
# Cleanup
res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False})
res = admin.get_realm(realm_name=realm)
assert res["bruteForceProtected"] is False
def test_clear_bruteforce_attempts_for_all_users(
admin: KeycloakAdmin, oid_with_credentials: Tuple[KeycloakOpenID, str, str], realm: str
):
"""Test users.
:param admin: Keycloak Admin client
:type admin: KeycloakAdmin
:param oid_with_credentials: Keycloak OpenID client with pre-configured user credentials
:type oid_with_credentials: Tuple[KeycloakOpenID, str, str]
:param realm: Keycloak realm
:type realm: str
"""
oid, username, password = oid_with_credentials
admin.realm_name = realm
# Turn on bruteforce protection
res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": True})
res = admin.get_realm(realm_name=realm)
assert res["bruteForceProtected"] is True
# Test login user with wrong credentials
try:
oid.token(username=username, password="wrongpassword")
except KeycloakAuthenticationError:
pass
user_id = admin.get_user_id(username)
bruteforce_status = admin.get_bruteforce_detection_status(user_id)
assert bruteforce_status["numFailures"] == 1
res = admin.clear_all_bruteforce_attempts()
bruteforce_status = admin.get_bruteforce_detection_status(user_id)
assert bruteforce_status["numFailures"] == 0
# Cleanup
res = admin.update_realm(realm_name=realm, payload={"bruteForceProtected": False})
res = admin.get_realm(realm_name=realm)
assert res["bruteForceProtected"] is False
Loading…
Cancel
Save