Browse Source

Merged in kukrilabs/python-keycloak (pull request #22)

Adding import Realm

Approved-by: Marcos Pereira <marcospereira.mpj@gmail.com>
pull/12/head
Ewan Jones 6 years ago
committed by Marcos Pereira
parent
commit
5b1dd5711d
  1. 14
      Pipfile
  2. 107
      Pipfile.lock
  3. 2
      bitbucket-pipelines.yml
  4. 2
      keycloak/__init__.py
  5. 2
      keycloak/authorization/__init__.py
  6. 1
      keycloak/authorization/permission.py
  7. 3
      keycloak/connection.py
  8. 1
      keycloak/exceptions.py
  9. 55
      keycloak/keycloak_admin.py
  10. 9
      keycloak/keycloak_openid.py
  11. 41
      keycloak/tests/test_connection.py
  12. 1
      keycloak/urls_patterns.py

14
Pipfile

@ -0,0 +1,14 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
requests = ">=2.18.4"
httmock = ">=1.2.5"
python-jose = ">=1.4.0"
[dev-packages]
[requires]
python_version = "3.7"

107
Pipfile.lock

@ -0,0 +1,107 @@
{
"_meta": {
"hash": {
"sha256": "2e38b123d04c65ce270c4f49048a74068545017ba69af6daf4612a5f43f64014"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.7"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"certifi": {
"hashes": [
"sha256:376690d6f16d32f9d1fe8932551d80b23e9d393a8578c5633a2ed39a64861638",
"sha256:456048c7e371c089d0a77a5212fb37a2c2dce1e24146e3b7e0261736aaeaa22a"
],
"version": "==2018.8.24"
},
"chardet": {
"hashes": [
"sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae",
"sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691"
],
"version": "==3.0.4"
},
"ecdsa": {
"hashes": [
"sha256:40d002cf360d0e035cf2cb985e1308d41aaa087cbfc135b2dc2d844296ea546c",
"sha256:64cf1ee26d1cde3c73c6d7d107f835fed7c6a2904aef9eac223d57ad800c43fa"
],
"version": "==0.13"
},
"future": {
"hashes": [
"sha256:e39ced1ab767b5936646cedba8bcce582398233d6a627067d4c6a454c90cfedb"
],
"version": "==0.16.0"
},
"httmock": {
"hashes": [
"sha256:4696306d1ff835c3ca865fdef2684d7e130b4120cc00126f862ba4797b1602ac"
],
"index": "pypi",
"version": "==1.2.6"
},
"idna": {
"hashes": [
"sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e",
"sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16"
],
"version": "==2.7"
},
"pyasn1": {
"hashes": [
"sha256:b9d3abc5031e61927c82d4d96c1cec1e55676c1a991623cfed28faea73cdd7ca",
"sha256:f58f2a3d12fd754aa123e9fa74fb7345333000a035f3921dbdaa08597aa53137"
],
"version": "==0.4.4"
},
"python-jose": {
"hashes": [
"sha256:29701d998fe560e52f17246c3213a882a4a39da7e42c7015bcc1f7823ceaff1c",
"sha256:ed7387f0f9af2ea0ddc441d83a6eb47a5909bd0c8a72ac3250e75afec2cc1371"
],
"index": "pypi",
"version": "==3.0.1"
},
"requests": {
"hashes": [
"sha256:63b52e3c866428a224f97cab011de738c36aec0185aa91cfacd418b5d58911d1",
"sha256:ec22d826a36ed72a7358ff3fe56cbd4ba69dd7a6718ffd450ff0e9df7a47ce6a"
],
"index": "pypi",
"version": "==2.19.1"
},
"rsa": {
"hashes": [
"sha256:25df4e10c263fb88b5ace923dd84bf9aa7f5019687b5e55382ffcdb8bede9db5",
"sha256:43f682fea81c452c98d09fc316aae12de6d30c4b5c84226642cf8f8fd1c93abd"
],
"version": "==3.4.2"
},
"six": {
"hashes": [
"sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9",
"sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb"
],
"version": "==1.11.0"
},
"urllib3": {
"hashes": [
"sha256:a68ac5e15e76e7e5dd2b8f94007233e01effe3e50e8daddf69acfd81cb686baf",
"sha256:b5725a0bd4ba422ab0e66e89e030c806576753ea3ee08554382c14e685d117b5"
],
"markers": "python_version != '3.2.*' and python_version != '3.3.*' and python_version >= '2.6' and python_version < '4' and python_version != '3.1.*' and python_version != '3.0.*'",
"version": "==1.23"
}
},
"develop": {}
}

2
bitbucket-pipelines.yml

@ -3,7 +3,7 @@
# Only use spaces to indent your .yml configuration.
# -----
# You can specify a custom docker image from Docker Hub as your build environment.
image: python:3.5.1
image: python:3.7
pipelines:
default:

2
keycloak/__init__.py

@ -15,5 +15,5 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .keycloak_openid import *
from .keycloak_admin import *
from .keycloak_openid import *

2
keycloak/authorization/__init__.py

@ -75,7 +75,6 @@ class Authorization:
self.policies[policy_name].add_permission(permission)
if pol['type'] == 'resource':
from pprint import pprint
permission = Permission(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
@ -86,4 +85,3 @@ class Authorization:
for policy_name in ast.literal_eval(pol['config']['applyPolicies']):
if self.policies.get(policy_name) is not None:
self.policies[policy_name].add_permission(permission)

1
keycloak/authorization/permission.py

@ -101,4 +101,3 @@ class Permission:
@scopes.setter
def scopes(self, value):
self._scopes = value

3
keycloak/connection.py

@ -26,9 +26,10 @@ try:
except ImportError:
from urlparse import urljoin
from .exceptions import *
import requests
from .exceptions import (KeycloakConnectionError)
class ConnectionManager(object):
""" Represents a simple server connection.

1
keycloak/exceptions.py

@ -74,7 +74,6 @@ class KeycloakInvalidTokenError(KeycloakOperationError):
def raise_error_from_response(response, error, expected_code=200, skip_exists=False):
if expected_code == response.status_code:
if expected_code == requests.codes.no_content:
return {}

55
keycloak/keycloak_admin.py

@ -24,18 +24,19 @@
# Unless otherwise stated in the comments, "id", in e.g. user_id, refers to the
# internal Keycloak server ID, usually a uuid string
from .urls_patterns import *
from .keycloak_openid import KeycloakOpenID
from .exceptions import raise_error_from_response, KeycloakGetError
from .urls_patterns import (
URL_ADMIN_USERS,
)
import json
from .connection import ConnectionManager
import json
from .exceptions import raise_error_from_response, KeycloakGetError
from .keycloak_openid import KeycloakOpenID
from .urls_patterns import URL_ADMIN_SERVER_INFO, URL_ADMIN_CLIENT_AUTHZ_RESOURCES, URL_ADMIN_CLIENT_ROLES, \
URL_ADMIN_GET_SESSIONS, URL_ADMIN_RESET_PASSWORD, URL_ADMIN_SEND_UPDATE_ACCOUNT, \
URL_ADMIN_USER_CLIENT_ROLES_COMPOSITE, URL_ADMIN_USER_GROUP, URL_ADMIN_REALM_ROLES, URL_ADMIN_GROUP_CHILD, \
URL_ADMIN_USER_CONSENTS, URL_ADMIN_SEND_VERIFY_EMAIL, URL_ADMIN_CLIENT, URL_ADMIN_USER, URL_ADMIN_CLIENT_ROLE, \
URL_ADMIN_USER_GROUPS, URL_ADMIN_CLIENTS, URL_ADMIN_FLOWS_EXECUTIONS, URL_ADMIN_GROUPS, URL_ADMIN_USER_CLIENT_ROLES, \
URL_ADMIN_REALM_IMPORT, URL_ADMIN_USERS_COUNT, URL_ADMIN_FLOWS, URL_ADMIN_GROUP, URL_ADMIN_CLIENT_AUTHZ_SETTINGS, \
URL_ADMIN_GROUP_MEMBERS, URL_ADMIN_USER_STORAGE, URL_ADMIN_GROUP_PERMISSIONS, URL_ADMIN_IDPS, \
URL_ADMIN_USER_CLIENT_ROLES_AVAILABLE, URL_ADMIN_USERS
class KeycloakAdmin:
@ -114,6 +115,22 @@ class KeycloakAdmin:
def token(self, value):
self._token = value
def import_realm(self, payload):
"""
Import a new realm from a RealmRepresentation. Realm name must be unique.
RealmRepresentation
https://www.keycloak.org/docs-api/4.4/rest-api/index.html#_realmrepresentation
:param payload: RealmRepresentation
:return: RealmRepresentation
"""
data_raw = self.connection.raw_post(URL_ADMIN_REALM_IMPORT,
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201)
def get_users(self, query=None):
"""
Get users Returns a list of users, filtered according to query parameters
@ -375,7 +392,7 @@ class KeycloakAdmin:
if subgroup['path'] == path:
return subgroup
elif subgroup["subGroups"]:
for subgroup in group["subGroups"]:
for subgroup in group["subGroups"]:
return self.get_subgroups(subgroup, path)
return None
@ -438,7 +455,7 @@ class KeycloakAdmin:
exists = None
if name is None and path is not None:
path="/" + name
path = "/" + name
elif path is not None:
exists = self.get_group_by_path(path=path, search_in_subgroups=True)
@ -447,13 +464,13 @@ class KeycloakAdmin:
return str(exists)
if parent is None:
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(URL_ADMIN_GROUPS.format(**params_path),
data=json.dumps(payload))
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(URL_ADMIN_GROUPS.format(**params_path),
data=json.dumps(payload))
else:
params_path = {"realm-name": self.realm_name, "id": parent,}
data_raw = self.connection.raw_post(URL_ADMIN_GROUP_CHILD.format(**params_path),
data=json.dumps(payload))
params_path = {"realm-name": self.realm_name, "id": parent, }
data_raw = self.connection.raw_post(URL_ADMIN_GROUP_CHILD.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists)
def group_set_permissions(self, group_id, enabled=True):
@ -828,7 +845,7 @@ class KeycloakAdmin:
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias}
data_raw = self.connection.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path),
data=payload)
data=payload)
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def sync_users(self, storage_id, action):

