cmdeploy: show DNS info at begin and end of cmdeploy run

This commit is contained in:
missytake 2023-12-14 17:30:05 +01:00
parent d642224a73
commit 146def2f06
2 changed files with 173 additions and 152 deletions

View File

@ -3,7 +3,6 @@ Provides the `cmdeploy` entry point function,
along with command line option and subcommand parsing.
"""
import argparse
import datetime
import shutil
import subprocess
import importlib.resources
@ -15,7 +14,7 @@ from pathlib import Path
from termcolor import colored
from chatmaild.config import read_config, write_initial_config
from cmdeploy.dns import DNS
from cmdeploy.dns import show_dns, check_necessary_dns
#
@ -39,33 +38,12 @@ def init_cmd(args, out):
else:
write_initial_config(args.inipath, mail_domain)
out.green(f"created config file for {mail_domain} in {args.inipath}")
dns = DNS(out, mail_domain)
try:
ipaddress = dns.resolve(args.chatmail_domain)
mta_ipadress = dns.resolve("mta-sts." + args.chatmail_domain)
except subprocess.CalledProcessError:
ipaddress = None
mta_ipadress = None
entries = 0
to_print = ["Now you should add %dnsentry% at your DNS provider:\n"]
if not ipaddress:
entries += 1
to_print.append(f"\tA\t{args.chatmail_domain}.\t\t<your server's IPv4 address>")
if not mta_ipadress or mta_ipadress != ipaddress:
entries += 1
to_print.append(
f"\tCNAME\tmta-sts.{args.chatmail_domain}.\t{args.chatmail_domain}."
)
if entries == 1:
singular = "this entry"
elif entries == 2:
singular = "these entries"
else:
return
to_print[0] = to_print[0].replace("%dnsentry%", singular)
for line in to_print:
print(line)
print()
check_necessary_dns(
args,
out,
"\nNow you should add %dnsentry% at your DNS provider:\n",
mail_domain,
)
def run_cmd_options(parser):
@ -79,6 +57,14 @@ def run_cmd_options(parser):
def run_cmd(args, out):
"""Deploy chatmail services on the remote server."""
mail_domain = args.config.mail_domain
if not check_necessary_dns(
args,
out,
"\nmissing DNS entries, please add %dnsentry% at your DNS provider:\n",
mail_domain,
):
sys.exit(1)
env = os.environ.copy()
env["CHATMAIL_INI"] = args.inipath
@ -86,15 +72,8 @@ def run_cmd(args, out):
pyinf = "pyinfra --dry" if args.dry_run else "pyinfra"
cmd = f"{pyinf} --ssh-user root {args.config.mail_domain} {deploy_path}"
mail_domain = args.config.mail_domain
dns = DNS(out, mail_domain)
root_ip = dns.resolve(mail_domain)
mta_ip = dns.resolve(f"mta-sts.{mail_domain}")
if not root_ip or root_ip != mta_ip:
out.red("DNS entries missing. Show instructions with:\n")
print(f"\tcmdeploy init {mail_domain}\n")
sys.exit(1)
out.check_call(cmd, env=env)
show_dns(args, out)
def dns_cmd_options(parser):
@ -107,121 +86,7 @@ def dns_cmd_options(parser):
def dns_cmd(args, out):
"""Generate dns zone file."""
template = importlib.resources.files(__package__).joinpath("chatmail.zone.f")
mail_domain = args.config.mail_domain
ssh = f"ssh root@{mail_domain}"
dns = DNS(out, mail_domain)
def read_dkim_entries(entry):
lines = []
for line in entry.split("\n"):
if line.startswith(";") or not line.strip():
continue
line = line.replace("\t", " ")
lines.append(line)
return "\n".join(lines)
print("Checking your DKIM keys and DNS entries...")
acme_account_url = out.shell_output(f"{ssh} -- acmetool account-url")
dkim_entry = read_dkim_entries(out.shell_output(f"{ssh} -- opendkim-genzone -F"))
ipv6 = dns.get_ipv6()
reverse_ipv6 = dns.check_ptr_record(ipv6, args.config.mail_domain)
ipv4 = dns.get_ipv4()
reverse_ipv4 = dns.check_ptr_record(ipv4, args.config.mail_domain)
to_print = []
with open(template, "r") as f:
zonefile = (
f.read()
.format(
acme_account_url=acme_account_url,
email=f"root@{args.config.mail_domain}",
sts_id=datetime.datetime.now().strftime("%Y%m%d%H%M"),
chatmail_domain=args.config.mail_domain,
dkim_entry=dkim_entry,
ipv6=ipv6,
ipv4=ipv4,
)
.strip()
)
if args.zonefile:
with open(args.zonefile, "w+") as zf:
zf.write(zonefile)
print(f"DNS records successfully written to: {args.zonefile}")
return
started_dkim_parsing = False
for line in zonefile.splitlines():
line = line.format(
acme_account_url=acme_account_url,
email=f"root@{args.config.mail_domain}",
sts_id=datetime.datetime.now().strftime("%Y%m%d%H%M"),
chatmail_domain=args.config.mail_domain,
dkim_entry=dkim_entry,
ipv6=ipv6,
).strip()
for typ in ["A", "AAAA", "CNAME", "CAA"]:
if f" {typ} " in line:
domain, value = line.split(f" {typ} ")
current = dns.get(typ, domain.strip()[:-1])
if current != value.strip():
to_print.append(line)
if " MX " in line:
domain, typ, prio, value = line.split()
current = dns.get(typ, domain[:-1])
if not current:
to_print.append(line)
elif current.split()[1] != value:
print(line.replace(prio, str(int(current[0]) + 1)))
if " SRV " in line:
domain, typ, prio, weight, port, value = line.split()
current = dns.get("SRV", domain[:-1])
if current != f"{prio} {weight} {port} {value}":
to_print.append(line)
if " TXT " in line:
domain, value = line.split(" TXT ")
current = dns.get("TXT", domain.strip()[:-1])
if domain.startswith("_mta-sts."):
if current.split("id=")[0] == value.split("id=")[0]:
continue
if current != value:
to_print.append(line)
if " IN TXT ( " in line:
started_dkim_parsing = True
dkim_lines = [line]
if started_dkim_parsing and line.startswith('"'):
dkim_lines.append(" " + line)
domain, data = "\n".join(dkim_lines).split(" IN TXT ")
current = dns.get("TXT", domain.strip()[:-1]).replace('" "', '"\n "')
current = f"( {current} )"
if current.replace(";", "\\;") != data:
to_print.append(dkim_entry)
if to_print:
to_print.insert(
0, "You should configure the following DNS entries at your provider:\n"
)
to_print.append(
"\nIf you already configured the DNS entries, don't worry. It can take a while until they are public."
)
print("\n".join(to_print))
else:
out.green("Great! All your DNS entries are correct.")
if not reverse_ipv4:
print(
f"\nYou should add a PTR/reverse DNS entry for {ipv4}, with the value: {args.config.mail_domain}"
)
print(
"You can do so at your hosting provider (maybe this isn't your DNS provider)."
)
if not reverse_ipv6:
print(
f"\nYou should add a PTR/reverse DNS entry for {ipv6}, with the value: {args.config.mail_domain}"
)
print(
"You can do so at your hosting provider (maybe this isn't your DNS provider)."
)
show_dns(args, out)
def status_cmd(args, out):

View File

@ -1,4 +1,7 @@
import requests
import importlib
import subprocess
import datetime
from ipaddress import ip_address
@ -40,3 +43,156 @@ class DNS:
"""Check the PTR record for an IPv4 or IPv6 address."""
result = self.get("PTR", ip_address(ip).reverse_pointer)
return result[:-1] == mail_domain
def show_dns(args, out):
template = importlib.resources.files(__package__).joinpath("chatmail.zone.f")
mail_domain = args.config.mail_domain
ssh = f"ssh root@{mail_domain}"
dns = DNS(out, mail_domain)
def read_dkim_entries(entry):
lines = []
for line in entry.split("\n"):
if line.startswith(";") or not line.strip():
continue
line = line.replace("\t", " ")
lines.append(line)
return "\n".join(lines)
print("Checking your DKIM keys and DNS entries...")
acme_account_url = out.shell_output(f"{ssh} -- acmetool account-url")
dkim_entry = read_dkim_entries(out.shell_output(f"{ssh} -- opendkim-genzone -F"))
ipv6 = dns.get_ipv6()
reverse_ipv6 = dns.check_ptr_record(ipv6, args.config.mail_domain)
ipv4 = dns.get_ipv4()
reverse_ipv4 = dns.check_ptr_record(ipv4, args.config.mail_domain)
to_print = []
with open(template, "r") as f:
zonefile = (
f.read()
.format(
acme_account_url=acme_account_url,
email=f"root@{args.config.mail_domain}",
sts_id=datetime.datetime.now().strftime("%Y%m%d%H%M"),
chatmail_domain=args.config.mail_domain,
dkim_entry=dkim_entry,
ipv6=ipv6,
ipv4=ipv4,
)
.strip()
)
if args.zonefile:
with open(args.zonefile, "w+") as zf:
zf.write(zonefile)
print(f"DNS records successfully written to: {args.zonefile}")
return
started_dkim_parsing = False
for line in zonefile.splitlines():
line = line.format(
acme_account_url=acme_account_url,
email=f"root@{args.config.mail_domain}",
sts_id=datetime.datetime.now().strftime("%Y%m%d%H%M"),
chatmail_domain=args.config.mail_domain,
dkim_entry=dkim_entry,
ipv6=ipv6,
).strip()
for typ in ["A", "AAAA", "CNAME", "CAA"]:
if f" {typ} " in line:
domain, value = line.split(f" {typ} ")
current = dns.get(typ, domain.strip()[:-1])
if current != value.strip():
to_print.append(line)
if " MX " in line:
domain, typ, prio, value = line.split()
current = dns.get(typ, domain[:-1])
if not current:
to_print.append(line)
elif current.split()[1] != value:
print(line.replace(prio, str(int(current[0]) + 1)))
if " SRV " in line:
domain, typ, prio, weight, port, value = line.split()
current = dns.get("SRV", domain[:-1])
if current != f"{prio} {weight} {port} {value}":
to_print.append(line)
if " TXT " in line:
domain, value = line.split(" TXT ")
current = dns.get("TXT", domain.strip()[:-1])
if domain.startswith("_mta-sts."):
if current.split("id=")[0] == value.split("id=")[0]:
continue
if current != value:
to_print.append(line)
if " IN TXT ( " in line:
started_dkim_parsing = True
dkim_lines = [line]
if started_dkim_parsing and line.startswith('"'):
dkim_lines.append(" " + line)
domain, data = "\n".join(dkim_lines).split(" IN TXT ")
current = dns.get("TXT", domain.strip()[:-1]).replace('" "', '"\n "')
current = f"( {current} )"
if current.replace(";", "\\;") != data:
to_print.append(dkim_entry)
if to_print:
to_print.insert(
0, "You should configure the following DNS entries at your provider:\n"
)
to_print.append(
"\nIf you already configured the DNS entries, don't worry. It can take a while until they are public."
)
print("\n".join(to_print))
else:
out.green("Great! All your DNS entries are correct.")
if not reverse_ipv4:
print(
f"\nYou should add a PTR/reverse DNS entry for {ipv4}, with the value: {args.config.mail_domain}"
)
print(
"You can do so at your hosting provider (maybe this isn't your DNS provider)."
)
if not reverse_ipv6:
print(
f"\nYou should add a PTR/reverse DNS entry for {ipv6}, with the value: {args.config.mail_domain}"
)
print(
"You can do so at your hosting provider (maybe this isn't your DNS provider)."
)
def check_necessary_dns(args, out, msg, mail_domain):
"""Check whether $mail_domain and mta-sts.$mail_domain resolve.
:param args: the argparse args object
:param out: an object to control CLI output
:param msg: a string like: "Now you should add %dnsentry% at your DNS provider:\n"
:param mail_domain: the mail_domain of the chatmail server
"""
dns = DNS(out, mail_domain)
try:
ipaddress = dns.resolve(mail_domain)
mta_ipadress = dns.resolve("mta-sts." + mail_domain)
except subprocess.CalledProcessError:
ipaddress = None
mta_ipadress = None
entries = 0
to_print = [msg]
if not ipaddress:
entries += 1
to_print.append(f"\tA\t{mail_domain}.\t\t<your server's IPv4 address>")
if not mta_ipadress or mta_ipadress != ipaddress:
entries += 1
to_print.append(f"\tCNAME\tmta-sts.{mail_domain}.\t{mail_domain}.")
if entries == 1:
singular = "this entry"
elif entries == 2:
singular = "these entries"
else:
return True
to_print[0] = to_print[0].replace("%dnsentry%", singular)
for line in to_print:
print(line)
print()