Browse Source

Replace RabbitMQ uses with Redis streams

RabbitMQ was used to support asynchronous/background processing tasks,
such as determining word count for text topics and scraping the
destinations or relevant APIs for link topics. This commit replaces
RabbitMQ's role (as the message broker) with Redis streams.

This included building a new "PostgreSQL to Redis bridge" that takes
over the previous role of pg-amqp-bridge: listening for NOTIFY messages
on a particular PostgreSQL channel and translating them to messages in
appropriate Redis streams.

One particular change of note is that the names of message "sources"
were adjusted a little and standardized. For example, the routing key
for a message caused by a new comment was previously "comment.created",
but is now "comments.insert". Similarly, "comment.edited" became
"comments.update.markdown". The new naming scheme uses the table name,
proper name for the SQL operation, and column name instead of the
previous unpredictable terms.
merge-requests/88/merge
Deimos 5 years ago
parent
commit
bcb5a3e079
  1. 6
      salt/salt/consumers/comment_user_mentions_generator.service.jinja2
  2. 6
      salt/salt/consumers/site_icon_downloader.service.jinja2
  3. 6
      salt/salt/consumers/topic_embedly_extractor.service.jinja2
  4. 6
      salt/salt/consumers/topic_interesting_activity_updater.service.jinja2
  5. 6
      salt/salt/consumers/topic_metadata_generator.service.jinja2
  6. 6
      salt/salt/consumers/topic_youtube_scraper.service.jinja2
  7. 12
      salt/salt/redis/init.sls
  8. 16
      salt/salt/redis/postgresql_redis_bridge.service.jinja2
  9. 181
      tildes/alembic/versions/4fb2c786c7a0_add_new_notify_triggers.py
  10. 22
      tildes/consumers/comment_user_mentions_generator.py
  11. 19
      tildes/consumers/site_icon_downloader.py
  12. 25
      tildes/consumers/topic_embedly_extractor.py
  13. 32
      tildes/consumers/topic_interesting_activity_updater.py
  14. 27
      tildes/consumers/topic_metadata_generator.py
  15. 25
      tildes/consumers/topic_youtube_scraper.py
  16. 70
      tildes/scripts/postgresql_redis_bridge.py
  17. 9
      tildes/sql/init/functions/event_stream.sql
  18. 25
      tildes/sql/init/triggers/comment_labels/event_stream.sql
  19. 38
      tildes/sql/init/triggers/comments/event_stream.sql
  20. 23
      tildes/sql/init/triggers/scraper_results/event_stream.sql
  21. 33
      tildes/sql/init/triggers/topics/event_stream.sql
  22. 132
      tildes/tildes/lib/event_stream.py

6
salt/salt/consumers/comment_user_mentions_generator.service.jinja2

@ -1,9 +1,9 @@
{% from 'common.jinja2' import app_dir, bin_dir -%}
[Unit]
Description=Comment User Mention Generator (Queue Consumer)
Requires=rabbitmq-server.service
After=rabbitmq-server.service
PartOf=rabbitmq-server.service
Requires=redis.service
After=redis.service
PartOf=redis.service
[Service]
WorkingDirectory={{ app_dir }}/consumers

6
salt/salt/consumers/site_icon_downloader.service.jinja2

@ -1,9 +1,9 @@
{% from 'common.jinja2' import app_dir, app_username, bin_dir -%}
[Unit]
Description=Site Icon Downloader (Queue Consumer)
Requires=rabbitmq-server.service
After=rabbitmq-server.service
PartOf=rabbitmq-server.service
Requires=redis.service
After=redis.service
PartOf=redis.service
[Service]
User={{ app_username }}

6
salt/salt/consumers/topic_embedly_extractor.service.jinja2

@ -1,9 +1,9 @@
{% from 'common.jinja2' import app_dir, bin_dir -%}
[Unit]
Description=Topic Embedly Extractor (Queue Consumer)
Requires=rabbitmq-server.service
After=rabbitmq-server.service
PartOf=rabbitmq-server.service
Requires=redis.service
After=redis.service
PartOf=redis.service
[Service]
WorkingDirectory={{ app_dir }}/consumers

