Browse Source

PEP8 recommendations. Thanks @marcelodufe!

hotfix/merge
Marcos Pereira 7 years ago
parent
commit
4e608d4014
  1. 100
      keycloak/__init__.py
  2. 50
      keycloak/connection.py
  3. 55
      keycloak/tests/test_connection.py

100
keycloak/__init__.py

@ -18,8 +18,16 @@
from .exceptions import raise_error_from_response, KeycloakGetError, KeycloakSecretNotFound, \
KeycloakRPTNotFound
from .urls_patterns import URL_AUTH, URL_TOKEN, URL_USERINFO, URL_WELL_KNOWN, URL_LOGOUT, \
URL_CERTS, URL_ENTITLEMENT, URL_INTROSPECT
from .urls_patterns import (
URL_AUTH,
URL_TOKEN,
URL_USERINFO,
URL_WELL_KNOWN,
URL_LOGOUT,
URL_CERTS,
URL_ENTITLEMENT,
URL_INTROSPECT
)
from .connection import ConnectionManager
from jose import jwt
@ -27,23 +35,39 @@ from jose import jwt
class Keycloak:
def __init__(self, server_url, client_id, realm_name, client_secret_key=None):
self.__client_id = client_id
self.__client_secret_key = client_secret_key
self.__realm_name = realm_name
self.client_id = client_id
self.client_secret_key = client_secret_key
self.realm_name = realm_name
self.__connection = ConnectionManager(base_url=server_url,
headers={},
timeout=60)
self.connection = ConnectionManager(base_url=server_url,
headers={},
timeout=60)
def __add_secret_key(self, payload):
@property
def get_client_id(self):
return self.client_id
@property
def get_client_secret_key(self):
return self.client_secret_key
@property
def get_realm_name(self):
return self.realm_name
@property
def get_connection(self):
return self.connection
def _add_secret_key(self, payload):
"""
Add secret key if exist.
:param payload:
:return:
"""
if self.__client_secret_key:
payload.update({"client_secret": self.__client_secret_key})
if self.client_secret_key:
payload.update({"client_secret": self.client_secret_key})
return payload
@ -55,8 +79,8 @@ class Keycloak:
:return It lists endpoints and other configuration options relevant.
"""
params_path = {"realm-name": self.__realm_name}
data_raw = self.__connection.raw_get(URL_WELL_KNOWN.format(**params_path))
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_WELL_KNOWN.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
@ -83,13 +107,13 @@ class Keycloak:
:param grant_type:
:return:
"""
params_path = {"realm-name": self.__realm_name}
params_path = {"realm-name": self.realm_name}
payload = {"username": username, "password": password,
"client_id": self.__client_id, "grant_type": grant_type}
"client_id": self.client_id, "grant_type": grant_type}
payload = self.__add_secret_key(payload)
data_raw = self.__connection.raw_post(URL_TOKEN.format(**params_path),
data=payload)
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_TOKEN.format(**params_path),
data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
def userinfo(self, token):
@ -103,10 +127,10 @@ class Keycloak:
:return:
"""
self.__connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.__realm_name}
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name}
data_raw = self.__connection.raw_get(URL_USERINFO.format(**params_path))
data_raw = self.connection.raw_get(URL_USERINFO.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
@ -116,12 +140,12 @@ class Keycloak:
:param refresh_token:
:return:
"""
params_path = {"realm-name": self.__realm_name}
payload = {"client_id": self.__client_id, "refresh_token": refresh_token}
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.client_id, "refresh_token": refresh_token}
payload = self.__add_secret_key(payload)
data_raw = self.__connection.raw_post(URL_LOGOUT.format(**params_path),
data=payload)
payload = self._add_secret_key(payload)
data_raw = self.connection.raw_post(URL_LOGOUT.format(**params_path),
data=payload)
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
@ -135,8 +159,8 @@ class Keycloak:
:return:
"""
params_path = {"realm-name": self.__realm_name}
data_raw = self.__connection.raw_get(URL_CERTS.format(**params_path))
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_get(URL_CERTS.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
def entitlement(self, token, resource_server_id):
@ -149,9 +173,9 @@ class Keycloak:
:return:
"""
self.__connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.__realm_name, "resource-server-id": resource_server_id}
data_raw = self.__connection.raw_get(URL_ENTITLEMENT.format(**params_path))
self.connection.add_param_headers("Authorization", "Bearer " + token)
params_path = {"realm-name": self.realm_name, "resource-server-id": resource_server_id}
data_raw = self.connection.raw_get(URL_ENTITLEMENT.format(**params_path))
return raise_error_from_response(data_raw, KeycloakGetError)
@ -168,21 +192,21 @@ class Keycloak:
:return:
"""
params_path = {"realm-name": self.__realm_name}
params_path = {"realm-name": self.realm_name}
payload = {"client_id": self.__client_id, "token": token}
payload = {"client_id": self.client_id, "token": token}
if token_type_hint == 'requesting_party_token':
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
self.__connection.add_param_headers("Authorization", "Bearer " + token)
self.connection.add_param_headers("Authorization", "Bearer " + token)
else:
raise KeycloakRPTNotFound("Can't found RPT.")
payload = self.__add_secret_key(payload)
payload = self._add_secret_key(payload)
data_raw = self.__connection.raw_post(URL_INTROSPECT.format(**params_path),
data=payload)
data_raw = self.connection.raw_post(URL_INTROSPECT.format(**params_path),
data=payload)
return raise_error_from_response(data_raw, KeycloakGetError)
@ -204,4 +228,4 @@ class Keycloak:
"""
return jwt.decode(token, key, algorithms=algorithms,
audience=self.__client_id, **kwargs)
audience=self.client_id, **kwargs)

