new-site/scripts/dmarc_report_parser.py
justin 8e5590b492 mail: DMARC aggregate-report parser + dedicated dmarc@ mailbox ingestion
Tool 2 of the deliverability monitoring pair (Tool 1 = mail_reputation_monitor).
DMARC rua reports from dozens of operators (Google, Yahoo, Comcast, Cox, Bell,
Mimecast, Cisco ESA, GMX, mail.com, ...) were landing in ops@ (dmarc@ was a DL),
burying real mail and never parsed. Now ingested + queryable:

- dmarc@performancewest.net converted DL -> dedicated Carbonio mailbox; isolated
  IMAP creds in server .env, surfaced to workers in docker-compose.yml (mirrors
  OPS_IMAP_*). 29 historical reports moved ops@ -> dmarc@ via IMAP.
- scripts/dmarc_report_parser.py: IMAP fetch unseen -> decompress .gz/.zip/.xml
  (namespace-agnostic: classic + urn:ietf:params:xml:ns:dmarc-2.0 GMX/mail.com) ->
  parse aggregate XML -> upsert dmarc_report (keyed (org_name,report_id), no-op on
  re-parse) + dmarc_record per source IP. dmarc_pass = dkim_aligned OR spf_aligned.
  Marks \Seen. --dry-run/--all/--alert (7d per-IP summary + Telegram if one of OUR
  IPs <95% pass, or EXTERNAL IP sends >=20 failing msgs as us = spoofing under
  p=reject). psycopg2 imported lazily so --dry-run runs without the driver.
- api/migrations/102_dmarc_aggregate.sql: dmarc_report + dmarc_record tables.
- infra/cron/pw-dmarc-parser: 06:20 UTC daily --alert (after reputation, before scrub).
- docs/deliverability.md: DMARC section DONE; query examples.

Verified: dry-run --all parses all 28 reports (1 non-report test probe), 0 unknown
after the namespace fix.
2026-06-19 08:50:20 -05:00

385 lines
16 KiB
Python