6
salt/salt/consumers/topic_interesting_activity_updater.service.jinja2

@ -1,9 +1,9 @@
{% from 'common.jinja2' import app_dir, bin_dir -%}
[Unit]
Description=Topic Interesting Activity Updater (Queue Consumer)
Requires=rabbitmq-server.service
After=rabbitmq-server.service
PartOf=rabbitmq-server.service
Requires=redis.service
After=redis.service
PartOf=redis.service
[Service]
WorkingDirectory={{ app_dir }}/consumers

6
salt/salt/consumers/topic_metadata_generator.service.jinja2

@ -1,9 +1,9 @@
{% from 'common.jinja2' import app_dir, bin_dir -%}
[Unit]
Description=Topic Metadata Generator (Queue Consumer)
Requires=rabbitmq-server.service
After=rabbitmq-server.service
PartOf=rabbitmq-server.service
Requires=redis.service
After=redis.service
PartOf=redis.service
[Service]
WorkingDirectory={{ app_dir }}/consumers

6
salt/salt/consumers/topic_youtube_scraper.service.jinja2

@ -1,9 +1,9 @@
{% from 'common.jinja2' import app_dir, bin_dir -%}
[Unit]
Description=Topic Youtube Scraper (Queue Consumer)
Requires=rabbitmq-server.service
After=rabbitmq-server.service
PartOf=rabbitmq-server.service
Requires=redis.service
After=redis.service
PartOf=redis.service
[Service]
WorkingDirectory={{ app_dir }}/consumers

12
salt/salt/redis/init.sls

@ -116,3 +116,15 @@ redis.service:
- require:
- user: redis-user
- cmd: install-redis
/etc/systemd/system/postgresql_redis_bridge.service:
file.managed:
- source: salt://redis/postgresql_redis_bridge.service.jinja2
- template: jinja
- user: root
- group: root
- mode: 644
postgresql_redis_bridge.service:
service.running:
- enable: True

16
salt/salt/redis/postgresql_redis_bridge.service.jinja2

@ -0,0 +1,16 @@
{% from 'common.jinja2' import app_dir, bin_dir -%}
[Unit]
Description=postgresql_redis_bridge - convert NOTIFY to Redis streams
Requires=redis.service
After=redis.service
PartOf=redis.service
[Service]
WorkingDirectory={{ app_dir }}/scripts
Environment="INI_FILE={{ app_dir }}/{{ pillar['ini_file'] }}"
ExecStart={{ bin_dir }}/python postgresql_redis_bridge.py
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target

181
tildes/alembic/versions/4fb2c786c7a0_add_new_notify_triggers.py

