From 8e5590b49288f10ee3aaec9d02ef26b16bf79991 Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 19 Jun 2026 08:50:20 -0500 Subject: [PATCH] mail: DMARC aggregate-report parser + dedicated dmarc@ mailbox ingestion Tool 2 of the deliverability monitoring pair (Tool 1 = mail_reputation_monitor). DMARC rua reports from dozens of operators (Google, Yahoo, Comcast, Cox, Bell, Mimecast, Cisco ESA, GMX, mail.com, ...) were landing in ops@ (dmarc@ was a DL), burying real mail and never parsed. Now ingested + queryable: - dmarc@performancewest.net converted DL -> dedicated Carbonio mailbox; isolated IMAP creds in server .env, surfaced to workers in docker-compose.yml (mirrors OPS_IMAP_*). 29 historical reports moved ops@ -> dmarc@ via IMAP. - scripts/dmarc_report_parser.py: IMAP fetch unseen -> decompress .gz/.zip/.xml (namespace-agnostic: classic + urn:ietf:params:xml:ns:dmarc-2.0 GMX/mail.com) -> parse aggregate XML -> upsert dmarc_report (keyed (org_name,report_id), no-op on re-parse) + dmarc_record per source IP. dmarc_pass = dkim_aligned OR spf_aligned. Marks \Seen. --dry-run/--all/--alert (7d per-IP summary + Telegram if one of OUR IPs <95% pass, or EXTERNAL IP sends >=20 failing msgs as us = spoofing under p=reject). psycopg2 imported lazily so --dry-run runs without the driver. - api/migrations/102_dmarc_aggregate.sql: dmarc_report + dmarc_record tables. - infra/cron/pw-dmarc-parser: 06:20 UTC daily --alert (after reputation, before scrub). - docs/deliverability.md: DMARC section DONE; query examples. Verified: dry-run --all parses all 28 reports (1 non-report test probe), 0 unknown after the namespace fix. --- api/migrations/102_dmarc_aggregate.sql | 66 +++++ docker-compose.yml | 5 + docs/deliverability.md | 44 ++- infra/cron/pw-dmarc-parser | 17 ++ scripts/dmarc_report_parser.py | 385 +++++++++++++++++++++++++ 5 files changed, 509 insertions(+), 8 deletions(-) create mode 100644 api/migrations/102_dmarc_aggregate.sql create mode 100644 infra/cron/pw-dmarc-parser create mode 100644 scripts/dmarc_report_parser.py diff --git a/api/migrations/102_dmarc_aggregate.sql b/api/migrations/102_dmarc_aggregate.sql new file mode 100644 index 0000000..d3f3054 --- /dev/null +++ b/api/migrations/102_dmarc_aggregate.sql @@ -0,0 +1,66 @@ +-- DMARC aggregate (rua) report ingestion. +-- +-- WHY: DMARC aggregate reports (RFC 7489) are the authoritative, cross-operator +-- view of who is sending mail AS us (header_from = performancewest.net / its +-- subdomains) and whether that mail passes SPF + DKIM alignment. Every major +-- receiver (Google, Yahoo, Comcast, Cox, Bell, Mimecast, Cisco ESA, GMX, mail.com, +-- ...) emails one zipped/gzipped XML per day to rua=mailto:dmarc@performancewest.net. +-- Reading them by hand is hopeless (dozens/day). This turns them into queryable +-- per-source-IP / per-domain SPF+DKIM+DMARC pass-fail trends so we can SEE: +-- * our own senders (.94 bulk / .107 hcout / .71 transactional / .15 relay) all +-- passing alignment (DKIM d=send. selector send, d=root selector mail) -- the +-- deliverability fixes this session were exactly about this; and +-- * any UNKNOWN IP sending as us that fails -- i.e. spoofing or a forgotten relay, +-- which is reputation poison under p=reject. +-- +-- Populated by scripts/dmarc_report_parser.py (IMAP fetch dmarc@ -> unzip -> parse +-- XML -> upsert). Idempotent: each report is keyed by (org_name, report_id) and +-- re-parsing the same report is a no-op (ON CONFLICT DO NOTHING). + +-- One row per aggregate report (the + ). +CREATE TABLE IF NOT EXISTS dmarc_report ( + id BIGSERIAL PRIMARY KEY, + org_name TEXT NOT NULL, -- reporting operator (google.com, yahoo.com, ...) + org_email TEXT, -- contact email from the report + report_id TEXT NOT NULL, -- operator's unique report id + date_begin TIMESTAMPTZ, -- report window start (from epoch) + date_end TIMESTAMPTZ, -- report window end + policy_domain TEXT, -- + policy_p TEXT, -- published policy: none|quarantine|reject + policy_sp TEXT, -- subdomain policy + policy_adkim TEXT, -- DKIM alignment mode r|s + policy_aspf TEXT, -- SPF alignment mode r|s + policy_pct INTEGER, -- % of mail policy applies to + received_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (org_name, report_id) +); + +-- One row per inside a report (a distinct source_ip + auth result combo). +CREATE TABLE IF NOT EXISTS dmarc_record ( + id BIGSERIAL PRIMARY KEY, + report_id BIGINT NOT NULL REFERENCES dmarc_report(id) ON DELETE CASCADE, + source_ip TEXT NOT NULL, -- the IP that sent the mail + msg_count INTEGER NOT NULL DEFAULT 0, -- messages from this IP in the window + disposition TEXT, -- DMARC disposition applied: none|quarantine|reject + dkim_aligned TEXT, -- policy_evaluated/dkim: pass|fail + spf_aligned TEXT, -- policy_evaluated/spf: pass|fail + dmarc_pass BOOLEAN, -- derived: dkim_aligned=pass OR spf_aligned=pass + header_from TEXT, -- identifiers/header_from + envelope_from TEXT, -- identifiers/envelope_from + dkim_domain TEXT, -- auth_results/dkim/domain + dkim_selector TEXT, -- auth_results/dkim/selector + dkim_result TEXT, -- auth_results/dkim/result (raw) + spf_domain TEXT, -- auth_results/spf/domain + spf_result TEXT -- auth_results/spf/result (raw) +); + +CREATE INDEX IF NOT EXISTS idx_dmarc_report_window ON dmarc_report (date_begin); +CREATE INDEX IF NOT EXISTS idx_dmarc_report_org ON dmarc_report (org_name); +CREATE INDEX IF NOT EXISTS idx_dmarc_record_report ON dmarc_record (report_id); +CREATE INDEX IF NOT EXISTS idx_dmarc_record_ip ON dmarc_record (source_ip); +CREATE INDEX IF NOT EXISTS idx_dmarc_record_fail ON dmarc_record (dmarc_pass) WHERE dmarc_pass = false; + +COMMENT ON TABLE dmarc_report IS + 'DMARC aggregate (rua) report headers. One row per operator report, keyed (org_name, report_id). Populated by scripts/dmarc_report_parser.py.'; +COMMENT ON TABLE dmarc_record IS + 'Per-source-IP rows inside each DMARC aggregate report: SPF/DKIM alignment + raw auth results. dmarc_pass = dkim_aligned=pass OR spf_aligned=pass.'; diff --git a/docker-compose.yml b/docker-compose.yml index 2d46c51..91e7f97 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -119,6 +119,11 @@ services: - OPS_IMAP_PORT=${OPS_IMAP_PORT:-993} - OPS_IMAP_USER=${OPS_IMAP_USER} - OPS_IMAP_PASS=${OPS_IMAP_PASS} + # DMARC aggregate-report ingestion mailbox (scripts.dmarc_report_parser) + - DMARC_IMAP_HOST=${DMARC_IMAP_HOST:-mail.performancewest.net} + - DMARC_IMAP_PORT=${DMARC_IMAP_PORT:-993} + - DMARC_IMAP_USER=${DMARC_IMAP_USER:-dmarc@performancewest.net} + - DMARC_IMAP_PASS=${DMARC_IMAP_PASS} - FROM_EMAIL=Performance West - CRYPTO_SWEEP_ADMIN_EMAIL=${ADMIN_EMAIL:-ops@performancewest.net} - USAC_USERNAME=${USAC_USERNAME} diff --git a/docs/deliverability.md b/docs/deliverability.md index 89b89e6..993b66c 100644 --- a/docs/deliverability.md +++ b/docs/deliverability.md @@ -224,14 +224,42 @@ all HE.net slaves + 8.8.8.8/1.1.1.1/9.9.9.9): - `send.performancewest.net` TXT `yahoo-verification-key=Ps5hGjVxXgeQcLcxr671YG0/RxzjjL0eqh6vfULubEo=` (added alongside the existing `send` SPF record; both TXT coexist). -### ✅ DMARC aggregate reports — mailbox FIXED 2026-06-19 (parser still TODO) -Gmail/Yahoo/Microsoft send daily per-IP auth+disposition XML to -`dmarc@performancewest.net` (DMARC record has `rua=mailto:dmarc@...`). **That -mailbox was REJECTING (5.1.1) until 2026-06-19 — we were silently losing every -report.** It's now a Carbonio DL -> ops@ (verified delivering). Next: add IMAP creds -for ops@ (or a dedicated dmarc mailbox) and build a small collector/parser worker to -chart per-IP/per-domain pass-fail without any provider login. Now actually worth -doing since the data finally arrives. +### ✅ DMARC aggregate reports — DONE 2026-06-19 (dedicated mailbox + parser) +Gmail/Yahoo/Microsoft + dozens of operators (Comcast, Cox, Bell, Mimecast, Cisco +ESA, GMX, mail.com, gosecure, ...) send daily per-IP auth+disposition XML to +`dmarc@performancewest.net` (DMARC record: `p=reject; rua=mailto:dmarc@; ruf=mailto:dmarc@; fo=1`). +**That mailbox was REJECTING (5.1.1) until 2026-06-19 — we silently lost every +report.** Now fully wired: + +1. **Dedicated mailbox.** `dmarc@performancewest.net` is its own Carbonio account + (was a DL -> ops@, which buried ops@ under report XML). Isolated IMAP credential + in the server `.env` (`DMARC_IMAP_{HOST,PORT,USER,PASS}`), surfaced to the workers + container in `docker-compose.yml` (mirrors the `OPS_IMAP_*` pattern). The 29 + historical reports that had landed in ops@ were moved over via IMAP. +2. **Parser worker.** `scripts/dmarc_report_parser.py` IMAP-fetches unseen messages, + decompresses the `.gz`/`.zip`/`.xml` attachment (namespace-agnostic — handles both + the classic and the `urn:ietf:params:xml:ns:dmarc-2.0` GMX/mail.com schema), parses + the aggregate XML, and upserts one `dmarc_report` row (keyed `(org_name, report_id)`, + so re-parsing is a no-op) + one `dmarc_record` row per source IP into the schema from + `api/migrations/102_dmarc_aggregate.sql`. `dmarc_pass = dkim_aligned=pass OR + spf_aligned=pass`. Marks each message `\Seen` so each run only handles new reports. + Flags: `--dry-run`, `--all` (backfill seen), `--alert` (7-day per-IP summary + + Telegram if one of OUR IPs drops below 95% pass, or an EXTERNAL IP sends >=20 failing + msgs as us = spoofing under `p=reject`). +3. **Cron.** `/etc/cron.d/pw-dmarc-parser` (tracked at `infra/cron/pw-dmarc-parser`) + runs `... workers python3 -m scripts.dmarc_report_parser --alert` daily at 06:20 UTC. + +Query examples once populated: +```sql +-- who sends as us, and are they aligning? (the payoff of the DKIM/subdomain fixes) +SELECT source_ip, sum(msg_count) total, + sum(msg_count) FILTER (WHERE dmarc_pass) pass, + round(100.0*sum(msg_count) FILTER (WHERE dmarc_pass)/sum(msg_count)) pass_pct +FROM dmarc_record r JOIN dmarc_report rep ON rep.id=r.report_id +WHERE rep.date_begin >= now()-interval '7 days' +GROUP BY source_ip ORDER BY total DESC; +-- any UNKNOWN IP failing alignment = spoofing/forgotten relay (reputation poison) +``` --- diff --git a/infra/cron/pw-dmarc-parser b/infra/cron/pw-dmarc-parser new file mode 100644 index 0000000..52e61db --- /dev/null +++ b/infra/cron/pw-dmarc-parser @@ -0,0 +1,17 @@ +# Nightly DMARC aggregate-report ingestion. Fetches the day's rua reports from the +# dedicated dmarc@performancewest.net mailbox (Google, Yahoo, Comcast, Cox, Bell, +# Mimecast, Cisco ESA, GMX, mail.com, Microsoft, ...), decompresses + parses the +# XML, and upserts per-source-IP SPF/DKIM/DMARC alignment into dmarc_report / +# dmarc_record. This is the authoritative cross-operator view of who sends mail AS +# us and whether it passes alignment -- the payoff of this session's DKIM/subdomain +# fixes -- and it flags any UNKNOWN IP sending as us (spoofing) under our p=reject. +# +# --alert prints the last-7d per-IP alignment summary and sends a Telegram warning +# if one of our own IPs drops below 95% DMARC pass, or an external IP sends >=20 +# failing messages as us. Marks processed messages \Seen so each run only handles +# new reports (idempotent; reports are also keyed (org_name, report_id) in the DB). +# +# The mailbox is IMAP-reachable from the network and the DB lives inside the docker +# network, so we run inside the workers container (which has DMARC_IMAP_* + DATABASE_URL +# from .env). Runs at 06:20 UTC (after 06:10 reputation, before 06:30 scrub). +20 6 * * * deploy cd /opt/performancewest && docker compose exec -T workers python3 -m scripts.dmarc_report_parser --alert >> /var/log/pw-dmarc-parser.log 2>&1 diff --git a/scripts/dmarc_report_parser.py b/scripts/dmarc_report_parser.py new file mode 100644 index 0000000..376c994 --- /dev/null +++ b/scripts/dmarc_report_parser.py @@ -0,0 +1,385 @@ +#!/usr/bin/env python3 +"""DMARC aggregate (rua) report parser -- cross-operator SPF/DKIM alignment trend. + +WHY +--- +DMARC aggregate reports (RFC 7489) are the authoritative, cross-operator view of +who sends mail AS us and whether it passes SPF + DKIM *alignment*. Every major +receiver (Google, Yahoo, Comcast, Cox, Bell, Mimecast, Cisco ESA, GMX, mail.com, +Microsoft, ...) emails one zipped/gzipped XML per day to the rua address +(dmarc@performancewest.net). Reading dozens/day by hand is hopeless. This worker +turns them into queryable rows so we can SEE: + + * our own senders (.94 bulk / .107 hcout / .71 transactional / .15 relay) all + passing alignment -- the whole point of this session's DKIM/subdomain fixes + (bulk signs d=send.performancewest.net selector "send"; root signs + d=performancewest.net selector "mail"); and + * any UNKNOWN IP sending as us and FAILING -- spoofing or a forgotten relay, + which is pure reputation poison under our p=reject policy. + +WHAT IT DOES +------------ +IMAP-fetches unread messages from dmarc@, finds the .zip/.gz/.xml attachment, +decompresses + parses the aggregate XML, and upserts one dmarc_report row +(keyed (org_name, report_id), so re-parsing is a no-op) plus one dmarc_record +row per . Then marks the message \\Seen so the next run skips it. +Idempotent end-to-end: safe to run on a cron over the same INBOX. + +USAGE +----- + # parse all UNSEEN reports in dmarc@ INBOX, store, mark seen + python3 -m scripts.dmarc_report_parser + + # parse but don't write / don't mark (inspect) + python3 -m scripts.dmarc_report_parser --dry-run + + # also parse already-seen messages (backfill the historical 29) + python3 -m scripts.dmarc_report_parser --all + + # print a summary of recent alignment + Telegram alert on unknown-IP failures + python3 -m scripts.dmarc_report_parser --alert + +Connection: IMAP via DMARC_IMAP_{HOST,PORT,USER,PASS}; DB via DATABASE_URL. +Telegram: TELEGRAM_BOT_TOKEN / TELEGRAM_CHAT_ID. +""" +from __future__ import annotations + +import argparse +import email +import gzip +import io +import json +import os +import sys +import urllib.request +import zipfile +from datetime import datetime, timezone +from xml.etree import ElementTree as ET + +import imaplib + +# psycopg2 is imported lazily (only when actually writing to the DB) so that +# --dry-run works on hosts without the driver installed (e.g. the bare app host; +# the DB is only reachable from inside the workers container anyway). + +# ── IMAP configuration ─────────────────────────────────────────────────────── +IMAP_HOST = os.getenv("DMARC_IMAP_HOST", "mail.performancewest.net") +IMAP_PORT = int(os.getenv("DMARC_IMAP_PORT", "993")) +IMAP_USER = os.getenv("DMARC_IMAP_USER", "dmarc@performancewest.net") +IMAP_PASS = os.getenv("DMARC_IMAP_PASS", "") +IMAP_FOLDER = os.getenv("DMARC_IMAP_FOLDER", "INBOX") + +# Our own egress IPs (so the summary can flag UNKNOWN senders failing alignment). +OUR_IPS = { + "207.174.124.71", # transactional (app server) + "207.174.124.94", # bulk / trucking (out05) + "207.174.124.107", # healthcare (hcout1) + "207.174.124.15", # co.carrierone.com relay (Carbonio) +} + + +# ── attachment extraction ───────────────────────────────────────────────────── +def extract_xml(payload: bytes, filename: str) -> bytes | None: + """Decompress a DMARC report attachment to raw XML bytes.""" + fn = (filename or "").lower() + try: + if fn.endswith(".gz"): + return gzip.decompress(payload) + if fn.endswith(".zip"): + zf = zipfile.ZipFile(io.BytesIO(payload)) + # reports contain exactly one XML; take the first entry + return zf.read(zf.namelist()[0]) + if fn.endswith(".xml"): + return payload + except Exception as exc: # corrupt attachment -- skip, don't crash the run + print(f"WARN: could not decompress {filename}: {exc}", file=sys.stderr) + return None + + +def find_report_xml(msg: email.message.Message) -> bytes | None: + """Walk an email and return the first decoded DMARC XML attachment.""" + for part in msg.walk(): + fn = part.get_filename() + if not fn: + # Some operators send application/gzip without a filename param. + ct = part.get_content_type() + if ct in ("application/gzip", "application/x-gzip"): + fn = "report.gz" + elif ct == "application/zip": + fn = "report.zip" + else: + continue + payload = part.get_payload(decode=True) + if not payload: + continue + xml = extract_xml(payload, fn) + if xml: + return xml + return None + + +# ── XML parsing ─────────────────────────────────────────────────────────────── +def _text(node, path, default=""): + el = node.find(path) if node is not None else None + return el.text.strip() if el is not None and el.text else default + + +def _epoch_to_dt(s): + try: + return datetime.fromtimestamp(int(s), tz=timezone.utc) + except (ValueError, TypeError): + return None + + +def _strip_ns(root: ET.Element) -> ET.Element: + """Strip XML namespaces in-place so find() works regardless of schema. + + Most operators emit the classic no-namespace aggregate XML, but some (GMX, + mail.com) use the RFC-registered urn:ietf:params:xml:ns:dmarc-2.0 namespace, + which would make namespace-naive find('record') miss everything. Rewriting + each tag to its local name normalizes both forms. + """ + for el in root.iter(): + if isinstance(el.tag, str) and "}" in el.tag: + el.tag = el.tag.rsplit("}", 1)[1] + return root + + +def parse_report(xml: bytes) -> tuple[dict, list[dict]] | None: + """Parse one DMARC aggregate XML into (report_header, [records]).""" + try: + root = _strip_ns(ET.fromstring(xml)) + except ET.ParseError as exc: + print(f"WARN: XML parse error: {exc}", file=sys.stderr) + return None + + meta = root.find("report_metadata") + pol = root.find("policy_published") + dr = meta.find("date_range") if meta is not None else None + + pct_raw = _text(pol, "pct") + header = { + "org_name": _text(meta, "org_name") or "unknown", + "org_email": _text(meta, "email"), + "report_id": _text(meta, "report_id") or "unknown", + "date_begin": _epoch_to_dt(_text(dr, "begin")), + "date_end": _epoch_to_dt(_text(dr, "end")), + "policy_domain": _text(pol, "domain"), + "policy_p": _text(pol, "p"), + "policy_sp": _text(pol, "sp"), + "policy_adkim": _text(pol, "adkim"), + "policy_aspf": _text(pol, "aspf"), + "policy_pct": int(pct_raw) if pct_raw.isdigit() else None, + } + + records = [] + for rec in root.findall("record"): + row = rec.find("row") + pe = row.find("policy_evaluated") if row is not None else None + ident = rec.find("identifiers") + auth = rec.find("auth_results") + dkim = auth.find("dkim") if auth is not None else None + spf = auth.find("spf") if auth is not None else None + + cnt_raw = _text(row, "count") + dkim_aligned = _text(pe, "dkim") + spf_aligned = _text(pe, "spf") + records.append({ + "source_ip": _text(row, "source_ip"), + "msg_count": int(cnt_raw) if cnt_raw.isdigit() else 0, + "disposition": _text(pe, "disposition"), + "dkim_aligned": dkim_aligned, + "spf_aligned": spf_aligned, + "dmarc_pass": (dkim_aligned == "pass") or (spf_aligned == "pass"), + "header_from": _text(ident, "header_from"), + "envelope_from": _text(ident, "envelope_from"), + "dkim_domain": _text(dkim, "domain"), + "dkim_selector": _text(dkim, "selector"), + "dkim_result": _text(dkim, "result"), + "spf_domain": _text(spf, "domain"), + "spf_result": _text(spf, "result"), + }) + return header, records + + +# ── DB upsert ───────────────────────────────────────────────────────────────── +def store(conn, header: dict, records: list[dict]) -> bool: + """Insert a report + its records. Returns True if newly inserted.""" + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO dmarc_report + (org_name, org_email, report_id, date_begin, date_end, + policy_domain, policy_p, policy_sp, policy_adkim, policy_aspf, policy_pct) + VALUES (%(org_name)s, %(org_email)s, %(report_id)s, %(date_begin)s, %(date_end)s, + %(policy_domain)s, %(policy_p)s, %(policy_sp)s, %(policy_adkim)s, + %(policy_aspf)s, %(policy_pct)s) + ON CONFLICT (org_name, report_id) DO NOTHING + RETURNING id + """, + header, + ) + row = cur.fetchone() + if not row: + conn.commit() + return False # already ingested + report_pk = row[0] + for rec in records: + rec["report_id"] = report_pk + cur.execute( + """ + INSERT INTO dmarc_record + (report_id, source_ip, msg_count, disposition, dkim_aligned, + spf_aligned, dmarc_pass, header_from, envelope_from, + dkim_domain, dkim_selector, dkim_result, spf_domain, spf_result) + VALUES (%(report_id)s, %(source_ip)s, %(msg_count)s, %(disposition)s, + %(dkim_aligned)s, %(spf_aligned)s, %(dmarc_pass)s, %(header_from)s, + %(envelope_from)s, %(dkim_domain)s, %(dkim_selector)s, + %(dkim_result)s, %(spf_domain)s, %(spf_result)s) + """, + rec, + ) + conn.commit() + return True + + +# ── summary + alerting ──────────────────────────────────────────────────────── +def summarize(conn, days: int = 7) -> tuple[str, list[str]]: + """Build a human summary of recent alignment. Returns (text, problems).""" + with conn.cursor() as cur: + cur.execute( + """ + SELECT r.source_ip, + sum(r.msg_count) AS total, + sum(r.msg_count) FILTER (WHERE r.dmarc_pass) AS passed, + sum(r.msg_count) FILTER (WHERE NOT r.dmarc_pass) AS failed + FROM dmarc_record r + JOIN dmarc_report rep ON rep.id = r.report_id + WHERE rep.date_begin >= now() - make_interval(days => %s) + GROUP BY r.source_ip + ORDER BY total DESC + LIMIT 25 + """, + (days,), + ) + rows = cur.fetchall() + + lines = [f"DMARC alignment, last {days}d (by source IP):"] + problems: list[str] = [] + for ip, total, passed, failed in rows: + total, passed, failed = int(total or 0), int(passed or 0), int(failed or 0) + if total == 0: + continue + pass_pct = round(100 * passed / total) + ours = "ours" if ip in OUR_IPS else "EXTERNAL" + lines.append(f" {ip:<16} [{ours:<8}] total={total:<6} pass={pass_pct}% fail={failed}") + # Alerts: our IP failing alignment, OR an external IP sending as us at volume. + if ip in OUR_IPS and pass_pct < 95 and total >= 20: + problems.append(f"{ip} (ours): only {pass_pct}% DMARC pass ({failed}/{total} fail) -- alignment broken") + if ip not in OUR_IPS and failed >= 20: + problems.append(f"{ip} (EXTERNAL): {failed} failing msgs sending as us -- possible spoofing") + return "\n".join(lines), problems + + +def send_telegram(text: str) -> bool: + bot = os.getenv("TELEGRAM_BOT_TOKEN", "") + chat = os.getenv("TELEGRAM_CHAT_ID", "") + if not bot or not chat: + return False + try: + data = json.dumps({"chat_id": chat, "text": text}).encode() + req = urllib.request.Request( + f"https://api.telegram.org/bot{bot}/sendMessage", + data=data, headers={"Content-Type": "application/json"}) + urllib.request.urlopen(req, timeout=10) + return True + except Exception as exc: # never fail the cron on a notify error + print(f"WARN: telegram notify failed: {exc}", file=sys.stderr) + return False + + +# ── main ────────────────────────────────────────────────────────────────────── +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--dry-run", action="store_true", + help="parse + print, no DB writes, do not mark seen") + ap.add_argument("--all", action="store_true", + help="process ALL messages (incl. already-seen); default is UNSEEN only") + ap.add_argument("--alert", action="store_true", + help="after ingest, print recent-alignment summary + Telegram on problems") + ap.add_argument("--summary-days", type=int, default=7, + help="window (days) for the --alert summary (default 7)") + args = ap.parse_args() + + if not IMAP_PASS: + print("ERROR: DMARC_IMAP_PASS not set", file=sys.stderr) + return 2 + + imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) + imap.login(IMAP_USER, IMAP_PASS) + imap.select(IMAP_FOLDER) + typ, data = imap.search(None, "ALL" if args.all else "UNSEEN") + ids = data[0].split() if typ == "OK" and data and data[0] else [] + print(f"{IMAP_USER}: {len(ids)} message(s) to scan ({'ALL' if args.all else 'UNSEEN'}).") + + conn = None + if not args.dry_run: + url = os.getenv("DATABASE_URL", "") + if not url: + print("ERROR: DATABASE_URL not set", file=sys.stderr) + return 2 + import psycopg2 # lazy: only needed when actually writing + conn = psycopg2.connect(url) + + ingested = skipped = no_xml = 0 + try: + for mid in ids: + typ, mdata = imap.fetch(mid, "(RFC822)") + if typ != "OK" or not mdata or not mdata[0]: + continue + msg = email.message_from_bytes(mdata[0][1]) + xml = find_report_xml(msg) + if not xml: + no_xml += 1 + # Not a report (e.g. our test probe) -- mark seen so we skip it next time. + if not args.dry_run and not args.all: + imap.store(mid, "+FLAGS", "\\Seen") + continue + parsed = parse_report(xml) + if not parsed: + continue + header, records = parsed + if args.dry_run: + fails = sum(r["msg_count"] for r in records if not r["dmarc_pass"]) + print(f" {header['org_name']:<28} report={header['report_id'][:32]:<32} " + f"records={len(records)} fail_msgs={fails}") + continue + if store(conn, header, records): + ingested += 1 + else: + skipped += 1 + if not args.all: + imap.store(mid, "+FLAGS", "\\Seen") + + if args.dry_run: + print(f"DRY-RUN: scanned {len(ids)}, non-report={no_xml}.") + else: + print(f"Ingested {ingested} new report(s); {skipped} already present; " + f"{no_xml} non-report message(s).") + if args.alert: + text, problems = summarize(conn, args.summary_days) + print("\n" + text) + if problems: + msg = ("⚠️ DMARC alignment alert\n\n" + text + + "\n\nIssues:\n" + "\n".join(f"- {p}" for p in problems)) + send_telegram(msg) + finally: + if conn: + conn.close() + imap.logout() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())