From 3e37c1ec7c51f66b9aec5fdbc3d286e697de3dd4 Mon Sep 17 00:00:00 2001 From: Deimos Date: Fri, 24 Jan 2020 17:21:42 -0700 Subject: [PATCH] Add error-handling for event stream consumers Previously, if an event stream consumer hit an error when processing a message, it would crash and restart, and the message that caused the error would be left in "pending" status for that consumer forever while the consumer continued processing new messages. This commit adds some more deliberate handling of messages that cause errors: * When a consumer starts, it will try to read pending messages first. In a case where an error was transient, this should mean that the message that previously caused a crash will be processed on retry. * If a particular message causes the consumer to crash 3 times, it will be considered "dead" and moved out of the consumer's pending list into one specifically for dead messages. These dead queues can be monitored and inspected manually to look into failures, while the consumer can still continue processing new messages. * After clearing or processing all pending messages, consumers go back to waiting for and processing new messages. --- tildes/tildes/lib/event_stream.py | 92 +++++++++++++++++++++++++++---- 1 file changed, 80 insertions(+), 12 deletions(-) diff --git a/tildes/tildes/lib/event_stream.py b/tildes/tildes/lib/event_stream.py index 8375841..cf99607 100644 --- a/tildes/tildes/lib/event_stream.py +++ b/tildes/tildes/lib/event_stream.py @@ -13,6 +13,7 @@ from redis import Redis, ResponseError from tildes.lib.database import get_session_from_config REDIS_KEY_PREFIX = "event_stream:" +MAX_RETRIES_PER_MESSAGE = 3 class Message: @@ -45,7 +46,11 @@ class EventStreamConsumer: """ def __init__( - self, consumer_group: str, source_streams: Sequence[str], uses_db: bool = True, + self, + consumer_group: str, + source_streams: Sequence[str], + uses_db: bool = True, + skip_pending: bool = False, ): """Initialize a new consumer, creating consumer groups and streams if needed.""" ini_file_path = os.environ["INI_FILE"] @@ -78,21 +83,23 @@ class EventStreamConsumer: else: self.db_session = None + # start by reading any already-pending messages by default + self.is_reading_pending = not skip_pending + def consume_streams(self) -> None: """Process messages from the streams indefinitely.""" while True: - # Get any messages from the source streams that haven't already been - # delivered to a consumer in this group - will fetch a maximum of one - # message from each stream, and block indefinitely if none are available - response = self.redis.xreadgroup( - self.consumer_group, - self.name, - {stream: ">" for stream in self.source_streams}, - count=1, - block=0, - ) + if self.is_reading_pending: + # clear out any persistently-failing messages first + self._clear_dead_messages() - messages = self._xreadgroup_response_to_messages(response) + messages = self._get_messages(pending=True) + + # when no pending messages are left, switch to waiting for new ones + if not messages: + self.is_reading_pending = False + else: + messages = self._get_messages(pending=False) for message in messages: self.process_message(message) @@ -103,6 +110,39 @@ class EventStreamConsumer: message.ack(self.consumer_group) + def _clear_dead_messages(self) -> None: + """Clear any pending messages that have failed too many times. + + If a message seems to be failing consistently, this will use XCLAIM to transfer + ownership to a fake "consumer" named -dead. Pending + messages for -dead consumers should be monitored and inspected manually to + determine why they were unable to be processed. + """ + # for each stream, use XPENDING to check for messages that have been delivered + # repeatedly (indicating that they're failing consistently) + for stream in self.source_streams: + response = self.redis.xpending_range( + stream, + self.consumer_group, + min="-", + max="+", + count=100, # there shouldn't ever be more than one, but won't hurt + consumername=self.name, + ) + + for entry in response: + # if it hasn't had many attempts yet, leave it pending to try again + if entry["times_delivered"] < MAX_RETRIES_PER_MESSAGE: + continue + + self.redis.xclaim( + stream, + self.consumer_group, + f"{self.consumer_group}-dead", + min_idle_time=0, # shouldn't have to worry about race condition + message_ids=[entry["message_id"]], + ) + def _xreadgroup_response_to_messages(self, response: Any) -> List[Message]: """Convert a response from XREADGROUP to a list of Messages.""" messages = [] @@ -126,6 +166,34 @@ class EventStreamConsumer: return messages + def _get_messages(self, pending: bool = False) -> List[Message]: + """Get any messages from the streams for this consumer. + + This method will return at most one message from each of the source streams per + call. + + If pending is True, the messages will be ones previously delivered to this + consumer but not acked. + + If pending is False, messages will be ones that haven't been delivered to any + consumer in this group, and this method will block indefinitely until there are + messages available. + """ + if pending: + message_id = "0" + else: + message_id = ">" + + response = self.redis.xreadgroup( + self.consumer_group, + self.name, + {stream: message_id for stream in self.source_streams}, + count=1, + block=0, + ) + + return self._xreadgroup_response_to_messages(response) + @abstractmethod def process_message(self, message: Message) -> None: """Process a message from the stream (subclasses must implement)."""