From 14d96e0a9b100e1d98681e6504b276e9e7669464 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Sat, 30 Mar 2024 00:27:59 +0100 Subject: [PATCH] snap somewhat working again --- chatmaild/src/chatmaild/metadata.py | 98 ++++++++++++------- .../src/chatmaild/tests/test_metadata.py | 89 +++++++++++------ 2 files changed, 121 insertions(+), 66 deletions(-) diff --git a/chatmaild/src/chatmaild/metadata.py b/chatmaild/src/chatmaild/metadata.py index 2a49bfd..3e5a273 100644 --- a/chatmaild/src/chatmaild/metadata.py +++ b/chatmaild/src/chatmaild/metadata.py @@ -1,6 +1,7 @@ +import time from pathlib import Path from threading import Thread -from queue import Queue +from queue import PriorityQueue from socketserver import ( UnixStreamServer, StreamRequestHandler, @@ -28,12 +29,16 @@ METADATA_TOKEN_KEY = "devicetoken" class Notifier: + CONNECTION_TIMEOUT = 60.0 # seconds + NOTIFICATION_RETRY_DELAY = 10.0 # seconds + MAX_NUMBER_OF_TRIES = 10 + def __init__(self, vmail_dir): self.vmail_dir = vmail_dir self.notification_dir = vmail_dir / "pending_notifications" if not self.notification_dir.exists(): self.notification_dir.mkdir() - self.notification_queue = Queue() + self.retry_queues = [PriorityQueue() for i in range(self.MAX_NUMBER_OF_TRIES)] def get_metadata_dict(self, addr): return FileDict(self.vmail_dir / addr / "metadata.json") @@ -58,38 +63,64 @@ class Notifier: return self.get_metadata_dict(addr).read().get(METADATA_TOKEN_KEY, []) def new_message_for_addr(self, addr): - self.notification_dir.joinpath(addr).touch() - self.notification_queue.put(addr) - - def thread_run_loop(self): - requests_session = requests.Session() - # on startup deliver all persisted notifications from last process run - self.notification_queue.put(None) - while 1: - self.thread_run_one(requests_session) - - def thread_run_one(self, requests_session): - addr = self.notification_queue.get() - if addr is None: - # startup, notify any "pending" notifications from last run - for addr_path in self.notification_dir.iterdir(): - if "@" in addr_path.name: - self.notify_tokens_for(requests_session, addr_path.name) - else: - self.notify_tokens_for(requests_session, addr) - - def notify_tokens_for(self, requests_session, addr): for token in self.get_tokens(addr): + self.notification_dir.joinpath(token).write_text(addr) + self.add_token_for_retry(token) + + def add_token_for_retry(self, token, numtries=0): + # backup exponentially with number of retries + when = time.time() + pow(self.NOTIFICATION_RETRY_DELAY, numtries) + self.retry_queues[numtries].put((when, token)) + + def start_notification_threads(self): + for token_path in self.notification_dir.iterdir(): + self.add_token_for_retry(token_path.name) + + for numtries in range(len(self.retry_queues)): + t = Thread(target=self.thread_retry_loop, args=(numtries,)) + t.setDaemon(True) + t.start() + + def thread_retry_loop(self, numtries): + requests_session = requests.Session() + while True: + retry_queue = self.retry_queues[numtries] + when, token = retry_queue.get() + wait_time = when - time.time() + if wait_time > 0: + time.sleep(wait_time) + self.notify_one(requests_session, token, numtries) + + def notify_one(self, requests_session, token, numtries=0): + try: response = requests_session.post( "https://notifications.delta.chat/notify", data=token, - timeout=60, + timeout=self.CONNECTION_TIMEOUT, ) - if response.status_code == 410: - # 410 Gone status code - # means the token is no longer valid. - self.remove_token(addr, token) - self.notification_dir.joinpath(addr).unlink(missing_ok=True) + except requests.exceptions.RequestException as e: + response = e + else: + if response.status_code in (200, 410): + token_path = self.notification_dir.joinpath(token) + if response.status_code == 410: + # 410 Gone: means the token is no longer valid. + try: + addr = token_path.read_text() + except FileNotFoundError: + logging.warning( + "could not determine address for token %r:", token + ) + return + self.remove_token(addr, token) + token_path.unlink(missing_ok=True) + return + + logging.warning("Notification request failed: %r", response) + if numtries < self.MAX_NUMBER_OF_TRIES: + self.add_token_for_retry(token, numtries=numtries + 1) + else: + logging.warning("giving up on token after %d tries: %r", numtries, token) def handle_dovecot_protocol(rfile, wfile, notifier): @@ -186,14 +217,7 @@ def main(): except FileNotFoundError: pass - # start notifier thread for signalling new messages to - # Delta Chat notification server - - t = Thread(target=notifier.thread_run_loop) - t.setDaemon(True) - t.start() - # let notifier thread run once for any pending notifications from last run - notifier.message_arrived_event.set() + notifier.start_notification_threads() with ThreadedUnixStreamServer(socket, Handler) as server: try: diff --git a/chatmaild/src/chatmaild/tests/test_metadata.py b/chatmaild/src/chatmaild/tests/test_metadata.py index 911f5a8..4e43d2f 100644 --- a/chatmaild/src/chatmaild/tests/test_metadata.py +++ b/chatmaild/src/chatmaild/tests/test_metadata.py @@ -1,5 +1,6 @@ import io import pytest +import requests from chatmaild.metadata import ( handle_dovecot_request, @@ -25,6 +26,11 @@ def testaddr2(): return "user2@example.org" +@pytest.fixture +def token(): + return "01234" + + def test_notifier_persistence(tmp_path, testaddr, testaddr2): notifier1 = Notifier(tmp_path) notifier2 = Notifier(tmp_path) @@ -57,7 +63,7 @@ def test_handle_dovecot_request_lookup_fails(notifier, testaddr): assert res == "N\n" -def test_handle_dovecot_request_happy_path(notifier, testaddr): +def test_handle_dovecot_request_happy_path(notifier, testaddr, token): transactions = {} # set device token in a transaction @@ -67,27 +73,27 @@ def test_handle_dovecot_request_happy_path(notifier, testaddr): assert not res and not notifier.get_tokens(testaddr) assert transactions == {tx: dict(addr=testaddr, res="O\n")} - msg = f"S{tx}\tpriv/guid00/devicetoken\t01234" + msg = f"S{tx}\tpriv/guid00/devicetoken\t{token}" res = handle_dovecot_request(msg, transactions, notifier) assert not res assert len(transactions) == 1 - assert notifier.get_tokens(testaddr) == ["01234"] + assert notifier.get_tokens(testaddr) == [token] msg = f"C{tx}" res = handle_dovecot_request(msg, transactions, notifier) assert res == "O\n" assert len(transactions) == 0 - assert notifier.get_tokens(testaddr) == ["01234"] + assert notifier.get_tokens(testaddr) == [token] # trigger notification for incoming message tx2 = "2222" assert handle_dovecot_request(f"B{tx2}\t{testaddr}", transactions, notifier) is None msg = f"S{tx2}\tpriv/guid00/messagenew" assert handle_dovecot_request(msg, transactions, notifier) is None - assert notifier.notification_queue.get() == testaddr + assert notifier.retry_queues[0].get()[1] == token assert handle_dovecot_request(f"C{tx2}", transactions, notifier) == "O\n" assert not transactions - assert notifier.notification_dir.joinpath(testaddr).exists() + assert notifier.notification_dir.joinpath(token).exists() def test_handle_dovecot_protocol_set_devicetoken(notifier): @@ -145,22 +151,25 @@ def test_handle_dovecot_protocol_iterate(notifier): assert wfile.getvalue() == b"\n" -def test_handle_dovecot_protocol_messagenew(notifier): - rfile = io.BytesIO( - b"\n".join( - [ - b"HELLO", - b"Btx01\tuser@example.org", - b"Stx01\tpriv/guid00/messagenew", - b"Ctx01", - ] - ) - ) - wfile = io.BytesIO() - handle_dovecot_protocol(rfile, wfile, notifier) - assert wfile.getvalue() == b"O\n" - addr = notifier.notification_queue.get() - assert notifier.notification_dir.joinpath(addr).exists() +def test_notifier_thread_firstrun(notifier, testaddr): + requests = [] + + class ReqMock: + def post(self, url, data, timeout): + requests.append((url, data, timeout)) + + class Result: + status_code = 200 + + return Result() + + notifier.add_token(testaddr, "01234") + notifier.new_message_for_addr(testaddr) + when, token = notifier.retry_queues[0].get() + notifier.notify_one(ReqMock(), token, numtries=0) + url, data, timeout = requests[0] + assert data == "01234" + assert notifier.get_tokens(testaddr) == ["01234"] def test_notifier_thread_run(notifier, testaddr): @@ -177,12 +186,27 @@ def test_notifier_thread_run(notifier, testaddr): notifier.add_token(testaddr, "01234") notifier.new_message_for_addr(testaddr) - notifier.thread_run_one(ReqMock()) + token = notifier.retry_queues[0].get()[1] + + notifier.notify_one(ReqMock(), token=token, numtries=0) url, data, timeout = requests[0] assert data == "01234" assert notifier.get_tokens(testaddr) == ["01234"] +def test_notifier_thread_connection_exceptions(notifier, testaddr): + class ReqMock: + def post(self, url, data, timeout): + raise requests.exceptions.RequestException("hello") + + notifier.add_token(testaddr, "01234") + notifier.new_message_for_addr(testaddr) + token = notifier.retry_queues[0].get()[1] + notifier.NOTIFICATION_RETRY_DELAY = 0.1 + notifier.notify_one(ReqMock(), token) + assert notifier.retry_queues[1].get()[1] == token + + def test_multi_device_notifier(notifier, testaddr): requests = [] @@ -198,11 +222,16 @@ def test_multi_device_notifier(notifier, testaddr): notifier.add_token(testaddr, "01234") notifier.add_token(testaddr, "56789") notifier.new_message_for_addr(testaddr) - notifier.thread_run_one(ReqMock()) + token1 = notifier.retry_queues[0].get()[1] + token2 = notifier.retry_queues[0].get()[1] + notifier.notify_one(ReqMock(), token1) + notifier.notify_one(ReqMock(), token2) + assert notifier.retry_queues[0].qsize() == 0 + assert notifier.retry_queues[1].qsize() == 0 url, data, timeout = requests[0] - assert data == "01234" + assert data == token1 url, data, timeout = requests[1] - assert data == "56789" + assert data == token2 assert notifier.get_tokens(testaddr) == ["01234", "56789"] @@ -219,12 +248,14 @@ def test_notifier_thread_run_gone_removes_token(notifier, testaddr): return Result() notifier.add_token(testaddr, "01234") - notifier.new_message_for_addr(testaddr) - assert notifier.get_tokens(testaddr) == ["01234"] notifier.add_token(testaddr, "45678") - notifier.thread_run_one(ReqMock()) + notifier.new_message_for_addr(testaddr) + for token in notifier.get_tokens(testaddr): + notifier.notify_one(ReqMock(), token, numtries=0) url, data, timeout = requests[0] assert data == "01234" url, data, timeout = requests[1] assert data == "45678" assert notifier.get_tokens(testaddr) == ["45678"] + assert notifier.retry_queues[0].qsize() == 2 + assert notifier.retry_queues[1].qsize() == 0