Add tests to metadata/token handling and post notifications in background thread (#224)

This commit is contained in:
holger krekel 2024-03-08 02:56:33 +01:00 committed by GitHub
parent 8baee557ee
commit 1e229ad2de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 228 additions and 53 deletions

View File

@ -1,75 +1,112 @@
import pwd import pwd
from queue import Queue
from threading import Thread
from socketserver import ( from socketserver import (
UnixStreamServer, UnixStreamServer,
StreamRequestHandler, StreamRequestHandler,
ThreadingMixIn, ThreadingMixIn,
) )
from .config import read_config, Config from .config import read_config
import sys import sys
import logging import logging
import os import os
import requests import requests
def handle_dovecot_protocol(rfile, wfile, tokens, requests_session, config: Config): DICTPROXY_LOOKUP_CHAR = "L"
DICTPROXY_SET_CHAR = "S"
DICTPROXY_BEGIN_TRANSACTION_CHAR = "B"
DICTPROXY_COMMIT_TRANSACTION_CHAR = "C"
DICTPROXY_TRANSACTION_CHARS = "SBC"
class Notifier:
def __init__(self):
self.guid2token = {}
self.to_notify_queue = Queue()
def set_token(self, guid, token):
self.guid2token[guid] = token
def new_message_for_guid(self, guid):
self.to_notify_queue.put(guid)
def thread_run_loop(self):
requests_session = requests.Session()
while 1:
self.thread_run_one(requests_session)
def thread_run_one(self, requests_session):
guid = self.to_notify_queue.get()
token = self.guid2token.get(guid)
if token:
response = requests_session.post(
"https://notifications.delta.chat/notify",
data=token,
timeout=60,
)
if response.status_code == 410:
# 410 Gone status code
# means the token is no longer valid.
del self.guid2token[guid]
def handle_dovecot_protocol(rfile, wfile, notifier):
# HELLO message, ignored. # HELLO message, ignored.
msg = rfile.readline().strip().decode() msg = rfile.readline().strip().decode()
transactions = {} transactions = {}
while True: while True:
msg = rfile.readline().strip().decode() msg = rfile.readline().strip().decode()
if not msg: if not msg:
break break
short_command = msg[0] res = handle_dovecot_request(msg, transactions, notifier)
if short_command == "L": if res:
wfile.write(b"N\n") wfile.write(res.encode("ascii"))
elif short_command == "S": wfile.flush()
# See header of
# <https://github.com/dovecot/core/blob/5e7965632395793d9355eb906b173bf28d2a10ca/src/lib-storage/mailbox-attribute.h>
# for the documentation on the structure of the key.
# Request GETMETADATA "INBOX" /private/chatmail
# results in a query for
# priv/dd72550f05eadc65542a1200cac67ad7/chatmail
#
# Request GETMETADATA "" /private/chatmail
# results in
# priv/dd72550f05eadc65542a1200cac67ad7/vendor/vendor.dovecot/pvt/server/chatmail
parts = msg[1:].split("\t") def handle_dovecot_request(msg, transactions, notifier):
transaction_id = parts[0] # see https://doc.dovecot.org/3.0/developer_manual/design/dict_protocol/
keyname = parts[1].split("/") short_command = msg[0]
value = parts[2] if len(parts) > 2 else "" parts = msg[1:].split("\t")
if keyname[0] == "priv" and keyname[2] == "devicetoken": if short_command == DICTPROXY_LOOKUP_CHAR:
tokens[keyname[1]] = value return "N\n"
elif keyname[0] == "priv" and keyname[2] == "messagenew":
guid = keyname[1]
token = tokens.get(guid)
if token:
response = requests_session.post(
"https://notifications.delta.chat/notify",
data=token,
timeout=60,
)
if response.status_code == 410:
# 410 Gone status code
# means the token is no longer valid.
del tokens[guid]
else:
# Transaction failed.
transactions[transaction_id] = b"F\n"
elif short_command == "B":
# Begin transaction.
transaction_id = msg[1:].split("\t")[0]
transactions[transaction_id] = b"O\n"
elif short_command == "C":
# Commit transaction.
transaction_id = msg[1:].split("\t")[0]
wfile.write(transactions.pop(transaction_id, b"N\n"))
wfile.flush() if short_command not in (DICTPROXY_TRANSACTION_CHARS):
return
transaction_id = parts[0]
if short_command == DICTPROXY_BEGIN_TRANSACTION_CHAR:
transactions[transaction_id] = "O\n"
elif short_command == DICTPROXY_COMMIT_TRANSACTION_CHAR:
# returns whether it failed or succeeded.
return transactions.pop(transaction_id, "N\n")
elif short_command == DICTPROXY_SET_CHAR:
# See header of
# <https://github.com/dovecot/core/blob/5e7965632395793d9355eb906b173bf28d2a10ca/src/lib-storage/mailbox-attribute.h>
# for the documentation on the structure of the key.
# Request GETMETADATA "INBOX" /private/chatmail
# results in a query for
# priv/dd72550f05eadc65542a1200cac67ad7/chatmail
#
# Request GETMETADATA "" /private/chatmail
# results in
# priv/dd72550f05eadc65542a1200cac67ad7/vendor/vendor.dovecot/pvt/server/chatmail
keyname = parts[1].split("/")
value = parts[2] if len(parts) > 2 else ""
if keyname[0] == "priv" and keyname[2] == "devicetoken":
notifier.set_token(keyname[1], value)
elif keyname[0] == "priv" and keyname[2] == "messagenew":
notifier.new_message_for_guid(keyname[1])
else:
# Transaction failed.
transactions[transaction_id] = "F\n"
class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer): class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
@ -79,16 +116,15 @@ class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
def main(): def main():
socket, username, config = sys.argv[1:] socket, username, config = sys.argv[1:]
passwd_entry = pwd.getpwnam(username) passwd_entry = pwd.getpwnam(username)
# XXX config is not currently used
config = read_config(config) config = read_config(config)
tokens = {} notifier = Notifier()
requests_session = requests.Session()
class Handler(StreamRequestHandler): class Handler(StreamRequestHandler):
def handle(self): def handle(self):
try: try:
handle_dovecot_protocol( handle_dovecot_protocol(self.rfile, self.wfile, notifier)
self.rfile, self.wfile, tokens, requests_session, config
)
except Exception: except Exception:
logging.exception("Exception in the handler") logging.exception("Exception in the handler")
raise raise
@ -98,6 +134,13 @@ def main():
except FileNotFoundError: except FileNotFoundError:
pass pass
# start notifier thread for signalling new messages to
# Delta Chat notification server
t = Thread(target=notifier.thread_run_loop)
t.setDaemon(True)
t.start()
with ThreadedUnixStreamServer(socket, Handler) as server: with ThreadedUnixStreamServer(socket, Handler) as server:
os.chown(socket, uid=passwd_entry.pw_uid, gid=passwd_entry.pw_gid) os.chown(socket, uid=passwd_entry.pw_uid, gid=passwd_entry.pw_gid)
try: try:

View File

@ -0,0 +1,132 @@
import io
from chatmaild.metadata import (
handle_dovecot_request,
handle_dovecot_protocol,
Notifier,
)
def test_handle_dovecot_request_lookup_fails():
notifier = Notifier()
res = handle_dovecot_request("Lpriv/123/chatmail", {}, notifier)
assert res == "N\n"
def test_handle_dovecot_request_happy_path():
notifier = Notifier()
transactions = {}
# lookups return the same NOTFOUND result
res = handle_dovecot_request("Lpriv/123/chatmail", transactions, notifier)
assert res == "N\n"
assert not notifier.guid2token and not transactions
# set device token in a transaction
tx = "1111"
msg = f"B{tx}\tuser"
res = handle_dovecot_request(msg, transactions, notifier)
assert not res and not notifier.guid2token
assert transactions == {tx: "O\n"}
msg = f"S{tx}\tpriv/guid00/devicetoken\t01234"
res = handle_dovecot_request(msg, transactions, notifier)
assert not res
assert len(transactions) == 1
assert len(notifier.guid2token) == 1
assert notifier.guid2token["guid00"] == "01234"
msg = f"C{tx}"
res = handle_dovecot_request(msg, transactions, notifier)
assert res == "O\n"
assert len(transactions) == 0
assert notifier.guid2token["guid00"] == "01234"
# trigger notification for incoming message
assert handle_dovecot_request(f"B{tx}\tuser", transactions, notifier) is None
msg = f"S{tx}\tpriv/guid00/messagenew"
assert handle_dovecot_request(msg, transactions, notifier) is None
assert notifier.to_notify_queue.get() == "guid00"
assert notifier.to_notify_queue.qsize() == 0
assert handle_dovecot_request(f"C{tx}\tuser", transactions, notifier) == "O\n"
assert not transactions
def test_handle_dovecot_protocol_set_devicetoken():
rfile = io.BytesIO(
b"\n".join(
[
b"HELLO",
b"Btx00\tuser",
b"Stx00\tpriv/guid00/devicetoken\t01234",
b"Ctx00",
]
)
)
wfile = io.BytesIO()
notifier = Notifier()
handle_dovecot_protocol(rfile, wfile, notifier)
assert notifier.guid2token["guid00"] == "01234"
assert wfile.getvalue() == b"O\n"
def test_handle_dovecot_protocol_messagenew():
rfile = io.BytesIO(
b"\n".join(
[
b"HELLO",
b"Btx01\tuser",
b"Stx01\tpriv/guid00/messagenew",
b"Ctx01",
]
)
)
wfile = io.BytesIO()
notifier = Notifier()
handle_dovecot_protocol(rfile, wfile, notifier)
assert wfile.getvalue() == b"O\n"
assert notifier.to_notify_queue.get() == "guid00"
assert notifier.to_notify_queue.qsize() == 0
def test_notifier_thread_run():
requests = []
class ReqMock:
def post(self, url, data, timeout):
requests.append((url, data, timeout))
class Result:
status_code = 200
return Result()
notifier = Notifier()
notifier.set_token("guid00", "01234")
notifier.new_message_for_guid("guid00")
notifier.thread_run_one(ReqMock())
url, data, timeout = requests[0]
assert data == "01234"
assert len(notifier.guid2token) == 1
def test_notifier_thread_run_gone_removes_token():
requests = []
class ReqMock:
def post(self, url, data, timeout):
requests.append((url, data, timeout))
class Result:
status_code = 410
return Result()
notifier = Notifier()
notifier.set_token("guid00", "01234")
notifier.new_message_for_guid("guid00")
assert notifier.guid2token["guid00"] == "01234"
notifier.thread_run_one(ReqMock())
url, data, timeout = requests[0]
assert data == "01234"
assert len(notifier.guid2token) == 0