mirror of https://gitlab.com/tildes/tildes.git
4 changed files with 172 additions and 3 deletions
-
56tildes/openapi_beta.yaml
-
7tildes/tildes/auth.py
-
1tildes/tildes/routes.py
-
111tildes/tildes/views/api/beta/auth.py
@ -0,0 +1,111 @@ |
|||
# Copyright (c) 2018 Tildes contributors <code@tildes.net> |
|||
# SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
"""JSON API endpoints related to authentication.""" |
|||
|
|||
from datetime import timedelta |
|||
from pyramid.interfaces import IAuthenticationPolicy |
|||
from pyramid.request import Request |
|||
from pyramid.view import view_config |
|||
|
|||
from tildes.enums import LogEventType |
|||
from tildes.metrics import incr_counter |
|||
from tildes.models.log import Log |
|||
from tildes.models.user import User |
|||
from tildes.views.api.beta.api_utils import build_error_response |
|||
from tildes.auth import JWTAuthenticationPolicy |
|||
|
|||
|
|||
@view_config(route_name="apibeta.login", openapi=True, renderer="json") |
|||
def login(request: Request) -> dict: |
|||
"""Process a login request and return a JWT token if successful.""" |
|||
username = request.openapi_validated.body.get("username") |
|||
password = request.openapi_validated.body.get("password") |
|||
|
|||
incr_counter("logins") |
|||
|
|||
# Look up the user for the supplied username |
|||
user = ( |
|||
request.query(User) |
|||
.undefer_all_columns() |
|||
.filter(User.username == username) |
|||
.one_or_none() |
|||
) |
|||
|
|||
# If the username doesn't exist, tell them so - usually this isn't considered a good |
|||
# practice, but it's completely trivial to check if a username exists on Tildes |
|||
# anyway (by visiting /user/<username>), so it's better to just let people know if |
|||
# they're trying to log in with the wrong username |
|||
if not user: |
|||
incr_counter("login_failures") |
|||
|
|||
# log the failure - need to manually commit because of the exception |
|||
log_entry = Log( |
|||
LogEventType.USER_LOG_IN_FAIL, |
|||
request, |
|||
{"username": username, "reason": "Nonexistent username"}, |
|||
) |
|||
request.db_session.add(log_entry) |
|||
request.tm.commit() |
|||
|
|||
return build_error_response( |
|||
"That username does not exist", |
|||
field="username", |
|||
status=401, |
|||
error_type="AuthenticationError", |
|||
) |
|||
|
|||
if not user.is_correct_password(password): |
|||
incr_counter("login_failures") |
|||
|
|||
# log the failure - need to manually commit because of the exception |
|||
log_entry = Log( |
|||
LogEventType.USER_LOG_IN_FAIL, |
|||
request, |
|||
{"username": username, "reason": "Incorrect password"}, |
|||
) |
|||
request.db_session.add(log_entry) |
|||
request.tm.commit() |
|||
|
|||
return build_error_response( |
|||
"Incorrect password", |
|||
field="password", |
|||
status=401, |
|||
error_type="AuthenticationError", |
|||
) |
|||
|
|||
# Don't allow banned users to log in |
|||
if user.is_banned: |
|||
if user.ban_expiry_time: |
|||
# add an hour to the ban's expiry time since the cronjob runs hourly |
|||
unban_time = user.ban_expiry_time + timedelta(hours=1) |
|||
unban_time = unban_time.strftime("%Y-%m-%d %H:%M (UTC)") |
|||
|
|||
return build_error_response( |
|||
"That account is temporarily banned. " |
|||
f"The ban will be lifted at {unban_time}", |
|||
status=401, |
|||
error_type="AuthenticationError", |
|||
) |
|||
|
|||
return build_error_response( |
|||
"That account has been banned", status=401, error_type="AuthenticationError" |
|||
) |
|||
|
|||
# If 2FA is enabled, save username to session and make user enter code |
|||
if user.two_factor_enabled: |
|||
return build_error_response( |
|||
"Two-factor authentication is enabled for this account. " |
|||
"This is currently not supported by the API.", |
|||
status=401, |
|||
error_type="AuthenticationError", |
|||
) |
|||
|
|||
# Get the JWT authentication policy and generate a token |
|||
auth_policy = request.registry.queryUtility(IAuthenticationPolicy) |
|||
# We use MultiAuthenticationPolicy, find the JWT policy |
|||
jwt_policy = auth_policy.get_policy(JWTAuthenticationPolicy) |
|||
|
|||
token = jwt_policy.create_jwt_token(user.user_id) |
|||
|
|||
return {"token": token} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue