chatmaild: move username/password length and passthrough_senders to chatmail.ini

This commit is contained in:
missytake 2023-12-11 20:31:36 +01:00
parent 156434b952
commit 0d0cc908c2
11 changed files with 116 additions and 54 deletions

View File

@ -13,9 +13,13 @@ class Config:
self.max_user_send_per_minute = int(params["max_user_send_per_minute"])
self.max_mailbox_size = params["max_mailbox_size"]
self.delete_mails_after = params["delete_mails_after"]
self.username_min_length = int(params["username_min_length"])
self.username_max_length = int(params["username_max_length"])
self.password_min_length = int(params["password_min_length"])
self.passthrough_senders = params["passthrough_senders"].split()
self.passthrough_recipients = params["passthrough_recipients"].split()
self.filtermail_smtp_port = int(params["filtermail_smtp_port"])
self.postfix_reinject_port = int(params["postfix_reinject_port"])
self.passthrough_recipients = params["passthrough_recipients"].split()
self.privacy_postal = params.get("privacy_postal")
self.privacy_mail = params.get("privacy_mail")
self.privacy_pdo = params.get("privacy_pdo")

View File

@ -12,6 +12,7 @@ from socketserver import (
import pwd
from .database import Database
from .config import read_config, Config
NOCREATE_FILE = "/etc/chatmail-nocreate"
@ -22,14 +23,17 @@ def encrypt_password(password: str):
return "{SHA512-CRYPT}" + passhash
def is_allowed_to_create(user, cleartext_password) -> bool:
def is_allowed_to_create(config: Config, user, cleartext_password) -> bool:
"""Return True if user and password are admissable."""
if os.path.exists(NOCREATE_FILE):
logging.warning(f"blocked account creation because {NOCREATE_FILE!r} exists.")
return False
if len(cleartext_password) < 9:
logging.warning("Password needs to be at least 9 characters long")
if len(cleartext_password) < config.password_min_length:
logging.warning(
"Password needs to be at least %s characters long",
config.password_min_length,
)
return False
parts = user.split("@")
@ -38,11 +42,15 @@ def is_allowed_to_create(user, cleartext_password) -> bool:
return False
localpart, domain = parts
if domain == "nine.testrun.org":
# nine.testrun.org policy, username has to be exactly nine chars
if len(localpart) != 9:
logging.warning(f"localpart {localpart!r} has not exactly nine chars")
return False
if (
len(localpart) > config.username_max_length
or len(localpart) < config.username_min_length
):
logging.warning(
"localpart %s has to be between %s and %s chars long"
% (localpart, config.username_min_length, config.username_max_length)
)
return False
return True
@ -60,7 +68,7 @@ def lookup_userdb(db, user):
return get_user_data(db, user)
def lookup_passdb(db, user, cleartext_password):
def lookup_passdb(db, config: Config, user, cleartext_password):
with db.write_transaction() as conn:
userdata = conn.get_user(user)
if userdata:
@ -72,7 +80,7 @@ def lookup_passdb(db, user, cleartext_password):
userdata["uid"] = "vmail"
userdata["gid"] = "vmail"
return userdata
if not is_allowed_to_create(user, cleartext_password):
if not is_allowed_to_create(config, user, cleartext_password):
return
encrypted_password = encrypt_password(cleartext_password)
@ -87,7 +95,7 @@ def lookup_passdb(db, user, cleartext_password):
)
def handle_dovecot_request(msg, db, mail_domain):
def handle_dovecot_request(msg, db, config: Config):
short_command = msg[0]
if short_command == "L": # LOOKUP
parts = msg[1:].split("\t")
@ -97,15 +105,15 @@ def handle_dovecot_request(msg, db, mail_domain):
res = ""
if namespace == "shared":
if type == "userdb":
if user.endswith(f"@{mail_domain}"):
if user.endswith(f"@{config.mail_domain}"):
res = lookup_userdb(db, user)
if res:
reply_command = "O"
else:
reply_command = "N"
elif type == "passdb":
if user.endswith(f"@{mail_domain}"):
res = lookup_passdb(db, user, cleartext_password=args[0])
if user.endswith(f"@{config.mail_domain}"):
res = lookup_passdb(db, config, user, cleartext_password=args[0])
if res:
reply_command = "O"
else:
@ -123,8 +131,7 @@ def main():
socket = sys.argv[1]
passwd_entry = pwd.getpwnam(sys.argv[2])
db = Database(sys.argv[3])
with open("/etc/mailname", "r") as fp:
mail_domain = fp.read().strip()
config = read_config(sys.argv[4])
class Handler(StreamRequestHandler):
def handle(self):
@ -133,7 +140,7 @@ def main():
msg = self.rfile.readline().strip().decode()
if not msg:
break
res = handle_dovecot_request(msg, db, mail_domain)
res = handle_dovecot_request(msg, db, config)
if res:
self.wfile.write(res.encode("ascii"))
self.wfile.flush()