@ -0,0 +1,181 @@
"""Add new NOTIFY triggers
Revision ID: 4fb2c786c7a0
Revises: f4e1ef359307
Create Date: 2020-01-19 19:45:32.460821
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "4fb2c786c7a0"
down_revision = "f4e1ef359307"
branch_labels = None
depends_on = None
def upgrade():
op.execute(
"""
create or replace function add_to_event_stream(stream_name_pieces text[], fields text[]) returns void as $$
select pg_notify(
'postgresql_events',
array_to_string(stream_name_pieces, '.') || ':' || json_object(fields)
);
$$ language sql;
"""
)
# comments
op.execute(
"""
create or replace function comments_events_trigger() returns trigger as $$
declare
affected_row record := coalesce(NEW, OLD);
stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV;
-- in general, only the below declaration of payload_fields should be edited
payload_fields text[] := array[
'comment_id', affected_row.comment_id
]::text[];
begin
perform add_to_event_stream(stream_name_pieces, payload_fields);
return null;
end;
$$ language plpgsql;
create trigger comments_events_insert_delete
after insert or delete on comments
for each row
execute function comments_events_trigger();
create trigger comments_events_update_markdown
after update of markdown on comments
for each row
execute function comments_events_trigger('markdown');
create trigger comments_events_update_is_deleted
after update of is_deleted on comments
for each row
execute function comments_events_trigger('is_deleted');
create trigger comments_events_update_is_removed
after update of is_removed on comments
for each row
execute function comments_events_trigger('is_removed');
"""
)
# topics
op.execute(
"""
create or replace function topics_events_trigger() returns trigger as $$
declare
affected_row record := coalesce(NEW, OLD);
stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV;
-- in general, only the below declaration of payload_fields should be edited
payload_fields text[] := array[
'topic_id', affected_row.topic_id
]::text[];
begin
perform add_to_event_stream(stream_name_pieces, payload_fields);
return null;
end;
$$ language plpgsql;
create trigger topics_events_insert_delete
after insert or delete on topics
for each row
execute function topics_events_trigger();
create trigger topics_events_update_markdown
after update of markdown on topics
for each row
execute function topics_events_trigger('markdown');
create trigger topics_events_update_link
after update of link on topics
for each row
execute function topics_events_trigger('link');
"""
)
# comment_labels
op.execute(
"""
create or replace function comment_labels_events_trigger() returns trigger as $$
declare
affected_row record := coalesce(NEW, OLD);
stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV;
-- in general, only the below declaration of payload_fields should be edited
payload_fields text[] := array[
'comment_id', affected_row.comment_id,
'user_id', affected_row.user_id,
'label', affected_row.label
]::text[];
begin
perform add_to_event_stream(stream_name_pieces, payload_fields);
return null;
end;
$$ language plpgsql;
create trigger comment_labels_events_insert_delete
after insert or delete on comment_labels
for each row
execute function comment_labels_events_trigger();
"""
)
# scraper_results
op.execute(
"""
create or replace function scraper_results_events_trigger() returns trigger as $$
declare
affected_row record := coalesce(NEW, OLD);
stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV;
-- in general, only the below declaration of payload_fields should be edited
payload_fields text[] := array[
'result_id', affected_row.result_id
]::text[];
begin
perform add_to_event_stream(stream_name_pieces, payload_fields);
return null;
end;
$$ language plpgsql;
create trigger scraper_results_events_insert_delete
after insert or delete on scraper_results
for each row
execute function scraper_results_events_trigger();
"""
)
def downgrade():
op.execute("drop trigger scraper_results_events_insert_delete on scraper_results")
op.execute("drop function scraper_results_events_trigger")
op.execute("drop trigger comment_labels_events_insert_delete on comment_labels")
op.execute("drop function comment_labels_events_trigger")
op.execute("drop trigger topics_events_update_link on topics")
op.execute("drop trigger topics_events_update_markdown on topics")
op.execute("drop trigger topics_events_insert_delete on topics")
op.execute("drop function topics_events_trigger")
op.execute("drop trigger comments_events_update_is_removed on comments")
op.execute("drop trigger comments_events_update_is_deleted on comments")
op.execute("drop trigger comments_events_update_markdown on comments")
op.execute("drop trigger comments_events_insert_delete on comments")
op.execute("drop function comments_events_trigger")
op.execute("drop function add_to_event_stream")

22
tildes/consumers/comment_user_mentions_generator.py

@ -3,20 +3,18 @@
"""Consumer that generates user mentions for comments."""
from amqpy import Message
from tildes.lib.amqp import PgsqlQueueConsumer
from tildes.lib.event_stream import EventStreamConsumer, Message
from tildes.models.comment import Comment, CommentNotification
class CommentUserMentionGenerator(PgsqlQueueConsumer):
class CommentUserMentionGenerator(EventStreamConsumer):
"""Consumer that generates user mentions for comments."""
def run(self, msg: Message) -> None:
"""Process a delivered message."""
def process_message(self, message: Message) -> None:
"""Process a message from the stream."""
comment = (
self.db_session.query(Comment)
.filter_by(comment_id=msg.body["comment_id"])
.filter_by(comment_id=message.fields["comment_id"])
.one()
)
@ -28,10 +26,10 @@ class CommentUserMentionGenerator(PgsqlQueueConsumer):
self.db_session, comment
)
if msg.delivery_info["routing_key"] == "comment.created":
if message.stream == "comments.insert":
for user_mention in new_mentions:
self.db_session.add(user_mention)
elif msg.delivery_info["routing_key"] == "comment.edited":
elif message.stream == "comments.update.markdown":
to_delete, to_add = CommentNotification.prevent_duplicate_notifications(
self.db_session, comment, new_mentions
)
@ -45,6 +43,6 @@ class CommentUserMentionGenerator(PgsqlQueueConsumer):
if __name__ == "__main__":
CommentUserMentionGenerator(
queue_name="comment_user_mentions_generator.q",
routing_keys=["comment.created", "comment.edited"],
).consume_queue()
"comment_user_mentions_generator",
source_streams=["comments.insert", "comments.update.markdown"],
).consume_streams()

19
tildes/consumers/site_icon_downloader.py

@ -9,33 +9,32 @@ from typing import Optional, Sequence
import publicsuffix
import requests
from amqpy import Message
from PIL import Image
from tildes.enums import ScraperType
from tildes.lib.amqp import PgsqlQueueConsumer
from tildes.lib.event_stream import EventStreamConsumer, Message
from tildes.lib.url import get_domain_from_url
from tildes.models.scraper import ScraperResult
class SiteIconDownloader(PgsqlQueueConsumer):
class SiteIconDownloader(EventStreamConsumer):
"""Consumer that generates content_metadata for topics."""
ICON_FOLDER = "/opt/tildes/static/images/site-icons"
def __init__(self, queue_name: str, routing_keys: Sequence[str]):
def __init__(self, consumer_group: str, source_streams: Sequence[str]):
"""Initialize the consumer, including the public suffix list."""
super().__init__(queue_name, routing_keys)
super().__init__(consumer_group, source_streams)
# download the public suffix list (would be good to add caching here)
psl_file = publicsuffix.fetch()
self.public_suffix_list = publicsuffix.PublicSuffixList(psl_file)
def run(self, msg: Message) -> None:
"""Process a delivered message."""
def process_message(self, message: Message) -> None:
"""Process a message from the stream."""
result = (
self.db_session.query(ScraperResult)
.filter_by(result_id=msg.body["result_id"])
.filter_by(result_id=message.fields["result_id"])
.one()
)
@ -97,5 +96,5 @@ class SiteIconDownloader(PgsqlQueueConsumer):
if __name__ == "__main__":
SiteIconDownloader(
queue_name="site_icon_downloader.q", routing_keys=["scraper_result.created"]
).consume_queue()
"site_icon_downloader", source_streams=["scraper_results.insert"]
).consume_streams()

25
tildes/consumers/topic_embedly_extractor.py

@ -7,15 +7,14 @@ import os
from datetime import timedelta
from typing import Sequence
from amqpy import Message
from pyramid.paster import get_appsettings
from requests.exceptions import HTTPError, Timeout
from sqlalchemy import cast, desc, func
from sqlalchemy.dialects.postgresql import JSONB
from tildes.enums import ScraperType
from tildes.lib.amqp import PgsqlQueueConsumer
from tildes.lib.datetime import utc_now
from tildes.lib.event_stream import EventStreamConsumer, Message
from tildes.models.scraper import ScraperResult
from tildes.models.topic import Topic
from tildes.scrapers import EmbedlyScraper
@ -25,19 +24,23 @@ from tildes.scrapers import EmbedlyScraper
RESCRAPE_DELAY = timedelta(hours=24)
class TopicEmbedlyExtractor(PgsqlQueueConsumer):
class TopicEmbedlyExtractor(EventStreamConsumer):
"""Consumer that fetches data from Embedly's Extract API for link topics."""
def __init__(self, api_key: str, queue_name: str, routing_keys: Sequence[str]):
def __init__(
self, api_key: str, consumer_group: str, source_streams: Sequence[str]
):
"""Initialize the consumer, including creating a scraper instance."""
super().__init__(queue_name, routing_keys)
super().__init__(consumer_group, source_streams)
self.scraper = EmbedlyScraper(api_key)
def run(self, msg: Message) -> None:
"""Process a delivered message."""
def process_message(self, message: Message) -> None:
"""Process a message from the stream."""
topic = (
self.db_session.query(Topic).filter_by(topic_id=msg.body["topic_id"]).one()
self.db_session.query(Topic)
.filter_by(topic_id=message.fields["topic_id"])
.one()
)
if not topic.is_link_type:
@ -95,6 +98,6 @@ if __name__ == "__main__":
TopicEmbedlyExtractor(
embedly_api_key,
queue_name="topic_embedly_extractor.q",
routing_keys=["topic.created", "topic.link_edited"],
).consume_queue()
"topic_embedly_extractor",
source_streams=["topics.insert", "topics.update.link"],
).consume_streams()

