finally use persistent queue items with random file names, simplifying the flows

This commit is contained in:
holger krekel 2024-03-31 13:35:29 +02:00
parent 9da626dfc8
commit 4d6f520f18
2 changed files with 72 additions and 35 deletions

View File

@ -1,13 +1,13 @@
"""
This modules provides notification machinery for transmitting device tokens to
a central notification server which in turns contacts a phone's notification server
a central notification server which in turns contacts a phone provider's notification server
to trigger Delta Chat apps to retrieve messages and provide instant notifications to users.
The Notifier class arranges the queuing of tokens in separate PriorityQueues
from which NotifyThreads take and transmit them via HTTPS
to the `notifications.delta.chat` service
which in turns contacts a phone's providers's notification service
which in turn ewakes up the Delta Chat app on user devices.
which in turn contacts a phone's providers's notification service
which in turn wakes up the Delta Chat app on user devices.
The lack of proper HTTP2-support in Python lets us
use multiple threads and connections to the Rust-implemented `notifications.delta.chat`
which however uses HTTP2 and thus only a single connection to phone-notification providers.
@ -20,16 +20,44 @@ If a token exceeds MAX_NUMBER_OF_TRIES it is dropped with a log warning.
Note that tokens are completely opaque to the notification machinery here
and will in the future be encrypted foreclosing all ability to distinguish
which device token ultimately goes to which phone-provider notification service.
which device token ultimately goes to which phone-provider notification service,
or to understand the relation of "device tokens" and chatmail addresses.
The meaning and format of tokens is basically a matter of Delta-Chat Core and
the `notification.delta.chat` service.
"""
import time
import logging
from uuid import uuid4
from threading import Thread
from pathlib import Path
from queue import PriorityQueue
from dataclasses import dataclass
import requests
@dataclass
class PersistentQueueItem:
path: Path
addr: str
token: str
def delete(self):
self.path.unlink(missing_ok=True)
@classmethod
def create(cls, queue_dir, addr, token):
queue_id = uuid4().hex
path = queue_dir.joinpath(queue_id)
path.write_text(f"{addr}\n{token}")
return cls(path, addr, token)
@classmethod
def read_from_path(cls, path):
addr, token = path.read_text().split("\n", maxsplit=1)
return cls(path, addr, token)
class Notifier:
URL = "https://notifications.delta.chat/notify"
CONNECTION_TIMEOUT = 60.0 # seconds until http-request is given up
@ -43,28 +71,31 @@ class Notifier:
def new_message_for_addr(self, addr, metadata):
for token in metadata.get_tokens_for_addr(addr):
self.notification_dir.joinpath(token).write_text(addr)
self.add_token_for_retry(token)
queue_item = PersistentQueueItem.create(self.notification_dir, addr, token)
self.queue_for_retry(queue_item)
def requeue_persistent_pending_tokens(self):
for token_path in self.notification_dir.iterdir():
self.add_token_for_retry(token_path.name)
def requeue_persistent_queue_items(self):
for queue_path in self.notification_dir.iterdir():
queue_item = PersistentQueueItem.read_from_path(queue_path)
self.queue_for_retry(queue_item)
def add_token_for_retry(self, token, retry_num=0):
def queue_for_retry(self, queue_item, retry_num=0):
if retry_num >= self.MAX_NUMBER_OF_TRIES:
return False
queue_item.delete()
logging.warning("dropping after %d tries: %r", retry_num, queue_item.token)
return
when = time.time()
if retry_num > 0:
# backup exponentially with number of retries
when += pow(self.NOTIFICATION_RETRY_DELAY, retry_num)
self.retry_queues[retry_num].put((when, token))
return True
self.retry_queues[retry_num].put((when, queue_item))
def start_notification_threads(self, remove_token_from_addr):
self.requeue_persistent_pending_tokens()
self.requeue_persistent_queue_items()
threads = {}
for retry_num in range(len(self.retry_queues)):
# use 4 threads for first-try tokens and less for subsequent tries
num_threads = {0: 4}.get(retry_num, 2)
threads[retry_num] = []
for _ in range(num_threads):
@ -90,36 +121,28 @@ class NotifyThread(Thread):
pass
def retry_one(self, requests_session, sleep=time.sleep):
when, token = self.notifier.retry_queues[self.retry_num].get()
when, queue_item = self.notifier.retry_queues[self.retry_num].get()
if when is None:
return False
wait_time = when - time.time()
if wait_time > 0:
sleep(wait_time)
self.perform_request_to_notification_server(requests_session, token)
self.perform_request_to_notification_server(requests_session, queue_item)
return True
def perform_request_to_notification_server(self, requests_session, token):
token_path = self.notifier.notification_dir.joinpath(token)
def perform_request_to_notification_server(self, requests_session, queue_item):
timeout = self.notifier.CONNECTION_TIMEOUT
token = queue_item.token
try:
timeout = self.notifier.CONNECTION_TIMEOUT
res = requests_session.post(self.notifier.URL, data=token, timeout=timeout)
except requests.exceptions.RequestException as e:
res = e
else:
if res.status_code in (200, 410):
if res.status_code == 410:
# 410 Gone: means the token is no longer valid.
try:
addr = token_path.read_text()
except FileNotFoundError:
logging.warning("no address for token %r:", token)
return
self.remove_token_from_addr(addr, token)
token_path.unlink(missing_ok=True)
self.remove_token_from_addr(queue_item.addr, token)
queue_item.delete()
return
logging.warning("Notification request failed: %r", res)
if not self.notifier.add_token_for_retry(token, retry_num=self.retry_num + 1):
token_path.unlink(missing_ok=True)
logging.warning("dropping token after %d tries: %r", self.retry_num, token)
self.notifier.queue_for_retry(queue_item, retry_num=self.retry_num + 1)

