new-site/scripts/dmarc_report_parser.py
justin ae68edbc58 fix(monitoring): repair both dead mail-alert crons + de-noise DMARC digest
Three bugs the owner hit:
1. Per-operator reputation alert (06:10 cron, mail_reputation_monitor --alert)
   silently never ran: it redirected to /var/log/pw-mail-reputation.log but
   /var/log is root-only and that file was never pre-created, so the deploy
   user's >> redirect failed and cron aborted before the command. Repointed
   both mail-alert crons to deploy-writable /opt/performancewest/logs/.
2. IP reputation alert (20:00 cron) still referenced the removed rehab pool
   (.91-.93) and used 8.8.8.8 for Spamhaus (which returns the open-resolver
   error, not a real answer). Dropped the rehab section, relabeled to the two
   live IPs (.94/.107), and switched the DNSBL check to Control D (76.76.2.0)
   which returns real Spamhaus ZEN data. (It was correctly SILENT lately
   because delivery is healthy -- silent-on-healthy is by design.)
3. DMARC daily digest was pure noise: it alerted on ANY external IP with >=20
   failing msgs, but those are legit recipient-side forwarders/security
   gateways (inkyphishfence, cloud-sec-av, Proofpoint, Mimecast, ...) that
   re-send our mail and naturally break SPF/DKIM alignment -- benign under
   p=reject. Added PTR-based forwarder detection (FORWARDER_PTR_HINTS) so the
   digest tags them [fwd] and only alerts on (a) OUR IP <95% pass or (b) an
   UNKNOWN non-forwarder external IP with >=100 failing msgs = real spoofing.

Verified: all 4 currently-flagged external IPs now classify as forwarder=True.
2026-06-24 06:28:50 -05:00

445 lines
19 KiB
Python

