diff --git a/chatmaild/src/chatmaild/notifier.py b/chatmaild/src/chatmaild/notifier.py index b23cbd3..2356f2f 100644 --- a/chatmaild/src/chatmaild/notifier.py +++ b/chatmaild/src/chatmaild/notifier.py @@ -14,7 +14,8 @@ If a token fails to cause a successful notification it is moved to a retry-number specific PriorityQueue which handles all tokens that failed a particular number of times and which are scheduled for retry using exponential back-off timing. -If a token exceeds MAX_NUMBER_OF_TRIES it is dropped with a log warning. +If a token notification would be scheduled more than DROP_DEADLINE seconds +after its first attempt, it is dropped with a log error. Note that tokens are completely opaque to the notification machinery here and will in the future be encrypted foreclosing all ability to distinguish @@ -25,6 +26,7 @@ the `notification.delta.chat` service. """ import time +import math import logging from uuid import uuid4 from threading import Thread @@ -38,38 +40,45 @@ import requests class PersistentQueueItem: path: Path addr: str + start_ts: float token: str def delete(self): self.path.unlink(missing_ok=True) @classmethod - def create(cls, queue_dir, addr, token): + def create(cls, queue_dir, addr, start_ts, token): queue_id = uuid4().hex path = queue_dir.joinpath(queue_id) - path.write_text(f"{addr}\n{token}") - return cls(path, addr, token) + path.write_text(f"{addr}\n{start_ts}\n{token}") + return cls(path, addr, start_ts, token) @classmethod def read_from_path(cls, path): - addr, token = path.read_text().split("\n", maxsplit=1) - return cls(path, addr, token) + addr, start_ts, token = path.read_text().split("\n", maxsplit=2) + return cls(path, addr, float(start_ts), token) class Notifier: URL = "https://notifications.delta.chat/notify" CONNECTION_TIMEOUT = 60.0 # seconds until http-request is given up - NOTIFICATION_RETRY_DELAY = 8.0 # seconds with exponential backoff - MAX_NUMBER_OF_TRIES = 6 - # exponential backoff means we try for 8^5 seconds, approximately 10 hours + BASE_DELAY = 8.0 # base seconds for exponential back-off delay + DROP_DEADLINE = 5 * 60 * 60 # drop notifications after 5 hours def __init__(self, notification_dir): self.notification_dir = notification_dir - self.retry_queues = [PriorityQueue() for _ in range(self.MAX_NUMBER_OF_TRIES)] + max_tries = int(math.log(self.DROP_DEADLINE, self.BASE_DELAY)) + 1 + self.retry_queues = [PriorityQueue() for _ in range(max_tries)] + + def compute_delay(self, retry_num): + return 0 if retry_num == 0 else pow(self.BASE_DELAY, retry_num) def new_message_for_addr(self, addr, metadata): + start_ts = time.time() for token in metadata.get_tokens_for_addr(addr): - queue_item = PersistentQueueItem.create(self.notification_dir, addr, token) + queue_item = PersistentQueueItem.create( + self.notification_dir, addr, start_ts, token + ) self.queue_for_retry(queue_item) def requeue_persistent_queue_items(self): @@ -78,15 +87,14 @@ class Notifier: self.queue_for_retry(queue_item) def queue_for_retry(self, queue_item, retry_num=0): - if retry_num >= self.MAX_NUMBER_OF_TRIES: + delay = self.compute_delay(retry_num) + when = time.time() + delay + deadline = queue_item.start_ts + self.DROP_DEADLINE + if retry_num >= len(self.retry_queues) or when > deadline: queue_item.delete() - logging.warning("dropping after %d tries: %r", retry_num, queue_item.token) + logging.error("notification exceeded deadline: %r", queue_item.token) return - when = time.time() - if retry_num > 0: - # back off exponentially with number of retries - when += pow(self.NOTIFICATION_RETRY_DELAY, retry_num) self.retry_queues[retry_num].put((when, queue_item)) def start_notification_threads(self, remove_token_from_addr): diff --git a/chatmaild/src/chatmaild/tests/test_metadata.py b/chatmaild/src/chatmaild/tests/test_metadata.py index 96cf2a9..c5f3347 100644 --- a/chatmaild/src/chatmaild/tests/test_metadata.py +++ b/chatmaild/src/chatmaild/tests/test_metadata.py @@ -206,7 +206,8 @@ def test_notifier_thread_connection_failures( metadata.add_token_to_addr(testaddr, "01234") notifier.new_message_for_addr(testaddr, metadata) notifier.NOTIFICATION_RETRY_DELAY = 5 - for i in range(notifier.MAX_NUMBER_OF_TRIES): + max_tries = len(notifier.retry_queues) + for i in range(max_tries): caplog.clear() reqmock = get_mocked_requests([status]) sleep_calls = [] @@ -215,12 +216,12 @@ def test_notifier_thread_connection_failures( assert "request failed" in caplog.records[0].msg if i > 0: assert len(sleep_calls) == 1 - if i + 1 < notifier.MAX_NUMBER_OF_TRIES: + if i + 1 < max_tries: assert notifier.retry_queues[i + 1].qsize() == 1 assert len(caplog.records) == 1 else: assert len(caplog.records) == 2 - assert "dropping" in caplog.records[1].msg + assert "deadline" in caplog.records[1].msg notifier.requeue_persistent_queue_items() assert notifier.retry_queues[0].qsize() == 0 @@ -267,11 +268,13 @@ def test_notifier_thread_run_gone_removes_token(metadata, notifier, testaddr): def test_persistent_queue_items(tmp_path, testaddr, token): - queue_item = PersistentQueueItem.create(tmp_path, testaddr, token) + queue_item = PersistentQueueItem.create(tmp_path, testaddr, 432.0, token) assert queue_item.addr == testaddr + assert queue_item.start_ts == 432.0 assert queue_item.token == token item2 = PersistentQueueItem.read_from_path(queue_item.path) assert item2.addr == testaddr + assert item2.start_ts == 432.0 assert item2.token == token assert item2 == queue_item item2.delete()