View File

@ -2,7 +2,7 @@
Description=Chatmail dict authentication proxy for dovecot
[Service]
ExecStart={execpath} /run/dovecot/doveauth.socket vmail /home/vmail/passdb.sqlite
ExecStart={execpath} /run/dovecot/doveauth.socket vmail /home/vmail/passdb.sqlite {config_path}
Restart=always
RestartSec=30

View File

@ -111,6 +111,9 @@ class BeforeQueueHandler:
if not mail_encrypted and check_mdn(message, envelope):
return
if envelope.mail_from in self.config.passthrough_senders:
return
passthrough_recipients = self.config.passthrough_recipients
envelope_from_domain = from_addr.split("@").pop()
for recipient in envelope.rcpt_tos:

View File

@ -20,8 +20,17 @@ max_mailbox_size = 100M
# time after which seen mails are deleted
delete_mails_after = 40d
# list of chatmail accounts which can send unencrypted mail
#passthrough_senders (not implemented yet)
# minimum length a username must have
username_min_length = 6
# maximum length a username must have
username_max_length = 20
# minimum length a password must have
password_min_length = 9
# list of chatmail accounts which can send outbound un-encrypted mail
passthrough_senders =
# list of e-mail recipients for which to accept outbound un-encrypted mails
passthrough_recipients =

View File

@ -1,23 +1,25 @@
#!/usr/bin/python3
#!/usr/local/lib/chatmaild/venv/bin/python3
""" CGI script for creating new accounts. """
import json
import random
mail_domain_path = "/etc/mailname"
from chatmaild.config import read_config, Config
CONFIG_PATH = "/usr/local/lib/chatmaild/chatmail.ini"
def create_newemail_dict(domain):
def create_newemail_dict(config: Config):
alphanumeric = "abcdefghijklmnopqrstuvwxyz1234567890"
user = "".join(random.choices(alphanumeric, k=9))
password = "".join(random.choices(alphanumeric, k=12))
return dict(email=f"{user}@{domain}", password=f"{password}")
user = "".join(random.choices(alphanumeric, k=config.username_min_length))
password = "".join(random.choices(alphanumeric, k=config.password_min_length + 3))
return dict(email=f"{user}@{config.mail_domain}", password=f"{password}")
def print_new_account():
domain = open(mail_domain_path).read().strip()
creds = create_newemail_dict(domain=domain)
config = read_config(CONFIG_PATH)
creds = create_newemail_dict(config)
print("Content-Type: application/json")
print("")

View File

@ -9,28 +9,31 @@ from chatmaild.doveauth import get_user_data, lookup_passdb, handle_dovecot_requ
from chatmaild.database import DBError
def test_basic(db):
lookup_passdb(db, "link2xt@c1.testrun.org", "Pieg9aeToe3eghuthe5u")
def test_basic(db, make_config):
config = make_config("c1.testrun.org")
lookup_passdb(db, config, "link2xt@c1.testrun.org", "Pieg9aeToe3eghuthe5u")
data = get_user_data(db, "link2xt@c1.testrun.org")
assert data
data2 = lookup_passdb(db, "link2xt@c1.testrun.org", "Pieg9aeToe3eghuthe5u")
data2 = lookup_passdb(db, config, "link2xt@c1.testrun.org", "Pieg9aeToe3eghuthe5u")
assert data == data2
def test_dont_overwrite_password_on_wrong_login(db):
def test_dont_overwrite_password_on_wrong_login(db, make_config):
"""Test that logging in with a different password doesn't create a new user"""
res = lookup_passdb(db, "newuser1@something.org", "kajdlkajsldk12l3kj1983")
config = make_config("something.org")
res = lookup_passdb(db, config, "newuser1@something.org", "kajdlkajsldk12l3kj1983")
assert res["password"]
res2 = lookup_passdb(db, "newuser1@something.org", "kajdlqweqwe")
res2 = lookup_passdb(db, config, "newuser1@something.org", "kajdlqweqwe")
# this function always returns a password hash, which is actually compared by dovecot.
assert res["password"] == res2["password"]
def test_nocreate_file(db, monkeypatch, tmpdir):
def test_nocreate_file(db, monkeypatch, tmpdir, make_config):
config = make_config("something.org")
p = tmpdir.join("nocreate")
p.write("")
monkeypatch.setattr(chatmaild.doveauth, "NOCREATE_FILE", str(p))
lookup_passdb(db, "newuser1@something.org", "zequ0Aimuchoodaechik")
lookup_passdb(db, config, "newuser1@something.org", "zequ0Aimuchoodaechik")
assert not get_user_data(db, "newuser1@something.org")
@ -45,12 +48,13 @@ def test_too_high_db_version(db):
db.ensure_tables()
def test_handle_dovecot_request(db):
def test_handle_dovecot_request(db, make_config):
config = make_config("c3.testrun.org")
msg = (
"Lshared/passdb/laksjdlaksjdlaksjdlk12j3l1k2j3123/"
"some42@c3.testrun.org\tsome42@c3.testrun.org"
)
res = handle_dovecot_request(msg, db, "c3.testrun.org")
res = handle_dovecot_request(msg, db, config)
assert res
assert res[0] == "O" and res.endswith("\n")
userdata = json.loads(res[1:].strip())
@ -59,16 +63,19 @@ def test_handle_dovecot_request(db):
assert userdata["password"].startswith("{SHA512-CRYPT}")
def test_50_concurrent_lookups_different_accounts(db, gencreds):
def test_50_concurrent_lookups_different_accounts(
db, gencreds, make_config, maildomain
):
num_threads = 50
req_per_thread = 5
results = queue.Queue()
config = make_config(maildomain)
def lookup(db):
for i in range(req_per_thread):
addr, password = gencreds()
try:
lookup_passdb(db, addr, password)
lookup_passdb(db, config, addr, password)
except Exception:
results.put(traceback.format_exc())
else:

View File

@ -127,3 +127,19 @@ def test_excempt_privacy(maildata, gencreds, handler):
content = msg.as_bytes()
assert "500" in handler.check_DATA(envelope=env2)
def test_passthrough_senders(gencreds, handler, maildata):
acc1 = gencreds()[0]
to_addr = "recipient@something.org"
handler.config.passthrough_senders = [acc1]
msg = maildata("plain.eml", acc1, to_addr)
class env:
mail_from = acc1
rcpt_tos = to_addr
content = msg.as_bytes()
# assert that None/no error is returned
assert not handler.check_DATA(envelope=env)

View File

@ -4,26 +4,26 @@ import chatmaild
from chatmaild.newemail import create_newemail_dict, print_new_account
def test_create_newemail_dict():
ac1 = create_newemail_dict(domain="example.org")
def test_create_newemail_dict(make_config):
config = make_config("example.org")
ac1 = create_newemail_dict(config)
assert "@" in ac1["email"]
assert len(ac1["password"]) >= 10
ac2 = create_newemail_dict(domain="example.org")
ac2 = create_newemail_dict(config)
assert ac1["email"] != ac2["email"]
assert ac1["password"] != ac2["password"]
def test_print_new_account(capsys, monkeypatch, maildomain, tmpdir):
p = tmpdir.join("mailname")
p.write(maildomain)
monkeypatch.setattr(chatmaild.newemail, "mail_domain_path", str(p))
def test_print_new_account(capsys, monkeypatch, maildomain, tmpdir, make_config):
config = make_config(maildomain)
monkeypatch.setattr(chatmaild.newemail, "CONFIG_PATH", str(config._inipath))
print_new_account()
out, err = capsys.readouterr()
lines = out.split("\n")
assert lines[0] == "Content-Type: application/json"
assert not lines[1]
dic = json.loads(lines[2])
assert dic["email"].endswith(f"@{maildomain}")
assert dic["email"].endswith(f"@{config.mail_domain}")
assert len(dic["password"]) >= 10

View File

@ -1,6 +1,16 @@
import requests
from cmdeploy.genqr import gen_qr_png_data
def test_gen_qr_png_data(maildomain):
data = gen_qr_png_data(maildomain)
assert data
def test_fastcgi_working(maildomain, chatmail_config):
url = f"https://{maildomain}/cgi-bin/newemail.py"
print(url)
res = requests.post(url)
assert maildomain in res.json().get("email")
assert len(res.json().get("password")) > chatmail_config.password_min_length

View File

@ -228,18 +228,22 @@ def imap_or_smtp(request):
@pytest.fixture
def gencreds(maildomain):
def gencreds(chatmail_config):
count = itertools.count()
next(count)
def gen(domain=None):
domain = domain if domain else maildomain
domain = domain if domain else chatmail_config.mail_domain
while 1:
num = next(count)
alphanumeric = "abcdefghijklmnopqrstuvwxyz1234567890"
user = "".join(random.choices(alphanumeric, k=10))
user = f"ac{num}_{user}"[:9]
password = "".join(random.choices(alphanumeric, k=12))
user = "".join(
random.choices(alphanumeric, k=chatmail_config.username_max_length)
)
user = f"ac{num}_{user}"[: chatmail_config.username_max_length]
password = "".join(
random.choices(alphanumeric, k=chatmail_config.password_min_length)
)
yield f"{user}@{domain}", f"{password}"
return lambda domain=None: next(gen(domain))