#!/usr/bin/env python3
"""DMARC aggregate (rua) report parser -- cross-operator SPF/DKIM alignment trend.
WHY
---
DMARC aggregate reports (RFC 7489) are the authoritative, cross-operator view of
who sends mail AS us and whether it passes SPF + DKIM *alignment*. Every major
receiver (Google, Yahoo, Comcast, Cox, Bell, Mimecast, Cisco ESA, GMX, mail.com,
Microsoft, ...) emails one zipped/gzipped XML per day to the rua address
(dmarc@performancewest.net). Reading dozens/day by hand is hopeless. This worker
turns them into queryable rows so we can SEE:
* our own senders (.94 bulk / .107 hcout / .71 transactional / .15 relay) all
passing alignment -- the whole point of this session's DKIM/subdomain fixes
(bulk signs d=send.performancewest.net selector "send"; root signs
d=performancewest.net selector "mail"); and
* any UNKNOWN IP sending as us and FAILING -- spoofing or a forgotten relay,
which is pure reputation poison under our p=reject policy.
WHAT IT DOES
------------
IMAP-fetches unread messages from dmarc@, finds the .zip/.gz/.xml attachment,
decompresses + parses the aggregate XML, and upserts one dmarc_report row
(keyed (org_name, report_id), so re-parsing is a no-op) plus one dmarc_record
row per <record>. Then marks the message \\Seen so the next run skips it.
Idempotent end-to-end: safe to run on a cron over the same INBOX.
USAGE
-----
# parse all UNSEEN reports in dmarc@ INBOX, store, mark seen
python3 -m scripts.dmarc_report_parser
# parse but don't write / don't mark (inspect)
python3 -m scripts.dmarc_report_parser --dry-run
# also parse already-seen messages (backfill the historical 29)
python3 -m scripts.dmarc_report_parser --all
# print a summary of recent alignment + Telegram alert on unknown-IP failures
python3 -m scripts.dmarc_report_parser --alert
Connection: IMAP via DMARC_IMAP_{HOST,PORT,USER,PASS}; DB via DATABASE_URL.
Telegram: TELEGRAM_BOT_TOKEN / TELEGRAM_CHAT_ID.
"""
from __future__ import annotations
import argparse
import email
import gzip
import io
import ipaddress
import json
import os
import sys
import urllib.request
import zipfile
from datetime import datetime, timezone
from xml.etree import ElementTree as ET
import imaplib
# psycopg2 is imported lazily (only when actually writing to the DB) so that
# --dry-run works on hosts without the driver installed (e.g. the bare app host;
# the DB is only reachable from inside the workers container anyway).
# ── IMAP configuration ───────────────────────────────────────────────────────
IMAP_HOST = os.getenv("DMARC_IMAP_HOST", "mail.performancewest.net")
IMAP_PORT = int(os.getenv("DMARC_IMAP_PORT", "993"))
IMAP_USER = os.getenv("DMARC_IMAP_USER", "dmarc@performancewest.net")
IMAP_PASS = os.getenv("DMARC_IMAP_PASS", "")
IMAP_FOLDER = os.getenv("DMARC_IMAP_FOLDER", "INBOX")
# Our own egress space. The whole 207.174.124.0/24 is Performance West's block:
# the warmup rotation pool sends from .91-.109 (out0x), plus .71 transactional,
# .94 bulk, .107 hcout, .15 relay. Anything OUTSIDE this is a third party sending
# as us -- either a legit forwarder we authorized, or (if it fails) a spoofer that
# our p=reject policy is correctly rejecting.
OUR_NETS = [ipaddress.ip_network("207.174.124.0/24")]
def is_ours(ip: str) -> bool:
try:
addr = ipaddress.ip_address(ip)
except ValueError:
return False
return any(addr in net for net in OUR_NETS)
# Reverse-DNS substrings that identify a LEGIT forwarder / recipient-side mail
# security gateway. These re-send our mail from their own IP, which naturally
# breaks SPF/DKIM alignment -> the forwarded copy "fails" DMARC. That is benign
# (the ORIGINAL was already delivered+aligned; our p=reject only drops the
# forwarded duplicate). We must NOT alert on these or the digest is pure noise.
# Matched case-insensitively against the source IP's PTR record.
FORWARDER_PTR_HINTS = (
"inkyphishfence", "cloud-sec-av", "proofpoint", "pphosted", "ppe-hosted",
"mimecast", "barracuda", "messagelabs", "symanteccloud", "fireeyecloud",
"trendmicro", "mailcontrol", "forcepoint", "cisco", "iphmx", # Cisco ESA
"mxlogic", "mailprotect", "emailsrvr", "godaddy", "secureserver",
"outlook.com", "protection.outlook", "google.com", "googlemail",
"amazonses", "sendgrid", "mailgun", "mcsv.net", "mailchimp",
"fastmail", "messagingengine", "zoho", "mailroute", "spamh",
"antispamcloud", "mailspamprotection", "fortimail", "sophos",
)
_ptr_cache: dict[str, str] = {}
def reverse_dns(ip: str) -> str:
"""Best-effort PTR lookup (cached). Empty string on failure."""
if ip in _ptr_cache:
return _ptr_cache[ip]
ptr = ""
try:
import socket
ptr = socket.gethostbyaddr(ip)[0].lower()
except Exception:
ptr = ""
_ptr_cache[ip] = ptr
return ptr
def is_known_forwarder(ip: str) -> bool:
"""True if the IP's PTR looks like a legit forwarder / security gateway, so
DMARC failures from it are benign (forwarded mail, not spoofing)."""
ptr = reverse_dns(ip)
return any(h in ptr for h in FORWARDER_PTR_HINTS) if ptr else False
# ── attachment extraction ─────────────────────────────────────────────────────
def extract_xml(payload: bytes, filename: str) -> bytes | None:
"""Decompress a DMARC report attachment to raw XML bytes."""
fn = (filename or "").lower()
try:
if fn.endswith(".gz"):
return gzip.decompress(payload)
if fn.endswith(".zip"):
zf = zipfile.ZipFile(io.BytesIO(payload))
# reports contain exactly one XML; take the first entry
return zf.read(zf.namelist()[0])
if fn.endswith(".xml"):
return payload
except Exception as exc: # corrupt attachment -- skip, don't crash the run
print(f"WARN: could not decompress {filename}: {exc}", file=sys.stderr)
return None
def find_report_xml(msg: email.message.Message) -> bytes | None:
"""Walk an email and return the first decoded DMARC XML attachment."""
for part in msg.walk():
fn = part.get_filename()
if not fn:
# Some operators send application/gzip without a filename param.
ct = part.get_content_type()
if ct in ("application/gzip", "application/x-gzip"):
fn = "report.gz"
elif ct == "application/zip":
fn = "report.zip"
else:
continue
payload = part.get_payload(decode=True)
if not payload:
continue
xml = extract_xml(payload, fn)
if xml:
return xml
return None
# ── XML parsing ───────────────────────────────────────────────────────────────
def _text(node, path, default=""):
el = node.find(path) if node is not None else None
return el.text.strip() if el is not None and el.text else default
def _epoch_to_dt(s):
try:
return datetime.fromtimestamp(int(s), tz=timezone.utc)
except (ValueError, TypeError):
return None
def _strip_ns(root: ET.Element) -> ET.Element:
"""Strip XML namespaces in-place so find() works regardless of schema.
Most operators emit the classic no-namespace aggregate XML, but some (GMX,
mail.com) use the RFC-registered urn:ietf:params:xml:ns:dmarc-2.0 namespace,
which would make namespace-naive find('record') miss everything. Rewriting
each tag to its local name normalizes both forms.
"""
for el in root.iter():
if isinstance(el.tag, str) and "}" in el.tag:
el.tag = el.tag.rsplit("}", 1)[1]
return root
def parse_report(xml: bytes) -> tuple[dict, list[dict]] | None:
"""Parse one DMARC aggregate XML into (report_header, [records])."""
try:
root = _strip_ns(ET.fromstring(xml))
except ET.ParseError as exc:
print(f"WARN: XML parse error: {exc}", file=sys.stderr)
return None
meta = root.find("report_metadata")
pol = root.find("policy_published")
dr = meta.find("date_range") if meta is not None else None
pct_raw = _text(pol, "pct")
header = {
"org_name": _text(meta, "org_name") or "unknown",
"org_email": _text(meta, "email"),
"report_id": _text(meta, "report_id") or "unknown",
"date_begin": _epoch_to_dt(_text(dr, "begin")),
"date_end": _epoch_to_dt(_text(dr, "end")),
"policy_domain": _text(pol, "domain"),
"policy_p": _text(pol, "p"),
"policy_sp": _text(pol, "sp"),
"policy_adkim": _text(pol, "adkim"),
"policy_aspf": _text(pol, "aspf"),
"policy_pct": int(pct_raw) if pct_raw.isdigit() else None,
}
records = []
for rec in root.findall("record"):
row = rec.find("row")
pe = row.find("policy_evaluated") if row is not None else None
ident = rec.find("identifiers")
auth = rec.find("auth_results")
dkim = auth.find("dkim") if auth is not None else None
spf = auth.find("spf") if auth is not None else None
cnt_raw = _text(row, "count")
dkim_aligned = _text(pe, "dkim")
spf_aligned = _text(pe, "spf")
records.append({
"source_ip": _text(row, "source_ip"),
"msg_count": int(cnt_raw) if cnt_raw.isdigit() else 0,
"disposition": _text(pe, "disposition"),
"dkim_aligned": dkim_aligned,
"spf_aligned": spf_aligned,
"dmarc_pass": (dkim_aligned == "pass") or (spf_aligned == "pass"),
"header_from": _text(ident, "header_from"),
"envelope_from": _text(ident, "envelope_from"),
"dkim_domain": _text(dkim, "domain"),
"dkim_selector": _text(dkim, "selector"),
"dkim_result": _text(dkim, "result"),
"spf_domain": _text(spf, "domain"),
"spf_result": _text(spf, "result"),
})
return header, records
# ── DB upsert ─────────────────────────────────────────────────────────────────
def store(conn, header: dict, records: list[dict]) -> bool:
"""Insert a report + its records. Returns True if newly inserted."""
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO dmarc_report
(org_name, org_email, report_id, date_begin, date_end,
policy_domain, policy_p, policy_sp, policy_adkim, policy_aspf, policy_pct)
VALUES (%(org_name)s, %(org_email)s, %(report_id)s, %(date_begin)s, %(date_end)s,
%(policy_domain)s, %(policy_p)s, %(policy_sp)s, %(policy_adkim)s,
%(policy_aspf)s, %(policy_pct)s)
ON CONFLICT (org_name, report_id) DO NOTHING
RETURNING id
""",
header,
)
row = cur.fetchone()
if not row:
conn.commit()
return False # already ingested
report_pk = row[0]
for rec in records:
rec["report_id"] = report_pk
cur.execute(
"""
INSERT INTO dmarc_record
(report_id, source_ip, msg_count, disposition, dkim_aligned,
spf_aligned, dmarc_pass, header_from, envelope_from,
dkim_domain, dkim_selector, dkim_result, spf_domain, spf_result)
VALUES (%(report_id)s, %(source_ip)s, %(msg_count)s, %(disposition)s,
%(dkim_aligned)s, %(spf_aligned)s, %(dmarc_pass)s, %(header_from)s,
%(envelope_from)s, %(dkim_domain)s, %(dkim_selector)s,
%(dkim_result)s, %(spf_domain)s, %(spf_result)s)
""",
rec,
)
conn.commit()
return True
# ── summary + alerting ────────────────────────────────────────────────────────
def summarize(conn, days: int = 7) -> tuple[str, list[str]]:
"""Build a human summary of recent alignment. Returns (text, problems)."""
with conn.cursor() as cur:
cur.execute(
"""
SELECT r.source_ip,
sum(r.msg_count) AS total,
sum(r.msg_count) FILTER (WHERE r.dmarc_pass) AS passed,
sum(r.msg_count) FILTER (WHERE NOT r.dmarc_pass) AS failed
FROM dmarc_record r
JOIN dmarc_report rep ON rep.id = r.report_id
WHERE rep.date_begin >= now() - make_interval(days => %s)
GROUP BY r.source_ip
ORDER BY total DESC
LIMIT 25
""",
(days,),
)
rows = cur.fetchall()
lines = [f"DMARC alignment, last {days}d (by source IP):"]
problems: list[str] = []
for ip, total, passed, failed in rows:
total, passed, failed = int(total or 0), int(passed or 0), int(failed or 0)
if total == 0:
continue
pass_pct = round(100 * passed / total)
ours = is_ours(ip)
if ours:
tag = "ours"
elif failed > 0 and is_known_forwarder(ip):
tag = "fwd" # legit forwarder / security gateway -- failures benign
else:
tag = "EXTERNAL"
lines.append(f" {ip:<16} [{tag:<8}] total={total:<6} pass={pass_pct}% fail={failed}")
# Alert ONLY on genuinely actionable cases:
# 1. OUR OWN IP failing alignment = a real auth/config break we must fix.
# 2. An UNKNOWN external IP (not ours, not a recognized forwarder) sending
# as us at high volume = possible spoofing. Recognized forwarders
# (Proofpoint/Mimecast/Inky/etc. re-sending our mail) naturally fail
# SPF/DKIM alignment and are filtered out -- they were the digest noise.
if ours and pass_pct < 95 and total >= 20:
problems.append(f"{ip} (ours): only {pass_pct}% DMARC pass ({failed}/{total} fail) -- alignment broken")
elif tag == "EXTERNAL" and failed >= 100:
problems.append(f"{ip} (EXTERNAL, PTR={reverse_dns(ip) or 'none'}): {failed} failing msgs sending as us -- possible spoofing")
return "\n".join(lines), problems
def send_telegram(text: str) -> bool:
bot = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot or not chat:
return False
try:
data = json.dumps({"chat_id": chat, "text": text}).encode()
req = urllib.request.Request(
f"https://api.telegram.org/bot{bot}/sendMessage",
data=data, headers={"Content-Type": "application/json"})
urllib.request.urlopen(req, timeout=10)
return True
except Exception as exc: # never fail the cron on a notify error
print(f"WARN: telegram notify failed: {exc}", file=sys.stderr)
return False
# ── main ──────────────────────────────────────────────────────────────────────
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--dry-run", action="store_true",
help="parse + print, no DB writes, do not mark seen")
ap.add_argument("--all", action="store_true",
help="process ALL messages (incl. already-seen); default is UNSEEN only")
ap.add_argument("--alert", action="store_true",
help="after ingest, print recent-alignment summary + Telegram on problems")
ap.add_argument("--summary-days", type=int, default=7,
help="window (days) for the --alert summary (default 7)")
args = ap.parse_args()
if not IMAP_PASS:
print("ERROR: DMARC_IMAP_PASS not set", file=sys.stderr)
return 2
imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
imap.login(IMAP_USER, IMAP_PASS)
imap.select(IMAP_FOLDER)
typ, data = imap.search(None, "ALL" if args.all else "UNSEEN")
ids = data[0].split() if typ == "OK" and data and data[0] else []
print(f"{IMAP_USER}: {len(ids)} message(s) to scan ({'ALL' if args.all else 'UNSEEN'}).")
conn = None
if not args.dry_run:
url = os.getenv("DATABASE_URL", "")
if not url:
print("ERROR: DATABASE_URL not set", file=sys.stderr)
return 2
import psycopg2 # lazy: only needed when actually writing
conn = psycopg2.connect(url)
ingested = skipped = no_xml = 0
try:
for mid in ids:
typ, mdata = imap.fetch(mid, "(RFC822)")
if typ != "OK" or not mdata or not mdata[0]:
continue
msg = email.message_from_bytes(mdata[0][1])
xml = find_report_xml(msg)
if not xml:
no_xml += 1
# Not a report (e.g. our test probe) -- mark seen so we skip it next time.
if not args.dry_run and not args.all:
imap.store(mid, "+FLAGS", "\\Seen")
continue
parsed = parse_report(xml)
if not parsed:
continue
header, records = parsed
if args.dry_run:
fails = sum(r["msg_count"] for r in records if not r["dmarc_pass"])
print(f" {header['org_name']:<28} report={header['report_id'][:32]:<32} "
f"records={len(records)} fail_msgs={fails}")
continue
if store(conn, header, records):
ingested += 1
else:
skipped += 1
if not args.all:
imap.store(mid, "+FLAGS", "\\Seen")
if args.dry_run:
print(f"DRY-RUN: scanned {len(ids)}, non-report={no_xml}.")
else:
print(f"Ingested {ingested} new report(s); {skipped} already present; "
f"{no_xml} non-report message(s).")
if args.alert:
text, problems = summarize(conn, args.summary_days)
print("\n" + text)
if problems:
msg = ("⚠️ DMARC alignment alert\n\n" + text +
"\n\nIssues:\n" + "\n".join(f"- {p}" for p in problems))
send_telegram(msg)
finally:
if conn:
conn.close()
imap.logout()
return 0
if __name__ == "__main__":
raise SystemExit(main())