32
tildes/consumers/topic_interesting_activity_updater.py

@ -6,21 +6,19 @@
from datetime import datetime
from typing import Optional
from amqpy import Message
from tildes.enums import CommentTreeSortOption
from tildes.lib.amqp import PgsqlQueueConsumer
from tildes.lib.event_stream import EventStreamConsumer, Message
from tildes.models.comment import Comment, CommentInTree, CommentTree
class TopicInterestingActivityUpdater(PgsqlQueueConsumer):
class TopicInterestingActivityUpdater(EventStreamConsumer):
"""Consumer that updates topics' last_interesting_activity_time."""
def run(self, msg: Message) -> None:
"""Process a delivered message."""
def process_message(self, message: Message) -> None:
"""Process a message from the stream."""
trigger_comment = (
self.db_session.query(Comment)
.filter_by(comment_id=msg.body["comment_id"])
.filter_by(comment_id=message.fields["comment_id"])
.one()
)
@ -82,15 +80,13 @@ class TopicInterestingActivityUpdater(PgsqlQueueConsumer):
if __name__ == "__main__":
TopicInterestingActivityUpdater(
queue_name="topic_interesting_activity_updater.q",
routing_keys=[
"comment.created",
"comment.deleted",
"comment.edited",
"comment.removed",
"comment.undeleted",
"comment.unremoved",
"comment_label.created",
"comment_label.deleted",
"topic_interesting_activity_updater",
source_streams=[
"comments.insert",
"comments.update.is_deleted",
"comments.update.markdown",
"comments.update.is_removed",
"comment_labels.insert",
"comment_labels.delete",
],
).consume_queue()
).consume_streams()

