diff --git a/chatmaild/src/chatmaild/notifier.py b/chatmaild/src/chatmaild/notifier.py index 3741193..4de05fc 100644 --- a/chatmaild/src/chatmaild/notifier.py +++ b/chatmaild/src/chatmaild/notifier.py @@ -1,13 +1,13 @@ """ This modules provides notification machinery for transmitting device tokens to -a central notification server which in turns contacts a phone's notification server +a central notification server which in turns contacts a phone provider's notification server to trigger Delta Chat apps to retrieve messages and provide instant notifications to users. The Notifier class arranges the queuing of tokens in separate PriorityQueues from which NotifyThreads take and transmit them via HTTPS to the `notifications.delta.chat` service -which in turns contacts a phone's providers's notification service -which in turn ewakes up the Delta Chat app on user devices. +which in turn contacts a phone's providers's notification service +which in turn wakes up the Delta Chat app on user devices. The lack of proper HTTP2-support in Python lets us use multiple threads and connections to the Rust-implemented `notifications.delta.chat` which however uses HTTP2 and thus only a single connection to phone-notification providers. @@ -20,16 +20,44 @@ If a token exceeds MAX_NUMBER_OF_TRIES it is dropped with a log warning. Note that tokens are completely opaque to the notification machinery here and will in the future be encrypted foreclosing all ability to distinguish -which device token ultimately goes to which phone-provider notification service. +which device token ultimately goes to which phone-provider notification service, +or to understand the relation of "device tokens" and chatmail addresses. +The meaning and format of tokens is basically a matter of Delta-Chat Core and +the `notification.delta.chat` service. """ import time import logging +from uuid import uuid4 from threading import Thread +from pathlib import Path from queue import PriorityQueue +from dataclasses import dataclass import requests +@dataclass +class PersistentQueueItem: + path: Path + addr: str + token: str + + def delete(self): + self.path.unlink(missing_ok=True) + + @classmethod + def create(cls, queue_dir, addr, token): + queue_id = uuid4().hex + path = queue_dir.joinpath(queue_id) + path.write_text(f"{addr}\n{token}") + return cls(path, addr, token) + + @classmethod + def read_from_path(cls, path): + addr, token = path.read_text().split("\n", maxsplit=1) + return cls(path, addr, token) + + class Notifier: URL = "https://notifications.delta.chat/notify" CONNECTION_TIMEOUT = 60.0 # seconds until http-request is given up @@ -43,28 +71,31 @@ class Notifier: def new_message_for_addr(self, addr, metadata): for token in metadata.get_tokens_for_addr(addr): - self.notification_dir.joinpath(token).write_text(addr) - self.add_token_for_retry(token) + queue_item = PersistentQueueItem.create(self.notification_dir, addr, token) + self.queue_for_retry(queue_item) - def requeue_persistent_pending_tokens(self): - for token_path in self.notification_dir.iterdir(): - self.add_token_for_retry(token_path.name) + def requeue_persistent_queue_items(self): + for queue_path in self.notification_dir.iterdir(): + queue_item = PersistentQueueItem.read_from_path(queue_path) + self.queue_for_retry(queue_item) - def add_token_for_retry(self, token, retry_num=0): + def queue_for_retry(self, queue_item, retry_num=0): if retry_num >= self.MAX_NUMBER_OF_TRIES: - return False + queue_item.delete() + logging.warning("dropping after %d tries: %r", retry_num, queue_item.token) + return when = time.time() if retry_num > 0: # backup exponentially with number of retries when += pow(self.NOTIFICATION_RETRY_DELAY, retry_num) - self.retry_queues[retry_num].put((when, token)) - return True + self.retry_queues[retry_num].put((when, queue_item)) def start_notification_threads(self, remove_token_from_addr): - self.requeue_persistent_pending_tokens() + self.requeue_persistent_queue_items() threads = {} for retry_num in range(len(self.retry_queues)): + # use 4 threads for first-try tokens and less for subsequent tries num_threads = {0: 4}.get(retry_num, 2) threads[retry_num] = [] for _ in range(num_threads): @@ -90,36 +121,28 @@ class NotifyThread(Thread): pass def retry_one(self, requests_session, sleep=time.sleep): - when, token = self.notifier.retry_queues[self.retry_num].get() + when, queue_item = self.notifier.retry_queues[self.retry_num].get() if when is None: return False wait_time = when - time.time() if wait_time > 0: sleep(wait_time) - self.perform_request_to_notification_server(requests_session, token) + self.perform_request_to_notification_server(requests_session, queue_item) return True - def perform_request_to_notification_server(self, requests_session, token): - token_path = self.notifier.notification_dir.joinpath(token) + def perform_request_to_notification_server(self, requests_session, queue_item): + timeout = self.notifier.CONNECTION_TIMEOUT + token = queue_item.token try: - timeout = self.notifier.CONNECTION_TIMEOUT res = requests_session.post(self.notifier.URL, data=token, timeout=timeout) except requests.exceptions.RequestException as e: res = e else: if res.status_code in (200, 410): if res.status_code == 410: - # 410 Gone: means the token is no longer valid. - try: - addr = token_path.read_text() - except FileNotFoundError: - logging.warning("no address for token %r:", token) - return - self.remove_token_from_addr(addr, token) - token_path.unlink(missing_ok=True) + self.remove_token_from_addr(queue_item.addr, token) + queue_item.delete() return logging.warning("Notification request failed: %r", res) - if not self.notifier.add_token_for_retry(token, retry_num=self.retry_num + 1): - token_path.unlink(missing_ok=True) - logging.warning("dropping token after %d tries: %r", self.retry_num, token) + self.notifier.queue_for_retry(queue_item, retry_num=self.retry_num + 1) diff --git a/chatmaild/src/chatmaild/tests/test_metadata.py b/chatmaild/src/chatmaild/tests/test_metadata.py index 3a02a7a..96cf2a9 100644 --- a/chatmaild/src/chatmaild/tests/test_metadata.py +++ b/chatmaild/src/chatmaild/tests/test_metadata.py @@ -10,6 +10,7 @@ from chatmaild.metadata import ( from chatmaild.notifier import ( Notifier, NotifyThread, + PersistentQueueItem, ) @@ -123,10 +124,11 @@ def test_handle_dovecot_request_happy_path(notifier, metadata, testaddr, token): ) msg = f"S{tx2}\tpriv/guid00/messagenew" assert handle_dovecot_request(msg, transactions, notifier, metadata) is None - assert notifier.retry_queues[0].get()[1] == token + queue_item = notifier.retry_queues[0].get()[1] + assert queue_item.token == token assert handle_dovecot_request(f"C{tx2}", transactions, notifier, metadata) == "O\n" assert not transactions - assert notifier.notification_dir.joinpath(token).exists() + assert queue_item.path.exists() def test_handle_dovecot_protocol_set_devicetoken(metadata, notifier): @@ -192,7 +194,7 @@ def test_notifier_thread_deletes_persistent_file(metadata, notifier, testaddr): url, data, timeout = reqmock.requests[0] assert data == "01234" assert metadata.get_tokens_for_addr(testaddr) == ["01234"] - notifier.requeue_persistent_pending_tokens() + notifier.requeue_persistent_queue_items() assert notifier.retry_queues[0].qsize() == 0 @@ -218,8 +220,8 @@ def test_notifier_thread_connection_failures( assert len(caplog.records) == 1 else: assert len(caplog.records) == 2 - assert "dropping token" in caplog.records[1].msg - notifier.requeue_persistent_pending_tokens() + assert "dropping" in caplog.records[1].msg + notifier.requeue_persistent_queue_items() assert notifier.retry_queues[0].qsize() == 0 @@ -262,3 +264,15 @@ def test_notifier_thread_run_gone_removes_token(metadata, notifier, testaddr): assert metadata.get_tokens_for_addr(testaddr) == ["45678"] assert notifier.retry_queues[0].qsize() == 0 assert notifier.retry_queues[1].qsize() == 0 + + +def test_persistent_queue_items(tmp_path, testaddr, token): + queue_item = PersistentQueueItem.create(tmp_path, testaddr, token) + assert queue_item.addr == testaddr + assert queue_item.token == token + item2 = PersistentQueueItem.read_from_path(queue_item.path) + assert item2.addr == testaddr + assert item2.token == token + assert item2 == queue_item + item2.delete() + assert not item2.path.exists()