Browse Source

Merge pull request #39 from Sispheor/features/custom_headers

[Feature] add custom headers. Closes #38
pull/44/head
Marcos Pereira 5 years ago
committed by GitHub
parent
commit
07f3e936b9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      docs/source/index.rst
  2. 18
      keycloak/keycloak_admin.py
  3. 9
      keycloak/keycloak_openid.py
  4. 46
      keycloak/tests/test_connection.py

16
docs/source/index.rst

@ -92,6 +92,14 @@ Main methods::
client_secret_key="secret",
verify=True)
# Optionally, you can pass custom headers that will be added to all HTTP calls
# keycloak_openid = KeycloakOpenID(server_url="http://localhost:8080/auth/",
# client_id="example_client",
# realm_name="example_realm",
# client_secret_key="secret",
# verify=True,
# custom_headers={'CustomHeader': 'value'})
# Get WellKnow
config_well_know = keycloak_openid.well_know()
@ -143,6 +151,14 @@ Main methods::
realm_name="example_realm",
verify=True)
# Optionally, you can pass custom headers that will be added to all HTTP calls
#keycloak_admin = KeycloakAdmin(server_url="http://localhost:8080/auth/",
# username='example-admin',
# password='secret',
# realm_name="example_realm",
# verify=True,
# custom_headers={'CustomHeader': 'value'})
# Add user
new_user = keycloak_admin.create_user({"email": "example@example.com",
"username": "example@example.com",

18
keycloak/keycloak_admin.py

@ -45,7 +45,8 @@ class KeycloakAdmin:
PAGE_SIZE = 100
def __init__(self, server_url, username, password, realm_name='master', client_id='admin-cli', verify=True, client_secret_key=None):
def __init__(self, server_url, username, password, realm_name='master', client_id='admin-cli', verify=True,
client_secret_key=None, custom_headers=None):
"""
:param server_url: Keycloak server url
@ -55,6 +56,7 @@ class KeycloakAdmin:
:param client_id: client id
:param verify: True if want check connection SSL
:param client_secret_key: client secret key
:param custom_headers: dict of custom header to pass to each HTML request
"""
self._username = username
self._password = password
@ -63,15 +65,23 @@ class KeycloakAdmin:
# Get token Admin
keycloak_openid = KeycloakOpenID(server_url=server_url, client_id=client_id, realm_name=realm_name,
verify=verify, client_secret_key=client_secret_key)
verify=verify, client_secret_key=client_secret_key,
custom_headers=custom_headers)
grant_type = ["password"]
if client_secret_key:
grant_type = ["client_credentials"]
self._token = keycloak_openid.token(username, password, grant_type=grant_type)
headers = {
'Authorization': 'Bearer ' + self.token.get('access_token'),
'Content-Type': 'application/json'
}
if custom_headers is not None:
# merge custom headers to main headers
headers.update(custom_headers)
self._connection = ConnectionManager(base_url=server_url,
headers={'Authorization': 'Bearer ' + self.token.get('access_token'),
'Content-Type': 'application/json'},
headers=headers,
timeout=60,
verify=verify)

9
keycloak/keycloak_openid.py

@ -43,7 +43,7 @@ from .urls_patterns import (
class KeycloakOpenID:
def __init__(self, server_url, realm_name, client_id, client_secret_key=None, verify=True):
def __init__(self, server_url, realm_name, client_id, client_secret_key=None, verify=True, custom_headers=None):
"""
:param server_url: Keycloak server url
@ -51,12 +51,17 @@ class KeycloakOpenID:
:param realm_name: realm name
:param client_secret_key: client secret key
:param verify: True if want check connection SSL
:param custom_headers: dict of custom header to pass to each HTML request
"""
self._client_id = client_id
self._client_secret_key = client_secret_key
self._realm_name = realm_name
headers = dict()
if custom_headers is not None:
# merge custom headers to main headers
headers.update(custom_headers)
self._connection = ConnectionManager(base_url=server_url,
headers={},
headers=headers,
timeout=60,
verify=verify)

46
keycloak/tests/test_connection.py

@ -14,9 +14,11 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from unittest import mock
from httmock import urlmatch, response, HTTMock, all_requests
from keycloak import KeycloakAdmin, KeycloakOpenID
from ..connection import ConnectionManager
try:
@ -141,3 +143,47 @@ class TestConnection(unittest.TestCase):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
{"test": "value"})
def test_KeycloakAdmin_custom_header(self):
class FakeToken:
@staticmethod
def get(string_val):
return "faketoken"
fake_token = FakeToken()
with mock.patch.object(KeycloakOpenID, "__init__", return_value=None) as mock_keycloak_open_id:
with mock.patch("keycloak.keycloak_openid.KeycloakOpenID.token", return_value=fake_token):
with mock.patch("keycloak.connection.ConnectionManager.__init__", return_value=None) as mock_connection_manager:
server_url = "https://localhost/auth/"
username = "admin"
password = "secret"
realm_name = "master"
headers = {
'Custom': 'test-custom-header'
}
KeycloakAdmin(server_url=server_url,
username=username,
password=password,
realm_name=realm_name,
verify=False,
custom_headers=headers)
mock_keycloak_open_id.assert_called_with(server_url=server_url,
realm_name=realm_name,
client_id='admin-cli',
client_secret_key=None,
verify=False,
custom_headers=headers)
expected_header = {'Authorization': 'Bearer faketoken',
'Content-Type': 'application/json',
'Custom': 'test-custom-header'
}
mock_connection_manager.assert_called_with(base_url=server_url,
headers=expected_header,
timeout=60,
verify=False)
Loading…
Cancel
Save