27
tildes/consumers/topic_metadata_generator.py

@ -6,31 +6,32 @@
from typing import Any, Dict, Sequence
import publicsuffix
from amqpy import Message
from sqlalchemy import cast, func
from sqlalchemy.dialects.postgresql import JSONB
from tildes.lib.amqp import PgsqlQueueConsumer
from tildes.lib.event_stream import EventStreamConsumer, Message
from tildes.lib.string import extract_text_from_html, truncate_string, word_count
from tildes.lib.url import get_domain_from_url
from tildes.models.topic import Topic
class TopicMetadataGenerator(PgsqlQueueConsumer):
class TopicMetadataGenerator(EventStreamConsumer):
"""Consumer that generates content_metadata for topics."""
def __init__(self, queue_name: str, routing_keys: Sequence[str]):
def __init__(self, consumer_group: str, source_streams: Sequence[str]):
"""Initialize the consumer, including the public suffix list."""
super().__init__(queue_name, routing_keys)
super().__init__(consumer_group, source_streams)
# download the public suffix list (would be good to add caching here)
psl_file = publicsuffix.fetch()
self.public_suffix_list = publicsuffix.PublicSuffixList(psl_file)
def run(self, msg: Message) -> None:
"""Process a delivered message."""
def process_message(self, message: Message) -> None:
"""Process a message from the stream."""
topic = (
self.db_session.query(Topic).filter_by(topic_id=msg.body["topic_id"]).one()
self.db_session.query(Topic)
.filter_by(topic_id=message.fields["topic_id"])
.one()
)
if topic.is_deleted:
@ -76,6 +77,10 @@ class TopicMetadataGenerator(PgsqlQueueConsumer):
if __name__ == "__main__":
TopicMetadataGenerator(
queue_name="topic_metadata_generator.q",
routing_keys=["topic.created", "topic.edited", "topic.link_edited"],
).consume_queue()
"topic_metadata_generator",
source_streams=[
"topics.insert",
"topics.update.markdown",
"topics.update.link",
],
).consume_streams()

25
tildes/consumers/topic_youtube_scraper.py

@ -7,15 +7,14 @@ import os
from datetime import timedelta
from typing import Sequence
from amqpy import Message
from pyramid.paster import get_appsettings
from requests.exceptions import HTTPError, Timeout
from sqlalchemy import cast, desc, func
from sqlalchemy.dialects.postgresql import JSONB
from tildes.enums import ScraperType
from tildes.lib.amqp import PgsqlQueueConsumer
from tildes.lib.datetime import utc_now
from tildes.lib.event_stream import EventStreamConsumer, Message
from tildes.models.scraper import ScraperResult
from tildes.models.topic import Topic
from tildes.scrapers import ScraperError, YoutubeScraper
@ -25,19 +24,23 @@ from tildes.scrapers import ScraperError, YoutubeScraper
RESCRAPE_DELAY = timedelta(hours=24)
class TopicYoutubeScraper(PgsqlQueueConsumer):
class TopicYoutubeScraper(EventStreamConsumer):
"""Consumer that fetches data from YouTube's data API for relevant link topics."""
def __init__(self, api_key: str, queue_name: str, routing_keys: Sequence[str]):
def __init__(
self, api_key: str, consumer_group: str, source_streams: Sequence[str]
):
"""Initialize the consumer, including creating a scraper instance."""
super().__init__(queue_name, routing_keys)
super().__init__(consumer_group, source_streams)
self.scraper = YoutubeScraper(api_key)
def run(self, msg: Message) -> None:
"""Process a delivered message."""
def process_message(self, message: Message) -> None:
"""Process a message from the stream."""
topic = (
self.db_session.query(Topic).filter_by(topic_id=msg.body["topic_id"]).one()
self.db_session.query(Topic)
.filter_by(topic_id=message.fields["topic_id"])
.one()
)
if not topic.is_link_type:
@ -95,6 +98,6 @@ if __name__ == "__main__":
TopicYoutubeScraper(
youtube_api_key,
queue_name="topic_youtube_scraper.q",
routing_keys=["topic.created", "topic.link_edited"],
).consume_queue()
consumer_group="topic_youtube_scraper",
source_streams=["topics.insert", "topics.update.link"],
).consume_streams()

70
tildes/scripts/postgresql_redis_bridge.py