#!/usr/bin/env python3
"""DMARC aggregate (rua) report parser -- cross-operator SPF/DKIM alignment trend.
WHY
---
DMARC aggregate reports (RFC 7489) are the authoritative, cross-operator view of
who sends mail AS us and whether it passes SPF + DKIM *alignment*. Every major
receiver (Google, Yahoo, Comcast, Cox, Bell, Mimecast, Cisco ESA, GMX, mail.com,
Microsoft, ...) emails one zipped/gzipped XML per day to the rua address
(dmarc@performancewest.net). Reading dozens/day by hand is hopeless. This worker
turns them into queryable rows so we can SEE:
* our own senders (.94 bulk / .107 hcout / .71 transactional / .15 relay) all
passing alignment -- the whole point of this session's DKIM/subdomain fixes
(bulk signs d=send.performancewest.net selector "send"; root signs
d=performancewest.net selector "mail"); and
* any UNKNOWN IP sending as us and FAILING -- spoofing or a forgotten relay,
which is pure reputation poison under our p=reject policy.
WHAT IT DOES
------------
IMAP-fetches unread messages from dmarc@, finds the .zip/.gz/.xml attachment,
decompresses + parses the aggregate XML, and upserts one dmarc_report row
(keyed (org_name, report_id), so re-parsing is a no-op) plus one dmarc_record
row per <record>. Then marks the message \\Seen so the next run skips it.
Idempotent end-to-end: safe to run on a cron over the same INBOX.
USAGE
-----
# parse all UNSEEN reports in dmarc@ INBOX, store, mark seen
python3 -m scripts.dmarc_report_parser
# parse but don't write / don't mark (inspect)
python3 -m scripts.dmarc_report_parser --dry-run
# also parse already-seen messages (backfill the historical 29)
python3 -m scripts.dmarc_report_parser --all
# print a summary of recent alignment + Telegram alert on unknown-IP failures
python3 -m scripts.dmarc_report_parser --alert
Connection: IMAP via DMARC_IMAP_{HOST,PORT,USER,PASS}; DB via DATABASE_URL.
Telegram: TELEGRAM_BOT_TOKEN / TELEGRAM_CHAT_ID.
"""
from __future__ import annotations
import argparse
import email
import gzip
import io
import json
import os
import sys
import urllib.request
import zipfile
from datetime import datetime, timezone
from xml.etree import ElementTree as ET
import imaplib
# psycopg2 is imported lazily (only when actually writing to the DB) so that
# --dry-run works on hosts without the driver installed (e.g. the bare app host;
# the DB is only reachable from inside the workers container anyway).
# ── IMAP configuration ───────────────────────────────────────────────────────
IMAP_HOST = os.getenv("DMARC_IMAP_HOST", "mail.performancewest.net")
IMAP_PORT = int(os.getenv("DMARC_IMAP_PORT", "993"))
IMAP_USER = os.getenv("DMARC_IMAP_USER", "dmarc@performancewest.net")
IMAP_PASS = os.getenv("DMARC_IMAP_PASS", "")
IMAP_FOLDER = os.getenv("DMARC_IMAP_FOLDER", "INBOX")
# Our own egress IPs (so the summary can flag UNKNOWN senders failing alignment).
OUR_IPS = {
"207.174.124.71", # transactional (app server)
"207.174.124.94", # bulk / trucking (out05)
"207.174.124.107", # healthcare (hcout1)
"207.174.124.15", # co.carrierone.com relay (Carbonio)
}
# ── attachment extraction ─────────────────────────────────────────────────────
def extract_xml(payload: bytes, filename: str) -> bytes | None:
"""Decompress a DMARC report attachment to raw XML bytes."""
fn = (filename or "").lower()
try:
if fn.endswith(".gz"):
return gzip.decompress(payload)
if fn.endswith(".zip"):
zf = zipfile.ZipFile(io.BytesIO(payload))
# reports contain exactly one XML; take the first entry
return zf.read(zf.namelist()[0])
if fn.endswith(".xml"):
return payload
except Exception as exc: # corrupt attachment -- skip, don't crash the run
print(f"WARN: could not decompress {filename}: {exc}", file=sys.stderr)
return None
def find_report_xml(msg: email.message.Message) -> bytes | None:
"""Walk an email and return the first decoded DMARC XML attachment."""
for part in msg.walk():
fn = part.get_filename()
if not fn:
# Some operators send application/gzip without a filename param.
ct = part.get_content_type()
if ct in ("application/gzip", "application/x-gzip"):
fn = "report.gz"
elif ct == "application/zip":
fn = "report.zip"
else:
continue
payload = part.get_payload(decode=True)
if not payload:
continue
xml = extract_xml(payload, fn)
if xml:
return xml
return None
# ── XML parsing ───────────────────────────────────────────────────────────────
def _text(node, path, default=""):
el = node.find(path) if node is not None else None
return el.text.strip() if el is not None and el.text else default
def _epoch_to_dt(s):
try:
return datetime.fromtimestamp(int(s), tz=timezone.utc)
except (ValueError, TypeError):
return None
def _strip_ns(root: ET.Element) -> ET.Element:
"""Strip XML namespaces in-place so find() works regardless of schema.
Most operators emit the classic no-namespace aggregate XML, but some (GMX,
mail.com) use the RFC-registered urn:ietf:params:xml:ns:dmarc-2.0 namespace,
which would make namespace-naive find('record') miss everything. Rewriting
each tag to its local name normalizes both forms.
"""
for el in root.iter():
if isinstance(el.tag, str) and "}" in el.tag:
el.tag = el.tag.rsplit("}", 1)[1]
return root
def parse_report(xml: bytes) -> tuple[dict, list[dict]] | None:
"""Parse one DMARC aggregate XML into (report_header, [records])."""
try:
root = _strip_ns(ET.fromstring(xml))
except ET.ParseError as exc:
print(f"WARN: XML parse error: {exc}", file=sys.stderr)
return None
meta = root.find("report_metadata")
pol = root.find("policy_published")
dr = meta.find("date_range") if meta is not None else None
pct_raw = _text(pol, "pct")
header = {
"org_name": _text(meta, "org_name") or "unknown",
"org_email": _text(meta, "email"),
"report_id": _text(meta, "report_id") or "unknown",
"date_begin": _epoch_to_dt(_text(dr, "begin")),
"date_end": _epoch_to_dt(_text(dr, "end")),
"policy_domain": _text(pol, "domain"),
"policy_p": _text(pol, "p"),
"policy_sp": _text(pol, "sp"),
"policy_adkim": _text(pol, "adkim"),
"policy_aspf": _text(pol, "aspf"),
"policy_pct": int(pct_raw) if pct_raw.isdigit() else None,
}
records = []
for rec in root.findall("record"):
row = rec.find("row")
pe = row.find("policy_evaluated") if row is not None else None
ident = rec.find("identifiers")
auth = rec.find("auth_results")
dkim = auth.find("dkim") if auth is not None else None
spf = auth.find("spf") if auth is not None else None
cnt_raw = _text(row, "count")
dkim_aligned = _text(pe, "dkim")
spf_aligned = _text(pe, "spf")
records.append({
"source_ip": _text(row, "source_ip"),
"msg_count": int(cnt_raw) if cnt_raw.isdigit() else 0,
"disposition": _text(pe, "disposition"),
"dkim_aligned": dkim_aligned,
"spf_aligned": spf_aligned,
"dmarc_pass": (dkim_aligned == "pass") or (spf_aligned == "pass"),
"header_from": _text(ident, "header_from"),
"envelope_from": _text(ident, "envelope_from"),
"dkim_domain": _text(dkim, "domain"),
"dkim_selector": _text(dkim, "selector"),
"dkim_result": _text(dkim, "result"),
"spf_domain": _text(spf, "domain"),
"spf_result": _text(spf, "result"),
})
return header, records
# ── DB upsert ─────────────────────────────────────────────────────────────────
def store(conn, header: dict, records: list[dict]) -> bool:
"""Insert a report + its records. Returns True if newly inserted."""
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO dmarc_report
(org_name, org_email, report_id, date_begin, date_end,
policy_domain, policy_p, policy_sp, policy_adkim, policy_aspf, policy_pct)
VALUES (%(org_name)s, %(org_email)s, %(report_id)s, %(date_begin)s, %(date_end)s,
%(policy_domain)s, %(policy_p)s, %(policy_sp)s, %(policy_adkim)s,
%(policy_aspf)s, %(policy_pct)s)
ON CONFLICT (org_name, report_id) DO NOTHING
RETURNING id
""",
header,
)
row = cur.fetchone()
if not row:
conn.commit()
return False # already ingested
report_pk = row[0]
for rec in records:
rec["report_id"] = report_pk
cur.execute(
"""
INSERT INTO dmarc_record
(report_id, source_ip, msg_count, disposition, dkim_aligned,
spf_aligned, dmarc_pass, header_from, envelope_from,
dkim_domain, dkim_selector, dkim_result, spf_domain, spf_result)
VALUES (%(report_id)s, %(source_ip)s, %(msg_count)s, %(disposition)s,
%(dkim_aligned)s, %(spf_aligned)s, %(dmarc_pass)s, %(header_from)s,
%(envelope_from)s, %(dkim_domain)s, %(dkim_selector)s,
%(dkim_result)s, %(spf_domain)s, %(spf_result)s)
""",
rec,
)
conn.commit()
return True
# ── summary + alerting ────────────────────────────────────────────────────────
def summarize(conn, days: int = 7) -> tuple[str, list[str]]:
"""Build a human summary of recent alignment. Returns (text, problems)."""
with conn.cursor() as cur:
cur.execute(
"""
SELECT r.source_ip,
sum(r.msg_count) AS total,
sum(r.msg_count) FILTER (WHERE r.dmarc_pass) AS passed,
sum(r.msg_count) FILTER (WHERE NOT r.dmarc_pass) AS failed
FROM dmarc_record r
JOIN dmarc_report rep ON rep.id = r.report_id
WHERE rep.date_begin >= now() - make_interval(days => %s)
GROUP BY r.source_ip
ORDER BY total DESC
LIMIT 25
""",
(days,),
)
rows = cur.fetchall()
lines = [f"DMARC alignment, last {days}d (by source IP):"]
problems: list[str] = []
for ip, total, passed, failed in rows:
total, passed, failed = int(total or 0), int(passed or 0), int(failed or 0)
if total == 0:
continue
pass_pct = round(100 * passed / total)
ours = "ours" if ip in OUR_IPS else "EXTERNAL"
lines.append(f" {ip:<16} [{ours:<8}] total={total:<6} pass={pass_pct}% fail={failed}")
# Alerts: our IP failing alignment, OR an external IP sending as us at volume.
if ip in OUR_IPS and pass_pct < 95 and total >= 20:
problems.append(f"{ip} (ours): only {pass_pct}% DMARC pass ({failed}/{total} fail) -- alignment broken")
if ip not in OUR_IPS and failed >= 20:
problems.append(f"{ip} (EXTERNAL): {failed} failing msgs sending as us -- possible spoofing")
return "\n".join(lines), problems
def send_telegram(text: str) -> bool:
bot = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot or not chat:
return False
try:
data = json.dumps({"chat_id": chat, "text": text}).encode()
req = urllib.request.Request(
f"https://api.telegram.org/bot{bot}/sendMessage",
data=data, headers={"Content-Type": "application/json"})
urllib.request.urlopen(req, timeout=10)
return True
except Exception as exc: # never fail the cron on a notify error
print(f"WARN: telegram notify failed: {exc}", file=sys.stderr)
return False
# ── main ──────────────────────────────────────────────────────────────────────
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--dry-run", action="store_true",
help="parse + print, no DB writes, do not mark seen")
ap.add_argument("--all", action="store_true",
help="process ALL messages (incl. already-seen); default is UNSEEN only")
ap.add_argument("--alert", action="store_true",
help="after ingest, print recent-alignment summary + Telegram on problems")
ap.add_argument("--summary-days", type=int, default=7,
help="window (days) for the --alert summary (default 7)")
args = ap.parse_args()
if not IMAP_PASS:
print("ERROR: DMARC_IMAP_PASS not set", file=sys.stderr)
return 2
imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
imap.login(IMAP_USER, IMAP_PASS)
imap.select(IMAP_FOLDER)
typ, data = imap.search(None, "ALL" if args.all else "UNSEEN")
ids = data[0].split() if typ == "OK" and data and data[0] else []
print(f"{IMAP_USER}: {len(ids)} message(s) to scan ({'ALL' if args.all else 'UNSEEN'}).")
conn = None
if not args.dry_run:
url = os.getenv("DATABASE_URL", "")
if not url:
print("ERROR: DATABASE_URL not set", file=sys.stderr)
return 2
import psycopg2 # lazy: only needed when actually writing
conn = psycopg2.connect(url)
ingested = skipped = no_xml = 0
try:
for mid in ids:
typ, mdata = imap.fetch(mid, "(RFC822)")
if typ != "OK" or not mdata or not mdata[0]:
continue
msg = email.message_from_bytes(mdata[0][1])
xml = find_report_xml(msg)
if not xml:
no_xml += 1
# Not a report (e.g. our test probe) -- mark seen so we skip it next time.
if not args.dry_run and not args.all:
imap.store(mid, "+FLAGS", "\\Seen")
continue
parsed = parse_report(xml)
if not parsed:
continue
header, records = parsed
if args.dry_run:
fails = sum(r["msg_count"] for r in records if not r["dmarc_pass"])
print(f" {header['org_name']:<28} report={header['report_id'][:32]:<32} "
f"records={len(records)} fail_msgs={fails}")
continue
if store(conn, header, records):
ingested += 1
else:
skipped += 1
if not args.all:
imap.store(mid, "+FLAGS", "\\Seen")
if args.dry_run:
print(f"DRY-RUN: scanned {len(ids)}, non-report={no_xml}.")
else:
print(f"Ingested {ingested} new report(s); {skipped} already present; "
f"{no_xml} non-report message(s).")
if args.alert:
text, problems = summarize(conn, args.summary_days)
print("\n" + text)
if problems:
msg = ("⚠️ DMARC alignment alert\n\n" + text +
"\n\nIssues:\n" + "\n".join(f"- {p}" for p in problems))
send_telegram(msg)
finally:
if conn:
conn.close()
imap.logout()
return 0
if __name__ == "__main__":
raise SystemExit(main())