Browse Source

Add error-handling for event stream consumers

Previously, if an event stream consumer hit an error when processing a
message, it would crash and restart, and the message that caused the
error would be left in "pending" status for that consumer forever while
the consumer continued processing new messages.

This commit adds some more deliberate handling of messages that cause
errors:

* When a consumer starts, it will try to read pending messages first. In
  a case where an error was transient, this should mean that the message
  that previously caused a crash will be processed on retry.
* If a particular message causes the consumer to crash 3 times, it will
  be considered "dead" and moved out of the consumer's pending list into
  one specifically for dead messages. These dead queues can be monitored
  and inspected manually to look into failures, while the consumer can
  still continue processing new messages.
* After clearing or processing all pending messages, consumers go back
  to waiting for and processing new messages.
merge-requests/89/head
Deimos 5 years ago
parent
commit
3e37c1ec7c
  1. 92
      tildes/tildes/lib/event_stream.py

92
tildes/tildes/lib/event_stream.py

@ -13,6 +13,7 @@ from redis import Redis, ResponseError
from tildes.lib.database import get_session_from_config
REDIS_KEY_PREFIX = "event_stream:"
MAX_RETRIES_PER_MESSAGE = 3
class Message:
@ -45,7 +46,11 @@ class EventStreamConsumer:
"""
def __init__(
self, consumer_group: str, source_streams: Sequence[str], uses_db: bool = True,
self,
consumer_group: str,
source_streams: Sequence[str],
uses_db: bool = True,
skip_pending: bool = False,
):
"""Initialize a new consumer, creating consumer groups and streams if needed."""
ini_file_path = os.environ["INI_FILE"]
@ -78,21 +83,23 @@ class EventStreamConsumer:
else:
self.db_session = None
# start by reading any already-pending messages by default
self.is_reading_pending = not skip_pending
def consume_streams(self) -> None:
"""Process messages from the streams indefinitely."""
while True:
# Get any messages from the source streams that haven't already been
# delivered to a consumer in this group - will fetch a maximum of one
# message from each stream, and block indefinitely if none are available
response = self.redis.xreadgroup(
self.consumer_group,
self.name,
{stream: ">" for stream in self.source_streams},
count=1,
block=0,
)
if self.is_reading_pending:
# clear out any persistently-failing messages first
self._clear_dead_messages()
messages = self._xreadgroup_response_to_messages(response)
messages = self._get_messages(pending=True)
# when no pending messages are left, switch to waiting for new ones
if not messages:
self.is_reading_pending = False
else:
messages = self._get_messages(pending=False)
for message in messages:
self.process_message(message)
@ -103,6 +110,39 @@ class EventStreamConsumer:
message.ack(self.consumer_group)
def _clear_dead_messages(self) -> None:
"""Clear any pending messages that have failed too many times.
If a message seems to be failing consistently, this will use XCLAIM to transfer
ownership to a fake "consumer" named <consumer group name>-dead. Pending
messages for -dead consumers should be monitored and inspected manually to
determine why they were unable to be processed.
"""
# for each stream, use XPENDING to check for messages that have been delivered
# repeatedly (indicating that they're failing consistently)
for stream in self.source_streams:
response = self.redis.xpending_range(
stream,
self.consumer_group,
min="-",
max="+",
count=100, # there shouldn't ever be more than one, but won't hurt
consumername=self.name,
)
for entry in response:
# if it hasn't had many attempts yet, leave it pending to try again
if entry["times_delivered"] < MAX_RETRIES_PER_MESSAGE:
continue
self.redis.xclaim(
stream,
self.consumer_group,
f"{self.consumer_group}-dead",
min_idle_time=0, # shouldn't have to worry about race condition
message_ids=[entry["message_id"]],
)
def _xreadgroup_response_to_messages(self, response: Any) -> List[Message]:
"""Convert a response from XREADGROUP to a list of Messages."""
messages = []
@ -126,6 +166,34 @@ class EventStreamConsumer:
return messages
def _get_messages(self, pending: bool = False) -> List[Message]:
"""Get any messages from the streams for this consumer.
This method will return at most one message from each of the source streams per
call.
If pending is True, the messages will be ones previously delivered to this
consumer but not acked.
If pending is False, messages will be ones that haven't been delivered to any
consumer in this group, and this method will block indefinitely until there are
messages available.
"""
if pending:
message_id = "0"
else:
message_id = ">"
response = self.redis.xreadgroup(
self.consumer_group,
self.name,
{stream: message_id for stream in self.source_streams},
count=1,
block=0,
)
return self._xreadgroup_response_to_messages(response)
@abstractmethod
def process_message(self, message: Message) -> None:
"""Process a message from the stream (subclasses must implement)."""

Loading…
Cancel
Save