mirror of https://gitlab.com/tildes/tildes.git
Browse Source
Merge branch 'add-api-jwt-authentication' into 'develop-1.101'
Merge branch 'add-api-jwt-authentication' into 'develop-1.101'
Tildes API | JWT Bearer Token Authentication See merge request tildes/tildes!165merge-requests/165/merge
9 changed files with 310 additions and 7 deletions
-
3tildes/development.ini
-
73tildes/openapi_beta.yaml
-
3tildes/production.ini.example
-
2tildes/requirements-dev.txt
-
2tildes/requirements.in
-
2tildes/requirements.txt
-
106tildes/tildes/auth.py
-
2tildes/tildes/routes.py
-
124tildes/tildes/views/api/beta/auth.py
@ -0,0 +1,124 @@ |
|||||
|
# Copyright (c) 2018 Tildes contributors <code@tildes.net> |
||||
|
# SPDX-License-Identifier: AGPL-3.0-or-later |
||||
|
|
||||
|
"""JSON API endpoints related to authentication.""" |
||||
|
|
||||
|
from datetime import timedelta |
||||
|
from pyramid.interfaces import IAuthenticationPolicy |
||||
|
from pyramid.request import Request |
||||
|
from pyramid.view import view_config |
||||
|
|
||||
|
from tildes.enums import LogEventType |
||||
|
from tildes.metrics import incr_counter |
||||
|
from tildes.models.log import Log |
||||
|
from tildes.models.user import User |
||||
|
from tildes.views.api.beta.api_utils import build_error_response |
||||
|
from tildes.auth import JWTAuthenticationPolicy |
||||
|
|
||||
|
|
||||
|
@view_config(route_name="apibeta.login", openapi=True, renderer="json") |
||||
|
def login(request: Request) -> dict: |
||||
|
"""Process a login request and return a JWT token if successful.""" |
||||
|
username = request.openapi_validated.body.get("username") |
||||
|
password = request.openapi_validated.body.get("password") |
||||
|
|
||||
|
incr_counter("logins") |
||||
|
|
||||
|
# Look up the user for the supplied username |
||||
|
user = ( |
||||
|
request.query(User) |
||||
|
.undefer_all_columns() |
||||
|
.filter(User.username == username) |
||||
|
.one_or_none() |
||||
|
) |
||||
|
|
||||
|
# If the username doesn't exist, tell them so - usually this isn't considered a good |
||||
|
# practice, but it's completely trivial to check if a username exists on Tildes |
||||
|
# anyway (by visiting /user/<username>), so it's better to just let people know if |
||||
|
# they're trying to log in with the wrong username |
||||
|
if not user: |
||||
|
incr_counter("login_failures") |
||||
|
|
||||
|
# log the failure - need to manually commit because of the exception |
||||
|
log_entry = Log( |
||||
|
LogEventType.USER_LOG_IN_FAIL, |
||||
|
request, |
||||
|
{"username": username, "reason": "Nonexistent username"}, |
||||
|
) |
||||
|
request.db_session.add(log_entry) |
||||
|
request.tm.commit() |
||||
|
|
||||
|
return build_error_response( |
||||
|
"That username does not exist", |
||||
|
field="username", |
||||
|
status=401, |
||||
|
error_type="AuthenticationError", |
||||
|
) |
||||
|
|
||||
|
if not user.is_correct_password(password): |
||||
|
incr_counter("login_failures") |
||||
|
|
||||
|
# log the failure - need to manually commit because of the exception |
||||
|
log_entry = Log( |
||||
|
LogEventType.USER_LOG_IN_FAIL, |
||||
|
request, |
||||
|
{"username": username, "reason": "Incorrect password"}, |
||||
|
) |
||||
|
request.db_session.add(log_entry) |
||||
|
request.tm.commit() |
||||
|
|
||||
|
return build_error_response( |
||||
|
"Incorrect password", |
||||
|
field="password", |
||||
|
status=401, |
||||
|
error_type="AuthenticationError", |
||||
|
) |
||||
|
|
||||
|
# Don't allow banned users to log in |
||||
|
if user.is_banned: |
||||
|
if user.ban_expiry_time: |
||||
|
# add an hour to the ban's expiry time since the cronjob runs hourly |
||||
|
unban_time = user.ban_expiry_time + timedelta(hours=1) |
||||
|
unban_time = unban_time.strftime("%Y-%m-%d %H:%M (UTC)") |
||||
|
|
||||
|
return build_error_response( |
||||
|
"That account is temporarily banned. " |
||||
|
f"The ban will be lifted at {unban_time}", |
||||
|
status=401, |
||||
|
error_type="AuthenticationError", |
||||
|
) |
||||
|
|
||||
|
return build_error_response( |
||||
|
"That account has been banned", status=401, error_type="AuthenticationError" |
||||
|
) |
||||
|
|
||||
|
# If 2FA is enabled, save username to session and make user enter code |
||||
|
if user.two_factor_enabled: |
||||
|
return build_error_response( |
||||
|
"Two-factor authentication is enabled for this account. " |
||||
|
"This is currently not supported by the API.", |
||||
|
status=401, |
||||
|
error_type="AuthenticationError", |
||||
|
) |
||||
|
|
||||
|
# Get the JWT authentication policy and generate a token |
||||
|
auth_policy = request.registry.queryUtility(IAuthenticationPolicy) |
||||
|
# We use MultiAuthenticationPolicy, find the JWT policy |
||||
|
jwt_policy = auth_policy.get_policy(JWTAuthenticationPolicy) |
||||
|
|
||||
|
token = jwt_policy.create_jwt_token(user.user_id) |
||||
|
|
||||
|
return {"token": token} |
||||
|
|
||||
|
|
||||
|
@view_config(route_name="apibeta.whoami", openapi=True, renderer="json") |
||||
|
def whoami(request: Request) -> dict: |
||||
|
"""Endpoint to let a user verify that they are authenticated.""" |
||||
|
|
||||
|
user = request.user |
||||
|
if not user: |
||||
|
msg = "You are not logged in" |
||||
|
else: |
||||
|
msg = f"You are logged in as {user.username}" |
||||
|
|
||||
|
return {"msg": msg} |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue