mirror of https://gitlab.com/tildes/tildes.git
Browse Source
Eliminate RabbitMQ
Eliminate RabbitMQ
This removes RabbitMQ as well as everything else attached to it: Erlang; the Prometheus collector; the pg-amqp-bridge and all PostgreSQL functions and triggers; and the amqpy Python package and the Tildes code that used it. Note that this commit does not actually uninstall or delete any of these packages or services, so if you have a running instance that you want to keep (instead of re-provisioning from scratch), you will need to manually remove them if you want them completely gone.merge-requests/88/merge
Deimos
5 years ago
21 changed files with 240 additions and 408 deletions
-
14salt/salt/prometheus/exporters/prometheus_rabbitmq_exporter.service
-
28salt/salt/prometheus/exporters/rabbitmq_exporter.sls
-
4salt/salt/prometheus/prometheus.yml.jinja2
-
1salt/salt/rabbitmq/definitions.json
-
82salt/salt/rabbitmq/init.sls
-
14salt/salt/rabbitmq/pg-amqp-bridge.service
-
5salt/salt/rabbitmq/rabbitmq.config
-
2salt/salt/top.sls
-
238tildes/alembic/versions/cc12ea6c616d_drop_rabbitmq_functions_triggers.py
-
3tildes/requirements-dev.txt
-
1tildes/requirements.in
-
3tildes/requirements.txt
-
6tildes/sql/init/functions/rabbitmq.sql
-
36tildes/sql/init/triggers/comment_labels/rabbitmq.sql
-
62tildes/sql/init/triggers/comments/rabbitmq.sql
-
27tildes/sql/init/triggers/scraper_results/rabbitmq.sql
-
41tildes/sql/init/triggers/topics/rabbitmq.sql
-
74tildes/tildes/lib/amqp.py
-
2tildes/tildes/models/comment/comment.py
-
1tildes/tildes/models/comment/comment_label.py
-
4tildes/tildes/models/topic/topic.py
@ -1,14 +0,0 @@ |
|||
[Unit] |
|||
Description=Prometheus RabbitMQ Exporter |
|||
After=syslog.target network.target |
|||
|
|||
[Service] |
|||
Type=simple |
|||
RemainAfterExit=no |
|||
WorkingDirectory=/opt/prometheus_rabbitmq_exporter |
|||
User=prometheus |
|||
Group=prometheus |
|||
ExecStart=/opt/prometheus_rabbitmq_exporter/rabbitmq_exporter |
|||
|
|||
[Install] |
|||
WantedBy=multi-user.target |
@ -1,28 +0,0 @@ |
|||
# Download/extract and set up the rabbitmq exporter |
|||
include: |
|||
- prometheus.user |
|||
|
|||
unpack-rabbitmq-exporter: |
|||
archive.extracted: |
|||
- name: /opt/prometheus_rabbitmq_exporter |
|||
- source: |
|||
- salt://prometheus/exporters/rabbitmq_exporter-1.0.0-WIP.linux-amd64.tar.gz |
|||
- https://github.com/kbudde/rabbitmq_exporter/releases/download/v1.0-wip1/rabbitmq_exporter-1.0.0-WIP.linux-amd64.tar.gz |
|||
- source_hash: sha256=d478dcf72d8a5175a4f3ea6b8e0356f64e2fcdb7b65bb5bfc0bd161d896abc4a |
|||
- if_missing: /opt/prometheus_rabbitmq_exporter |
|||
- user: prometheus |
|||
- group: prometheus |
|||
- options: --strip-components=1 |
|||
- enforce_toplevel: False |
|||
|
|||
/etc/systemd/system/prometheus_rabbitmq_exporter.service: |
|||
file.managed: |
|||
- source: salt://prometheus/exporters/prometheus_rabbitmq_exporter.service |
|||
- user: root |
|||
- group: root |
|||
- mode: 644 |
|||
|
|||
prometheus-rabbitmq-exporter-service: |
|||
service.running: |
|||
- name: prometheus_rabbitmq_exporter |
|||
- enable: True |
@ -1 +0,0 @@ |
|||
{"exchanges":[{"name":"pgsql_events","vhost":"/","type":"topic","durable":true,"auto_delete":false,"internal":false,"arguments":{}}]} |
@ -1,82 +0,0 @@ |
|||
erlang: |
|||
pkgrepo.managed: |
|||
- name: deb http://packages.erlang-solutions.com/ubuntu/ xenial contrib |
|||
- dist: xenial |
|||
- file: /etc/apt/sources.list.d/erlang.list |
|||
- key_url: https://packages.erlang-solutions.com/ubuntu/erlang_solutions.asc |
|||
- require_in: |
|||
- pkg: rabbitmq-server |
|||
file.managed: |
|||
- name: /etc/apt/preferences.d/erlang |
|||
- mode: 755 |
|||
- contents: | |
|||
Package: erlang* |
|||
Pin: version 1:20.3-1 |
|||
Pin-Priority: 1000 |
|||
- require_in: |
|||
- pkg: rabbitmq-server |
|||
|
|||
rabbitmq: |
|||
pkgrepo.managed: |
|||
- name: deb http://www.rabbitmq.com/debian/ testing main |
|||
- dist: testing |
|||
- file: /etc/apt/sources.list.d/rabbitmq.list |
|||
- key_url: https://www.rabbitmq.com/rabbitmq-release-signing-key.asc |
|||
- require_in: |
|||
- pkg: rabbitmq-server |
|||
pkg.installed: |
|||
- name: rabbitmq-server |
|||
- refresh: True |
|||
|
|||
rabbitmq-server.service: |
|||
service.running: |
|||
- enable: True |
|||
- watch: |
|||
- file: /etc/rabbitmq/rabbitmq.config |
|||
- file: /etc/rabbitmq/definitions.json |
|||
|
|||
rabbitmq-management: |
|||
cmd.run: |
|||
- name: rabbitmq-plugins enable rabbitmq_management |
|||
- unless: 'rabbitmq-plugins list | grep \\[E.*rabbitmq_management' |
|||
|
|||
/usr/local/bin/rabbitmqadmin: |
|||
cmd.run: |
|||
- name: wget http://localhost:15672/cli/rabbitmqadmin -O /usr/local/bin/rabbitmqadmin |
|||
- creates: /usr/local/bin/rabbitmqadmin |
|||
file.managed: |
|||
- mode: 755 |
|||
|
|||
/etc/rabbitmq/rabbitmq.config: |
|||
file.managed: |
|||
- source: salt://rabbitmq/rabbitmq.config |
|||
- group: rabbitmq |
|||
- mode: 644 |
|||
|
|||
/etc/rabbitmq/definitions.json: |
|||
file.managed: |
|||
- source: salt://rabbitmq/definitions.json |
|||
- group: rabbitmq |
|||
- mode: 644 |
|||
|
|||
install-pg-amqp-bridge: |
|||
archive.extracted: |
|||
- name: /usr/local/bin |
|||
- source: |
|||
- https://github.com/subzerocloud/pg-amqp-bridge/releases/download/0.0.5/pg-amqp-bridge-0.0.5-x86_64-unknown-linux-gnu.tar.gz |
|||
- source_hash: sha256=8194c3307fe7954a0ef1ba66d2f51e14647756d0f87ddd468ef0dc3fbc8476fe |
|||
- unless: ls /usr/local/bin/pg-amqp-bridge |
|||
- enforce_toplevel: False |
|||
|
|||
/etc/systemd/system/pg-amqp-bridge.service: |
|||
file.managed: |
|||
- source: salt://rabbitmq/pg-amqp-bridge.service |
|||
- user: root |
|||
- group: root |
|||
- mode: 644 |
|||
- require_in: |
|||
- pg-amqp-bridge.service |
|||
|
|||
pg-amqp-bridge.service: |
|||
service.running: |
|||
- enable: True |
@ -1,14 +0,0 @@ |
|||
[Unit] |
|||
Description=pg-amqp-bridge - send pgsql NOTIFY to rabbitmq |
|||
Requires=rabbitmq-server.service |
|||
After=rabbitmq-server.service |
|||
PartOf=rabbitmq-server.service |
|||
|
|||
[Service] |
|||
Environment="POSTGRESQL_URI=postgres://tildes@%2Frun%2Fpostgresql/tildes" "AMQP_URI=amqp://localhost//" "BRIDGE_CHANNELS=pgsql_events:pgsql_events" "DELIVERY_MODE=PERSISTENT" |
|||
ExecStart=/usr/local/bin/pg-amqp-bridge |
|||
Restart=always |
|||
RestartSec=5 |
|||
|
|||
[Install] |
|||
WantedBy=multi-user.target |
@ -1,5 +0,0 @@ |
|||
[ |
|||
{rabbitmq_management, [ |
|||
{load_definitions, "/etc/rabbitmq/definitions.json"} |
|||
]} |
|||
]. |
@ -0,0 +1,238 @@ |
|||
"""Drop rabbitmq functions/triggers |
|||
|
|||
Revision ID: cc12ea6c616d |
|||
Revises: 4fb2c786c7a0 |
|||
Create Date: 2020-01-19 21:29:41.337253 |
|||
|
|||
""" |
|||
from alembic import op |
|||
import sqlalchemy as sa |
|||
|
|||
|
|||
# revision identifiers, used by Alembic. |
|||
revision = "cc12ea6c616d" |
|||
down_revision = "4fb2c786c7a0" |
|||
branch_labels = None |
|||
depends_on = None |
|||
|
|||
|
|||
def upgrade(): |
|||
# scraper_results |
|||
op.execute( |
|||
"drop trigger send_rabbitmq_message_for_scraper_result_insert on scraper_results" |
|||
) |
|||
op.execute("drop function send_rabbitmq_message_for_scraper_result") |
|||
|
|||
# comment_labels |
|||
op.execute( |
|||
"drop trigger send_rabbitmq_message_for_comment_label_delete on comment_labels" |
|||
) |
|||
op.execute( |
|||
"drop trigger send_rabbitmq_message_for_comment_label_insert on comment_labels" |
|||
) |
|||
op.execute("drop function send_rabbitmq_message_for_comment_label") |
|||
|
|||
# topics |
|||
op.execute("drop trigger send_rabbitmq_message_for_topic_link_edit on topics") |
|||
op.execute("drop trigger send_rabbitmq_message_for_topic_edit on topics") |
|||
op.execute("drop trigger send_rabbitmq_message_for_topic_insert on topics") |
|||
op.execute("drop function send_rabbitmq_message_for_topic") |
|||
|
|||
# comments |
|||
op.execute("drop trigger send_rabbitmq_message_for_comment_unremove on comments") |
|||
op.execute("drop trigger send_rabbitmq_message_for_comment_remove on comments") |
|||
op.execute("drop trigger send_rabbitmq_message_for_comment_undelete on comments") |
|||
op.execute("drop trigger send_rabbitmq_message_for_comment_delete on comments") |
|||
op.execute("drop trigger send_rabbitmq_message_for_comment_edit on comments") |
|||
op.execute("drop trigger send_rabbitmq_message_for_comment_insert on comments") |
|||
op.execute("drop function send_rabbitmq_message_for_comment") |
|||
|
|||
op.execute("drop function send_rabbitmq_message") |
|||
|
|||
|
|||
def downgrade(): |
|||
op.execute( |
|||
""" |
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message(routing_key TEXT, message TEXT) RETURNS VOID AS $$ |
|||
SELECT pg_notify('pgsql_events', routing_key || '|' || message); |
|||
$$ LANGUAGE SQL; |
|||
""" |
|||
) |
|||
|
|||
# comments |
|||
op.execute( |
|||
""" |
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message_for_comment() RETURNS TRIGGER AS $$ |
|||
DECLARE |
|||
affected_row RECORD; |
|||
payload TEXT; |
|||
BEGIN |
|||
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN |
|||
affected_row := NEW; |
|||
ELSIF (TG_OP = 'DELETE') THEN |
|||
affected_row := OLD; |
|||
END IF; |
|||
|
|||
payload := json_build_object('comment_id', affected_row.comment_id, 'event_type', TG_OP); |
|||
|
|||
PERFORM send_rabbitmq_message('comment.' || TG_ARGV[0], payload); |
|||
|
|||
RETURN NULL; |
|||
END; |
|||
$$ LANGUAGE plpgsql; |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_insert |
|||
AFTER INSERT ON comments |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('created'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_edit |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.markdown IS DISTINCT FROM NEW.markdown) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('edited'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_delete |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.is_deleted = false AND NEW.is_deleted = true) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('deleted'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_undelete |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.is_deleted = true AND NEW.is_deleted = false) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('undeleted'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_remove |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.is_removed = false AND NEW.is_removed = true) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('removed'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_unremove |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.is_removed = true AND NEW.is_removed = false) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('unremoved'); |
|||
""" |
|||
) |
|||
|
|||
# topics |
|||
op.execute( |
|||
""" |
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message_for_topic() RETURNS TRIGGER AS $$ |
|||
DECLARE |
|||
affected_row RECORD; |
|||
payload TEXT; |
|||
BEGIN |
|||
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN |
|||
affected_row := NEW; |
|||
ELSIF (TG_OP = 'DELETE') THEN |
|||
affected_row := OLD; |
|||
END IF; |
|||
|
|||
payload := json_build_object('topic_id', affected_row.topic_id); |
|||
|
|||
PERFORM send_rabbitmq_message('topic.' || TG_ARGV[0], payload); |
|||
|
|||
RETURN NULL; |
|||
END; |
|||
$$ LANGUAGE plpgsql; |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_topic_insert |
|||
AFTER INSERT ON topics |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_topic('created'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_topic_edit |
|||
AFTER UPDATE ON topics |
|||
FOR EACH ROW |
|||
WHEN (OLD.markdown IS DISTINCT FROM NEW.markdown) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_topic('edited'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_topic_link_edit |
|||
AFTER UPDATE ON topics |
|||
FOR EACH ROW |
|||
WHEN (OLD.link IS DISTINCT FROM NEW.link) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_topic('link_edited'); |
|||
""" |
|||
) |
|||
|
|||
# comment_labels |
|||
op.execute( |
|||
""" |
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message_for_comment_label() RETURNS TRIGGER AS $$ |
|||
DECLARE |
|||
affected_row RECORD; |
|||
payload TEXT; |
|||
BEGIN |
|||
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN |
|||
affected_row := NEW; |
|||
ELSIF (TG_OP = 'DELETE') THEN |
|||
affected_row := OLD; |
|||
END IF; |
|||
|
|||
payload := json_build_object( |
|||
'comment_id', affected_row.comment_id, |
|||
'label', affected_row.label, |
|||
'user_id', affected_row.user_id); |
|||
|
|||
PERFORM send_rabbitmq_message('comment_label.' || TG_ARGV[0], payload); |
|||
|
|||
RETURN NULL; |
|||
END; |
|||
$$ LANGUAGE plpgsql; |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_label_insert |
|||
AFTER INSERT ON comment_labels |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment_label('created'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_label_delete |
|||
AFTER DELETE ON comment_labels |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment_label('deleted'); |
|||
""" |
|||
) |
|||
|
|||
# scraper_results |
|||
op.execute( |
|||
""" |
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message_for_scraper_result() RETURNS TRIGGER AS $$ |
|||
DECLARE |
|||
affected_row RECORD; |
|||
payload TEXT; |
|||
BEGIN |
|||
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN |
|||
affected_row := NEW; |
|||
ELSIF (TG_OP = 'DELETE') THEN |
|||
affected_row := OLD; |
|||
END IF; |
|||
|
|||
payload := json_build_object('result_id', affected_row.result_id); |
|||
|
|||
PERFORM send_rabbitmq_message('scraper_result.' || TG_ARGV[0], payload); |
|||
|
|||
RETURN NULL; |
|||
END; |
|||
$$ LANGUAGE plpgsql; |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_scraper_result_insert |
|||
AFTER INSERT ON scraper_results |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_scraper_result('created'); |
|||
""" |
|||
) |
@ -1,6 +0,0 @@ |
|||
-- Copyright (c) 2018 Tildes contributors <code@tildes.net> |
|||
-- SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message(routing_key TEXT, message TEXT) RETURNS VOID AS $$ |
|||
SELECT pg_notify('pgsql_events', routing_key || '|' || message); |
|||
$$ LANGUAGE SQL; |
@ -1,36 +0,0 @@ |
|||
-- Copyright (c) 2019 Tildes contributors <code@tildes.net> |
|||
-- SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message_for_comment_label() RETURNS TRIGGER AS $$ |
|||
DECLARE |
|||
affected_row RECORD; |
|||
payload TEXT; |
|||
BEGIN |
|||
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN |
|||
affected_row := NEW; |
|||
ELSIF (TG_OP = 'DELETE') THEN |
|||
affected_row := OLD; |
|||
END IF; |
|||
|
|||
payload := json_build_object( |
|||
'comment_id', affected_row.comment_id, |
|||
'label', affected_row.label, |
|||
'user_id', affected_row.user_id); |
|||
|
|||
PERFORM send_rabbitmq_message('comment_label.' || TG_ARGV[0], payload); |
|||
|
|||
RETURN NULL; |
|||
END; |
|||
$$ LANGUAGE plpgsql; |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_label_insert |
|||
AFTER INSERT ON comment_labels |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment_label('created'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_label_delete |
|||
AFTER DELETE ON comment_labels |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment_label('deleted'); |
@ -1,62 +0,0 @@ |
|||
-- Copyright (c) 2018 Tildes contributors <code@tildes.net> |
|||
-- SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message_for_comment() RETURNS TRIGGER AS $$ |
|||
DECLARE |
|||
affected_row RECORD; |
|||
payload TEXT; |
|||
BEGIN |
|||
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN |
|||
affected_row := NEW; |
|||
ELSIF (TG_OP = 'DELETE') THEN |
|||
affected_row := OLD; |
|||
END IF; |
|||
|
|||
payload := json_build_object('comment_id', affected_row.comment_id, 'event_type', TG_OP); |
|||
|
|||
PERFORM send_rabbitmq_message('comment.' || TG_ARGV[0], payload); |
|||
|
|||
RETURN NULL; |
|||
END; |
|||
$$ LANGUAGE plpgsql; |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_insert |
|||
AFTER INSERT ON comments |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('created'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_edit |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.markdown IS DISTINCT FROM NEW.markdown) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('edited'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_delete |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.is_deleted = false AND NEW.is_deleted = true) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('deleted'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_undelete |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.is_deleted = true AND NEW.is_deleted = false) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('undeleted'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_remove |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.is_removed = false AND NEW.is_removed = true) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('removed'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_comment_unremove |
|||
AFTER UPDATE ON comments |
|||
FOR EACH ROW |
|||
WHEN (OLD.is_removed = true AND NEW.is_removed = false) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_comment('unremoved'); |
@ -1,27 +0,0 @@ |
|||
-- Copyright (c) 2018 Tildes contributors <code@tildes.net> |
|||
-- SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message_for_scraper_result() RETURNS TRIGGER AS $$ |
|||
DECLARE |
|||
affected_row RECORD; |
|||
payload TEXT; |
|||
BEGIN |
|||
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN |
|||
affected_row := NEW; |
|||
ELSIF (TG_OP = 'DELETE') THEN |
|||
affected_row := OLD; |
|||
END IF; |
|||
|
|||
payload := json_build_object('result_id', affected_row.result_id); |
|||
|
|||
PERFORM send_rabbitmq_message('scraper_result.' || TG_ARGV[0], payload); |
|||
|
|||
RETURN NULL; |
|||
END; |
|||
$$ LANGUAGE plpgsql; |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_scraper_result_insert |
|||
AFTER INSERT ON scraper_results |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_scraper_result('created'); |
@ -1,41 +0,0 @@ |
|||
-- Copyright (c) 2018 Tildes contributors <code@tildes.net> |
|||
-- SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
CREATE OR REPLACE FUNCTION send_rabbitmq_message_for_topic() RETURNS TRIGGER AS $$ |
|||
DECLARE |
|||
affected_row RECORD; |
|||
payload TEXT; |
|||
BEGIN |
|||
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN |
|||
affected_row := NEW; |
|||
ELSIF (TG_OP = 'DELETE') THEN |
|||
affected_row := OLD; |
|||
END IF; |
|||
|
|||
payload := json_build_object('topic_id', affected_row.topic_id); |
|||
|
|||
PERFORM send_rabbitmq_message('topic.' || TG_ARGV[0], payload); |
|||
|
|||
RETURN NULL; |
|||
END; |
|||
$$ LANGUAGE plpgsql; |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_topic_insert |
|||
AFTER INSERT ON topics |
|||
FOR EACH ROW |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_topic('created'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_topic_edit |
|||
AFTER UPDATE ON topics |
|||
FOR EACH ROW |
|||
WHEN (OLD.markdown IS DISTINCT FROM NEW.markdown) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_topic('edited'); |
|||
|
|||
|
|||
CREATE TRIGGER send_rabbitmq_message_for_topic_link_edit |
|||
AFTER UPDATE ON topics |
|||
FOR EACH ROW |
|||
WHEN (OLD.link IS DISTINCT FROM NEW.link) |
|||
EXECUTE PROCEDURE send_rabbitmq_message_for_topic('link_edited'); |
@ -1,74 +0,0 @@ |
|||
# Copyright (c) 2018 Tildes contributors <code@tildes.net> |
|||
# SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
"""Contains classes related to handling AMQP (rabbitmq) messages.""" |
|||
|
|||
import json |
|||
import os |
|||
from abc import abstractmethod |
|||
from typing import Sequence |
|||
|
|||
from amqpy import AbstractConsumer, Connection, Message |
|||
|
|||
from tildes.lib.database import get_session_from_config |
|||
|
|||
|
|||
class PgsqlQueueConsumer(AbstractConsumer): |
|||
"""Base class for consumers of events sent from PostgreSQL via rabbitmq. |
|||
|
|||
This class is intended to be used in a completely "stand-alone" manner, such as |
|||
inside a script being run separately as a background job. As such, it also includes |
|||
connecting to rabbitmq, declaring the underlying queue and bindings, and |
|||
(optionally) connecting to the database to be able to fetch and modify data as |
|||
necessary. It relies on the environment variable INI_FILE being set. |
|||
|
|||
Note that all messages received by these consumers are expected to be in JSON |
|||
format. |
|||
""" |
|||
|
|||
PGSQL_EXCHANGE_NAME = "pgsql_events" |
|||
|
|||
def __init__( |
|||
self, queue_name: str, routing_keys: Sequence[str], uses_db: bool = True |
|||
): |
|||
"""Initialize a new queue, bindings, and consumer for it.""" |
|||
self.connection = Connection() |
|||
self.channel = self.connection.channel() |
|||
|
|||
self.channel.queue_declare(queue_name, durable=True, auto_delete=False) |
|||
|
|||
for routing_key in routing_keys: |
|||
self.channel.queue_bind( |
|||
queue_name, exchange=self.PGSQL_EXCHANGE_NAME, routing_key=routing_key |
|||
) |
|||
|
|||
if uses_db: |
|||
self.db_session = get_session_from_config(os.environ["INI_FILE"]) |
|||
else: |
|||
self.db_session = None |
|||
|
|||
super().__init__(self.channel, queue_name) |
|||
|
|||
def consume_queue(self) -> None: |
|||
"""Declare the consumer and consume messages indefinitely.""" |
|||
self.declare() |
|||
self.connection.loop() |
|||
|
|||
@abstractmethod |
|||
def run(self, msg: Message) -> None: |
|||
"""Process a delivered message (subclasses must implement).""" |
|||
pass |
|||
|
|||
def start(self, msg: Message) -> None: |
|||
"""Setup/teardown for message-processing (wraps run()).""" |
|||
# decode the msg body from JSON |
|||
msg.body = json.loads(msg.body) |
|||
|
|||
# process the message, will call run() |
|||
super().start(msg) |
|||
|
|||
# after processing is finished, commit the transaction and ack the msg |
|||
if self.db_session: |
|||
self.db_session.commit() |
|||
|
|||
msg.ack() |
Write
Preview
Loading…
Cancel
Save
Reference in new issue