50
keycloak/connection.py

@ -33,28 +33,24 @@ class ConnectionManager(object):
"""
def __init__(self, base_url, headers={}, timeout=60):
self.__base_url = base_url
self.__headers = headers
self.__timeout = timeout
self.base_url = base_url
self.headers = headers
self.timeout = timeout
def get_url(self):
@property
def get_base_url(self):
""" Return base url in use for requests to the server. """
return self.__base_url
return self.base_url
@property
def get_timeout(self):
""" Return timeout in use for request to the server. """
return self.__timeout
def set_headers(self, params):
""" Update header request to the server.
:arg
params (dict): Parameters header request.
"""
self.__headers = params
return self.timeout
@property
def get_headers(self):
""" Return header request to the server. """
return self.__headers
return self.headers
def get_param_headers(self, key):
""" Return a specific header parameter.
@ -63,11 +59,11 @@ class ConnectionManager(object):
:return:
If the header parameters exist, return its value.
"""
return self.__headers.get(key)
return self.headers.get(key)
def clean_headers(self):
""" Clear header parameters. """
self.__headers = {}
self.headers = {}
def exist_param_headers(self, key):
""" Check if the parameter exists in the header.
@ -84,14 +80,14 @@ class ConnectionManager(object):
key (str): Header parameters key.
value (str): Value to be added.
"""
self.__headers[key] = value
self.headers[key] = value
def del_param_headers(self, key):
""" Remove a specific parameter.
:arg
key (str): Key of the header parameters.
"""
self.__headers.pop(key, None)
self.headers.pop(key, None)
def raw_get(self, path, **kwargs):
""" Submit get request to the path.
@ -104,10 +100,10 @@ class ConnectionManager(object):
"""
try:
return requests.get(urljoin(self.get_url(), path),
return requests.get(urljoin(self.base_url, path),
params=kwargs,
headers=self.get_headers(),
timeout=self.get_timeout())
headers=self.headers,
timeout=self.timeout)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
@ -123,11 +119,11 @@ class ConnectionManager(object):
HttpError: Can't connect to server.
"""
try:
return requests.post(urljoin(self.get_url(), path),
return requests.post(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.get_headers(),
timeout=self.get_timeout())
headers=self.headers,
timeout=self.timeout)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
@ -143,11 +139,11 @@ class ConnectionManager(object):
HttpError: Can't connect to server.
"""
try:
return requests.put(urljoin(self.get_url(), path),
return requests.put(urljoin(self.base_url, path),
params=kwargs,
data=data,
headers=self.get_headers(),
timeout=self.get_timeout())
headers=self.headers,
timeout=self.timeout)
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)

55
keycloak/tests/test_connection.py

@ -29,7 +29,7 @@ except ImportError:
class TestConnection(unittest.TestCase):
def setUp(self):
self.__conn = ConnectionManager(
self._conn = ConnectionManager(
base_url="http://localhost/",
headers={},
timeout=60)
@ -42,7 +42,7 @@ class TestConnection(unittest.TestCase):
def test_raw_get(self):
with HTTMock(self.response_content_success):
resp = self.__conn.raw_get("/known_path")
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b'response_ok')
self.assertEqual(resp.status_code, 200)
@ -55,7 +55,7 @@ class TestConnection(unittest.TestCase):
return response(201, content, headers, None, 5, request)
with HTTMock(response_post_success):
resp = self.__conn.raw_post("/known_path",
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 201)
@ -68,7 +68,7 @@ class TestConnection(unittest.TestCase):
return response(200, content, headers, None, 5, request)
with HTTMock(response_put_success):
resp = self.__conn.raw_put("/known_path",
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 200)
@ -82,7 +82,7 @@ class TestConnection(unittest.TestCase):
return response(404, content, headers, None, 5, request)
with HTTMock(response_get_fail):
resp = self.__conn.raw_get("/known_path")
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b"404 page not found")
self.assertEqual(resp.status_code, 404)
@ -96,7 +96,7 @@ class TestConnection(unittest.TestCase):
return response(404, content, headers, None, 5, request)
with HTTMock(response_post_fail):
resp = self.__conn.raw_post("/known_path",
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8"))
self.assertEqual(resp.status_code, 404)
@ -110,44 +110,39 @@ class TestConnection(unittest.TestCase):
return response(404, content, headers, None, 5, request)
with HTTMock(response_put_fail):
resp = self.__conn.raw_put("/known_path",
{'field': 'value'})
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
self.assertEqual(resp.content, str(["Start can't be blank"]).encode("utf-8"))
self.assertEqual(resp.status_code, 404)
def test_add_param_headers(self):
self.__conn.add_param_headers("test", "value")
self.assertEqual(self.__conn.get_headers(),
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
def test_del_param_headers(self):
self.__conn.add_param_headers("test", "value")
self.__conn.del_param_headers("test")
self.assertEqual(self.__conn.get_headers(), {})
self._conn.add_param_headers("test", "value")
self._conn.del_param_headers("test")
self.assertEqual(self._conn.headers, {})
def test_clean_param_headers(self):
self.__conn.add_param_headers("test", "value")
self.assertEqual(self.__conn.get_headers(),
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
self.__conn.clean_headers()
self.assertEqual(self.__conn.get_headers(), {})
self._conn.clean_headers()
self.assertEqual(self._conn.headers, {})
def test_exist_param_headers(self):
self.__conn.add_param_headers("test", "value")
self.assertTrue(self.__conn.exist_param_headers("test"))
self.assertFalse(self.__conn.exist_param_headers("test_no"))
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_param_headers(self):
self.__conn.add_param_headers("test", "value")
self.assertTrue(self.__conn.exist_param_headers("test"))
self.assertFalse(self.__conn.exist_param_headers("test_no"))
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_headers(self):
self.__conn.add_param_headers("test", "value")
self.assertEqual(self.__conn.get_headers(),
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
def test_set_headers(self):
self.__conn.set_headers({"test": "value"})
self.assertTrue(self.__conn.get_headers(), {"test": "value"})
self.assertFalse(self.__conn.exist_param_headers("test_no"))
Loading…
Cancel
Save