diff --git a/salt/salt/consumers/init.sls b/salt/salt/consumers/init.sls index 01b33a1..d1b4bd2 100644 --- a/salt/salt/consumers/init.sls +++ b/salt/salt/consumers/init.sls @@ -35,6 +35,18 @@ consumer-topic_embedly_extractor.service: service.running: - enable: True +/etc/systemd/system/consumer-topic_youtube_scraper.service: + file.managed: + - source: salt://consumers/topic_youtube_scraper.service.jinja2 + - template: jinja + - user: root + - group: root + - mode: 644 + +consumer-topic_youtube_scraper.service: + service.running: + - enable: True + /etc/systemd/system/consumer-site_icon_downloader.service: file.managed: - source: salt://consumers/site_icon_downloader.service.jinja2 diff --git a/salt/salt/consumers/topic_youtube_scraper.service.jinja2 b/salt/salt/consumers/topic_youtube_scraper.service.jinja2 new file mode 100644 index 0000000..5768a36 --- /dev/null +++ b/salt/salt/consumers/topic_youtube_scraper.service.jinja2 @@ -0,0 +1,16 @@ +{% from 'common.jinja2' import app_dir, bin_dir -%} +[Unit] +Description=Topic Youtube Scraper (Queue Consumer) +Requires=rabbitmq-server.service +After=rabbitmq-server.service +PartOf=rabbitmq-server.service + +[Service] +WorkingDirectory={{ app_dir }}/consumers +Environment="INI_FILE={{ app_dir }}/{{ pillar['ini_file'] }}" +ExecStart={{ bin_dir }}/python topic_youtube_scraper.py +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target diff --git a/tildes/alembic/versions/61f43e57679a_add_youtube_scraper_result.py b/tildes/alembic/versions/61f43e57679a_add_youtube_scraper_result.py new file mode 100644 index 0000000..8448021 --- /dev/null +++ b/tildes/alembic/versions/61f43e57679a_add_youtube_scraper_result.py @@ -0,0 +1,35 @@ +"""Add youtube scraper result + +Revision ID: 61f43e57679a +Revises: a0e0b6206146 +Create Date: 2019-01-26 20:02:27.642583 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "61f43e57679a" +down_revision = "a0e0b6206146" +branch_labels = None +depends_on = None + + +def upgrade(): + # ALTER TYPE doesn't work from inside a transaction, disable it + connection = None + if not op.get_context().as_sql: + connection = op.get_bind() + connection.execution_options(isolation_level="AUTOCOMMIT") + + op.execute("ALTER TYPE scrapertype ADD VALUE IF NOT EXISTS 'YOUTUBE'") + + # re-activate the transaction for any future migrations + if connection is not None: + connection.execution_options(isolation_level="READ_COMMITTED") + + +def downgrade(): + # can't remove from enums, do nothing + pass diff --git a/tildes/consumers/topic_youtube_scraper.py b/tildes/consumers/topic_youtube_scraper.py new file mode 100644 index 0000000..d140eb6 --- /dev/null +++ b/tildes/consumers/topic_youtube_scraper.py @@ -0,0 +1,100 @@ +# Copyright (c) 2019 Tildes contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +"""Consumer that fetches data from YouTube's data API for relevant link topics.""" + +from datetime import timedelta +import os +from typing import Sequence + +from amqpy import Message +from pyramid.paster import get_appsettings +from requests.exceptions import HTTPError, Timeout +from sqlalchemy import cast, desc, func +from sqlalchemy.dialects.postgresql import JSONB + +from tildes.enums import ScraperType +from tildes.lib.amqp import PgsqlQueueConsumer +from tildes.lib.datetime import utc_now +from tildes.models.scraper import ScraperResult +from tildes.models.topic import Topic +from tildes.scrapers import YoutubeScraper + + +# don't rescrape the same url inside this time period +RESCRAPE_DELAY = timedelta(hours=24) + + +class TopicYoutubeScraper(PgsqlQueueConsumer): + """Consumer that fetches data from YouTube's data API for relevant link topics.""" + + def __init__(self, api_key: str, queue_name: str, routing_keys: Sequence[str]): + """Initialize the consumer, including creating a scraper instance.""" + super().__init__(queue_name, routing_keys) + + self.scraper = YoutubeScraper(api_key) + + def run(self, msg: Message) -> None: + """Process a delivered message.""" + topic = ( + self.db_session.query(Topic).filter_by(topic_id=msg.body["topic_id"]).one() + ) + + if not topic.is_link_type: + return + + if not self.scraper.is_applicable(topic.link): + return + + # see if we already have a recent scrape result from the same url + result = ( + self.db_session.query(ScraperResult) + .filter( + ScraperResult.url == topic.link, + ScraperResult.scraper_type == ScraperType.YOUTUBE, + ScraperResult.scrape_time > utc_now() - RESCRAPE_DELAY, + ) + .order_by(desc(ScraperResult.scrape_time)) + .first() + ) + + # if not, scrape the url and store the result + if not result: + try: + result = self.scraper.scrape_url(topic.link) + except (HTTPError, Timeout): + return + + self.db_session.add(result) + + new_metadata = YoutubeScraper.get_metadata_from_result(result) + + if new_metadata: + # update the topic's content_metadata in a way that won't wipe out any + # existing values, and can handle the column being null + ( + self.db_session.query(Topic) + .filter(Topic.topic_id == topic.topic_id) + .update( + { + "content_metadata": func.coalesce( + Topic.content_metadata, cast({}, JSONB) + ).op("||")(new_metadata) + }, + synchronize_session=False, + ) + ) + + +if __name__ == "__main__": + # pylint: disable=invalid-name + settings = get_appsettings(os.environ["INI_FILE"]) + youtube_api_key = settings.get("api_keys.youtube") + if not youtube_api_key: + raise RuntimeError("No YouTube API key available in INI file") + + TopicYoutubeScraper( + youtube_api_key, + queue_name="topic_youtube_scraper.q", + routing_keys=["topic.created"], + ).consume_queue() diff --git a/tildes/production.ini.example b/tildes/production.ini.example index e805a9d..78d2e7a 100644 --- a/tildes/production.ini.example +++ b/tildes/production.ini.example @@ -35,6 +35,7 @@ webassets.manifest = json # API keys for external APIs api_keys.embedly = embedlykeygoeshere api_keys.stripe = sk_live_ActualKeyShouldGoHere +api_keys.youtube = youtubekeygoeshere [server:main] use = egg:gunicorn#main diff --git a/tildes/tildes/enums.py b/tildes/tildes/enums.py index 6c8ec1e..d5c693c 100644 --- a/tildes/tildes/enums.py +++ b/tildes/tildes/enums.py @@ -88,6 +88,7 @@ class ScraperType(enum.Enum): """Enum for the types of scrapers available.""" EMBEDLY = enum.auto() + YOUTUBE = enum.auto() class TopicSortOption(enum.Enum): diff --git a/tildes/tildes/scrapers/__init__.py b/tildes/tildes/scrapers/__init__.py index 31ff25f..f1f4d07 100644 --- a/tildes/tildes/scrapers/__init__.py +++ b/tildes/tildes/scrapers/__init__.py @@ -1,3 +1,4 @@ """Contains scrapers.""" from .embedly_scraper import EmbedlyScraper +from .youtube_scraper import YoutubeScraper diff --git a/tildes/tildes/scrapers/youtube_scraper.py b/tildes/tildes/scrapers/youtube_scraper.py new file mode 100644 index 0000000..c36adab --- /dev/null +++ b/tildes/tildes/scrapers/youtube_scraper.py @@ -0,0 +1,123 @@ +# Copyright (c) 2019 Tildes contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +"""Contains the YoutubeScraper class.""" + +from datetime import timedelta +import re +from typing import Any, Dict +from urllib.parse import parse_qs, urlparse + +from dateutil import parser +import requests + +from tildes.enums import ScraperType +from tildes.models.scraper import ScraperResult + + +# Only parses the subset of ISO8601 durations that YouTube uses +# fmt: off +YOUTUBE_DURATION_REGEX = re.compile( + "P" + r"(?:(?P\d+)D)?" + "T" + r"(?:(?P\d+)H)?" + r"(?:(?P\d+)M)?" + r"(?:(?P\d+)S)?" +) +# fmt: on + + +class YoutubeScraper: + """Scraper that uses the YouTube Data API.""" + + def __init__(self, api_key: str): + """Create a new scraper using the specified YouTube API key.""" + self.api_key = api_key + + def is_applicable(self, url: str) -> bool: + """Return whether this scraper is applicable to a particular url.""" + parsed_url = urlparse(url) + + if parsed_url.hostname not in ("www.youtube.com", "youtube.com"): + return False + + if parsed_url.path != "/watch": + return False + + return True + + def scrape_url(self, url: str) -> ScraperResult: + """Scrape a url and return the result.""" + parsed_url = urlparse(url) + query_params = parse_qs(parsed_url.query) + video_id = query_params["v"] + + if not video_id: + raise ValueError("Invalid url, no video ID found.") + + params: Dict[str, Any] = { + "key": self.api_key, + "id": video_id, + "part": "snippet,contentDetails,statistics", + } + + response = requests.get( + "https://www.googleapis.com/youtube/v3/videos", params=params, timeout=5 + ) + response.raise_for_status() + + return ScraperResult(url, ScraperType.YOUTUBE, response.json()["items"][0]) + + @staticmethod + def get_metadata_from_result(result: ScraperResult) -> Dict[str, Any]: + """Get the metadata that we're interested in out of a scrape result.""" + if result.scraper_type != ScraperType.YOUTUBE: + raise ValueError("Can't process a result from a different scraper.") + + metadata = {} + + snippet = result.data.get("snippet") + + if snippet.get("title"): + metadata["title"] = snippet["title"] + + if snippet.get("description"): + metadata["description"] = snippet["description"] + + if snippet.get("publishedAt"): + published = parser.parse(snippet["publishedAt"], ignoretz=True) + metadata["published"] = int(published.timestamp()) + + if snippet.get("channelTitle"): + metadata["authors"] = [snippet["channelTitle"]] + + content_details = result.data.get("contentDetails") + + if content_details.get("duration"): + match = YOUTUBE_DURATION_REGEX.match(content_details["duration"]) + if not match: + raise ValueError("Unable to parse duration") + + duration_components = {} + + # convert None to zero and all strings to integers + for key, value in match.groupdict().items(): + if value is None: + duration_components[key] = 0 + else: + duration_components[key] = int(value) + + delta = timedelta( + days=duration_components["days"], + hours=duration_components["hours"], + minutes=duration_components["minutes"], + seconds=duration_components["seconds"], + ) + + # string version of timedelta always has hours, strip it off when it's zero + duration = str(delta).lstrip("0:") + + metadata["duration"] = duration + + return metadata