various naming refinements

This commit is contained in:
holger krekel 2024-03-28 10:25:56 +01:00
parent 5d5e2b199c
commit 554c33423f
3 changed files with 89 additions and 80 deletions

View File

@ -6,7 +6,7 @@ from contextlib import contextmanager
class FileDict: class FileDict:
"""Concurrency-safe multi-reader-single-writer Persistent Dict.""" """Concurrency-safe multi-reader/single-writer persistent dict."""
def __init__(self, path): def __init__(self, path):
self.path = path self.path = path

View File

@ -16,13 +16,16 @@ import requests
from .filedict import FileDict from .filedict import FileDict
DICTPROXY_HELLO_CHAR = "H"
DICTPROXY_LOOKUP_CHAR = "L" DICTPROXY_LOOKUP_CHAR = "L"
DICTPROXY_ITERATE_CHAR = "I" DICTPROXY_ITERATE_CHAR = "I"
DICTPROXY_SET_CHAR = "S"
DICTPROXY_BEGIN_TRANSACTION_CHAR = "B" DICTPROXY_BEGIN_TRANSACTION_CHAR = "B"
DICTPROXY_SET_CHAR = "S"
DICTPROXY_COMMIT_TRANSACTION_CHAR = "C" DICTPROXY_COMMIT_TRANSACTION_CHAR = "C"
DICTPROXY_TRANSACTION_CHARS = "SBC" DICTPROXY_TRANSACTION_CHARS = "BSC"
# each SETMETADATA on this key appends to a list of unique device tokens
# which only ever get removed if the upstream indicates the token is invalid
METADATA_TOKEN_KEY = "devicetoken" METADATA_TOKEN_KEY = "devicetoken"
@ -31,22 +34,20 @@ class Notifier:
self.vmail_dir = vmail_dir self.vmail_dir = vmail_dir
self.to_notify_queue = Queue() self.to_notify_queue = Queue()
def get_metadata_dict(self, mbox): def get_metadata_dict(self, addr):
mbox_path = self.vmail_dir.joinpath(mbox) addr_path = self.vmail_dir.joinpath(addr)
if not mbox_path.exists(): return FileDict(addr_path / "metadata.marshalled")
mbox_path.mkdir()
return FileDict(mbox_path / "metadata.marshalled")
def add_token(self, mbox, token): def add_token(self, addr, token):
with self.get_metadata_dict(mbox).modify() as data: with self.get_metadata_dict(addr).modify() as data:
tokens = data.get(METADATA_TOKEN_KEY) tokens = data.get(METADATA_TOKEN_KEY)
if tokens is None: if tokens is None:
data[METADATA_TOKEN_KEY] = tokens = [] data[METADATA_TOKEN_KEY] = tokens = []
if token not in tokens: if token not in tokens:
tokens.append(token) tokens.append(token)
def remove_token(self, mbox, token): def remove_token(self, addr, token):
with self.get_metadata_dict(mbox).modify() as data: with self.get_metadata_dict(addr).modify() as data:
tokens = data.get(METADATA_TOKEN_KEY) tokens = data.get(METADATA_TOKEN_KEY)
if tokens: if tokens:
try: try:
@ -54,11 +55,11 @@ class Notifier:
except KeyError: except KeyError:
pass pass
def get_tokens(self, mbox): def get_tokens(self, addr):
return self.get_metadata_dict(mbox).read().get(METADATA_TOKEN_KEY, []) return self.get_metadata_dict(addr).read().get(METADATA_TOKEN_KEY, [])
def new_message_for_mbox(self, mbox): def new_message_for_addr(self, addr):
self.to_notify_queue.put(mbox) self.to_notify_queue.put(addr)
def thread_run_loop(self): def thread_run_loop(self):
requests_session = requests.Session() requests_session = requests.Session()
@ -66,8 +67,8 @@ class Notifier:
self.thread_run_one(requests_session) self.thread_run_one(requests_session)
def thread_run_one(self, requests_session): def thread_run_one(self, requests_session):
mbox = self.to_notify_queue.get() addr = self.to_notify_queue.get()
for token in self.get_tokens(mbox): for token in self.get_tokens(addr):
response = requests_session.post( response = requests_session.post(
"https://notifications.delta.chat/notify", "https://notifications.delta.chat/notify",
data=token, data=token,
@ -76,13 +77,10 @@ class Notifier:
if response.status_code == 410: if response.status_code == 410:
# 410 Gone status code # 410 Gone status code
# means the token is no longer valid. # means the token is no longer valid.
self.remove_token(mbox, token) self.remove_token(addr, token)
def handle_dovecot_protocol(rfile, wfile, notifier): def handle_dovecot_protocol(rfile, wfile, notifier):
# HELLO message, ignored.
msg = rfile.readline().strip().decode()
transactions = {} transactions = {}
while True: while True:
msg = rfile.readline().strip().decode() msg = rfile.readline().strip().decode()
@ -104,9 +102,9 @@ def handle_dovecot_request(msg, transactions, notifier):
keyparts = parts[0].split("/") keyparts = parts[0].split("/")
if keyparts[0] == "priv": if keyparts[0] == "priv":
keyname = keyparts[2] keyname = keyparts[2]
mbox = parts[1] addr = parts[1]
if keyname == METADATA_TOKEN_KEY: if keyname == METADATA_TOKEN_KEY:
res = " ".join(notifier.get_tokens(mbox)) res = " ".join(notifier.get_tokens(addr))
return f"O{res}\n" return f"O{res}\n"
logging.warning("lookup ignored: %r", msg) logging.warning("lookup ignored: %r", msg)
return "N\n" return "N\n"
@ -114,29 +112,35 @@ def handle_dovecot_request(msg, transactions, notifier):
# Empty line means ITER_FINISHED. # Empty line means ITER_FINISHED.
# If we don't return empty line Dovecot will timeout. # If we don't return empty line Dovecot will timeout.
return "\n" return "\n"
elif short_command == DICTPROXY_HELLO_CHAR:
return # no version checking
if short_command not in (DICTPROXY_TRANSACTION_CHARS): if short_command not in (DICTPROXY_TRANSACTION_CHARS):
logging.warning("unknown dictproxy request: %r", msg)
return return
transaction_id = parts[0] transaction_id = parts[0]
if short_command == DICTPROXY_BEGIN_TRANSACTION_CHAR: if short_command == DICTPROXY_BEGIN_TRANSACTION_CHAR:
mbox = parts[1] addr = parts[1]
transactions[transaction_id] = dict(mbox=mbox, res="O\n") transactions[transaction_id] = dict(addr=addr, res="O\n")
elif short_command == DICTPROXY_COMMIT_TRANSACTION_CHAR: elif short_command == DICTPROXY_COMMIT_TRANSACTION_CHAR:
# returns whether it failed or succeeded. # each set devicetoken operation persists directly
# and does not wait until a "commit" comes
# because our dovecot config does not involve
# multiple set-operations in a single commit
return transactions.pop(transaction_id)["res"] return transactions.pop(transaction_id)["res"]
elif short_command == DICTPROXY_SET_CHAR: elif short_command == DICTPROXY_SET_CHAR:
# For documentation on key structure see # For documentation on key structure see
# <https://github.com/dovecot/core/blob/5e7965632395793d9355eb906b173bf28d2a10ca/src/lib-storage/mailbox-attribute.h> # https://github.com/dovecot/core/blob/main/src/lib-storage/mailbox-attribute.h
keyname = parts[1].split("/") keyname = parts[1].split("/")
value = parts[2] if len(parts) > 2 else "" value = parts[2] if len(parts) > 2 else ""
mbox = transactions[transaction_id]["mbox"] addr = transactions[transaction_id]["addr"]
if keyname[0] == "priv" and keyname[2] == METADATA_TOKEN_KEY: if keyname[0] == "priv" and keyname[2] == METADATA_TOKEN_KEY:
notifier.add_token(mbox, value) notifier.add_token(addr, value)
elif keyname[0] == "priv" and keyname[2] == "messagenew": elif keyname[0] == "priv" and keyname[2] == "messagenew":
notifier.new_message_for_mbox(mbox) notifier.new_message_for_addr(addr)
else: else:
# Transaction failed. # Transaction failed.
transactions[transaction_id]["res"] = "F\n" transactions[transaction_id]["res"] = "F\n"

View File

@ -15,66 +15,71 @@ def notifier(tmp_path):
return Notifier(vmail_dir) return Notifier(vmail_dir)
def test_notifier_persistence(tmp_path): @pytest.fixture
vmail_dir = tmp_path def testaddr():
vmail_dir.joinpath("user1@example.org").mkdir() return "user.name@example.org"
vmail_dir.joinpath("user3@example.org").mkdir()
notifier1 = Notifier(vmail_dir)
notifier2 = Notifier(vmail_dir)
assert not notifier1.get_tokens("user1@example.org")
assert not notifier2.get_tokens("user1@example.org")
notifier1.add_token("user1@example.org", "01234")
notifier1.add_token("user3@example.org", "456")
assert notifier2.get_tokens("user1@example.org") == ["01234"]
assert notifier2.get_tokens("user3@example.org") == ["456"]
notifier2.remove_token("user1@example.org", "01234")
assert not notifier1.get_tokens("user1@example.org")
def test_notifier_delete_without_set(notifier): @pytest.fixture
notifier.remove_token("user@example.org", "123") def testaddr2():
assert not notifier.get_tokens("user@example.org") return "user2@example.org"
def test_handle_dovecot_request_lookup_fails(notifier): def test_notifier_persistence(tmp_path, testaddr, testaddr2):
res = handle_dovecot_request("Lpriv/123/chatmail\tuser@example.org", {}, notifier) notifier1 = Notifier(tmp_path)
notifier2 = Notifier(tmp_path)
assert not notifier1.get_tokens(testaddr)
assert not notifier2.get_tokens(testaddr)
notifier1.add_token(testaddr, "01234")
notifier1.add_token(testaddr2, "456")
assert notifier2.get_tokens(testaddr) == ["01234"]
assert notifier2.get_tokens(testaddr2) == ["456"]
notifier2.remove_token(testaddr, "01234")
assert not notifier1.get_tokens(testaddr)
assert notifier1.get_tokens(testaddr2) == ["456"]
def test_notifier_delete_without_set(notifier, testaddr):
notifier.remove_token(testaddr, "123")
assert not notifier.get_tokens(testaddr)
def test_handle_dovecot_request_lookup_fails(notifier, testaddr):
res = handle_dovecot_request(f"Lpriv/123/chatmail\t{testaddr}", {}, notifier)
assert res == "N\n" assert res == "N\n"
def test_handle_dovecot_request_happy_path(notifier): def test_handle_dovecot_request_happy_path(notifier, testaddr):
transactions = {} transactions = {}
# set device token in a transaction # set device token in a transaction
tx = "1111" tx = "1111"
msg = f"B{tx}\tuser@example.org" msg = f"B{tx}\t{testaddr}"
res = handle_dovecot_request(msg, transactions, notifier) res = handle_dovecot_request(msg, transactions, notifier)
assert not res and not notifier.get_tokens("user@example.org") assert not res and not notifier.get_tokens(testaddr)
assert transactions == {tx: dict(mbox="user@example.org", res="O\n")} assert transactions == {tx: dict(addr=testaddr, res="O\n")}
msg = f"S{tx}\tpriv/guid00/devicetoken\t01234" msg = f"S{tx}\tpriv/guid00/devicetoken\t01234"
res = handle_dovecot_request(msg, transactions, notifier) res = handle_dovecot_request(msg, transactions, notifier)
assert not res assert not res
assert len(transactions) == 1 assert len(transactions) == 1
assert notifier.get_tokens("user@example.org") == ["01234"] assert notifier.get_tokens(testaddr) == ["01234"]
msg = f"C{tx}" msg = f"C{tx}"
res = handle_dovecot_request(msg, transactions, notifier) res = handle_dovecot_request(msg, transactions, notifier)
assert res == "O\n" assert res == "O\n"
assert len(transactions) == 0 assert len(transactions) == 0
assert notifier.get_tokens("user@example.org") == ["01234"] assert notifier.get_tokens(testaddr) == ["01234"]
# trigger notification for incoming message # trigger notification for incoming message
assert ( tx2 = "2222"
handle_dovecot_request(f"B{tx}\tuser@example.org", transactions, notifier) assert handle_dovecot_request(f"B{tx2}\t{testaddr}", transactions, notifier) is None
is None msg = f"S{tx2}\tpriv/guid00/messagenew"
)
msg = f"S{tx}\tpriv/guid00/messagenew"
assert handle_dovecot_request(msg, transactions, notifier) is None assert handle_dovecot_request(msg, transactions, notifier) is None
assert notifier.to_notify_queue.get() == "user@example.org" assert notifier.to_notify_queue.get() == testaddr
assert notifier.to_notify_queue.qsize() == 0 assert notifier.to_notify_queue.qsize() == 0
assert handle_dovecot_request(f"C{tx}", transactions, notifier) == "O\n" assert handle_dovecot_request(f"C{tx2}", transactions, notifier) == "O\n"
assert not transactions assert not transactions
@ -151,7 +156,7 @@ def test_handle_dovecot_protocol_messagenew(notifier):
assert notifier.to_notify_queue.qsize() == 0 assert notifier.to_notify_queue.qsize() == 0
def test_notifier_thread_run(notifier): def test_notifier_thread_run(notifier, testaddr):
requests = [] requests = []
class ReqMock: class ReqMock:
@ -163,15 +168,15 @@ def test_notifier_thread_run(notifier):
return Result() return Result()
notifier.add_token("user@example.org", "01234") notifier.add_token(testaddr, "01234")
notifier.new_message_for_mbox("user@example.org") notifier.new_message_for_addr(testaddr)
notifier.thread_run_one(ReqMock()) notifier.thread_run_one(ReqMock())
url, data, timeout = requests[0] url, data, timeout = requests[0]
assert data == "01234" assert data == "01234"
assert notifier.get_tokens("user@example.org") == ["01234"] assert notifier.get_tokens(testaddr) == ["01234"]
def test_multi_device_notifier(notifier): def test_multi_device_notifier(notifier, testaddr):
requests = [] requests = []
class ReqMock: class ReqMock:
@ -183,18 +188,18 @@ def test_multi_device_notifier(notifier):
return Result() return Result()
notifier.add_token("user@example.org", "01234") notifier.add_token(testaddr, "01234")
notifier.add_token("user@example.org", "56789") notifier.add_token(testaddr, "56789")
notifier.new_message_for_mbox("user@example.org") notifier.new_message_for_addr(testaddr)
notifier.thread_run_one(ReqMock()) notifier.thread_run_one(ReqMock())
url, data, timeout = requests[0] url, data, timeout = requests[0]
assert data == "01234" assert data == "01234"
url, data, timeout = requests[1] url, data, timeout = requests[1]
assert data == "56789" assert data == "56789"
assert notifier.get_tokens("user@example.org") == ["01234", "56789"] assert notifier.get_tokens(testaddr) == ["01234", "56789"]
def test_notifier_thread_run_gone_removes_token(notifier): def test_notifier_thread_run_gone_removes_token(notifier, testaddr):
requests = [] requests = []
class ReqMock: class ReqMock:
@ -206,13 +211,13 @@ def test_notifier_thread_run_gone_removes_token(notifier):
return Result() return Result()
notifier.add_token("user@example.org", "01234") notifier.add_token(testaddr, "01234")
notifier.new_message_for_mbox("user@example.org") notifier.new_message_for_addr(testaddr)
assert notifier.get_tokens("user@example.org") == ["01234"] assert notifier.get_tokens(testaddr) == ["01234"]
notifier.add_token("user@example.org", "45678") notifier.add_token(testaddr, "45678")
notifier.thread_run_one(ReqMock()) notifier.thread_run_one(ReqMock())
url, data, timeout = requests[0] url, data, timeout = requests[0]
assert data == "01234" assert data == "01234"
url, data, timeout = requests[1] url, data, timeout = requests[1]
assert data == "45678" assert data == "45678"
assert notifier.get_tokens("user@example.org") == ["45678"] assert notifier.get_tokens(testaddr) == ["45678"]