DNS: get DNS records with server-side dig

This commit is contained in:
missytake 2023-12-14 16:29:37 +01:00
parent 7ed59ea8bc
commit 0238437ce7
3 changed files with 30 additions and 79 deletions

View File

@ -399,7 +399,7 @@ def deploy_chatmail(config_path: Path) -> None:
# to use 127.0.0.1 as the resolver.
apt.packages(
name="Install unbound",
packages=["unbound", "unbound-anchor"],
packages=["unbound", "unbound-anchor", "dnsutils"],
)
server.shell(
name="Generate root keys for validating DNSSEC",

View File

@ -124,24 +124,11 @@ def dns_cmd(args, out):
dkim_entry = read_dkim_entries(out.shell_output(f"{ssh} -- opendkim-genzone -F"))
ipv6 = dns.get_ipv6()
reverse_ipv6 = dns.check_ptr_record(ipv6, args.config.mail_domain)
ipv4 = dns.get_ipv4()
print()
if not dns.check_ptr_record(ipv4, args.config.mail_domain):
print(
f"You should add a PTR/reverse DNS entry for {ipv4}, with the value: {args.config.mail_domain}"
)
print(
"You can do so at your hosting provider (maybe this isn't your DNS provider).\n"
)
if not dns.check_ptr_record(ipv6, args.config.mail_domain):
print(
f"You should add a PTR/reverse DNS entry for {ipv6}, with the value: {args.config.mail_domain}"
)
print(
"You can do so at your hosting provider (maybe this isn't your DNS provider).\n"
)
reverse_ipv4 = dns.check_ptr_record(ipv4, args.config.mail_domain)
to_print = []
with open(template, "r") as f:
zonefile = (
f.read()
@ -179,11 +166,11 @@ def dns_cmd(args, out):
to_print.append(line)
if " MX " in line:
domain, typ, prio, value = line.split()
current = dns.resolve_mx(domain[:-1])
if not current[0]:
current = dns.get(typ, domain[:-1])
if not current:
to_print.append(line)
elif current[1] != value:
print(line.replace(prio, str(current[0] + 1)))
elif current.split()[1] != value:
print(line.replace(prio, str(int(current[0]) + 1)))
if " SRV " in line:
domain, typ, prio, weight, port, value = line.split()
current = dns.get("SRV", domain[:-1])
@ -207,6 +194,7 @@ def dns_cmd(args, out):
current = f"( {current} )"
if current.replace(";", "\\;") != data:
to_print.append(dkim_entry)
if to_print:
to_print.insert(
0, "You should configure the following DNS entries at your provider:\n"
@ -218,6 +206,21 @@ def dns_cmd(args, out):
else:
out.green("Great! All your DNS entries are correct.")
if not reverse_ipv4:
print(
f"\nYou should add a PTR/reverse DNS entry for {ipv4}, with the value: {args.config.mail_domain}"
)
print(
"You can do so at your hosting provider (maybe this isn't your DNS provider)."
)
if not reverse_ipv6:
print(
f"\nYou should add a PTR/reverse DNS entry for {ipv6}, with the value: {args.config.mail_domain}"
)
print(
"You can do so at your hosting provider (maybe this isn't your DNS provider)."
)
def status_cmd(args, out):
"""Display status for online chatmail instance."""

View File

@ -1,22 +1,5 @@
import requests
from ipaddress import ip_address
from json.decoder import JSONDecodeError
resolvers = [
"https://dns.nextdns.io/dns-query",
"https://dns.google/resolve",
"https://cloudflare-dns.com/dns-query",
]
dns_types = {
"A": 1,
"AAAA": 28,
"CNAME": 5,
"MX": 15,
"SRV": 33,
"CAA": 257,
"TXT": 16,
"PTR": 12,
}
class DNS:
@ -35,47 +18,12 @@ class DNS:
def get(self, typ: str, domain: str) -> str:
"""Get a DNS entry"""
for url in resolvers:
r = self.session.get(
url,
params={"name": domain, "type": typ},
headers={"accept": "application/dns-json"},
)
try:
j = r.json()
except JSONDecodeError:
# ignore DNS resolvers which don't give us JSON
continue
if "Answer" in j:
for answer in j["Answer"]:
if answer["type"] == dns_types[typ]:
return answer["data"]
return ""
def resolve_mx(self, domain: str) -> (str, str):
"""Resolve an MX entry"""
for url in resolvers:
r = self.session.get(
url,
params={"name": domain, "type": "MX"},
headers={"accept": "application/dns-json"},
)
try:
j = r.json()
except JSONDecodeError:
# ignore DNS resolvers which don't give us JSON
continue
if "Answer" in j:
result = (0, None)
for answer in j["Answer"]:
if answer["type"] == dns_types["MX"]:
prio, server_name = answer["data"].split()
if int(prio) > result[0]:
result = (int(prio), server_name)
return result
return None, None
dig_result = self.out.shell_output(f"{self.ssh} -- dig {typ} {domain}")
line_num = 0
for line in dig_result.splitlines():
line_num += 1
if line.strip() == ";; ANSWER SECTION:":
return dig_result.splitlines()[line_num].split("\t")[-1]
def resolve(self, domain: str) -> str:
result = self.get("A", domain)