Browse Source

Add simple metrics to event stream consumer jobs

This adds some very simple metrics to all of the background jobs that
consume the event streams. Currently, the only "real" metric is a
counter tracking how many messages have been processed by that consumer,
but a lot of the value will come from being able to utilize the
automatic "up" metric provided by Prometheus to monitor and make sure
that all of the jobs are running.

I decided to use ports starting from 25010 for these jobs - this is
completely arbitrary, it's just a fairly large range of unassigned
ports, so shouldn't conflict with anything.

I'm not a fan of how much hard-coding is involved here for the different
ports and jobs in the Prometheus config, but it's also not a big deal.
merge-requests/110/head
Deimos 5 years ago
parent
commit
b011be34ef
  1. 27
      salt/salt/prometheus/prometheus.yml.jinja2
  2. 2
      tildes/consumers/comment_user_mentions_generator.py
  3. 2
      tildes/consumers/site_icon_downloader.py
  4. 2
      tildes/consumers/topic_embedly_extractor.py
  5. 2
      tildes/consumers/topic_interesting_activity_updater.py
  6. 2
      tildes/consumers/topic_metadata_generator.py
  7. 2
      tildes/consumers/topic_youtube_scraper.py
  8. 11
      tildes/tests/test_string.py
  9. 38
      tildes/tildes/lib/event_stream.py
  10. 13
      tildes/tildes/lib/string.py

27
salt/salt/prometheus/prometheus.yml.jinja2

@ -55,3 +55,30 @@ scrape_configs:
target_label: instance
- target_label: __address__
replacement: 127.0.0.1:9115 # The blackbox exporter's real hostname:port
# event stream consumers (background jobs)
- job_name: "consumer_comment_user_mentions_generator"
static_configs:
- targets: ['{{ pillar['site_hostname'] }}:25010']
- job_name: "consumer_topic_interesting_activity_updater"
static_configs:
- targets: ['{{ pillar['site_hostname'] }}:25013']
- job_name: "consumer_topic_metadata_generator"
static_configs:
- targets: ['{{ pillar['site_hostname'] }}:25014']
{% if grains["id"] == "prod" %}
- job_name: "consumer_site_icon_downloader"
static_configs:
- targets: ['{{ pillar['site_hostname'] }}:25011']
- job_name: "consumer_topic_embedly_extractor"
static_configs:
- targets: ['{{ pillar['site_hostname'] }}:25012']
- job_name: "consumer_topic_youtube_scraper"
static_configs:
- targets: ['{{ pillar['site_hostname'] }}:25015']
{% endif %}

2
tildes/consumers/comment_user_mentions_generator.py

