Browse Source

Initial import

pull/12/head
Marcos Pereira 7 years ago
parent
commit
bc4c8f1b66
  1. 3
      .gitignore
  2. 35
      keycloak/__init__.py
  3. 139
      keycloak/connection.py
  4. 105
      keycloak/exceptions.py
  5. 0
      keycloak/tests/__init__.py
  6. 3
      keycloak/urls_patterns.py
  7. 1
      requirements.txt

3
.gitignore

@ -99,3 +99,6 @@ ENV/
# mypy
.mypy_cache/
.idea/
main.py

35
keycloak/__init__.py

@ -0,0 +1,35 @@
"""
"""
import json
from keycloak.exceptions import raise_error_from_response, KeycloakGetError
from .connection import ConnectionManager
from .urls_patterns import URL_WELL_KNOWN
class Keycloak:
def __init__(self, server_url, client_id, realm_name, client_secret_key=None):
self.__client_id = client_id
self.__client_secret_key = client_secret_key
self.__realm_name = realm_name
self.__connection = ConnectionManager(base_url=server_url,
headers={},
timeout=60)
def get_well_know(self):
params = {"realm-name": self.__realm_name}
data_raw = self.__connection.raw_get(URL_WELL_KNOWN.format(**params))
raise_error_from_response(data_raw, KeycloakGetError)
return json.loads(data_raw.text)
def auth(self):
"""
http://openid.net/specs/openid-connect-core-1_0.html#AuthorizationEndpoint
:return:
"""

139
keycloak/connection.py

@ -0,0 +1,139 @@
"""
"""
import requests
from urllib.parse import urljoin
from .exceptions import *
class ConnectionManager(object):
""" Represents a simple server connection.
Args:
base_url (str): The URL server
headers (dict): The header parameters of the requests to the server.
timeout (int): Timeout to use for requests to the server.
"""
def __init__(self, base_url, headers={}, timeout=60):
self.__base_url = base_url
self.__headers = headers
self.__timeout = timeout
def get_url(self):
""" Return base url in use for requests to the server. """
return self.__base_url
def get_timeout(self):
""" Return timeout in use for request to the server. """
return self.__timeout
def set_headers(self, params):
""" Update header request to the server.
:arg
params (dict): Parameters header request.
"""
self.__headers = params
def get_headers(self):
""" Return header request to the server. """
return self.__headers
def get_param_headers(self, key):
""" Return a single header parameter.
:arg
key (str): Key of the header parameters.
:return:
If the header parameters exist, return value him.
"""
return self.__headers[key] if key in self.__headers.keys() else None
def clean_headers(self):
""" Clear header parameters. """
self.__headers = {}
def exist_param_headers(self, key):
""" Check if the parameter exist in header.
:arg
key (str): Key of the header parameters.
:return:
If the header parameters exist, return True.
"""
return True if self.get_param_headers(key) else False
def add_param_headers(self, key, value):
""" Add a single parameter in header.
:arg
key (str): Key of the header parameters.
value (str): Value for the header parameter.
"""
request_headers = self.__headers.copy()
request_headers.update({key: value})
self.set_headers(request_headers)
def del_param_headers(self, key):
""" Remove a single header parameter.
:arg
key (str): Key of the header parameters.
"""
if self.get_param_headers(key):
del self.__headers[key]
def raw_get(self, path, **kwargs):
""" Submit get request to the path.
:arg
path (str): Path for request.
:return
Response the request.
:exception
HttpError: Can't connect to server.
"""
try:
return requests.get(urljoin(self.get_url(), path),
params=kwargs,
headers=self.get_headers(),
timeout=self.get_timeout())
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
def raw_post(self, path, data, **kwargs):
""" Submit post request to the path.
:arg
path (str): Path for request.
data (dict): Payload for request.
:return
Response the request.
:exception
HttpError: Can't connect to server.
"""
try:
return requests.post(urljoin(self.get_url(), path),
params=kwargs,
data=data,
headers=self.get_headers(),
timeout=self.get_timeout())
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)
def raw_put(self, path, data, **kwargs):
""" Submit put request to the path.
:arg
path (str): Path for request.
data (dict): Payload for request.
:return
Response the request.
:exception
HttpError: Can't connect to server.
"""
try:
return requests.put(urljoin(self.get_url(), path),
params=kwargs,
data=data,
headers=self.get_headers(),
timeout=self.get_timeout())
except Exception as e:
raise KeycloakConnectionError(
"Can't connect to server (%s)" % e)

105
keycloak/exceptions.py

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class KeycloakError(Exception):
def __init__(self, error_message="", response_code=None,
response_body=None):
Exception.__init__(self, error_message)
self.response_code = response_code
self.response_body = response_body
self.error_message = error_message
def __str__(self):
if self.response_code is not None:
return "{0}: {1}".format(self.response_code, self.error_message)
else:
return "{0}".format(self.error_message)
class KeycloakAuthenticationError(KeycloakError):
pass
class KeycloakConnectionError(KeycloakError):
pass
class KeycloakOperationError(KeycloakError):
pass
class KeycloakListError(KeycloakOperationError):
pass
class KeycloakGetError(KeycloakOperationError):
pass
class KeycloakCreateError(KeycloakOperationError):
pass
class KeycloakUpdateError(KeycloakOperationError):
pass
class KeycloakDeleteError(KeycloakOperationError):
pass
class KeycloakProtectError(KeycloakOperationError):
pass
class KeycloakTransferProjectError(KeycloakOperationError):
pass
class KeycloakBuildCancelError(KeycloakOperationError):
pass
class KeycloakBuildRetryError(KeycloakOperationError):
pass
class KeycloakBlockError(KeycloakOperationError):
pass
def raise_error_from_response(response, error, expected_code=200):
if expected_code == response.status_code:
return response.json()
try:
message = response.json()['message']
except (KeyError, ValueError):
message = response.content
if isinstance(error, dict):
error = error.get(response.status_code, KeycloakOperationError)
else:
if response.status_code == 401:
error = KeycloakAuthenticationError
raise error(error_message=message,
response_code=response.status_code,
response_body=response.content)

0
keycloak/tests/__init__.py

3
keycloak/urls_patterns.py

@ -0,0 +1,3 @@
URL_WELL_KNOWN = "realms/{realm-name}/.well-known/openid-configuration"
URL_WELL_KNOWN = "realms/{realm-name}/protocol/openid-connect/auth"

1
requirements.txt

@ -0,0 +1 @@
requests==2.18.3
Loading…
Cancel
Save