@ -0,0 +1,70 @@
# Copyright (c) 2020 Tildes contributors <code@tildes.net>
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Script that converts NOTIFY events on a PostgreSQL channel to Redis stream entries.
Should be kept running at all times as a service.
"""
import json
import os
from configparser import ConfigParser
from select import select
from redis import Redis
from sqlalchemy.engine.url import make_url
import psycopg2
from tildes.lib.event_stream import REDIS_KEY_PREFIX
NOTIFY_CHANNEL = "postgresql_events"
def postgresql_redis_bridge(config_path: str) -> None:
"""Listen for NOTIFY events and add them to Redis streams."""
config = ConfigParser()
config.read(config_path)
redis = Redis(unix_socket_path=config.get("app:main", "redis.unix_socket_path"))
postgresql_url = make_url(config.get("app:main", "sqlalchemy.url"))
postgresql = psycopg2.connect(
user=postgresql_url.username, dbname=postgresql_url.database
)
postgresql.autocommit = True
with postgresql.cursor() as cursor:
cursor.execute(f"listen {NOTIFY_CHANNEL}")
while True:
# block until a NOTIFY comes through on the channel
select([postgresql], [], [])
# fetch any notifications without needing to execute a query
postgresql.poll()
# add each NOTIFY to the specified stream(s), using a Redis pipeline to avoid
# round trips when there are multiple sent by the same PostgreSQL transaction
with redis.pipeline(transaction=False) as pipe:
while postgresql.notifies:
notify = postgresql.notifies.pop(0)
# the payload format should be "<destination stream name>:<json dict>"
try:
stream_name, fields_json = notify.payload.split(":", maxsplit=1)
except ValueError:
continue
try:
fields = json.loads(fields_json)
except json.decoder.JSONDecodeError:
continue
pipe.xadd(f"{REDIS_KEY_PREFIX}{stream_name}", fields)
pipe.execute()
if __name__ == "__main__":
postgresql_redis_bridge(os.environ["INI_FILE"])

9
tildes/sql/init/functions/event_stream.sql

@ -0,0 +1,9 @@
-- Copyright (c) 2020 Tildes contributors <code@tildes.net>
-- SPDX-License-Identifier: AGPL-3.0-or-later
create or replace function add_to_event_stream(stream_name_pieces text[], fields text[]) returns void as $$
select pg_notify(
'postgresql_events',
array_to_string(stream_name_pieces, '.') || ':' || json_object(fields)
);
$$ language sql;

25
tildes/sql/init/triggers/comment_labels/event_stream.sql

@ -0,0 +1,25 @@
-- Copyright (c) 2020 Tildes contributors <code@tildes.net>
-- SPDX-License-Identifier: AGPL-3.0-or-later
create or replace function comment_labels_events_trigger() returns trigger as $$
declare
affected_row record := coalesce(NEW, OLD);
stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV;
-- in general, only the below declaration of payload_fields should be edited
payload_fields text[] := array[
'comment_id', affected_row.comment_id,
'user_id', affected_row.user_id,
'label', affected_row.label
]::text[];
begin
perform add_to_event_stream(stream_name_pieces, payload_fields);
return null;
end;
$$ language plpgsql;
create trigger comment_labels_events_insert_delete
after insert or delete on comment_labels
for each row
execute function comment_labels_events_trigger();

38
tildes/sql/init/triggers/comments/event_stream.sql

@ -0,0 +1,38 @@
-- Copyright (c) 2020 Tildes contributors <code@tildes.net>
-- SPDX-License-Identifier: AGPL-3.0-or-later
create or replace function comments_events_trigger() returns trigger as $$
declare
affected_row record := coalesce(NEW, OLD);
stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV;
-- in general, only the below declaration of payload_fields should be edited
payload_fields text[] := array[
'comment_id', affected_row.comment_id
]::text[];
begin
perform add_to_event_stream(stream_name_pieces, payload_fields);
return null;
end;
$$ language plpgsql;
create trigger comments_events_insert_delete
after insert or delete on comments
for each row
execute function comments_events_trigger();
create trigger comments_events_update_markdown
after update of markdown on comments
for each row
execute function comments_events_trigger('markdown');
create trigger comments_events_update_is_deleted
after update of is_deleted on comments
for each row
execute function comments_events_trigger('is_deleted');
create trigger comments_events_update_is_removed
after update of is_removed on comments
for each row
execute function comments_events_trigger('is_removed');

23
tildes/sql/init/triggers/scraper_results/event_stream.sql

@ -0,0 +1,23 @@
-- Copyright (c) 2020 Tildes contributors <code@tildes.net>
-- SPDX-License-Identifier: AGPL-3.0-or-later
create or replace function scraper_results_events_trigger() returns trigger as $$
declare
affected_row record := coalesce(NEW, OLD);
stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV;
-- in general, only the below declaration of payload_fields should be edited
payload_fields text[] := array[
'result_id', affected_row.result_id
]::text[];
begin
perform add_to_event_stream(stream_name_pieces, payload_fields);
return null;
end;
$$ language plpgsql;
create trigger scraper_results_events_insert_delete
after insert or delete on scraper_results
for each row
execute function scraper_results_events_trigger();

33
tildes/sql/init/triggers/topics/event_stream.sql

@ -0,0 +1,33 @@
-- Copyright (c) 2020 Tildes contributors <code@tildes.net>
-- SPDX-License-Identifier: AGPL-3.0-or-later
create or replace function topics_events_trigger() returns trigger as $$
declare
affected_row record := coalesce(NEW, OLD);
stream_name_pieces text[] := array[TG_TABLE_NAME, lower(TG_OP)]::text[] || TG_ARGV;
-- in general, only the below declaration of payload_fields should be edited
payload_fields text[] := array[
'topic_id', affected_row.topic_id
]::text[];
begin
perform add_to_event_stream(stream_name_pieces, payload_fields);
return null;
end;
$$ language plpgsql;
create trigger topics_events_insert_delete
after insert or delete on topics
for each row
execute function topics_events_trigger();
create trigger topics_events_update_markdown
after update of markdown on topics
for each row
execute function topics_events_trigger('markdown');
create trigger topics_events_update_link
after update of link on topics
for each row
execute function topics_events_trigger('link');

132
tildes/tildes/lib/event_stream.py

@ -0,0 +1,132 @@
# Copyright (c) 2020 Tildes contributors <code@tildes.net>
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Contains classes related to handling the Redis-based event streams."""
import os
from abc import abstractmethod
from configparser import ConfigParser
from typing import Any, Dict, List, Sequence
from redis import Redis, ResponseError
from tildes.lib.database import get_session_from_config
REDIS_KEY_PREFIX = "event_stream:"
class Message:
"""Represents a single message taken from a stream."""
def __init__(
self, redis: Redis, stream: str, message_id: str, fields: Dict[str, str]
):
"""Initialize a new message from a Redis stream."""
self.redis = redis
self.stream = stream
self.message_id = message_id
self.fields = fields
def ack(self, consumer_group: str) -> None:
"""Acknowledge the message, removing it from the pending entries list."""
self.redis.xack(
f"{REDIS_KEY_PREFIX}{self.stream}", consumer_group, self.message_id
)
class EventStreamConsumer:
"""Base class for consumers of events retrieved from a stream in Redis.
This class is intended to be used in a completely "stand-alone" manner, such as
inside a script being run separately as a background job. As such, it also includes
connecting to Redis, creating the consumer group and the relevant streams, and
(optionally) connecting to the database to be able to fetch and modify data as
necessary. It relies on the environment variable INI_FILE being set.
"""
def __init__(
self, consumer_group: str, source_streams: Sequence[str], uses_db: bool = True,
):
"""Initialize a new consumer, creating consumer groups and streams if needed."""
ini_file_path = os.environ["INI_FILE"]
config = ConfigParser()
config.read(ini_file_path)
self.redis = Redis(
unix_socket_path=config.get("app:main", "redis.unix_socket_path")
)
self.consumer_group = consumer_group
self.source_streams = [
f"{REDIS_KEY_PREFIX}{stream}" for stream in source_streams
]
# hardcoded for now, will need to change for multiple consumers in same group
self.name = f"{consumer_group}-1"
# create all the consumer groups and streams (if necessary)
for stream in self.source_streams:
try:
self.redis.xgroup_create(stream, consumer_group, mkstream=True)
except ResponseError as error:
# if the consumer group already exists, a BUSYGROUP error will be
# returned, so we want to ignore that one but raise anything else
if not str(error).startswith("BUSYGROUP"):
raise
if uses_db:
self.db_session = get_session_from_config(ini_file_path)
else:
self.db_session = None
def consume_streams(self) -> None:
"""Process messages from the streams indefinitely."""
while True:
# Get any messages from the source streams that haven't already been
# delivered to a consumer in this group - will fetch a maximum of one
# message from each stream, and block indefinitely if none are available
response = self.redis.xreadgroup(
self.consumer_group,
self.name,
{stream: ">" for stream in self.source_streams},
count=1,
block=0,
)
messages = self._xreadgroup_response_to_messages(response)
for message in messages:
self.process_message(message)
# after processing finishes, commit the transaction and ack the message
if self.db_session:
self.db_session.commit()
message.ack(self.consumer_group)
def _xreadgroup_response_to_messages(self, response: Any) -> List[Message]:
"""Convert a response from XREADGROUP to a list of Messages."""
messages = []
# responses come back in an ugly format, a list of (one for each stream):
# [b'<stream name>', [(b'<entry id>', {<entry fields, all bytestrings>})]]
for stream_response in response:
stream_name = stream_response[0].decode("utf-8")
for entry in stream_response[1]:
message = Message(
self.redis,
stream_name[len(REDIS_KEY_PREFIX) :],
message_id=entry[0].decode("utf-8"),
fields={
key.decode("utf-8"): value.decode("utf-8")
for key, value in entry[1].items()
},
)
messages.append(message)
return messages
@abstractmethod
def process_message(self, message: Message) -> None:
"""Process a message from the stream (subclasses must implement)."""
pass
Loading…
Cancel
Save