@ -10,6 +10,8 @@ from tildes.models.comment import Comment, CommentNotification
class CommentUserMentionGenerator(EventStreamConsumer):
"""Consumer that generates user mentions for comments."""
METRICS_PORT = 25010
def process_message(self, message: Message) -> None:
"""Process a message from the stream."""
comment = (

2
tildes/consumers/site_icon_downloader.py

@ -20,6 +20,8 @@ from tildes.models.scraper import ScraperResult
class SiteIconDownloader(EventStreamConsumer):
"""Consumer that generates content_metadata for topics."""
METRICS_PORT = 25011
ICON_FOLDER = "/opt/tildes/static/images/site-icons"
def __init__(self, consumer_group: str, source_streams: Sequence[str]):

2
tildes/consumers/topic_embedly_extractor.py

@ -27,6 +27,8 @@ RESCRAPE_DELAY = timedelta(hours=24)
class TopicEmbedlyExtractor(EventStreamConsumer):
"""Consumer that fetches data from Embedly's Extract API for link topics."""
METRICS_PORT = 25012
def __init__(
self, api_key: str, consumer_group: str, source_streams: Sequence[str]
):

2
tildes/consumers/topic_interesting_activity_updater.py

@ -14,6 +14,8 @@ from tildes.models.comment import Comment, CommentInTree, CommentTree
class TopicInterestingActivityUpdater(EventStreamConsumer):
"""Consumer that updates topics' last_interesting_activity_time."""
METRICS_PORT = 25013
def process_message(self, message: Message) -> None:
"""Process a message from the stream."""
trigger_comment = (

2
tildes/consumers/topic_metadata_generator.py

@ -19,6 +19,8 @@ from tildes.models.topic import Topic
class TopicMetadataGenerator(EventStreamConsumer):
"""Consumer that generates content_metadata for topics."""
METRICS_PORT = 25014
def __init__(self, consumer_group: str, source_streams: Sequence[str]):
"""Initialize the consumer, including the public suffix list."""
super().__init__(consumer_group, source_streams)

2
tildes/consumers/topic_youtube_scraper.py

@ -27,6 +27,8 @@ RESCRAPE_DELAY = timedelta(hours=24)
class TopicYoutubeScraper(EventStreamConsumer):
"""Consumer that fetches data from YouTube's data API for relevant link topics."""
METRICS_PORT = 25015
def __init__(
self, api_key: str, consumer_group: str, source_streams: Sequence[str]
):

11
tildes/tests/test_string.py

@ -2,6 +2,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
from tildes.lib.string import (
camelcase_to_snakecase,
convert_to_url_slug,
truncate_string,
truncate_string_at_char,
@ -141,3 +142,13 @@ def test_word_count_with_lots_of_punctuation():
"best not to count 100% on it; that's just foolish/risky."
)
assert word_count(string) == 31
def test_basic_camelcase_to_snakecase():
"""Ensure CamelCase->snake_case conversion works for a simple case."""
assert camelcase_to_snakecase("SomeClassName") == "some_class_name"
def test_camelcase_to_snakecase_with_acronym():
"""Ensure CamelCase->snake_case works as expected with an acronym."""
assert camelcase_to_snakecase("SomeHTTPThing") == "some_http_thing"

38
tildes/tildes/lib/event_stream.py

@ -8,9 +8,11 @@ from abc import abstractmethod
from configparser import ConfigParser
from typing import Any, Dict, List, Sequence
from prometheus_client import CollectorRegistry, Counter, start_http_server
from redis import Redis, ResponseError
from tildes.lib.database import get_session_from_config
from tildes.lib.string import camelcase_to_snakecase
REDIS_KEY_PREFIX = "event_stream:"
MAX_RETRIES_PER_MESSAGE = 3
@ -45,6 +47,8 @@ class EventStreamConsumer:
necessary. It relies on the environment variable INI_FILE being set.
"""
METRICS_PORT = None
def __init__(
self,
consumer_group: str,
@ -86,6 +90,36 @@ class EventStreamConsumer:
# start by reading any already-pending messages by default
self.is_reading_pending = not skip_pending
self._init_metrics()
@property
def _metrics_prefix(self) -> str:
"""Prefix string at the start of every metric name for this consumer."""
snakecase_name = camelcase_to_snakecase(self.__class__.__name__)
return f"tildes_consumer_{snakecase_name}"
def _init_metrics(self) -> None:
"""Initialize this consumer's metrics, registry, and launch HTTP server.
Requires class property METRICS_PORT to be set, otherwise it just sets
self.metrics to None (and will crash if the port is already in use).
"""
if not self.METRICS_PORT:
self.metrics = None
return
self.metrics_registry = CollectorRegistry()
self.metrics = {
"messages_counter": Counter(
f"{self._metrics_prefix}_messages_processed",
"Consumer Messages Processed",
registry=self.metrics_registry,
),
}
start_http_server(self.METRICS_PORT, registry=self.metrics_registry)
def consume_streams(self) -> None:
"""Process messages from the streams indefinitely."""
while True:
@ -110,6 +144,10 @@ class EventStreamConsumer:
message.ack(self.consumer_group)
if self.metrics:
counter = self.metrics["messages_counter"]
counter.inc()
def _clear_dead_messages(self) -> None:
"""Clear any pending messages that have failed too many times.

13
tildes/tildes/lib/string.py

@ -247,3 +247,16 @@ def extract_text_from_html(html: str, skip_tags: Optional[List[str]] = None) ->
# sanitize unicode, remove leading/trailing whitespace, etc.
return simplify_string(extracted_text)
def camelcase_to_snakecase(original: str) -> str:
"""Convert words in a string from CamelCase to snake_case.
Code adapted from the "inflection" library's underscore() method:
https://github.com/jpvanhal/inflection/blob/master/inflection.py
"""
converted = re.sub(r"([A-Z]+)([A-Z][a-z])", r"\1_\2", original)
converted = re.sub(r"([a-z\d])([A-Z])", r"\1_\2", converted)
converted = converted.replace("-", "_")
return converted.lower()
Loading…
Cancel
Save