From 459ffcabd648a24e2a868f875c8bc1299c2441c7 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 29 Mar 2024 22:45:56 +0100 Subject: [PATCH] better preserve notification order, using a queue again --- chatmaild/src/chatmaild/metadata.py | 47 +++++++++++-------- .../src/chatmaild/tests/test_metadata.py | 6 +-- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/chatmaild/src/chatmaild/metadata.py b/chatmaild/src/chatmaild/metadata.py index 45fa8cb..2a49bfd 100644 --- a/chatmaild/src/chatmaild/metadata.py +++ b/chatmaild/src/chatmaild/metadata.py @@ -1,5 +1,6 @@ from pathlib import Path -from threading import Thread, Event +from threading import Thread +from queue import Queue from socketserver import ( UnixStreamServer, StreamRequestHandler, @@ -32,7 +33,7 @@ class Notifier: self.notification_dir = vmail_dir / "pending_notifications" if not self.notification_dir.exists(): self.notification_dir.mkdir() - self.message_arrived_event = Event() + self.notification_queue = Queue() def get_metadata_dict(self, addr): return FileDict(self.vmail_dir / addr / "metadata.json") @@ -58,31 +59,37 @@ class Notifier: def new_message_for_addr(self, addr): self.notification_dir.joinpath(addr).touch() - self.message_arrived_event.set() + self.notification_queue.put(addr) def thread_run_loop(self): requests_session = requests.Session() + # on startup deliver all persisted notifications from last process run + self.notification_queue.put(None) while 1: - self.message_arrived_event.wait() - self.message_arrived_event.clear() self.thread_run_one(requests_session) def thread_run_one(self, requests_session): - for addr_path in self.notification_dir.iterdir(): - addr = addr_path.name - if "@" not in addr: - continue - for token in self.get_tokens(addr): - response = requests_session.post( - "https://notifications.delta.chat/notify", - data=token, - timeout=60, - ) - if response.status_code == 410: - # 410 Gone status code - # means the token is no longer valid. - self.remove_token(addr, token) - addr_path.unlink() + addr = self.notification_queue.get() + if addr is None: + # startup, notify any "pending" notifications from last run + for addr_path in self.notification_dir.iterdir(): + if "@" in addr_path.name: + self.notify_tokens_for(requests_session, addr_path.name) + else: + self.notify_tokens_for(requests_session, addr) + + def notify_tokens_for(self, requests_session, addr): + for token in self.get_tokens(addr): + response = requests_session.post( + "https://notifications.delta.chat/notify", + data=token, + timeout=60, + ) + if response.status_code == 410: + # 410 Gone status code + # means the token is no longer valid. + self.remove_token(addr, token) + self.notification_dir.joinpath(addr).unlink(missing_ok=True) def handle_dovecot_protocol(rfile, wfile, notifier): diff --git a/chatmaild/src/chatmaild/tests/test_metadata.py b/chatmaild/src/chatmaild/tests/test_metadata.py index 5954ef1..911f5a8 100644 --- a/chatmaild/src/chatmaild/tests/test_metadata.py +++ b/chatmaild/src/chatmaild/tests/test_metadata.py @@ -84,7 +84,7 @@ def test_handle_dovecot_request_happy_path(notifier, testaddr): assert handle_dovecot_request(f"B{tx2}\t{testaddr}", transactions, notifier) is None msg = f"S{tx2}\tpriv/guid00/messagenew" assert handle_dovecot_request(msg, transactions, notifier) is None - assert notifier.message_arrived_event.is_set() + assert notifier.notification_queue.get() == testaddr assert handle_dovecot_request(f"C{tx2}", transactions, notifier) == "O\n" assert not transactions assert notifier.notification_dir.joinpath(testaddr).exists() @@ -159,8 +159,8 @@ def test_handle_dovecot_protocol_messagenew(notifier): wfile = io.BytesIO() handle_dovecot_protocol(rfile, wfile, notifier) assert wfile.getvalue() == b"O\n" - assert notifier.message_arrived_event.is_set() - assert notifier.notification_dir.joinpath("user@example.org").exists() + addr = notifier.notification_queue.get() + assert notifier.notification_dir.joinpath(addr).exists() def test_notifier_thread_run(notifier, testaddr):