9
keycloak/keycloak_openid.py

@ -21,7 +21,12 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import json
from jose import jwt
from .authorization import Authorization
from .connection import ConnectionManager
from .exceptions import raise_error_from_response, KeycloakGetError, \
KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError
from .urls_patterns import (
@ -33,9 +38,6 @@ from .urls_patterns import (
URL_ENTITLEMENT,
URL_INTROSPECT
)
from .connection import ConnectionManager
from jose import jwt
import json
class KeycloakOpenID:
@ -397,4 +399,3 @@ class KeycloakOpenID:
permissions += policy.permissions
return list(set(permissions))

41
keycloak/tests/test_connection.py

@ -19,44 +19,42 @@ from httmock import urlmatch, response, HTTMock, all_requests
from ..connection import ConnectionManager
try:
import unittest
except ImportError:
import unittest2 as unittest
class TestConnection(unittest.TestCase):
def setUp(self):
self._conn = ConnectionManager(
base_url="http://localhost/",
headers={},
timeout=60)
base_url="http://localhost/",
headers={},
timeout=60)
@all_requests
def response_content_success(self, url, request):
headers = {'content-type': 'application/json'}
content = b'response_ok'
return response(200, content, headers, None, 5, request)
def test_raw_get(self):
def test_raw_get(self):
with HTTMock(self.response_content_success):
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b'response_ok')
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.status_code, 200)
def test_raw_post(self):
@urlmatch(path="/known_path", method="post")
def response_post_success(url, request):
headers = {'content-type': 'application/json'}
content = 'response'.encode("utf-8")
return response(201, content, headers, None, 5, request)
with HTTMock(response_post_success):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 201)
@ -69,32 +67,30 @@ class TestConnection(unittest.TestCase):
with HTTMock(response_put_success):
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 200)
def test_raw_get_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="get")
def response_get_fail(url, request):
headers = {'content-type': 'application/json'}
content = "404 page not found".encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_get_fail):
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b"404 page not found")
self.assertEqual(resp.status_code, 404)
def test_raw_post_fail(self):
self.assertEqual(resp.status_code, 404)
def test_raw_post_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="post")
def response_post_fail(url, request):
headers = {'content-type': 'application/json'}
content = str(["Start can't be blank"]).encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_post_fail):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
@ -102,7 +98,6 @@ class TestConnection(unittest.TestCase):
self.assertEqual(resp.status_code, 404)
def test_raw_put_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="put")
def response_put_fail(url, request):
headers = {'content-type': 'application/json'}
@ -124,7 +119,7 @@ class TestConnection(unittest.TestCase):
self._conn.add_param_headers("test", "value")
self._conn.del_param_headers("test")
self.assertEqual(self._conn.headers, {})
def test_clean_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
@ -136,12 +131,12 @@ class TestConnection(unittest.TestCase):
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,

1
keycloak/urls_patterns.py

@ -64,6 +64,7 @@ URL_ADMIN_CLIENT_AUTHZ_RESOURCES = "admin/realms/{realm-name}/clients/{id}/authz
URL_ADMIN_CLIENT_CERTS = "admin/realms/{realm-name}/clients/{id}/certificates/{attr}"
URL_ADMIN_REALM_ROLES = "admin/realms/{realm-name}/roles"
URL_ADMIN_REALM_IMPORT = "admin/realms"
URL_ADMIN_IDPS = "admin/realms/{realm-name}/identity-provider/instances"
URL_ADMIN_FLOWS = "admin/realms/{realm-name}/authentication/flows"

Loading…
Cancel
Save