View File

@ -10,6 +10,7 @@ from chatmaild.metadata import (
from chatmaild.notifier import (
Notifier,
NotifyThread,
PersistentQueueItem,
)
@ -123,10 +124,11 @@ def test_handle_dovecot_request_happy_path(notifier, metadata, testaddr, token):
)
msg = f"S{tx2}\tpriv/guid00/messagenew"
assert handle_dovecot_request(msg, transactions, notifier, metadata) is None
assert notifier.retry_queues[0].get()[1] == token
queue_item = notifier.retry_queues[0].get()[1]
assert queue_item.token == token
assert handle_dovecot_request(f"C{tx2}", transactions, notifier, metadata) == "O\n"
assert not transactions
assert notifier.notification_dir.joinpath(token).exists()
assert queue_item.path.exists()
def test_handle_dovecot_protocol_set_devicetoken(metadata, notifier):
@ -192,7 +194,7 @@ def test_notifier_thread_deletes_persistent_file(metadata, notifier, testaddr):
url, data, timeout = reqmock.requests[0]
assert data == "01234"
assert metadata.get_tokens_for_addr(testaddr) == ["01234"]
notifier.requeue_persistent_pending_tokens()
notifier.requeue_persistent_queue_items()
assert notifier.retry_queues[0].qsize() == 0
@ -218,8 +220,8 @@ def test_notifier_thread_connection_failures(
assert len(caplog.records) == 1
else:
assert len(caplog.records) == 2
assert "dropping token" in caplog.records[1].msg
notifier.requeue_persistent_pending_tokens()
assert "dropping" in caplog.records[1].msg
notifier.requeue_persistent_queue_items()
assert notifier.retry_queues[0].qsize() == 0
@ -262,3 +264,15 @@ def test_notifier_thread_run_gone_removes_token(metadata, notifier, testaddr):
assert metadata.get_tokens_for_addr(testaddr) == ["45678"]
assert notifier.retry_queues[0].qsize() == 0
assert notifier.retry_queues[1].qsize() == 0
def test_persistent_queue_items(tmp_path, testaddr, token):
queue_item = PersistentQueueItem.create(tmp_path, testaddr, token)
assert queue_item.addr == testaddr
assert queue_item.token == token
item2 = PersistentQueueItem.read_from_path(queue_item.path)
assert item2.addr == testaddr
assert item2.token == token
assert item2 == queue_item
item2.delete()
assert not item2.path.exists()