Browse Source

Added Instropect RPT and Token.

hotfix/merge
Marcos Pereira 7 years ago
parent
commit
8069a5101e
  1. 8
      README.md
  2. 8
      docs/source/index.rst
  3. 87
      keycloak/__init__.py
  4. 16
      keycloak/exceptions.py
  5. 1
      requirements.txt

8
README.md

@ -71,7 +71,11 @@ certs = keycloak.certs()
token = keycloak.token("user", "password") token = keycloak.token("user", "password")
rpt = keycloak.entitlement(token['access_token'], "resource_id") rpt = keycloak.entitlement(token['access_token'], "resource_id")
# Instropect
keycloak.instropect(token['access_token'], rpt['rpt'])
# Instropect RPT
token_rpt_info = keycloak.instropect(keycloak.instropect(token['access_token'], rpt=rpt['rpt'],
token_type_hint="requesting_party_token"))
# Instropect Token
token_info = keycloak.instropect(token['access_token']))
``` ```

8
docs/source/index.rst

@ -100,6 +100,10 @@ Main methods::
token = keycloak.token("user", "password") token = keycloak.token("user", "password")
rpt = keycloak.entitlement(token['access_token'], "resource_id") rpt = keycloak.entitlement(token['access_token'], "resource_id")
# Instropect
keycloak.instropect(token['access_token'], rpt['rpt'])
# Instropect RPT
token_rpt_info = keycloak.instropect(keycloak.instropect(token['access_token'], rpt=rpt['rpt'],
token_type_hint="requesting_party_token"))
# Instropect Token
token_info = keycloak.instropect(token['access_token']))

87
keycloak/__init__.py

@ -1,11 +1,27 @@
"""
"""
from keycloak.exceptions import raise_error_from_response, KeycloakGetError
# -*- coding: utf-8 -*-
#
# Copyright (C) 2017 Marcos Pereira <marcospereira.mpj@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .exceptions import raise_error_from_response, KeycloakGetError, KeycloakSecretNotFound, \
KeycloakRPTNotFound
from .urls_patterns import URL_AUTH, URL_TOKEN, URL_USERINFO, URL_WELL_KNOWN, URL_LOGOUT, \ from .urls_patterns import URL_AUTH, URL_TOKEN, URL_USERINFO, URL_WELL_KNOWN, URL_LOGOUT, \
URL_CERTS, URL_ENTITLEMENT, URL_INTROSPECT URL_CERTS, URL_ENTITLEMENT, URL_INTROSPECT
from .connection import ConnectionManager from .connection import ConnectionManager
import jwt
class Keycloak: class Keycloak:
@ -19,6 +35,18 @@ class Keycloak:
headers={}, headers={},
timeout=60) timeout=60)
def __add_secret_key(self, payload):
"""
Add secret key if exist.
:param payload:
:return:
"""
if self.__client_secret_key:
payload.update({"client_secret": self.__client_secret_key})
return payload
def well_know(self): def well_know(self):
""" The most important endpoint to understand is the well-known configuration """ The most important endpoint to understand is the well-known configuration
endpoint. It lists endpoints and other configuration options relevant to endpoint. It lists endpoints and other configuration options relevant to
@ -59,9 +87,7 @@ class Keycloak:
payload = {"username": username, "password": password, payload = {"username": username, "password": password,
"client_id": self.__client_id, "grant_type": grant_type} "client_id": self.__client_id, "grant_type": grant_type}
if self.__client_secret_key:
payload.update({"client_secret": self.__client_secret_key})
payload = self.__add_secret_key(payload)
data_raw = self.__connection.raw_post(URL_TOKEN.format(**params_path), data_raw = self.__connection.raw_post(URL_TOKEN.format(**params_path),
data=payload) data=payload)
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
@ -93,9 +119,7 @@ class Keycloak:
params_path = {"realm-name": self.__realm_name} params_path = {"realm-name": self.__realm_name}
payload = {"client_id": self.__client_id, "refresh_token": refresh_token} payload = {"client_id": self.__client_id, "refresh_token": refresh_token}
if self.__client_secret_key:
payload.update({"client_secret": self.__client_secret_key})
payload = self.__add_secret_key(payload)
data_raw = self.__connection.raw_post(URL_LOGOUT.format(**params_path), data_raw = self.__connection.raw_post(URL_LOGOUT.format(**params_path),
data=payload) data=payload)
@ -131,7 +155,7 @@ class Keycloak:
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def instropect(self, token, rpt, token_type_hint="requesting_party_token"):
def instropect(self, token, rpt=None, token_type_hint=None):
""" """
The introspection endpoint is used to retrieve the active state of a token. It is can only be The introspection endpoint is used to retrieve the active state of a token. It is can only be
invoked by confidential clients. invoked by confidential clients.
@ -145,14 +169,45 @@ class Keycloak:
:return: :return:
""" """
params_path = {"realm-name": self.__realm_name} params_path = {"realm-name": self.__realm_name}
payload = {"client_id": self.__client_id, "token": rpt,
'token_type_hint': token_type_hint}
if self.__client_secret_key:
payload.update({"client_secret": self.__client_secret_key})
payload = {"client_id": self.__client_id, "token": token}
if token_type_hint == 'requesting_party_token':
if rpt:
payload.update({"token": rpt, "token_type_hint": token_type_hint})
self.__connection.add_param_headers("Authorization", "Bearer " + token) self.__connection.add_param_headers("Authorization", "Bearer " + token)
else:
raise KeycloakRPTNotFound("Can't found RPT.")
payload = self.__add_secret_key(payload)
data_raw = self.__connection.raw_post(URL_INTROSPECT.format(**params_path), data_raw = self.__connection.raw_post(URL_INTROSPECT.format(**params_path),
data=payload) data=payload)
return raise_error_from_response(data_raw, KeycloakGetError) return raise_error_from_response(data_raw, KeycloakGetError)
def decode_token(self, token, secret='', verify=False, algorithms=['RS256']):
"""
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
structure that represents a cryptographic key. This specification
also defines a JWK Set JSON data structure that represents a set of
JWKs. Cryptographic algorithms and identifiers for use with this
specification are described in the separate JSON Web Algorithms (JWA)
specification and IANA registries established by that specification.
https://tools.ietf.org/html/rfc7517
:param token:
:param secret:
:param verify:
:param algorithms:
:return:
"""
if verify:
if secret:
return jwt.decode(token, secret=secret, verify=verify, algorithms=algorithms)
raise KeycloakSecretNotFound("Can't found secret key.")
return jwt.decode(token, verify=verify, algorithms=algorithms)

16
keycloak/exceptions.py

@ -67,23 +67,11 @@ class KeycloakDeleteError(KeycloakOperationError):
pass pass
class KeycloakProtectError(KeycloakOperationError):
class KeycloakSecretNotFound(KeycloakOperationError):
pass pass
class KeycloakTransferProjectError(KeycloakOperationError):
pass
class KeycloakBuildCancelError(KeycloakOperationError):
pass
class KeycloakBuildRetryError(KeycloakOperationError):
pass
class KeycloakBlockError(KeycloakOperationError):
class KeycloakRPTNotFound(KeycloakOperationError):
pass pass

1
requirements.txt

@ -1,2 +1,3 @@
requests==2.18.3 requests==2.18.3
httmock==1.2.5 httmock==1.2.5
PyJWT==1.5.2
Loading…
Cancel
Save