From 08f651dc1e7eafa75e870c75d744ec5630318ad2 Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 19 Jun 2026 08:35:45 -0500 Subject: [PATCH] feat(deliverability): mail reputation monitor (SNDS-equivalent from postfix logs) Adds scripts/mail_reputation_monitor.py + migration 101 (mail_reputation_daily). Sender reputation is judged by the RECEIVING operator (Microsoft/Google/Yahoo/ Proofpoint), and the provider portals (SNDS/Postmaster/CFL) need a login and lag 24-48h. Our postfix logs already carry the ground truth in real time: every send records the receiving host + SMTP response, and the response classifies WHY: 250 -> accepted 451 4.7.500 -> throttled (Microsoft rate-limiting a cold IP) 550 5.7.x -> reject_reputation (spam/reputation) 550 5.1.1/5.4.1-> reject_recipient (dead mailbox / access denied = list hygiene) 550 ...SPAM -> reject_content (SpamAssassin) The parser classifies each egress delivery (out0x/hcout/relay) by (sending_ip, receiver, outcome, reason_code) and upserts ONE daily aggregate row per bucket (idempotent ON CONFLICT), so a nightly cron over the rotated log gives a queryable trend without re-parse double-counting. --alert prints a per-operator summary and Telegram-alerts on regressions (>=10% reputation rejects, or Microsoft >=70% throttled). Reads stdin ("-") so the host-owned /var/log/mail.log can be piped into the DB-connected workers container. Motivation: 2026-06-19 audit found ~80% of Microsoft sends were getting 451 4.7.500 throttles on the warming IPs -- this makes that trend visible as reputation recovers. --- api/migrations/101_mail_reputation_daily.sql | 39 ++ scripts/mail_reputation_monitor.py | 370 +++++++++++++++++++ 2 files changed, 409 insertions(+) create mode 100644 api/migrations/101_mail_reputation_daily.sql create mode 100644 scripts/mail_reputation_monitor.py diff --git a/api/migrations/101_mail_reputation_daily.sql b/api/migrations/101_mail_reputation_daily.sql new file mode 100644 index 0000000..e887141 --- /dev/null +++ b/api/migrations/101_mail_reputation_daily.sql @@ -0,0 +1,39 @@ +-- Daily mail-reputation snapshots parsed from the postfix logs. +-- +-- WHY: Sender reputation lives at the RECEIVING operator (Microsoft 365, Google, +-- Yahoo, ...), not at the recipient domain. The provider portals (SNDS, Postmaster, +-- Yahoo CFL) show this but each needs a login and lags 24-48h. Our own postfix logs +-- already contain the ground truth in real time: every send to a Microsoft tenant +-- returns 250 (accepted) / 451 4.7.500 (throttled, "server busy") / 5xx (rejected), +-- and the reject text tells us WHY (reputation 5.7.1, recipient unknown 5.1.1, etc.). +-- A 2026-06-19 audit found 80% of Microsoft sends were getting 451 4.7.500 throttles +-- on the warming IPs -- exactly the signal we need to watch ease as reputation builds. +-- +-- This table stores ONE row per (log_date, sending_ip, receiver_operator, outcome +-- class) so we can chart accepted% / deferred% / reject-reason mix per operator over +-- time without re-parsing logs (which rotate fast). Populated by +-- scripts/mail_reputation_monitor.py (idempotent upsert per day). + +CREATE TABLE IF NOT EXISTS mail_reputation_daily ( + id BIGSERIAL PRIMARY KEY, + log_date DATE NOT NULL, -- the calendar day of the log lines (server TZ) + sending_ip TEXT NOT NULL, -- our egress IP (207.174.124.94 / .107 / ...) + receiver TEXT NOT NULL, -- normalized receiving operator: microsoft|google|yahoo|proofpoint|other + outcome TEXT NOT NULL, -- accepted|throttled|reject_reputation|reject_recipient|reject_content|reject_other|deferred_other + reason_code TEXT, -- representative enhanced status / code (e.g. 4.7.500, 5.7.1, 5.1.1) + msg_count INTEGER NOT NULL DEFAULT 0, + sample_text TEXT, -- one representative raw response, for debugging + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (log_date, sending_ip, receiver, outcome, reason_code) +); + +CREATE INDEX IF NOT EXISTS idx_mail_rep_daily_date ON mail_reputation_daily (log_date); +CREATE INDEX IF NOT EXISTS idx_mail_rep_daily_receiver ON mail_reputation_daily (receiver, log_date); + +COMMENT ON TABLE mail_reputation_daily IS + 'Daily per-IP per-receiving-operator mail delivery outcome counts parsed from postfix logs (SNDS-equivalent reputation trend, no provider login needed). Populated by scripts/mail_reputation_monitor.py.'; +COMMENT ON COLUMN mail_reputation_daily.receiver IS + 'Normalized receiving operator: microsoft|google|yahoo|proofpoint|mimecast|other'; +COMMENT ON COLUMN mail_reputation_daily.outcome IS + 'accepted|throttled|reject_reputation|reject_recipient|reject_content|reject_other|deferred_other'; diff --git a/scripts/mail_reputation_monitor.py b/scripts/mail_reputation_monitor.py new file mode 100644 index 0000000..50c724f --- /dev/null +++ b/scripts/mail_reputation_monitor.py @@ -0,0 +1,370 @@ +#!/usr/bin/env python3 +"""Mail reputation monitor -- SNDS-equivalent trend from our own postfix logs. + +WHY +--- +Sender reputation is judged by the RECEIVING operator (Microsoft 365, Google, +Yahoo, Proofpoint, ...), not by recipient domain. The provider portals (Microsoft +SNDS, Google Postmaster, Yahoo CFL) show this, but each needs a login and lags +24-48h. Our postfix logs already contain the ground truth in real time: every +delivery attempt records the receiving host + the SMTP response, and the response +text tells us WHY a message was throttled/rejected: + + * 250 ... -> accepted + * 451 4.7.500 Server busy -> throttled (Microsoft rate-limiting a cold IP) + * 550 5.7.1 / 5.7.x reputation -> rejected for sender reputation/spam + * 550 5.4.1 / 5.1.1 -> recipient unknown / access denied (list hygiene) + * 550 ... classified as SPAM -> content rejection (SpamAssassin etc.) + +A 2026-06-19 audit found ~80% of Microsoft sends were getting 451 4.7.500 throttles +on the warming IPs. This script turns that into a daily, queryable trend so we can +SEE reputation recover (throttle% falling, accept% rising) without any provider login. + +WHAT IT DOES +------------ +Parses one or more postfix logs, keeps only OUR bulk/campaign egress (the +out0x/hcout/relay smtp clients), classifies every status=sent/deferred/bounced +line by (sending_ip, receiving operator, outcome class, reason code), and upserts +daily aggregate rows into `mail_reputation_daily`. Idempotent: re-running for the +same day overwrites that day's counts (ON CONFLICT upsert), so a cron can run it +nightly against the rotated log without double-counting. + +Optionally sends a Telegram summary / alert (see --alert). + +USAGE +----- + # parse the current + rotated logs, upsert daily snapshots + sudo python3 -m scripts.mail_reputation_monitor /var/log/mail.log /var/log/mail.log.1 + + # also print a human summary of the most recent day and alert on regressions + sudo python3 -m scripts.mail_reputation_monitor --alert /var/log/mail.log + + # dry-run (parse + print, no DB writes) + sudo python3 -m scripts.mail_reputation_monitor --dry-run /var/log/mail.log + +Connection: DATABASE_URL (the app DB). Telegram: TELEGRAM_BOT_TOKEN/TELEGRAM_CHAT_ID. +""" +from __future__ import annotations + +import argparse +import gzip +import json +import os +import re +import sys +import urllib.request +from collections import defaultdict +from datetime import datetime + +import psycopg2 + +# ── Which postfix smtp clients are OUR outbound campaign/transactional egress ── +# Postfix logs these as postfix//smtp[pid] where is the +# master.cf service name: out05-09 = trucking/main pool; hcout0-9 = healthcare; +# rehab0x = (disabled) rehab pool; plain "smtp" = transactional via co.carrierone. +EGRESS_CLIENT_RE = re.compile(r"postfix/(out0[0-9]/smtp|hcout[0-9]/smtp|rehab0[0-9]/smtp|smtp)\[") + +# Pull the structured fields out of a postfix smtp delivery line. +DATE_RE = re.compile(r"^([A-Z][a-z]{2})\s+(\d{1,2})\s") +STATUS_RE = re.compile(r"status=(sent|deferred|bounced)") +RELAY_RE = re.compile(r"relay=([^,\s]+)") +TO_RE = re.compile(r"to=<([^>]*)>") +DSN_RE = re.compile(r"dsn=(\d\.\d+\.\d+)") +# Microsoft puts "from [207.174.124.94]" in throttle messages; otherwise we tag +# the IP from the queue-id<->IP mapping is unavailable, so we read it from the +# "from [ip]" hint when present, else leave as the stream's default below. +FROM_IP_RE = re.compile(r"from \[(\d{1,3}(?:\.\d{1,3}){3})\]") +# Enhanced status code inside the remote reply, e.g. "550 5.7.1" or "451 4.7.500". +REPLY_CODE_RE = re.compile(r"\b([45]\d\d)[ -](\d\.\d+\.\d+)\b") + +MONTHS = {m: i for i, m in enumerate( + ["Jan", "Feb", "Mar", "Apr", "May", "Jun", + "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"], start=1)} + +# Map a sending postfix client to its default egress IP (when the line itself +# doesn't echo "from [ip]"). out05-09 -> .94 main pool; hcout* -> .107 HC stream. +def default_ip_for_client(line: str) -> str: + if re.search(r"postfix/out0[0-9]/smtp\[", line): + return "207.174.124.94" + if re.search(r"postfix/hcout[0-9]/smtp\[", line): + return "207.174.124.107" + if re.search(r"postfix/rehab0[0-9]/smtp\[", line): + return "207.174.124.9x" # rehab pool (disabled), kept for historical logs + return "207.174.124.71" # transactional default egress + + +def classify_receiver(relay: str, to_addr: str) -> str: + """Normalize the receiving operator from the relay hostname / recipient domain.""" + r = (relay or "").lower() + if "protection.outlook.com" in r or "outlook.com" in r or "hotmail" in r: + return "microsoft" + if "google.com" in r or "aspmx.l.google" in r or "googlemail" in r or "psmtp" in r: + return "google" + if "yahoodns.net" in r or "yahoo.com" in r or "aol.com" in r: + return "yahoo" + if "pphosted.com" in r or "proofpoint" in r: + return "proofpoint" + if "mimecast" in r: + return "mimecast" + # Fall back to the recipient domain for the consumer mailboxes. + d = to_addr.rsplit("@", 1)[-1].lower() if "@" in to_addr else "" + if d in ("outlook.com", "hotmail.com", "live.com", "msn.com"): + return "microsoft" + if d in ("gmail.com", "googlemail.com"): + return "google" + if d in ("yahoo.com", "aol.com", "ymail.com"): + return "yahoo" + return "other" + + +def classify_outcome(status: str, dsn: str, text: str) -> str: + """Bucket a delivery into an outcome class from status + dsn + reply text.""" + t = (text or "").lower() + if status == "sent": + return "accepted" + if status == "deferred": + # 4.7.500 server busy / throttling is the dominant Microsoft cold-IP signal. + if dsn == "4.7.500" or "server busy" in t or "4.7.500" in t: + return "throttled" + if dsn.startswith("4.7") or "rate" in t or "too many" in t or "throttl" in t: + return "throttled" + return "deferred_other" + # bounced (5xx) + if dsn in ("5.7.1",) or "5.7.1" in t or "reputation" in t or "spam" in t and "classified" not in t: + return "reject_reputation" + if "classified as spam" in t or "content" in t: + return "reject_content" + if dsn in ("5.1.1", "5.4.1", "5.1.0") or "user unknown" in t or "recipient address rejected" in t \ + or "does not exist" in t or "no mailbox" in t or "access denied" in t: + return "reject_recipient" + if dsn.startswith("5.7"): + return "reject_reputation" + return "reject_other" + + +def iter_lines(paths: list[str]): + """Yield lines from each path, transparently decompressing .gz. + + A path of "-" reads from stdin (so a cron can `sudo cat /var/log/mail.log | + docker compose exec -T workers python3 -m scripts.mail_reputation_monitor -`, + which is how we feed the host-owned log into the DB-connected container). + """ + for p in paths: + try: + if p == "-": + yield from sys.stdin + elif p.endswith(".gz"): + with gzip.open(p, "rt", errors="replace") as fh: + yield from fh + else: + with open(p, "rt", errors="replace") as fh: + yield from fh + except FileNotFoundError: + continue + except PermissionError: + print(f"WARN: cannot read {p} (need sudo?)", file=sys.stderr) + continue + + +def parse(paths: list[str], year: int) -> dict: + """Return {(date, ip, receiver, outcome, code): [count, sample_text]}.""" + agg: dict = defaultdict(lambda: [0, None]) + for line in iter_lines(paths): + if not EGRESS_CLIENT_RE.search(line): + continue + sm = STATUS_RE.search(line) + if not sm: + continue + status = sm.group(1) + dm = DATE_RE.match(line) + if not dm: + continue + mon, day = MONTHS.get(dm.group(1)), int(dm.group(2)) + if not mon: + continue + log_date = f"{year:04d}-{mon:02d}-{day:02d}" + + m = RELAY_RE.search(line) + relay = m.group(1) if m else "" + # Skip internal relays (localhost content filters, our own relay handoff). + if relay.startswith("127.0.0.1") or relay.startswith("co.carrierone.com"): + continue + to_m = TO_RE.search(line) + to_addr = to_m.group(1) if to_m else "" + # Skip internal/role recipients (system bounces, our own monitoring). + rd = to_addr.rsplit("@", 1)[-1].lower() if "@" in to_addr else "" + if rd in ("performancewest.net", "send.performancewest.net", "carrierone.com", + "perfwest.performancewest.net", "example.com", "example.org"): + continue + + dm2 = DSN_RE.search(line) + dsn = dm2.group(1) if dm2 else "" + ip_m = FROM_IP_RE.search(line) + ip = ip_m.group(1) if ip_m else default_ip_for_client(line) + + receiver = classify_receiver(relay, to_addr) + # Representative reply text = everything after "said:" or the status paren. + text = line.split("said:", 1)[-1].strip() if "said:" in line else line + outcome = classify_outcome(status, dsn, text) + + # Reason code: prefer dsn; else the code embedded in the reply text. + code = dsn + if not code: + cm = REPLY_CODE_RE.search(text) + code = cm.group(2) if cm else "" + + key = (log_date, ip, receiver, outcome, code) + agg[key][0] += 1 + if agg[key][1] is None: + agg[key][1] = text[:300].strip() + return agg + + +def upsert(conn, agg: dict) -> int: + n = 0 + with conn.cursor() as cur: + for (log_date, ip, receiver, outcome, code), (count, sample) in agg.items(): + cur.execute( + """ + INSERT INTO mail_reputation_daily + (log_date, sending_ip, receiver, outcome, reason_code, msg_count, sample_text) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (log_date, sending_ip, receiver, outcome, reason_code) + DO UPDATE SET msg_count = EXCLUDED.msg_count, + sample_text = EXCLUDED.sample_text, + updated_at = now() + """, + (log_date, ip, receiver, outcome, code or "", count, sample), + ) + n += 1 + conn.commit() + return n + + +def summarize(conn, log_date: str | None = None) -> tuple[str, list[str]]: + """Build a human summary of a day's reputation. Returns (text, problems).""" + with conn.cursor() as cur: + if log_date is None: + cur.execute("SELECT max(log_date) FROM mail_reputation_daily") + row = cur.fetchone() + log_date = str(row[0]) if row and row[0] else None + if not log_date: + return "No mail_reputation_daily data yet.", [] + cur.execute( + """ + SELECT receiver, + sum(msg_count) FILTER (WHERE outcome='accepted') AS accepted, + sum(msg_count) FILTER (WHERE outcome='throttled') AS throttled, + sum(msg_count) FILTER (WHERE outcome='reject_reputation') AS rej_rep, + sum(msg_count) FILTER (WHERE outcome='reject_recipient') AS rej_rcpt, + sum(msg_count) FILTER (WHERE outcome='reject_content') AS rej_content, + sum(msg_count) AS total + FROM mail_reputation_daily + WHERE log_date = %s + GROUP BY receiver + ORDER BY total DESC + """, + (log_date,), + ) + rows = cur.fetchall() + + lines = [f"Mail reputation {log_date} (by receiving operator):"] + problems: list[str] = [] + for receiver, acc, thr, rrep, rrcpt, rcont, total in rows: + acc, thr, rrep, rrcpt, rcont, total = ( + int(acc or 0), int(thr or 0), int(rrep or 0), + int(rrcpt or 0), int(rcont or 0), int(total or 0)) + if total == 0: + continue + acc_pct = round(100 * acc / total) + thr_pct = round(100 * thr / total) + rrep_pct = round(100 * rrep / total) + lines.append( + f" {receiver:<10} total={total:<6} accepted={acc_pct}% " + f"throttled={thr_pct}% reputation-reject={rrep_pct}% " + f"(rcpt={rrcpt} content={rcont})" + ) + # Reputation alerts (only on meaningful volume). + if total >= 100: + if rrep_pct >= 10: + problems.append(f"{receiver}: {rrep_pct}% reputation rejects ({rrep}/{total})") + if receiver == "microsoft" and thr_pct >= 70: + problems.append(f"microsoft: {thr_pct}% throttled (451 4.7.500) -- still cold") + return "\n".join(lines), problems + + +def send_telegram(text: str) -> bool: + bot = os.getenv("TELEGRAM_BOT_TOKEN", "") + chat = os.getenv("TELEGRAM_CHAT_ID", "") + if not bot or not chat: + return False + try: + data = json.dumps({"chat_id": chat, "text": text}).encode() + req = urllib.request.Request( + f"https://api.telegram.org/bot{bot}/sendMessage", + data=data, headers={"Content-Type": "application/json"}) + urllib.request.urlopen(req, timeout=10) + return True + except Exception as exc: # never fail the cron on a notify error + print(f"WARN: telegram notify failed: {exc}", file=sys.stderr) + return False + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("logs", nargs="*", + default=["/var/log/mail.log"], + help="postfix log paths (.gz ok). Default: /var/log/mail.log") + ap.add_argument("--dry-run", action="store_true", help="parse + print, no DB writes") + ap.add_argument("--alert", action="store_true", + help="print summary + send Telegram (summary always; alert if problems)") + ap.add_argument("--year", type=int, default=datetime.now().year, + help="year to stamp on syslog dates (default: current year)") + args = ap.parse_args() + + logs = args.logs or ["/var/log/mail.log"] + agg = parse(logs, args.year) + if not agg: + print("No egress delivery lines found in the given logs.") + return 0 + + if args.dry_run: + # Print the parsed aggregate without touching the DB. + by_day: dict = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) + for (d, ip, recv, outcome, code), (cnt, _s) in agg.items(): + by_day[d][recv][outcome] += cnt + for d in sorted(by_day): + print(f"\n{d}:") + for recv in sorted(by_day[d]): + outs = by_day[d][recv] + tot = sum(outs.values()) + print(f" {recv:<10} total={tot:<6} " + + " ".join(f"{k}={v}" for k, v in sorted(outs.items()))) + print(f"\nDRY-RUN: parsed {len(agg)} aggregate buckets, no DB writes.") + return 0 + + url = os.getenv("DATABASE_URL", "") + if not url: + print("ERROR: DATABASE_URL not set", file=sys.stderr) + return 2 + conn = psycopg2.connect(url) + try: + n = upsert(conn, agg) + print(f"Upserted {n} mail_reputation_daily rows from {len(logs)} log file(s).") + if args.alert: + text, problems = summarize(conn) + print("\n" + text) + if problems: + msg = ("⚠️ Mail reputation alert\n\n" + text + + "\n\nIssues:\n" + "\n".join(f"- {p}" for p in problems)) + send_telegram(msg) + else: + # Silent-on-healthy by default; uncomment to get a daily green ping. + pass + finally: + conn.close() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())