Browse Source

pep8 formatting

pull/12/head
Ewan Jones 6 years ago
parent
commit
4763a5f4ab
No known key found for this signature in database GPG Key ID: E34E7F21D7272734
  1. 2
      keycloak/__init__.py
  2. 2
      keycloak/authorization/__init__.py
  3. 1
      keycloak/authorization/permission.py
  4. 3
      keycloak/connection.py
  5. 1
      keycloak/exceptions.py
  6. 29
      keycloak/keycloak_admin.py
  7. 9
      keycloak/keycloak_openid.py
  8. 41
      keycloak/tests/test_connection.py

2
keycloak/__init__.py

@ -15,5 +15,5 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .keycloak_openid import *
from .keycloak_admin import *
from .keycloak_openid import *

2
keycloak/authorization/__init__.py

@ -75,7 +75,6 @@ class Authorization:
self.policies[policy_name].add_permission(permission)
if pol['type'] == 'resource':
from pprint import pprint
permission = Permission(name=pol['name'],
type=pol['type'],
logic=pol['logic'],
@ -86,4 +85,3 @@ class Authorization:
for policy_name in ast.literal_eval(pol['config']['applyPolicies']):
if self.policies.get(policy_name) is not None:
self.policies[policy_name].add_permission(permission)

1
keycloak/authorization/permission.py

@ -101,4 +101,3 @@ class Permission:
@scopes.setter
def scopes(self, value):
self._scopes = value

3
keycloak/connection.py

@ -26,9 +26,10 @@ try:
except ImportError:
from urlparse import urljoin
from .exceptions import *
import requests
from .exceptions import (KeycloakConnectionError)
class ConnectionManager(object):
""" Represents a simple server connection.

1
keycloak/exceptions.py

@ -74,7 +74,6 @@ class KeycloakInvalidTokenError(KeycloakOperationError):
def raise_error_from_response(response, error, expected_code=200, skip_exists=False):
if expected_code == response.status_code:
if expected_code == requests.codes.no_content:
return {}

29
keycloak/keycloak_admin.py

@ -24,19 +24,16 @@
# Unless otherwise stated in the comments, "id", in e.g. user_id, refers to the
# internal Keycloak server ID, usually a uuid string
from .urls_patterns import *
from .keycloak_openid import KeycloakOpenID
import json
from .connection import ConnectionManager
from .exceptions import raise_error_from_response, KeycloakGetError
from .keycloak_openid import KeycloakOpenID
from .urls_patterns import *
from .urls_patterns import (
URL_ADMIN_USERS,
)
from .connection import ConnectionManager
import json
class KeycloakAdmin:
@ -391,7 +388,7 @@ class KeycloakAdmin:
if subgroup['path'] == path:
return subgroup
elif subgroup["subGroups"]:
for subgroup in group["subGroups"]:
for subgroup in group["subGroups"]:
return self.get_subgroups(subgroup, path)
return None
@ -451,7 +448,7 @@ class KeycloakAdmin:
exists = None
if name is None and path is not None:
path="/" + name
path = "/" + name
elif path is not None:
exists = self.get_group_by_path(path=path, search_in_subgroups=True)
@ -460,13 +457,13 @@ class KeycloakAdmin:
return str(exists)
if parent is None:
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(URL_ADMIN_GROUPS.format(**params_path),
data=json.dumps(payload))
params_path = {"realm-name": self.realm_name}
data_raw = self.connection.raw_post(URL_ADMIN_GROUPS.format(**params_path),
data=json.dumps(payload))
else:
params_path = {"realm-name": self.realm_name, "id": parent,}
data_raw = self.connection.raw_post(URL_ADMIN_GROUP_CHILD.format(**params_path),
data=json.dumps(payload))
params_path = {"realm-name": self.realm_name, "id": parent, }
data_raw = self.connection.raw_post(URL_ADMIN_GROUP_CHILD.format(**params_path),
data=json.dumps(payload))
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=201, skip_exists=skip_exists)
def group_set_permissions(self, group_id, enabled=True):
@ -841,7 +838,7 @@ class KeycloakAdmin:
params_path = {"realm-name": self.realm_name, "flow-alias": flow_alias}
data_raw = self.connection.raw_put(URL_ADMIN_FLOWS_EXECUTIONS.format(**params_path),
data=payload)
data=payload)
return raise_error_from_response(data_raw, KeycloakGetError, expected_code=204)
def sync_users(self, storage_id, action):

9
keycloak/keycloak_openid.py

@ -21,7 +21,12 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import json
from jose import jwt
from .authorization import Authorization
from .connection import ConnectionManager
from .exceptions import raise_error_from_response, KeycloakGetError, \
KeycloakRPTNotFound, KeycloakAuthorizationConfigError, KeycloakInvalidTokenError
from .urls_patterns import (
@ -33,9 +38,6 @@ from .urls_patterns import (
URL_ENTITLEMENT,
URL_INTROSPECT
)
from .connection import ConnectionManager
from jose import jwt
import json
class KeycloakOpenID:
@ -397,4 +399,3 @@ class KeycloakOpenID:
permissions += policy.permissions
return list(set(permissions))

41
keycloak/tests/test_connection.py

@ -19,44 +19,42 @@ from httmock import urlmatch, response, HTTMock, all_requests
from ..connection import ConnectionManager
try:
import unittest
except ImportError:
import unittest2 as unittest
class TestConnection(unittest.TestCase):
def setUp(self):
self._conn = ConnectionManager(
base_url="http://localhost/",
headers={},
timeout=60)
base_url="http://localhost/",
headers={},
timeout=60)
@all_requests
def response_content_success(self, url, request):
headers = {'content-type': 'application/json'}
content = b'response_ok'
return response(200, content, headers, None, 5, request)
def test_raw_get(self):
def test_raw_get(self):
with HTTMock(self.response_content_success):
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b'response_ok')
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.status_code, 200)
def test_raw_post(self):
@urlmatch(path="/known_path", method="post")
def response_post_success(url, request):
headers = {'content-type': 'application/json'}
content = 'response'.encode("utf-8")
return response(201, content, headers, None, 5, request)
with HTTMock(response_post_success):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 201)
@ -69,32 +67,30 @@ class TestConnection(unittest.TestCase):
with HTTMock(response_put_success):
resp = self._conn.raw_put("/known_path",
{'field': 'value'})
{'field': 'value'})
self.assertEqual(resp.content, b'response')
self.assertEqual(resp.status_code, 200)
def test_raw_get_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="get")
def response_get_fail(url, request):
headers = {'content-type': 'application/json'}
content = "404 page not found".encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_get_fail):
resp = self._conn.raw_get("/known_path")
self.assertEqual(resp.content, b"404 page not found")
self.assertEqual(resp.status_code, 404)
def test_raw_post_fail(self):
self.assertEqual(resp.status_code, 404)
def test_raw_post_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="post")
def response_post_fail(url, request):
headers = {'content-type': 'application/json'}
content = str(["Start can't be blank"]).encode("utf-8")
return response(404, content, headers, None, 5, request)
with HTTMock(response_post_fail):
resp = self._conn.raw_post("/known_path",
{'field': 'value'})
@ -102,7 +98,6 @@ class TestConnection(unittest.TestCase):
self.assertEqual(resp.status_code, 404)
def test_raw_put_fail(self):
@urlmatch(netloc="localhost", path="/known_path", method="put")
def response_put_fail(url, request):
headers = {'content-type': 'application/json'}
@ -124,7 +119,7 @@ class TestConnection(unittest.TestCase):
self._conn.add_param_headers("test", "value")
self._conn.del_param_headers("test")
self.assertEqual(self._conn.headers, {})
def test_clean_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,
@ -136,12 +131,12 @@ class TestConnection(unittest.TestCase):
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_param_headers(self):
self._conn.add_param_headers("test", "value")
self.assertTrue(self._conn.exist_param_headers("test"))
self.assertFalse(self._conn.exist_param_headers("test_no"))
def test_get_headers(self):
self._conn.add_param_headers("test", "value")
self.assertEqual(self._conn.headers,

Loading…
Cancel
Save