diff --git a/salt/salt/consumers/init.sls b/salt/salt/consumers/init.sls index a97a251..4e26637 100644 --- a/salt/salt/consumers/init.sls +++ b/salt/salt/consumers/init.sls @@ -21,3 +21,17 @@ consumer-topic_metadata_generator.service: consumer-comment_user_mentions_generator.service: service.running: - enable: True + +{% if grains['id'] == 'prod' %} +/etc/systemd/system/consumer-topic_embedly_extractor.service: + file.managed: + - source: salt://consumers/topic_embedly_extractor.service.jinja2 + - template: jinja + - user: root + - group: root + - mode: 644 + +consumer-topic_embedly_extractor.service: + service.running: + - enable: True +{% endif %} diff --git a/salt/salt/consumers/topic_embedly_extractor.service.jinja2 b/salt/salt/consumers/topic_embedly_extractor.service.jinja2 new file mode 100644 index 0000000..47cb0e0 --- /dev/null +++ b/salt/salt/consumers/topic_embedly_extractor.service.jinja2 @@ -0,0 +1,16 @@ +{% from 'common.jinja2' import app_dir, bin_dir -%} +[Unit] +Description=Topic Embedly Extractor (Queue Consumer) +Requires=rabbitmq-server.service +After=rabbitmq-server.service +PartOf=rabbitmq-server.service + +[Service] +WorkingDirectory={{ app_dir }}/consumers +Environment="INI_FILE={{ app_dir }}/{{ pillar['ini_file'] }}" +ExecStart={{ bin_dir }}/python topic_embedly_extractor.py +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target diff --git a/tildes/alembic/env.py b/tildes/alembic/env.py index 050df0c..7d56167 100644 --- a/tildes/alembic/env.py +++ b/tildes/alembic/env.py @@ -16,6 +16,7 @@ from tildes.models.comment import Comment, CommentNotification, CommentTag, Comm from tildes.models.group import Group, GroupSubscription from tildes.models.log import Log from tildes.models.message import MessageConversation, MessageReply +from tildes.models.scraper import ScraperResult from tildes.models.topic import Topic, TopicVisit, TopicVote from tildes.models.user import User, UserGroupSettings, UserInviteCode diff --git a/tildes/alembic/versions/09cfb27cc90e_add_scraper_results_table.py b/tildes/alembic/versions/09cfb27cc90e_add_scraper_results_table.py new file mode 100644 index 0000000..f1ed77d --- /dev/null +++ b/tildes/alembic/versions/09cfb27cc90e_add_scraper_results_table.py @@ -0,0 +1,52 @@ +"""Add scraper_results table + +Revision ID: 09cfb27cc90e +Revises: 04fd898de0db +Create Date: 2018-09-09 21:22:32.769786 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "09cfb27cc90e" +down_revision = "04fd898de0db" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "scraper_results", + sa.Column("result_id", sa.Integer(), nullable=False), + sa.Column("url", sa.Text(), nullable=False), + sa.Column( + "scraper_type", + postgresql.ENUM("EMBEDLY", name="scrapertype"), + nullable=False, + ), + sa.Column( + "scrape_time", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("NOW()"), + nullable=False, + ), + sa.Column("data", postgresql.JSONB(astext_type=sa.Text()), nullable=True), + sa.PrimaryKeyConstraint("result_id", name=op.f("pk_scraper_results")), + ) + op.create_index( + op.f("ix_scraper_results_scrape_time"), + "scraper_results", + ["scrape_time"], + unique=False, + ) + op.create_index( + op.f("ix_scraper_results_url"), "scraper_results", ["url"], unique=False + ) + + +def downgrade(): + op.drop_index(op.f("ix_scraper_results_url"), table_name="scraper_results") + op.drop_index(op.f("ix_scraper_results_scrape_time"), table_name="scraper_results") + op.drop_table("scraper_results") diff --git a/tildes/consumers/topic_embedly_extractor.py b/tildes/consumers/topic_embedly_extractor.py new file mode 100644 index 0000000..68ca723 --- /dev/null +++ b/tildes/consumers/topic_embedly_extractor.py @@ -0,0 +1,99 @@ +# Copyright (c) 2018 Tildes contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +"""Consumer that fetches data from Embedly's Extract API for link topics.""" + +from datetime import timedelta +import os +from typing import Sequence + +from amqpy import Message +from pyramid.paster import bootstrap +from requests.exceptions import HTTPError +from sqlalchemy import cast, desc, func +from sqlalchemy.dialects.postgresql import JSONB + +from tildes.enums import ScraperType +from tildes.lib.amqp import PgsqlQueueConsumer +from tildes.lib.datetime import utc_now +from tildes.models.scraper import ScraperResult +from tildes.models.topic import Topic +from tildes.scrapers import EmbedlyScraper + + +# don't rescrape the same url inside this time period +RESCRAPE_DELAY = timedelta(hours=24) + + +class TopicEmbedlyExtractor(PgsqlQueueConsumer): + """Consumer that fetches data from Embedly's Extract API for link topics.""" + + def __init__( + self, api_key: str, queue_name: str, routing_keys: Sequence[str] + ) -> None: + """Initialize the consumer, including creating a scraper instance.""" + super().__init__(queue_name, routing_keys) + + self.scraper = EmbedlyScraper(api_key) + + def run(self, msg: Message) -> None: + """Process a delivered message.""" + topic = ( + self.db_session.query(Topic).filter_by(topic_id=msg.body["topic_id"]).one() + ) + + if not topic.is_link_type: + return + + # see if we already have a recent scrape result from the same url + result = ( + self.db_session.query(ScraperResult) + .filter( + ScraperResult.url == topic.link, + ScraperResult.scraper_type == ScraperType.EMBEDLY, + ScraperResult.scrape_time > utc_now() - RESCRAPE_DELAY, + ) + .order_by(desc(ScraperResult.scrape_time)) + .first() + ) + + # if not, scrape the url and store the result + if not result: + try: + result = self.scraper.scrape_url(topic.link) + except HTTPError: + return + + self.db_session.add(result) + + new_metadata = EmbedlyScraper.get_metadata_from_result(result) + + if new_metadata: + # update the topic's content_metadata in a way that won't wipe out any + # existing values, and can handle the column being null + ( + self.db_session.query(Topic) + .filter(Topic.topic_id == topic.topic_id) + .update( + { + "content_metadata": func.coalesce( + Topic.content_metadata, cast({}, JSONB) + ).op("||")(new_metadata) + }, + synchronize_session=False, + ) + ) + + +if __name__ == "__main__": + # pylint: disable=invalid-name + env = bootstrap(os.environ["INI_FILE"]) + embedly_api_key = env["registry"].settings.get("api_keys.embedly") + if not embedly_api_key: + raise RuntimeError("No embedly API key available in INI file") + + TopicEmbedlyExtractor( + embedly_api_key, + queue_name="topic_embedly_extractor.q", + routing_keys=["topic.created"], + ).consume_queue() diff --git a/tildes/production.ini.example b/tildes/production.ini.example index ab61f13..9212449 100644 --- a/tildes/production.ini.example +++ b/tildes/production.ini.example @@ -29,6 +29,7 @@ webassets.cache = false webassets.manifest = json # API keys for external APIs +api_keys.embedly = embedlykeygoeshere api_keys.stripe = sk_live_ActualKeyShouldGoHere [server:main] diff --git a/tildes/tildes/enums.py b/tildes/tildes/enums.py index 66df640..bc294bb 100644 --- a/tildes/tildes/enums.py +++ b/tildes/tildes/enums.py @@ -65,6 +65,12 @@ class LogEventType(enum.Enum): TOPIC_UNREMOVE = enum.auto() +class ScraperType(enum.Enum): + """Enum for the types of scrapers available.""" + + EMBEDLY = enum.auto() + + class TopicSortOption(enum.Enum): """Enum for the different methods topics can be sorted by.""" diff --git a/tildes/tildes/models/scraper/__init__.py b/tildes/tildes/models/scraper/__init__.py new file mode 100644 index 0000000..a62438c --- /dev/null +++ b/tildes/tildes/models/scraper/__init__.py @@ -0,0 +1,3 @@ +"""Contains models related to scrapers.""" + +from .scraper_result import ScraperResult diff --git a/tildes/tildes/models/scraper/scraper_result.py b/tildes/tildes/models/scraper/scraper_result.py new file mode 100644 index 0000000..9d13ea2 --- /dev/null +++ b/tildes/tildes/models/scraper/scraper_result.py @@ -0,0 +1,37 @@ +# Copyright (c) 2018 Tildes contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +"""Contains the ScraperResult class.""" + +from datetime import datetime +from typing import Any + +from sqlalchemy import Column, Integer, Text, TIMESTAMP +from sqlalchemy.dialects.postgresql import ENUM, JSONB +from sqlalchemy.sql.expression import text + +from tildes.enums import ScraperType +from tildes.models import DatabaseModel + + +class ScraperResult(DatabaseModel): + """Model for the result from a scraper processing a url.""" + + __tablename__ = "scraper_results" + + result_id: int = Column(Integer, primary_key=True) + url: str = Column(Text, nullable=False, index=True) + scraper_type: ScraperType = Column(ENUM(ScraperType), nullable=False) + scrape_time: datetime = Column( + TIMESTAMP(timezone=True), + nullable=False, + index=True, + server_default=text("NOW()"), + ) + data: Any = Column(JSONB) + + def __init__(self, url: str, scraper_type: ScraperType, data: Any) -> None: + """Create a new ScraperResult.""" + self.url = url + self.scraper_type = scraper_type + self.data = data diff --git a/tildes/tildes/scrapers/__init__.py b/tildes/tildes/scrapers/__init__.py new file mode 100644 index 0000000..31ff25f --- /dev/null +++ b/tildes/tildes/scrapers/__init__.py @@ -0,0 +1,3 @@ +"""Contains scrapers.""" + +from .embedly_scraper import EmbedlyScraper diff --git a/tildes/tildes/scrapers/embedly_scraper.py b/tildes/tildes/scrapers/embedly_scraper.py new file mode 100644 index 0000000..c99985a --- /dev/null +++ b/tildes/tildes/scrapers/embedly_scraper.py @@ -0,0 +1,61 @@ +# Copyright (c) 2018 Tildes contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +"""Contains the EmbedlyScraper class.""" + +from typing import Any, Dict + +import requests + +from tildes.enums import ScraperType +from tildes.lib.string import extract_text_from_html, word_count +from tildes.models.scraper import ScraperResult + + +class EmbedlyScraper: + """Scraper that uses Embedly's "Extract" API.""" + + def __init__(self, api_key: str) -> None: + """Create a new scraper using the specified Embedly API key.""" + self.api_key = api_key + + def scrape_url(self, url: str) -> ScraperResult: + """Scrape a url and return the result.""" + params: Dict[str, Any] = {"key": self.api_key, "format": "json", "url": url} + + response = requests.get("https://api.embedly.com/1/extract", params=params) + response.raise_for_status() + + return ScraperResult(url, ScraperType.EMBEDLY, response.json()) + + @staticmethod + def get_metadata_from_result(result: ScraperResult) -> Dict[str, Any]: + """Get the metadata that we're interested in out of a scrape result.""" + # pylint: disable=too-many-branches + if result.scraper_type != ScraperType.EMBEDLY: + raise ValueError("Can't process a result from a different scraper.") + + metadata = {} + + if result.data.get("title"): + metadata["title"] = result.data["title"] + + if result.data.get("description"): + metadata["description"] = result.data["description"] + + content = result.data.get("content") + if content: + metadata["word_count"] = word_count(extract_text_from_html(content)) + + if result.data.get("published"): + # the field's value is in milliseconds, store it in seconds instead + metadata["published"] = result.data["published"] // 1000 + + authors = result.data.get("authors") + if authors: + try: + metadata["authors"] = [author["name"] for author in authors] + except KeyError: + pass + + return metadata