mirror of https://gitlab.com/tildes/tildes.git
Browse Source
Add the Embedly Extract scraper and consumer
Add the Embedly Extract scraper and consumer
This adds a consumer (in prod only) that uses Embedly's Extract API to scrape the links from all new link topics and stores some of the data in the topic's content_metadata column.merge-requests/37/head
Deimos
6 years ago
11 changed files with 293 additions and 0 deletions
-
14salt/salt/consumers/init.sls
-
16salt/salt/consumers/topic_embedly_extractor.service.jinja2
-
1tildes/alembic/env.py
-
52tildes/alembic/versions/09cfb27cc90e_add_scraper_results_table.py
-
99tildes/consumers/topic_embedly_extractor.py
-
1tildes/production.ini.example
-
6tildes/tildes/enums.py
-
3tildes/tildes/models/scraper/__init__.py
-
37tildes/tildes/models/scraper/scraper_result.py
-
3tildes/tildes/scrapers/__init__.py
-
61tildes/tildes/scrapers/embedly_scraper.py
@ -0,0 +1,16 @@ |
|||
{% from 'common.jinja2' import app_dir, bin_dir -%} |
|||
[Unit] |
|||
Description=Topic Embedly Extractor (Queue Consumer) |
|||
Requires=rabbitmq-server.service |
|||
After=rabbitmq-server.service |
|||
PartOf=rabbitmq-server.service |
|||
|
|||
[Service] |
|||
WorkingDirectory={{ app_dir }}/consumers |
|||
Environment="INI_FILE={{ app_dir }}/{{ pillar['ini_file'] }}" |
|||
ExecStart={{ bin_dir }}/python topic_embedly_extractor.py |
|||
Restart=always |
|||
RestartSec=5 |
|||
|
|||
[Install] |
|||
WantedBy=multi-user.target |
@ -0,0 +1,52 @@ |
|||
"""Add scraper_results table |
|||
|
|||
Revision ID: 09cfb27cc90e |
|||
Revises: 04fd898de0db |
|||
Create Date: 2018-09-09 21:22:32.769786 |
|||
|
|||
""" |
|||
from alembic import op |
|||
import sqlalchemy as sa |
|||
from sqlalchemy.dialects import postgresql |
|||
|
|||
# revision identifiers, used by Alembic. |
|||
revision = "09cfb27cc90e" |
|||
down_revision = "04fd898de0db" |
|||
branch_labels = None |
|||
depends_on = None |
|||
|
|||
|
|||
def upgrade(): |
|||
op.create_table( |
|||
"scraper_results", |
|||
sa.Column("result_id", sa.Integer(), nullable=False), |
|||
sa.Column("url", sa.Text(), nullable=False), |
|||
sa.Column( |
|||
"scraper_type", |
|||
postgresql.ENUM("EMBEDLY", name="scrapertype"), |
|||
nullable=False, |
|||
), |
|||
sa.Column( |
|||
"scrape_time", |
|||
sa.TIMESTAMP(timezone=True), |
|||
server_default=sa.text("NOW()"), |
|||
nullable=False, |
|||
), |
|||
sa.Column("data", postgresql.JSONB(astext_type=sa.Text()), nullable=True), |
|||
sa.PrimaryKeyConstraint("result_id", name=op.f("pk_scraper_results")), |
|||
) |
|||
op.create_index( |
|||
op.f("ix_scraper_results_scrape_time"), |
|||
"scraper_results", |
|||
["scrape_time"], |
|||
unique=False, |
|||
) |
|||
op.create_index( |
|||
op.f("ix_scraper_results_url"), "scraper_results", ["url"], unique=False |
|||
) |
|||
|
|||
|
|||
def downgrade(): |
|||
op.drop_index(op.f("ix_scraper_results_url"), table_name="scraper_results") |
|||
op.drop_index(op.f("ix_scraper_results_scrape_time"), table_name="scraper_results") |
|||
op.drop_table("scraper_results") |
@ -0,0 +1,99 @@ |
|||
# Copyright (c) 2018 Tildes contributors <code@tildes.net> |
|||
# SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
"""Consumer that fetches data from Embedly's Extract API for link topics.""" |
|||
|
|||
from datetime import timedelta |
|||
import os |
|||
from typing import Sequence |
|||
|
|||
from amqpy import Message |
|||
from pyramid.paster import bootstrap |
|||
from requests.exceptions import HTTPError |
|||
from sqlalchemy import cast, desc, func |
|||
from sqlalchemy.dialects.postgresql import JSONB |
|||
|
|||
from tildes.enums import ScraperType |
|||
from tildes.lib.amqp import PgsqlQueueConsumer |
|||
from tildes.lib.datetime import utc_now |
|||
from tildes.models.scraper import ScraperResult |
|||
from tildes.models.topic import Topic |
|||
from tildes.scrapers import EmbedlyScraper |
|||
|
|||
|
|||
# don't rescrape the same url inside this time period |
|||
RESCRAPE_DELAY = timedelta(hours=24) |
|||
|
|||
|
|||
class TopicEmbedlyExtractor(PgsqlQueueConsumer): |
|||
"""Consumer that fetches data from Embedly's Extract API for link topics.""" |
|||
|
|||
def __init__( |
|||
self, api_key: str, queue_name: str, routing_keys: Sequence[str] |
|||
) -> None: |
|||
"""Initialize the consumer, including creating a scraper instance.""" |
|||
super().__init__(queue_name, routing_keys) |
|||
|
|||
self.scraper = EmbedlyScraper(api_key) |
|||
|
|||
def run(self, msg: Message) -> None: |
|||
"""Process a delivered message.""" |
|||
topic = ( |
|||
self.db_session.query(Topic).filter_by(topic_id=msg.body["topic_id"]).one() |
|||
) |
|||
|
|||
if not topic.is_link_type: |
|||
return |
|||
|
|||
# see if we already have a recent scrape result from the same url |
|||
result = ( |
|||
self.db_session.query(ScraperResult) |
|||
.filter( |
|||
ScraperResult.url == topic.link, |
|||
ScraperResult.scraper_type == ScraperType.EMBEDLY, |
|||
ScraperResult.scrape_time > utc_now() - RESCRAPE_DELAY, |
|||
) |
|||
.order_by(desc(ScraperResult.scrape_time)) |
|||
.first() |
|||
) |
|||
|
|||
# if not, scrape the url and store the result |
|||
if not result: |
|||
try: |
|||
result = self.scraper.scrape_url(topic.link) |
|||
except HTTPError: |
|||
return |
|||
|
|||
self.db_session.add(result) |
|||
|
|||
new_metadata = EmbedlyScraper.get_metadata_from_result(result) |
|||
|
|||
if new_metadata: |
|||
# update the topic's content_metadata in a way that won't wipe out any |
|||
# existing values, and can handle the column being null |
|||
( |
|||
self.db_session.query(Topic) |
|||
.filter(Topic.topic_id == topic.topic_id) |
|||
.update( |
|||
{ |
|||
"content_metadata": func.coalesce( |
|||
Topic.content_metadata, cast({}, JSONB) |
|||
).op("||")(new_metadata) |
|||
}, |
|||
synchronize_session=False, |
|||
) |
|||
) |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
# pylint: disable=invalid-name |
|||
env = bootstrap(os.environ["INI_FILE"]) |
|||
embedly_api_key = env["registry"].settings.get("api_keys.embedly") |
|||
if not embedly_api_key: |
|||
raise RuntimeError("No embedly API key available in INI file") |
|||
|
|||
TopicEmbedlyExtractor( |
|||
embedly_api_key, |
|||
queue_name="topic_embedly_extractor.q", |
|||
routing_keys=["topic.created"], |
|||
).consume_queue() |
@ -0,0 +1,3 @@ |
|||
"""Contains models related to scrapers.""" |
|||
|
|||
from .scraper_result import ScraperResult |
@ -0,0 +1,37 @@ |
|||
# Copyright (c) 2018 Tildes contributors <code@tildes.net> |
|||
# SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
"""Contains the ScraperResult class.""" |
|||
|
|||
from datetime import datetime |
|||
from typing import Any |
|||
|
|||
from sqlalchemy import Column, Integer, Text, TIMESTAMP |
|||
from sqlalchemy.dialects.postgresql import ENUM, JSONB |
|||
from sqlalchemy.sql.expression import text |
|||
|
|||
from tildes.enums import ScraperType |
|||
from tildes.models import DatabaseModel |
|||
|
|||
|
|||
class ScraperResult(DatabaseModel): |
|||
"""Model for the result from a scraper processing a url.""" |
|||
|
|||
__tablename__ = "scraper_results" |
|||
|
|||
result_id: int = Column(Integer, primary_key=True) |
|||
url: str = Column(Text, nullable=False, index=True) |
|||
scraper_type: ScraperType = Column(ENUM(ScraperType), nullable=False) |
|||
scrape_time: datetime = Column( |
|||
TIMESTAMP(timezone=True), |
|||
nullable=False, |
|||
index=True, |
|||
server_default=text("NOW()"), |
|||
) |
|||
data: Any = Column(JSONB) |
|||
|
|||
def __init__(self, url: str, scraper_type: ScraperType, data: Any) -> None: |
|||
"""Create a new ScraperResult.""" |
|||
self.url = url |
|||
self.scraper_type = scraper_type |
|||
self.data = data |
@ -0,0 +1,3 @@ |
|||
"""Contains scrapers.""" |
|||
|
|||
from .embedly_scraper import EmbedlyScraper |
@ -0,0 +1,61 @@ |
|||
# Copyright (c) 2018 Tildes contributors <code@tildes.net> |
|||
# SPDX-License-Identifier: AGPL-3.0-or-later |
|||
|
|||
"""Contains the EmbedlyScraper class.""" |
|||
|
|||
from typing import Any, Dict |
|||
|
|||
import requests |
|||
|
|||
from tildes.enums import ScraperType |
|||
from tildes.lib.string import extract_text_from_html, word_count |
|||
from tildes.models.scraper import ScraperResult |
|||
|
|||
|
|||
class EmbedlyScraper: |
|||
"""Scraper that uses Embedly's "Extract" API.""" |
|||
|
|||
def __init__(self, api_key: str) -> None: |
|||
"""Create a new scraper using the specified Embedly API key.""" |
|||
self.api_key = api_key |
|||
|
|||
def scrape_url(self, url: str) -> ScraperResult: |
|||
"""Scrape a url and return the result.""" |
|||
params: Dict[str, Any] = {"key": self.api_key, "format": "json", "url": url} |
|||
|
|||
response = requests.get("https://api.embedly.com/1/extract", params=params) |
|||
response.raise_for_status() |
|||
|
|||
return ScraperResult(url, ScraperType.EMBEDLY, response.json()) |
|||
|
|||
@staticmethod |
|||
def get_metadata_from_result(result: ScraperResult) -> Dict[str, Any]: |
|||
"""Get the metadata that we're interested in out of a scrape result.""" |
|||
# pylint: disable=too-many-branches |
|||
if result.scraper_type != ScraperType.EMBEDLY: |
|||
raise ValueError("Can't process a result from a different scraper.") |
|||
|
|||
metadata = {} |
|||
|
|||
if result.data.get("title"): |
|||
metadata["title"] = result.data["title"] |
|||
|
|||
if result.data.get("description"): |
|||
metadata["description"] = result.data["description"] |
|||
|
|||
content = result.data.get("content") |
|||
if content: |
|||
metadata["word_count"] = word_count(extract_text_from_html(content)) |
|||
|
|||
if result.data.get("published"): |
|||
# the field's value is in milliseconds, store it in seconds instead |
|||
metadata["published"] = result.data["published"] // 1000 |
|||
|
|||
authors = result.data.get("authors") |
|||
if authors: |
|||
try: |
|||
metadata["authors"] = [author["name"] for author in authors] |
|||
except KeyError: |
|||
pass |
|||
|
|||
return metadata |
Write
Preview
Loading…
Cancel
Save
Reference in new issue