From bcb5a3e07956052f677afa8ff56cba499f02d4f3 Mon Sep 17 00:00:00 2001 From: Deimos Date: Fri, 10 Jan 2020 20:45:18 -0700 Subject: [PATCH] Replace RabbitMQ uses with Redis streams RabbitMQ was used to support asynchronous/background processing tasks, such as determining word count for text topics and scraping the destinations or relevant APIs for link topics. This commit replaces RabbitMQ's role (as the message broker) with Redis streams. This included building a new "PostgreSQL to Redis bridge" that takes over the previous role of pg-amqp-bridge: listening for NOTIFY messages on a particular PostgreSQL channel and translating them to messages in appropriate Redis streams. One particular change of note is that the names of message "sources" were adjusted a little and standardized. For example, the routing key for a message caused by a new comment was previously "comment.created", but is now "comments.insert". Similarly, "comment.edited" became "comments.update.markdown". The new naming scheme uses the table name, proper name for the SQL operation, and column name instead of the previous unpredictable terms. --- ...ent_user_mentions_generator.service.jinja2 | 6 +- .../site_icon_downloader.service.jinja2 | 6 +- .../topic_embedly_extractor.service.jinja2 | 6 +- ...nteresting_activity_updater.service.jinja2 | 6 +- .../topic_metadata_generator.service.jinja2 | 6 +- .../topic_youtube_scraper.service.jinja2 | 6 +- salt/salt/redis/init.sls | 12 ++ .../postgresql_redis_bridge.service.jinja2 | 16 ++ .../4fb2c786c7a0_add_new_notify_triggers.py | 181 ++++++++++++++++++ .../comment_user_mentions_generator.py | 22 +-- tildes/consumers/site_icon_downloader.py | 19 +- tildes/consumers/topic_embedly_extractor.py | 25 +-- .../topic_interesting_activity_updater.py | 32 ++-- tildes/consumers/topic_metadata_generator.py | 27 +-- tildes/consumers/topic_youtube_scraper.py | 25 +-- tildes/scripts/postgresql_redis_bridge.py | 70 +++++++ tildes/sql/init/functions/event_stream.sql | 9 + .../triggers/comment_labels/event_stream.sql | 25 +++ .../init/triggers/comments/event_stream.sql | 38 ++++ .../triggers/scraper_results/event_stream.sql | 23 +++ .../sql/init/triggers/topics/event_stream.sql | 33 ++++ tildes/tildes/lib/event_stream.py | 132 +++++++++++++ 22 files changed, 634 insertions(+), 91 deletions(-) create mode 100644 salt/salt/redis/postgresql_redis_bridge.service.jinja2 create mode 100644 tildes/alembic/versions/4fb2c786c7a0_add_new_notify_triggers.py create mode 100644 tildes/scripts/postgresql_redis_bridge.py create mode 100644 tildes/sql/init/functions/event_stream.sql create mode 100644 tildes/sql/init/triggers/comment_labels/event_stream.sql create mode 100644 tildes/sql/init/triggers/comments/event_stream.sql create mode 100644 tildes/sql/init/triggers/scraper_results/event_stream.sql create mode 100644 tildes/sql/init/triggers/topics/event_stream.sql create mode 100644 tildes/tildes/lib/event_stream.py diff --git a/salt/salt/consumers/comment_user_mentions_generator.service.jinja2 b/salt/salt/consumers/comment_user_mentions_generator.service.jinja2 index 4b8988e..6623512 100644 --- a/salt/salt/consumers/comment_user_mentions_generator.service.jinja2 +++ b/salt/salt/consumers/comment_user_mentions_generator.service.jinja2 @@ -1,9 +1,9 @@ {% from 'common.jinja2' import app_dir, bin_dir -%} [Unit] Description=Comment User Mention Generator (Queue Consumer) -Requires=rabbitmq-server.service -After=rabbitmq-server.service -PartOf=rabbitmq-server.service +Requires=redis.service +After=redis.service +PartOf=redis.service [Service] WorkingDirectory={{ app_dir }}/consumers diff --git a/salt/salt/consumers/site_icon_downloader.service.jinja2 b/salt/salt/consumers/site_icon_downloader.service.jinja2 index faae373..ee7a98a 100644 --- a/salt/salt/consumers/site_icon_downloader.service.jinja2 +++ b/salt/salt/consumers/site_icon_downloader.service.jinja2 @@ -1,9 +1,9 @@ {% from 'common.jinja2' import app_dir, app_username, bin_dir -%} [Unit] Description=Site Icon Downloader (Queue Consumer) -Requires=rabbitmq-server.service -After=rabbitmq-server.service -PartOf=rabbitmq-server.service +Requires=redis.service +After=redis.service +PartOf=redis.service [Service] User={{ app_username }} diff --git a/salt/salt/consumers/topic_embedly_extractor.service.jinja2 b/salt/salt/consumers/topic_embedly_extractor.service.jinja2 index 47cb0e0..0663337 100644 --- a/salt/salt/consumers/topic_embedly_extractor.service.jinja2 +++ b/salt/salt/consumers/topic_embedly_extractor.service.jinja2 @@ -1,9 +1,9 @@ {% from 'common.jinja2' import app_dir, bin_dir -%} [Unit] Description=Topic Embedly Extractor (Queue Consumer) -Requires=rabbitmq-server.service -After=rabbitmq-server.service -PartOf=rabbitmq-server.service +Requires=redis.service +After=redis.service +PartOf=redis.service [Service] WorkingDirectory={{ app_dir }}/consumers diff --git a/salt/salt/consumers/topic_interesting_activity_updater.service.jinja2 b/salt/salt/consumers/topic_interesting_activity_updater.service.jinja2 index 050ab6c..a19c3a3 100644 --- a/salt/salt/consumers/topic_interesting_activity_updater.service.jinja2 +++ b/salt/salt/consumers/topic_interesting_activity_updater.service.jinja2 @@ -1,9 +1,9 @@ {% from 'common.jinja2' import app_dir, bin_dir -%} [Unit] Description=Topic Interesting Activity Updater (Queue Consumer) -Requires=rabbitmq-server.service -After=rabbitmq-server.service -PartOf=rabbitmq-server.service +Requires=redis.service +After=redis.service +PartOf=redis.service [Service] WorkingDirectory={{ app_dir }}/consumers diff --git a/salt/salt/consumers/topic_metadata_generator.service.jinja2 b/salt/salt/consumers/topic_metadata_generator.service.jinja2 index 1025c03..0545f21 100644 --- a/salt/salt/consumers/topic_metadata_generator.service.jinja2 +++ b/salt/salt/consumers/topic_metadata_generator.service.jinja2 @@ -1,9 +1,9 @@ {% from 'common.jinja2' import app_dir, bin_dir -%} [Unit] Description=Topic Metadata Generator (Queue Consumer) -Requires=rabbitmq-server.service -After=rabbitmq-server.service -PartOf=rabbitmq-server.service +Requires=redis.service +After=redis.service +PartOf=redis.service [Service] WorkingDirectory={{ app_dir }}/consumers diff --git a/salt/salt/consumers/topic_youtube_scraper.service.jinja2 b/salt/salt/consumers/topic_youtube_scraper.service.jinja2 index 5768a36..4fcf5d0 100644 --- a/salt/salt/consumers/topic_youtube_scraper.service.jinja2 +++ b/salt/salt/consumers/topic_youtube_scraper.service.jinja2 @@ -1,9 +1,9 @@ {% from 'common.jinja2' import app_dir, bin_dir -%} [Unit] Description=Topic Youtube Scraper (Queue Consumer) -Requires=rabbitmq-server.service -After=rabbitmq-server.service -PartOf=rabbitmq-server.service +Requires=redis.service +After=redis.service +PartOf=redis.service [Service] WorkingDirectory={{ app_dir }}/consumers diff --git a/salt/salt/redis/init.sls b/salt/salt/redis/init.sls index 8df19b7..9d71192 100644 --- a/salt/salt/redis/init.sls +++ b/salt/salt/redis/init.sls @@ -116,3 +116,15 @@ redis.service: - require: - user: redis-user - cmd: install-redis + +/etc/systemd/system/postgresql_redis_bridge.service: + file.managed: + - source: salt://redis/postgresql_redis_bridge.service.jinja2 + - template: jinja + - user: root + - group: root + - mode: 644 + +postgresql_redis_bridge.service: + service.running: + - enable: True diff --git a/salt/salt/redis/postgresql_redis_bridge.service.jinja2 b/salt/salt/redis/postgresql_redis_bridge.service.jinja2 new file mode 100644 index 0000000..d972f0e --- /dev/null +++ b/salt/salt/redis/postgresql_redis_bridge.service.jinja2 @@ -0,0 +1,16 @@ +{% from 'common.jinja2' import app_dir, bin_dir -%} +[Unit] +Description=postgresql_redis_bridge - convert NOTIFY to Redis streams +Requires=redis.service +After=redis.service +PartOf=redis.service + +[Service] +WorkingDirectory={{ app_dir }}/scripts +Environment="INI_FILE={{ app_dir }}/{{ pillar['ini_file'] }}" +ExecStart={{ bin_dir }}/python postgresql_redis_bridge.py +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target diff --git a/tildes/alembic/versions/4fb2c786c7a0_add_new_notify_triggers.py b/tildes/alembic/versions/4fb2c786c7a0_add_new_notify_triggers.py new file mode 100644 index 0000000..c4a8ce0 --- /dev/null +++ b/tildes/alembic/versions/4fb2c786c7a0_add_new_notify_triggers.py @@ -0,0 +1,181 @@ +"""Add new NOTIFY triggers + +Revision ID: 4fb2c786c7a0 +Revises: f4e1ef359307 +Create Date: 2020-01-19 19:45:32.460821 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "4fb2c786c7a0" +down_revision = "f4e1ef359307" +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute( + """ + create or replace function add_to_event_stream(stream_name_pieces text[], fields text[]) returns void as $$ + select pg_notify( + 'postgresql_events', + array_to_string(stream_name_pieces, '.') || ':' || json_object(fields) + ); + $$ language sql; + """ + ) + + # comments + op.execute( + """ + create or replace function comments_events_trigger() returns trigger as $$ + declare + affected_row record := coalesce(NEW, OLD); + stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV; + + -- in general, only the below declaration of payload_fields should be edited + payload_fields text[] := array[ + 'comment_id', affected_row.comment_id + ]::text[]; + begin + perform add_to_event_stream(stream_name_pieces, payload_fields); + + return null; + end; + $$ language plpgsql; + + create trigger comments_events_insert_delete + after insert or delete on comments + for each row + execute function comments_events_trigger(); + + create trigger comments_events_update_markdown + after update of markdown on comments + for each row + execute function comments_events_trigger('markdown'); + + create trigger comments_events_update_is_deleted + after update of is_deleted on comments + for each row + execute function comments_events_trigger('is_deleted'); + + create trigger comments_events_update_is_removed + after update of is_removed on comments + for each row + execute function comments_events_trigger('is_removed'); + """ + ) + + # topics + op.execute( + """ + create or replace function topics_events_trigger() returns trigger as $$ + declare + affected_row record := coalesce(NEW, OLD); + stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV; + + -- in general, only the below declaration of payload_fields should be edited + payload_fields text[] := array[ + 'topic_id', affected_row.topic_id + ]::text[]; + begin + perform add_to_event_stream(stream_name_pieces, payload_fields); + + return null; + end; + $$ language plpgsql; + + create trigger topics_events_insert_delete + after insert or delete on topics + for each row + execute function topics_events_trigger(); + + create trigger topics_events_update_markdown + after update of markdown on topics + for each row + execute function topics_events_trigger('markdown'); + + create trigger topics_events_update_link + after update of link on topics + for each row + execute function topics_events_trigger('link'); + """ + ) + + # comment_labels + op.execute( + """ + create or replace function comment_labels_events_trigger() returns trigger as $$ + declare + affected_row record := coalesce(NEW, OLD); + stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV; + + -- in general, only the below declaration of payload_fields should be edited + payload_fields text[] := array[ + 'comment_id', affected_row.comment_id, + 'user_id', affected_row.user_id, + 'label', affected_row.label + ]::text[]; + begin + perform add_to_event_stream(stream_name_pieces, payload_fields); + + return null; + end; + $$ language plpgsql; + + create trigger comment_labels_events_insert_delete + after insert or delete on comment_labels + for each row + execute function comment_labels_events_trigger(); + """ + ) + + # scraper_results + op.execute( + """ + create or replace function scraper_results_events_trigger() returns trigger as $$ + declare + affected_row record := coalesce(NEW, OLD); + stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV; + + -- in general, only the below declaration of payload_fields should be edited + payload_fields text[] := array[ + 'result_id', affected_row.result_id + ]::text[]; + begin + perform add_to_event_stream(stream_name_pieces, payload_fields); + + return null; + end; + $$ language plpgsql; + + create trigger scraper_results_events_insert_delete + after insert or delete on scraper_results + for each row + execute function scraper_results_events_trigger(); + """ + ) + + +def downgrade(): + op.execute("drop trigger scraper_results_events_insert_delete on scraper_results") + op.execute("drop function scraper_results_events_trigger") + + op.execute("drop trigger comment_labels_events_insert_delete on comment_labels") + op.execute("drop function comment_labels_events_trigger") + + op.execute("drop trigger topics_events_update_link on topics") + op.execute("drop trigger topics_events_update_markdown on topics") + op.execute("drop trigger topics_events_insert_delete on topics") + op.execute("drop function topics_events_trigger") + + op.execute("drop trigger comments_events_update_is_removed on comments") + op.execute("drop trigger comments_events_update_is_deleted on comments") + op.execute("drop trigger comments_events_update_markdown on comments") + op.execute("drop trigger comments_events_insert_delete on comments") + op.execute("drop function comments_events_trigger") + + op.execute("drop function add_to_event_stream") diff --git a/tildes/consumers/comment_user_mentions_generator.py b/tildes/consumers/comment_user_mentions_generator.py index c96de76..d926bce 100644 --- a/tildes/consumers/comment_user_mentions_generator.py +++ b/tildes/consumers/comment_user_mentions_generator.py @@ -3,20 +3,18 @@ """Consumer that generates user mentions for comments.""" -from amqpy import Message - -from tildes.lib.amqp import PgsqlQueueConsumer +from tildes.lib.event_stream import EventStreamConsumer, Message from tildes.models.comment import Comment, CommentNotification -class CommentUserMentionGenerator(PgsqlQueueConsumer): +class CommentUserMentionGenerator(EventStreamConsumer): """Consumer that generates user mentions for comments.""" - def run(self, msg: Message) -> None: - """Process a delivered message.""" + def process_message(self, message: Message) -> None: + """Process a message from the stream.""" comment = ( self.db_session.query(Comment) - .filter_by(comment_id=msg.body["comment_id"]) + .filter_by(comment_id=message.fields["comment_id"]) .one() ) @@ -28,10 +26,10 @@ class CommentUserMentionGenerator(PgsqlQueueConsumer): self.db_session, comment ) - if msg.delivery_info["routing_key"] == "comment.created": + if message.stream == "comments.insert": for user_mention in new_mentions: self.db_session.add(user_mention) - elif msg.delivery_info["routing_key"] == "comment.edited": + elif message.stream == "comments.update.markdown": to_delete, to_add = CommentNotification.prevent_duplicate_notifications( self.db_session, comment, new_mentions ) @@ -45,6 +43,6 @@ class CommentUserMentionGenerator(PgsqlQueueConsumer): if __name__ == "__main__": CommentUserMentionGenerator( - queue_name="comment_user_mentions_generator.q", - routing_keys=["comment.created", "comment.edited"], - ).consume_queue() + "comment_user_mentions_generator", + source_streams=["comments.insert", "comments.update.markdown"], + ).consume_streams() diff --git a/tildes/consumers/site_icon_downloader.py b/tildes/consumers/site_icon_downloader.py index 2c177cb..7e29174 100644 --- a/tildes/consumers/site_icon_downloader.py +++ b/tildes/consumers/site_icon_downloader.py @@ -9,33 +9,32 @@ from typing import Optional, Sequence import publicsuffix import requests -from amqpy import Message from PIL import Image from tildes.enums import ScraperType -from tildes.lib.amqp import PgsqlQueueConsumer +from tildes.lib.event_stream import EventStreamConsumer, Message from tildes.lib.url import get_domain_from_url from tildes.models.scraper import ScraperResult -class SiteIconDownloader(PgsqlQueueConsumer): +class SiteIconDownloader(EventStreamConsumer): """Consumer that generates content_metadata for topics.""" ICON_FOLDER = "/opt/tildes/static/images/site-icons" - def __init__(self, queue_name: str, routing_keys: Sequence[str]): + def __init__(self, consumer_group: str, source_streams: Sequence[str]): """Initialize the consumer, including the public suffix list.""" - super().__init__(queue_name, routing_keys) + super().__init__(consumer_group, source_streams) # download the public suffix list (would be good to add caching here) psl_file = publicsuffix.fetch() self.public_suffix_list = publicsuffix.PublicSuffixList(psl_file) - def run(self, msg: Message) -> None: - """Process a delivered message.""" + def process_message(self, message: Message) -> None: + """Process a message from the stream.""" result = ( self.db_session.query(ScraperResult) - .filter_by(result_id=msg.body["result_id"]) + .filter_by(result_id=message.fields["result_id"]) .one() ) @@ -97,5 +96,5 @@ class SiteIconDownloader(PgsqlQueueConsumer): if __name__ == "__main__": SiteIconDownloader( - queue_name="site_icon_downloader.q", routing_keys=["scraper_result.created"] - ).consume_queue() + "site_icon_downloader", source_streams=["scraper_results.insert"] + ).consume_streams() diff --git a/tildes/consumers/topic_embedly_extractor.py b/tildes/consumers/topic_embedly_extractor.py index ad11e1f..b6912f7 100644 --- a/tildes/consumers/topic_embedly_extractor.py +++ b/tildes/consumers/topic_embedly_extractor.py @@ -7,15 +7,14 @@ import os from datetime import timedelta from typing import Sequence -from amqpy import Message from pyramid.paster import get_appsettings from requests.exceptions import HTTPError, Timeout from sqlalchemy import cast, desc, func from sqlalchemy.dialects.postgresql import JSONB from tildes.enums import ScraperType -from tildes.lib.amqp import PgsqlQueueConsumer from tildes.lib.datetime import utc_now +from tildes.lib.event_stream import EventStreamConsumer, Message from tildes.models.scraper import ScraperResult from tildes.models.topic import Topic from tildes.scrapers import EmbedlyScraper @@ -25,19 +24,23 @@ from tildes.scrapers import EmbedlyScraper RESCRAPE_DELAY = timedelta(hours=24) -class TopicEmbedlyExtractor(PgsqlQueueConsumer): +class TopicEmbedlyExtractor(EventStreamConsumer): """Consumer that fetches data from Embedly's Extract API for link topics.""" - def __init__(self, api_key: str, queue_name: str, routing_keys: Sequence[str]): + def __init__( + self, api_key: str, consumer_group: str, source_streams: Sequence[str] + ): """Initialize the consumer, including creating a scraper instance.""" - super().__init__(queue_name, routing_keys) + super().__init__(consumer_group, source_streams) self.scraper = EmbedlyScraper(api_key) - def run(self, msg: Message) -> None: - """Process a delivered message.""" + def process_message(self, message: Message) -> None: + """Process a message from the stream.""" topic = ( - self.db_session.query(Topic).filter_by(topic_id=msg.body["topic_id"]).one() + self.db_session.query(Topic) + .filter_by(topic_id=message.fields["topic_id"]) + .one() ) if not topic.is_link_type: @@ -95,6 +98,6 @@ if __name__ == "__main__": TopicEmbedlyExtractor( embedly_api_key, - queue_name="topic_embedly_extractor.q", - routing_keys=["topic.created", "topic.link_edited"], - ).consume_queue() + "topic_embedly_extractor", + source_streams=["topics.insert", "topics.update.link"], + ).consume_streams() diff --git a/tildes/consumers/topic_interesting_activity_updater.py b/tildes/consumers/topic_interesting_activity_updater.py index 5e900ec..b497a0d 100644 --- a/tildes/consumers/topic_interesting_activity_updater.py +++ b/tildes/consumers/topic_interesting_activity_updater.py @@ -6,21 +6,19 @@ from datetime import datetime from typing import Optional -from amqpy import Message - from tildes.enums import CommentTreeSortOption -from tildes.lib.amqp import PgsqlQueueConsumer +from tildes.lib.event_stream import EventStreamConsumer, Message from tildes.models.comment import Comment, CommentInTree, CommentTree -class TopicInterestingActivityUpdater(PgsqlQueueConsumer): +class TopicInterestingActivityUpdater(EventStreamConsumer): """Consumer that updates topics' last_interesting_activity_time.""" - def run(self, msg: Message) -> None: - """Process a delivered message.""" + def process_message(self, message: Message) -> None: + """Process a message from the stream.""" trigger_comment = ( self.db_session.query(Comment) - .filter_by(comment_id=msg.body["comment_id"]) + .filter_by(comment_id=message.fields["comment_id"]) .one() ) @@ -82,15 +80,13 @@ class TopicInterestingActivityUpdater(PgsqlQueueConsumer): if __name__ == "__main__": TopicInterestingActivityUpdater( - queue_name="topic_interesting_activity_updater.q", - routing_keys=[ - "comment.created", - "comment.deleted", - "comment.edited", - "comment.removed", - "comment.undeleted", - "comment.unremoved", - "comment_label.created", - "comment_label.deleted", + "topic_interesting_activity_updater", + source_streams=[ + "comments.insert", + "comments.update.is_deleted", + "comments.update.markdown", + "comments.update.is_removed", + "comment_labels.insert", + "comment_labels.delete", ], - ).consume_queue() + ).consume_streams() diff --git a/tildes/consumers/topic_metadata_generator.py b/tildes/consumers/topic_metadata_generator.py index e973085..e63c89f 100644 --- a/tildes/consumers/topic_metadata_generator.py +++ b/tildes/consumers/topic_metadata_generator.py @@ -6,31 +6,32 @@ from typing import Any, Dict, Sequence import publicsuffix -from amqpy import Message from sqlalchemy import cast, func from sqlalchemy.dialects.postgresql import JSONB -from tildes.lib.amqp import PgsqlQueueConsumer +from tildes.lib.event_stream import EventStreamConsumer, Message from tildes.lib.string import extract_text_from_html, truncate_string, word_count from tildes.lib.url import get_domain_from_url from tildes.models.topic import Topic -class TopicMetadataGenerator(PgsqlQueueConsumer): +class TopicMetadataGenerator(EventStreamConsumer): """Consumer that generates content_metadata for topics.""" - def __init__(self, queue_name: str, routing_keys: Sequence[str]): + def __init__(self, consumer_group: str, source_streams: Sequence[str]): """Initialize the consumer, including the public suffix list.""" - super().__init__(queue_name, routing_keys) + super().__init__(consumer_group, source_streams) # download the public suffix list (would be good to add caching here) psl_file = publicsuffix.fetch() self.public_suffix_list = publicsuffix.PublicSuffixList(psl_file) - def run(self, msg: Message) -> None: - """Process a delivered message.""" + def process_message(self, message: Message) -> None: + """Process a message from the stream.""" topic = ( - self.db_session.query(Topic).filter_by(topic_id=msg.body["topic_id"]).one() + self.db_session.query(Topic) + .filter_by(topic_id=message.fields["topic_id"]) + .one() ) if topic.is_deleted: @@ -76,6 +77,10 @@ class TopicMetadataGenerator(PgsqlQueueConsumer): if __name__ == "__main__": TopicMetadataGenerator( - queue_name="topic_metadata_generator.q", - routing_keys=["topic.created", "topic.edited", "topic.link_edited"], - ).consume_queue() + "topic_metadata_generator", + source_streams=[ + "topics.insert", + "topics.update.markdown", + "topics.update.link", + ], + ).consume_streams() diff --git a/tildes/consumers/topic_youtube_scraper.py b/tildes/consumers/topic_youtube_scraper.py index 4ebb02e..cb2ad05 100644 --- a/tildes/consumers/topic_youtube_scraper.py +++ b/tildes/consumers/topic_youtube_scraper.py @@ -7,15 +7,14 @@ import os from datetime import timedelta from typing import Sequence -from amqpy import Message from pyramid.paster import get_appsettings from requests.exceptions import HTTPError, Timeout from sqlalchemy import cast, desc, func from sqlalchemy.dialects.postgresql import JSONB from tildes.enums import ScraperType -from tildes.lib.amqp import PgsqlQueueConsumer from tildes.lib.datetime import utc_now +from tildes.lib.event_stream import EventStreamConsumer, Message from tildes.models.scraper import ScraperResult from tildes.models.topic import Topic from tildes.scrapers import ScraperError, YoutubeScraper @@ -25,19 +24,23 @@ from tildes.scrapers import ScraperError, YoutubeScraper RESCRAPE_DELAY = timedelta(hours=24) -class TopicYoutubeScraper(PgsqlQueueConsumer): +class TopicYoutubeScraper(EventStreamConsumer): """Consumer that fetches data from YouTube's data API for relevant link topics.""" - def __init__(self, api_key: str, queue_name: str, routing_keys: Sequence[str]): + def __init__( + self, api_key: str, consumer_group: str, source_streams: Sequence[str] + ): """Initialize the consumer, including creating a scraper instance.""" - super().__init__(queue_name, routing_keys) + super().__init__(consumer_group, source_streams) self.scraper = YoutubeScraper(api_key) - def run(self, msg: Message) -> None: - """Process a delivered message.""" + def process_message(self, message: Message) -> None: + """Process a message from the stream.""" topic = ( - self.db_session.query(Topic).filter_by(topic_id=msg.body["topic_id"]).one() + self.db_session.query(Topic) + .filter_by(topic_id=message.fields["topic_id"]) + .one() ) if not topic.is_link_type: @@ -95,6 +98,6 @@ if __name__ == "__main__": TopicYoutubeScraper( youtube_api_key, - queue_name="topic_youtube_scraper.q", - routing_keys=["topic.created", "topic.link_edited"], - ).consume_queue() + consumer_group="topic_youtube_scraper", + source_streams=["topics.insert", "topics.update.link"], + ).consume_streams() diff --git a/tildes/scripts/postgresql_redis_bridge.py b/tildes/scripts/postgresql_redis_bridge.py new file mode 100644 index 0000000..fe2f823 --- /dev/null +++ b/tildes/scripts/postgresql_redis_bridge.py @@ -0,0 +1,70 @@ +# Copyright (c) 2020 Tildes contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +"""Script that converts NOTIFY events on a PostgreSQL channel to Redis stream entries. + +Should be kept running at all times as a service. +""" + +import json +import os +from configparser import ConfigParser +from select import select + +from redis import Redis +from sqlalchemy.engine.url import make_url +import psycopg2 + +from tildes.lib.event_stream import REDIS_KEY_PREFIX + + +NOTIFY_CHANNEL = "postgresql_events" + + +def postgresql_redis_bridge(config_path: str) -> None: + """Listen for NOTIFY events and add them to Redis streams.""" + config = ConfigParser() + config.read(config_path) + + redis = Redis(unix_socket_path=config.get("app:main", "redis.unix_socket_path")) + + postgresql_url = make_url(config.get("app:main", "sqlalchemy.url")) + postgresql = psycopg2.connect( + user=postgresql_url.username, dbname=postgresql_url.database + ) + postgresql.autocommit = True + + with postgresql.cursor() as cursor: + cursor.execute(f"listen {NOTIFY_CHANNEL}") + + while True: + # block until a NOTIFY comes through on the channel + select([postgresql], [], []) + + # fetch any notifications without needing to execute a query + postgresql.poll() + + # add each NOTIFY to the specified stream(s), using a Redis pipeline to avoid + # round trips when there are multiple sent by the same PostgreSQL transaction + with redis.pipeline(transaction=False) as pipe: + while postgresql.notifies: + notify = postgresql.notifies.pop(0) + + # the payload format should be ":" + try: + stream_name, fields_json = notify.payload.split(":", maxsplit=1) + except ValueError: + continue + + try: + fields = json.loads(fields_json) + except json.decoder.JSONDecodeError: + continue + + pipe.xadd(f"{REDIS_KEY_PREFIX}{stream_name}", fields) + + pipe.execute() + + +if __name__ == "__main__": + postgresql_redis_bridge(os.environ["INI_FILE"]) diff --git a/tildes/sql/init/functions/event_stream.sql b/tildes/sql/init/functions/event_stream.sql new file mode 100644 index 0000000..d025db8 --- /dev/null +++ b/tildes/sql/init/functions/event_stream.sql @@ -0,0 +1,9 @@ +-- Copyright (c) 2020 Tildes contributors +-- SPDX-License-Identifier: AGPL-3.0-or-later + +create or replace function add_to_event_stream(stream_name_pieces text[], fields text[]) returns void as $$ + select pg_notify( + 'postgresql_events', + array_to_string(stream_name_pieces, '.') || ':' || json_object(fields) + ); +$$ language sql; diff --git a/tildes/sql/init/triggers/comment_labels/event_stream.sql b/tildes/sql/init/triggers/comment_labels/event_stream.sql new file mode 100644 index 0000000..ceb5738 --- /dev/null +++ b/tildes/sql/init/triggers/comment_labels/event_stream.sql @@ -0,0 +1,25 @@ +-- Copyright (c) 2020 Tildes contributors +-- SPDX-License-Identifier: AGPL-3.0-or-later + +create or replace function comment_labels_events_trigger() returns trigger as $$ +declare + affected_row record := coalesce(NEW, OLD); + stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV; + + -- in general, only the below declaration of payload_fields should be edited + payload_fields text[] := array[ + 'comment_id', affected_row.comment_id, + 'user_id', affected_row.user_id, + 'label', affected_row.label + ]::text[]; +begin + perform add_to_event_stream(stream_name_pieces, payload_fields); + + return null; +end; +$$ language plpgsql; + +create trigger comment_labels_events_insert_delete + after insert or delete on comment_labels + for each row + execute function comment_labels_events_trigger(); diff --git a/tildes/sql/init/triggers/comments/event_stream.sql b/tildes/sql/init/triggers/comments/event_stream.sql new file mode 100644 index 0000000..39611d1 --- /dev/null +++ b/tildes/sql/init/triggers/comments/event_stream.sql @@ -0,0 +1,38 @@ +-- Copyright (c) 2020 Tildes contributors +-- SPDX-License-Identifier: AGPL-3.0-or-later + +create or replace function comments_events_trigger() returns trigger as $$ +declare + affected_row record := coalesce(NEW, OLD); + stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV; + + -- in general, only the below declaration of payload_fields should be edited + payload_fields text[] := array[ + 'comment_id', affected_row.comment_id + ]::text[]; +begin + perform add_to_event_stream(stream_name_pieces, payload_fields); + + return null; +end; +$$ language plpgsql; + +create trigger comments_events_insert_delete + after insert or delete on comments + for each row + execute function comments_events_trigger(); + +create trigger comments_events_update_markdown + after update of markdown on comments + for each row + execute function comments_events_trigger('markdown'); + +create trigger comments_events_update_is_deleted + after update of is_deleted on comments + for each row + execute function comments_events_trigger('is_deleted'); + +create trigger comments_events_update_is_removed + after update of is_removed on comments + for each row + execute function comments_events_trigger('is_removed'); diff --git a/tildes/sql/init/triggers/scraper_results/event_stream.sql b/tildes/sql/init/triggers/scraper_results/event_stream.sql new file mode 100644 index 0000000..3dde324 --- /dev/null +++ b/tildes/sql/init/triggers/scraper_results/event_stream.sql @@ -0,0 +1,23 @@ +-- Copyright (c) 2020 Tildes contributors +-- SPDX-License-Identifier: AGPL-3.0-or-later + +create or replace function scraper_results_events_trigger() returns trigger as $$ +declare + affected_row record := coalesce(NEW, OLD); + stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV; + + -- in general, only the below declaration of payload_fields should be edited + payload_fields text[] := array[ + 'result_id', affected_row.result_id + ]::text[]; +begin + perform add_to_event_stream(stream_name_pieces, payload_fields); + + return null; +end; +$$ language plpgsql; + +create trigger scraper_results_events_insert_delete + after insert or delete on scraper_results + for each row + execute function scraper_results_events_trigger(); diff --git a/tildes/sql/init/triggers/topics/event_stream.sql b/tildes/sql/init/triggers/topics/event_stream.sql new file mode 100644 index 0000000..3700787 --- /dev/null +++ b/tildes/sql/init/triggers/topics/event_stream.sql @@ -0,0 +1,33 @@ +-- Copyright (c) 2020 Tildes contributors +-- SPDX-License-Identifier: AGPL-3.0-or-later + +create or replace function topics_events_trigger() returns trigger as $$ +declare + affected_row record := coalesce(NEW, OLD); + stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV; + + -- in general, only the below declaration of payload_fields should be edited + payload_fields text[] := array[ + 'topic_id', affected_row.topic_id + ]::text[]; +begin + perform add_to_event_stream(stream_name_pieces, payload_fields); + + return null; +end; +$$ language plpgsql; + +create trigger topics_events_insert_delete + after insert or delete on topics + for each row + execute function topics_events_trigger(); + +create trigger topics_events_update_markdown + after update of markdown on topics + for each row + execute function topics_events_trigger('markdown'); + +create trigger topics_events_update_link + after update of link on topics + for each row + execute function topics_events_trigger('link'); diff --git a/tildes/tildes/lib/event_stream.py b/tildes/tildes/lib/event_stream.py new file mode 100644 index 0000000..8375841 --- /dev/null +++ b/tildes/tildes/lib/event_stream.py @@ -0,0 +1,132 @@ +# Copyright (c) 2020 Tildes contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +"""Contains classes related to handling the Redis-based event streams.""" + +import os +from abc import abstractmethod +from configparser import ConfigParser +from typing import Any, Dict, List, Sequence + +from redis import Redis, ResponseError + +from tildes.lib.database import get_session_from_config + +REDIS_KEY_PREFIX = "event_stream:" + + +class Message: + """Represents a single message taken from a stream.""" + + def __init__( + self, redis: Redis, stream: str, message_id: str, fields: Dict[str, str] + ): + """Initialize a new message from a Redis stream.""" + self.redis = redis + self.stream = stream + self.message_id = message_id + self.fields = fields + + def ack(self, consumer_group: str) -> None: + """Acknowledge the message, removing it from the pending entries list.""" + self.redis.xack( + f"{REDIS_KEY_PREFIX}{self.stream}", consumer_group, self.message_id + ) + + +class EventStreamConsumer: + """Base class for consumers of events retrieved from a stream in Redis. + + This class is intended to be used in a completely "stand-alone" manner, such as + inside a script being run separately as a background job. As such, it also includes + connecting to Redis, creating the consumer group and the relevant streams, and + (optionally) connecting to the database to be able to fetch and modify data as + necessary. It relies on the environment variable INI_FILE being set. + """ + + def __init__( + self, consumer_group: str, source_streams: Sequence[str], uses_db: bool = True, + ): + """Initialize a new consumer, creating consumer groups and streams if needed.""" + ini_file_path = os.environ["INI_FILE"] + config = ConfigParser() + config.read(ini_file_path) + + self.redis = Redis( + unix_socket_path=config.get("app:main", "redis.unix_socket_path") + ) + self.consumer_group = consumer_group + self.source_streams = [ + f"{REDIS_KEY_PREFIX}{stream}" for stream in source_streams + ] + + # hardcoded for now, will need to change for multiple consumers in same group + self.name = f"{consumer_group}-1" + + # create all the consumer groups and streams (if necessary) + for stream in self.source_streams: + try: + self.redis.xgroup_create(stream, consumer_group, mkstream=True) + except ResponseError as error: + # if the consumer group already exists, a BUSYGROUP error will be + # returned, so we want to ignore that one but raise anything else + if not str(error).startswith("BUSYGROUP"): + raise + + if uses_db: + self.db_session = get_session_from_config(ini_file_path) + else: + self.db_session = None + + def consume_streams(self) -> None: + """Process messages from the streams indefinitely.""" + while True: + # Get any messages from the source streams that haven't already been + # delivered to a consumer in this group - will fetch a maximum of one + # message from each stream, and block indefinitely if none are available + response = self.redis.xreadgroup( + self.consumer_group, + self.name, + {stream: ">" for stream in self.source_streams}, + count=1, + block=0, + ) + + messages = self._xreadgroup_response_to_messages(response) + + for message in messages: + self.process_message(message) + + # after processing finishes, commit the transaction and ack the message + if self.db_session: + self.db_session.commit() + + message.ack(self.consumer_group) + + def _xreadgroup_response_to_messages(self, response: Any) -> List[Message]: + """Convert a response from XREADGROUP to a list of Messages.""" + messages = [] + + # responses come back in an ugly format, a list of (one for each stream): + # [b'', [(b'', {})]] + for stream_response in response: + stream_name = stream_response[0].decode("utf-8") + + for entry in stream_response[1]: + message = Message( + self.redis, + stream_name[len(REDIS_KEY_PREFIX) :], + message_id=entry[0].decode("utf-8"), + fields={ + key.decode("utf-8"): value.decode("utf-8") + for key, value in entry[1].items() + }, + ) + messages.append(message) + + return messages + + @abstractmethod + def process_message(self, message: Message) -> None: + """Process a message from the stream (subclasses must implement).""" + pass