mirror of https://gitlab.com/tildes/tildes.git
5 changed files with 214 additions and 147 deletions
-
19tildes/openapi_beta.yaml
-
18tildes/tildes/lib/id.py
-
86tildes/tildes/views/api/beta/api_utils.py
-
79tildes/tildes/views/api/beta/comment.py
-
159tildes/tildes/views/api/beta/topic.py
@ -0,0 +1,86 @@ |
|||||
|
# Copyright (c) 2018 Tildes contributors <code@tildes.net> |
||||
|
# SPDX-License-Identifier: AGPL-3.0-or-later |
||||
|
|
||||
|
"""JSON API utils.""" |
||||
|
|
||||
|
from typing import Tuple |
||||
|
from pyramid.request import Request |
||||
|
from pyramid.response import Response |
||||
|
from tildes.lib.id import split_anchored_id |
||||
|
from tildes.models.pagination import PaginatedQuery, PaginatedResults |
||||
|
|
||||
|
|
||||
|
def query_apply_pagination( # noqa |
||||
|
query, before, after, error_if_no_anchor: bool = False |
||||
|
) -> PaginatedQuery: |
||||
|
"""Apply pagination parameters to a query.""" |
||||
|
# Start by parsing the before/after parameters and extracting the anchor type |
||||
|
# We don't know if the ID has an anchor, so we just try to split it |
||||
|
# If it doesn't have an anchor, we just use the ID as is. |
||||
|
anchor_type = None |
||||
|
if before and after: |
||||
|
raise ValueError("Cannot specify both before and after parameters") |
||||
|
if before: |
||||
|
try: |
||||
|
anchor_type, before = split_anchored_id(before) |
||||
|
except ValueError as exc: |
||||
|
if error_if_no_anchor: |
||||
|
raise ValueError( |
||||
|
"Expected an anchored ID for 'before' parameter" |
||||
|
) from exc |
||||
|
if after: |
||||
|
try: |
||||
|
anchor_type, after = split_anchored_id(after) |
||||
|
except ValueError as exc: |
||||
|
if error_if_no_anchor: |
||||
|
raise ValueError( |
||||
|
"Expected an anchored ID for 'after' parameter" |
||||
|
) from exc |
||||
|
|
||||
|
if anchor_type: |
||||
|
query = query.anchor_type(anchor_type) |
||||
|
if before: |
||||
|
query = query.before_id36(before) |
||||
|
if after: |
||||
|
query = query.after_id36(after) |
||||
|
return query |
||||
|
|
||||
|
|
||||
|
def get_next_and_prev_link(request: Request, page: PaginatedResults) -> Tuple[str, str]: |
||||
|
"""Get the next and previous links for pagination.""" |
||||
|
next_link = None |
||||
|
prev_link = None |
||||
|
|
||||
|
if page.has_next_page: |
||||
|
query_vars = request.GET.copy() |
||||
|
query_vars.pop("before", None) |
||||
|
query_vars.update({"after": page.next_page_after_id36}) |
||||
|
next_link = request.current_route_url(_query=query_vars) |
||||
|
|
||||
|
if page.has_prev_page: |
||||
|
query_vars = request.GET.copy() |
||||
|
query_vars.pop("after", None) |
||||
|
query_vars.update({"before": page.prev_page_before_id36}) |
||||
|
prev_link = request.current_route_url(_query=query_vars) |
||||
|
|
||||
|
return (next_link, prev_link) |
||||
|
|
||||
|
|
||||
|
def build_error_response( |
||||
|
message: str, |
||||
|
status: int = 400, |
||||
|
field: str = "N/A", |
||||
|
error_type: str = "ValidationError", |
||||
|
) -> Response: |
||||
|
"""Build a standardized error response.""" |
||||
|
return Response( |
||||
|
status=status, |
||||
|
content_type="application/json", |
||||
|
json=[ |
||||
|
{ |
||||
|
"message": message, |
||||
|
"field": field, |
||||
|
"exception": error_type, |
||||
|
} |
||||
|
], |
||||
|
) |
||||
@ -0,0 +1,79 @@ |
|||||
|
# Copyright (c) 2018 Tildes contributors <code@tildes.net> |
||||
|
# SPDX-License-Identifier: AGPL-3.0-or-later |
||||
|
|
||||
|
"""JSON API helper functions related to comments.""" |
||||
|
|
||||
|
from pyramid.request import Request |
||||
|
from tildes.models.comment import Comment |
||||
|
from tildes.models.comment.comment_tree import CommentInTree |
||||
|
|
||||
|
|
||||
|
def comment_to_dict(request: Request, comment: Comment) -> dict: |
||||
|
"""Convert a Comment object to a dictionary for JSON serialization.""" |
||||
|
|
||||
|
# Check permissions for viewing comment details (and set safe defaults) |
||||
|
author = None |
||||
|
rendered_html = None |
||||
|
exemplary = None |
||||
|
is_op = None |
||||
|
is_me = None |
||||
|
is_new = None |
||||
|
if request.has_permission("view", comment): |
||||
|
author = comment.user.username |
||||
|
rendered_html = comment.rendered_html |
||||
|
exemplary = comment.is_label_active("exemplary") |
||||
|
is_me = request.user == comment.user if request.user else False |
||||
|
if request.has_permission("view_author", comment.topic): |
||||
|
is_op = comment.user == comment.topic.user |
||||
|
is_new = ( |
||||
|
(comment.created_time > comment.topic.last_visit_time) |
||||
|
if ( |
||||
|
hasattr(comment.topic, "last_visit_time") |
||||
|
and comment.topic.last_visit_time |
||||
|
and not is_me |
||||
|
) |
||||
|
else False |
||||
|
) |
||||
|
|
||||
|
return { |
||||
|
"id": comment.comment_id36, |
||||
|
"topic_id": comment.topic.topic_id36, |
||||
|
"author": author, |
||||
|
"rendered_html": rendered_html, |
||||
|
"created_at": comment.created_time.isoformat(), |
||||
|
"edited_at": ( |
||||
|
comment.last_edited_time.isoformat() if comment.last_edited_time else None |
||||
|
), |
||||
|
"votes": comment.num_votes, |
||||
|
"is_removed": comment.is_removed, |
||||
|
"is_deleted": comment.is_deleted, |
||||
|
"exemplary": exemplary, |
||||
|
"collapsed": ( |
||||
|
(comment.collapsed_state == "full") |
||||
|
if hasattr(comment, "collapsed_state") |
||||
|
else None |
||||
|
), |
||||
|
"collapsed_individual": ( |
||||
|
(comment.collapsed_state == "individual") |
||||
|
if hasattr(comment, "collapsed_state") |
||||
|
else None |
||||
|
), |
||||
|
"is_op": is_op, |
||||
|
"is_me": is_me, |
||||
|
"is_new": is_new, |
||||
|
"voted": comment.user_voted, |
||||
|
"bookmarked": comment.user_bookmarked, |
||||
|
} |
||||
|
|
||||
|
|
||||
|
def comment_subtree_to_dict(request: Request, comments: list[CommentInTree]) -> list: |
||||
|
"""Convert a comment subtree to a list of dictionaries for JSON serialization.""" |
||||
|
comments_list = [] |
||||
|
for comment in comments: |
||||
|
comment_dict = comment_to_dict(request, comment) |
||||
|
comment_dict["depth"] = comment.depth |
||||
|
comment_dict["children"] = ( |
||||
|
comment_subtree_to_dict(request, comment.replies) if comment.replies else [] |
||||
|
) |
||||
|
comments_list.append(comment_dict) |
||||
|
return comments_list |
||||
Write
Preview
Loading…
Cancel
Save
Reference in new issue