From 5495d5b4d2ec2dfa6ec41aabedbe37cac1028bcb Mon Sep 17 00:00:00 2001 From: Deimos Date: Wed, 29 Jan 2020 19:56:13 -0700 Subject: [PATCH] Store all topic visits, adjust triggers/queries This changes from storing only a single topic visit per user to just storing all of them. I don't intend to keep all of these and will probably find a way to "quantize" repeated visits soon. However, I want to get an idea of the volume first, and also use this to see how the new querying methods work in production. On that note, I'm not sure that the LATERAL outer join is the best method, but it seems interesting (and was kind of a pain in the ass in SQLAlchemy), so we'll see how it looks. As part of this, I also changed the method of adjusting num_comments on past topic visits to be done entirely in triggers, instead of the previous approach of doing it in _increment_topic_comments_seen(). However, this really just made me realize how incorrect this idea is and how many edge cases can come up that will mess up the comment counters on the visits (e.g. post a comment and then delete it immediately). Hopefully this can go away in the somewhat near future with some other changes to notifications. --- ...add_visit_time_to_topic_visits_primary_.py | 112 ++++++++++++++++++ .../comment_notifications/topic_visits.sql | 34 ++++++ .../init/triggers/comments/topic_visits.sql | 8 +- tildes/tildes/models/topic/topic_query.py | 28 +++-- tildes/tildes/models/topic/topic_visit.py | 39 ++---- tildes/tildes/views/api/web/comment.py | 72 ++--------- tildes/tildes/views/topic.py | 6 +- 7 files changed, 197 insertions(+), 102 deletions(-) create mode 100644 tildes/alembic/versions/19400b1efe8b_add_visit_time_to_topic_visits_primary_.py create mode 100644 tildes/sql/init/triggers/comment_notifications/topic_visits.sql diff --git a/tildes/alembic/versions/19400b1efe8b_add_visit_time_to_topic_visits_primary_.py b/tildes/alembic/versions/19400b1efe8b_add_visit_time_to_topic_visits_primary_.py new file mode 100644 index 0000000..e5c8454 --- /dev/null +++ b/tildes/alembic/versions/19400b1efe8b_add_visit_time_to_topic_visits_primary_.py @@ -0,0 +1,112 @@ +"""Add visit_time to topic_visits primary key + +Revision ID: 19400b1efe8b +Revises: 6c840340ab86 +Create Date: 2020-01-30 00:14:47.511461 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "19400b1efe8b" +down_revision = "6c840340ab86" +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute("alter table topic_visits drop constraint pk_topic_visits") + op.execute( + "alter table topic_visits add constraint pk_topic_visits primary key (user_id, topic_id, visit_time)" + ) + op.alter_column("topic_visits", "visit_time", server_default=sa.text("NOW()")) + + op.execute( + """ + CREATE OR REPLACE FUNCTION increment_user_topic_visit_num_comments() RETURNS TRIGGER AS $$ + BEGIN + UPDATE topic_visits + SET num_comments = num_comments + 1 + WHERE user_id = NEW.user_id + AND topic_id = NEW.topic_id + AND visit_time = ( + SELECT MAX(visit_time) + FROM topic_visits + WHERE topic_id = NEW.topic_id + AND user_id = NEW.user_id + ); + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + """ + ) + + op.execute( + """ + create or replace function update_last_topic_visit_num_comments() returns trigger as $$ + declare + comment comments%rowtype; + begin + select * INTO comment from comments where comment_id = NEW.comment_id; + + -- if marking a notification as read, increment the comment count on the user's + -- last visit to the topic as long as it was before the comment was posted + if (OLD.is_unread = true and NEW.is_unread = false) then + update topic_visits + set num_comments = num_comments + 1 + where topic_id = comment.topic_id + and user_id = NEW.user_id + and visit_time < comment.created_time + and visit_time = ( + select max(visit_time) + from topic_visits + where topic_id = comment.topic_id + and user_id = NEW.user_id + ); + end if; + + return null; + end + $$ language plpgsql; + """ + ) + + op.execute( + """ + create trigger update_last_topic_visit_num_comments_update + after update of is_unread on comment_notifications + for each row + execute procedure update_last_topic_visit_num_comments(); + """ + ) + + +def downgrade(): + op.execute("alter table topic_visits drop constraint pk_topic_visits") + op.execute( + "alter table topic_visits add constraint pk_topic_visits primary key (user_id, topic_id)" + ) + op.alter_column("topic_visits", "visit_time", server_default=None) + + op.execute( + "drop trigger update_last_topic_visit_num_comments_update on comment_notifications" + ) + op.execute("drop function update_last_topic_visit_num_comments") + + op.execute( + """ + CREATE OR REPLACE FUNCTION increment_user_topic_visit_num_comments() RETURNS TRIGGER AS $$ + BEGIN + UPDATE topic_visits + SET num_comments = num_comments + 1 + WHERE user_id = NEW.user_id + AND topic_id = NEW.topic_id; + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + """ + ) diff --git a/tildes/sql/init/triggers/comment_notifications/topic_visits.sql b/tildes/sql/init/triggers/comment_notifications/topic_visits.sql new file mode 100644 index 0000000..29efd47 --- /dev/null +++ b/tildes/sql/init/triggers/comment_notifications/topic_visits.sql @@ -0,0 +1,34 @@ +-- Copyright (c) 2020 Tildes contributors +-- SPDX-License-Identifier: AGPL-3.0-or-later + +create or replace function update_last_topic_visit_num_comments() returns trigger as $$ +declare + comment comments%rowtype; +begin + select * INTO comment from comments where comment_id = NEW.comment_id; + + -- if marking a notification as read, increment the comment count on the user's + -- last visit to the topic as long as it was before the comment was posted + if (OLD.is_unread = true and NEW.is_unread = false) then + update topic_visits + set num_comments = num_comments + 1 + where topic_id = comment.topic_id + and user_id = NEW.user_id + and visit_time < comment.created_time + and visit_time = ( + select max(visit_time) + from topic_visits + where topic_id = comment.topic_id + and user_id = NEW.user_id + ); + end if; + + return null; +end +$$ language plpgsql; + + +create trigger update_last_topic_visit_num_comments_update + after update of is_unread on comment_notifications + for each row + execute procedure update_last_topic_visit_num_comments(); diff --git a/tildes/sql/init/triggers/comments/topic_visits.sql b/tildes/sql/init/triggers/comments/topic_visits.sql index 73e4e40..f71253f 100644 --- a/tildes/sql/init/triggers/comments/topic_visits.sql +++ b/tildes/sql/init/triggers/comments/topic_visits.sql @@ -7,7 +7,13 @@ BEGIN UPDATE topic_visits SET num_comments = num_comments + 1 WHERE user_id = NEW.user_id - AND topic_id = NEW.topic_id; + AND topic_id = NEW.topic_id + AND visit_time = ( + SELECT MAX(visit_time) + FROM topic_visits + WHERE topic_id = NEW.topic_id + AND user_id = NEW.user_id + ); RETURN NULL; END; diff --git a/tildes/tildes/models/topic/topic_query.py b/tildes/tildes/models/topic/topic_query.py index 7218f36..6a06818 100644 --- a/tildes/tildes/models/topic/topic_query.py +++ b/tildes/tildes/models/topic/topic_query.py @@ -7,7 +7,7 @@ from typing import Any, Sequence from pyramid.request import Request from sqlalchemy import func -from sqlalchemy.sql.expression import and_, label +from sqlalchemy.sql.expression import and_, desc, label, text from tildes.enums import TopicSortOption from tildes.lib.datetime import SimpleHoursPeriod, utc_now @@ -94,14 +94,26 @@ class TopicQuery(PaginatedQuery): def _attach_visit_data(self) -> "TopicQuery": """Join the data related to the user's last visit to the topic(s).""" - query = self.outerjoin( - TopicVisit, - and_( + # subquery using LATERAL to select only the newest visit for each topic + lateral_subquery = ( + self.request.db_session.query( + TopicVisit.visit_time, TopicVisit.num_comments + ) + .filter( TopicVisit.topic_id == Topic.topic_id, TopicVisit.user == self.request.user, - ), + ) + .order_by(desc(TopicVisit.visit_time)) + .limit(1) + .correlate(Topic) + .subquery() + .lateral() ) - query = query.add_columns(TopicVisit.visit_time, TopicVisit.num_comments) + + # join on "true" since the subquery already restricts to the row we want + query = self.outerjoin(lateral_subquery, text("true")) + + query = query.add_columns(lateral_subquery) return query @@ -148,7 +160,7 @@ class TopicQuery(PaginatedQuery): return topic def apply_sort_option( - self, sort: TopicSortOption, desc: bool = True + self, sort: TopicSortOption, is_desc: bool = True ) -> "TopicQuery": """Apply a TopicSortOption sorting method (generative).""" if sort == TopicSortOption.VOTES: @@ -162,7 +174,7 @@ class TopicQuery(PaginatedQuery): elif sort == TopicSortOption.ALL_ACTIVITY: self._sort_column = Topic.last_activity_time - self.sort_desc = desc + self.sort_desc = is_desc return self diff --git a/tildes/tildes/models/topic/topic_visit.py b/tildes/tildes/models/topic/topic_visit.py index 04b8cb4..877ce32 100644 --- a/tildes/tildes/models/topic/topic_visit.py +++ b/tildes/tildes/models/topic/topic_visit.py @@ -6,11 +6,9 @@ from datetime import datetime from sqlalchemy import Column, ForeignKey, Integer, TIMESTAMP -from sqlalchemy.dialects.postgresql import insert -from sqlalchemy.dialects.postgresql.dml import Insert from sqlalchemy.orm import relationship +from sqlalchemy.sql.expression import text -from tildes.lib.datetime import utc_now from tildes.models import DatabaseModel from tildes.models.user import User @@ -18,11 +16,7 @@ from .topic import Topic class TopicVisit(DatabaseModel): - """Model for a user's visit to a topic. - - New visits should not be created through __init__(), but by executing the statement - returned by the `generate_insert_statement` method. This will take advantage of - postgresql's ability to update any existing visit. + """Model for a user's visits to a topic. Trigger behavior: Incoming: @@ -40,26 +34,19 @@ class TopicVisit(DatabaseModel): topic_id: int = Column( Integer, ForeignKey("topics.topic_id"), nullable=False, primary_key=True ) - visit_time: datetime = Column(TIMESTAMP(timezone=True), nullable=False) + visit_time: datetime = Column( + TIMESTAMP(timezone=True), + nullable=False, + primary_key=True, + server_default=text("NOW()"), + ) num_comments: int = Column(Integer, nullable=False) user: User = relationship("User", innerjoin=True) topic: Topic = relationship("Topic", innerjoin=True) - @classmethod - def generate_insert_statement(cls, user: User, topic: Topic) -> Insert: - """Return a INSERT ... ON CONFLICT DO UPDATE statement for a visit.""" - visit_time = utc_now() - return ( - insert(cls.__table__) - .values( - user_id=user.user_id, - topic_id=topic.topic_id, - visit_time=visit_time, - num_comments=topic.num_comments, - ) - .on_conflict_do_update( - constraint=cls.__table__.primary_key, - set_={"visit_time": visit_time, "num_comments": topic.num_comments}, - ) - ) + def __init__(self, user: User, topic: Topic): + """Create a new visit to a topic.""" + self.user = user + self.topic = topic + self.num_comments = topic.num_comments diff --git a/tildes/tildes/views/api/web/comment.py b/tildes/tildes/views/api/web/comment.py index 1160378..c6dc784 100644 --- a/tildes/tildes/views/api/web/comment.py +++ b/tildes/tildes/views/api/web/comment.py @@ -7,15 +7,11 @@ from marshmallow.fields import Boolean from pyramid.httpexceptions import HTTPUnprocessableEntity from pyramid.request import Request from pyramid.response import Response -from sqlalchemy.dialects.postgresql import insert from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import joinedload from sqlalchemy.orm.exc import FlushError from webargs.pyramidparser import use_kwargs -from zope.sqlalchemy import mark_changed from tildes.enums import CommentLabelOption, CommentNotificationType, LogEventType -from tildes.lib.datetime import utc_now from tildes.models.comment import ( Comment, CommentBookmark, @@ -24,7 +20,6 @@ from tildes.models.comment import ( CommentVote, ) from tildes.models.log import LogComment -from tildes.models.topic import TopicVisit from tildes.schemas.comment import CommentLabelSchema, CommentSchema from tildes.views import IC_NOOP from tildes.views.decorators import ic_view_config, rate_limit_view @@ -38,42 +33,11 @@ def _mark_comment_read_from_interaction(request: Request, comment: Comment) -> N if not request.user.interact_mark_notifications_read: return - with request.db_session.no_autoflush: - request.query(CommentNotification).filter( - CommentNotification.user == request.user, - CommentNotification.comment == comment, - CommentNotification.is_unread == True, # noqa - ).update({"is_unread": False}, synchronize_session=False) - _increment_topic_comments_seen(request, comment) - - -def _increment_topic_comments_seen(request: Request, comment: Comment) -> None: - """Increment the number of comments in a topic the user has viewed. - - We want to increment the number of comments they've seen in the thread that the - comment came from, so that they don't *both* get a notification as well as have the - thread highlight with "(1 new)". This should only happen if their last visit was - before the comment was posted, however. Below, this is implemented as a - INSERT ... ON CONFLICT DO UPDATE so that it will insert a new topic visit with - 1 comment if they didn't previously have one at all. - """ - statement = ( - insert(TopicVisit.__table__) - .values( - user_id=request.user.user_id, - topic_id=comment.topic_id, - visit_time=utc_now(), - num_comments=1, - ) - .on_conflict_do_update( - constraint=TopicVisit.__table__.primary_key, - set_={"num_comments": TopicVisit.num_comments + 1}, - where=TopicVisit.visit_time < comment.created_time, - ) - ) - - request.db_session.execute(statement) - mark_changed(request.db_session) + request.query(CommentNotification).filter( + CommentNotification.user == request.user, + CommentNotification.comment == comment, + CommentNotification.is_unread == True, # noqa + ).update({"is_unread": False}, synchronize_session=False) @ic_view_config( @@ -362,31 +326,15 @@ def put_mark_comments_read(request: Request, mark_all_previous: bool) -> Respons notification = request.context if mark_all_previous: - prev_notifications = ( - request.query(CommentNotification) - .filter( - CommentNotification.user == request.user, - CommentNotification.is_unread == True, # noqa - CommentNotification.created_time <= notification.created_time, - ) - .options(joinedload(CommentNotification.comment)) - .all() - ) - - # sort the notifications by created_time of their comment so that the - # INSERT ... ON CONFLICT DO UPDATE statements work as expected - prev_notifications = sorted( - prev_notifications, key=lambda c: c.comment.created_time - ) - - for comment_notification in prev_notifications: - comment_notification.is_unread = False - _increment_topic_comments_seen(request, comment_notification.comment) + request.query(CommentNotification).filter( + CommentNotification.user == request.user, + CommentNotification.is_unread == True, # noqa + CommentNotification.created_time <= notification.created_time, + ).update({"is_unread": False}, synchronize_session=False) return Response("Your comment notifications have been cleared.") notification.is_unread = False - _increment_topic_comments_seen(request, notification.comment) return IC_NOOP diff --git a/tildes/tildes/views/topic.py b/tildes/tildes/views/topic.py index c56f4bc..3ddd0e3 100644 --- a/tildes/tildes/views/topic.py +++ b/tildes/tildes/views/topic.py @@ -20,7 +20,6 @@ from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import any_, desc, text from sqlalchemy_utils import Ltree from webargs.pyramidparser import use_kwargs -from zope.sqlalchemy import mark_changed from tildes.enums import ( CommentLabelOption, @@ -441,10 +440,7 @@ def get_topic(request: Request, comment_order: CommentTreeSortOption) -> dict: tree.collapse_from_labels() if request.user: - # update their last visit time for this topic - statement = TopicVisit.generate_insert_statement(request.user, topic) - request.db_session.execute(statement) - mark_changed(request.db_session) + request.db_session.add(TopicVisit(request.user, topic)) # collapse old comments if the user has a previous visit to the topic # (and doesn't have that behavior disabled)