#!/usr/bin/env python3 """Mail reputation monitor -- SNDS-equivalent trend from our own postfix logs. WHY --- Sender reputation is judged by the RECEIVING operator (Microsoft 365, Google, Yahoo, Proofpoint, ...), not by recipient domain. The provider portals (Microsoft SNDS, Google Postmaster, Yahoo CFL) show this, but each needs a login and lags 24-48h. Our postfix logs already contain the ground truth in real time: every delivery attempt records the receiving host + the SMTP response, and the response text tells us WHY a message was throttled/rejected: * 250 ... -> accepted * 451 4.7.500 Server busy -> throttled (Microsoft rate-limiting a cold IP) * 550 5.7.1 / 5.7.x reputation -> rejected for sender reputation/spam * 550 5.4.1 / 5.1.1 -> recipient unknown / access denied (list hygiene) * 550 ... classified as SPAM -> content rejection (SpamAssassin etc.) A 2026-06-19 audit found ~80% of Microsoft sends were getting 451 4.7.500 throttles on the warming IPs. This script turns that into a daily, queryable trend so we can SEE reputation recover (throttle% falling, accept% rising) without any provider login. WHAT IT DOES ------------ Parses one or more postfix logs, keeps only OUR bulk/campaign egress (the out0x/hcout/relay smtp clients), classifies every status=sent/deferred/bounced line by (sending_ip, receiving operator, outcome class, reason code), and upserts daily aggregate rows into `mail_reputation_daily`. Idempotent: re-running for the same day overwrites that day's counts (ON CONFLICT upsert), so a cron can run it nightly against the rotated log without double-counting. Optionally sends a Telegram summary / alert (see --alert). USAGE ----- # parse the current + rotated logs, upsert daily snapshots sudo python3 -m scripts.mail_reputation_monitor /var/log/mail.log /var/log/mail.log.1 # also print a human summary of the most recent day and alert on regressions sudo python3 -m scripts.mail_reputation_monitor --alert /var/log/mail.log # dry-run (parse + print, no DB writes) sudo python3 -m scripts.mail_reputation_monitor --dry-run /var/log/mail.log Connection: DATABASE_URL (the app DB). Telegram: TELEGRAM_BOT_TOKEN/TELEGRAM_CHAT_ID. """ from __future__ import annotations import argparse import gzip import json import os import re import sys import urllib.request from collections import defaultdict from datetime import datetime import psycopg2 # ── Which postfix smtp clients are OUR outbound campaign/transactional egress ── # Postfix logs these as postfix//smtp[pid] where is the # master.cf service name: out05-09 = trucking/main pool; hcout0-9 = healthcare; # rehab0x = (disabled) rehab pool; plain "smtp" = transactional via co.carrierone. EGRESS_CLIENT_RE = re.compile(r"postfix/(out0[0-9]/smtp|hcout[0-9]/smtp|rehab0[0-9]/smtp|smtp)\[") # Pull the structured fields out of a postfix smtp delivery line. DATE_RE = re.compile(r"^([A-Z][a-z]{2})\s+(\d{1,2})\s") STATUS_RE = re.compile(r"status=(sent|deferred|bounced)") RELAY_RE = re.compile(r"relay=([^,\s]+)") TO_RE = re.compile(r"to=<([^>]*)>") DSN_RE = re.compile(r"dsn=(\d\.\d+\.\d+)") # Microsoft puts "from [207.174.124.94]" in throttle messages; otherwise we tag # the IP from the queue-id<->IP mapping is unavailable, so we read it from the # "from [ip]" hint when present, else leave as the stream's default below. FROM_IP_RE = re.compile(r"from \[(\d{1,3}(?:\.\d{1,3}){3})\]") # Enhanced status code inside the remote reply, e.g. "550 5.7.1" or "451 4.7.500". REPLY_CODE_RE = re.compile(r"\b([45]\d\d)[ -](\d\.\d+\.\d+)\b") MONTHS = {m: i for i, m in enumerate( ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"], start=1)} # Map a sending postfix client to its default egress IP (when the line itself # doesn't echo "from [ip]"). out05-09 -> .94 main pool; hcout* -> .107 HC stream. def default_ip_for_client(line: str) -> str: if re.search(r"postfix/out0[0-9]/smtp\[", line): return "207.174.124.94" if re.search(r"postfix/hcout[0-9]/smtp\[", line): return "207.174.124.107" if re.search(r"postfix/rehab0[0-9]/smtp\[", line): return "207.174.124.9x" # rehab pool (disabled), kept for historical logs return "207.174.124.71" # transactional default egress def classify_receiver(relay: str, to_addr: str) -> str: """Normalize the receiving operator from the relay hostname / recipient domain.""" r = (relay or "").lower() if "protection.outlook.com" in r or "outlook.com" in r or "hotmail" in r: return "microsoft" if "google.com" in r or "aspmx.l.google" in r or "googlemail" in r or "psmtp" in r: return "google" if "yahoodns.net" in r or "yahoo.com" in r or "aol.com" in r: return "yahoo" if "pphosted.com" in r or "proofpoint" in r: return "proofpoint" if "mimecast" in r: return "mimecast" # Fall back to the recipient domain for the consumer mailboxes. d = to_addr.rsplit("@", 1)[-1].lower() if "@" in to_addr else "" if d in ("outlook.com", "hotmail.com", "live.com", "msn.com"): return "microsoft" if d in ("gmail.com", "googlemail.com"): return "google" if d in ("yahoo.com", "aol.com", "ymail.com"): return "yahoo" return "other" def classify_outcome(status: str, dsn: str, text: str) -> str: """Bucket a delivery into an outcome class from status + dsn + reply text.""" t = (text or "").lower() if status == "sent": return "accepted" if status == "deferred": # 4.7.500 server busy / throttling is the dominant Microsoft cold-IP signal. if dsn == "4.7.500" or "server busy" in t or "4.7.500" in t: return "throttled" if dsn.startswith("4.7") or "rate" in t or "too many" in t or "throttl" in t: return "throttled" return "deferred_other" # bounced (5xx) if dsn in ("5.7.1",) or "5.7.1" in t or "reputation" in t or "spam" in t and "classified" not in t: return "reject_reputation" if "classified as spam" in t or "content" in t: return "reject_content" if dsn in ("5.1.1", "5.4.1", "5.1.0") or "user unknown" in t or "recipient address rejected" in t \ or "does not exist" in t or "no mailbox" in t or "access denied" in t: return "reject_recipient" if dsn.startswith("5.7"): return "reject_reputation" return "reject_other" def iter_lines(paths: list[str]): """Yield lines from each path, transparently decompressing .gz. A path of "-" reads from stdin (so a cron can `sudo cat /var/log/mail.log | docker compose exec -T workers python3 -m scripts.mail_reputation_monitor -`, which is how we feed the host-owned log into the DB-connected container). """ for p in paths: try: if p == "-": yield from sys.stdin elif p.endswith(".gz"): with gzip.open(p, "rt", errors="replace") as fh: yield from fh else: with open(p, "rt", errors="replace") as fh: yield from fh except FileNotFoundError: continue except PermissionError: print(f"WARN: cannot read {p} (need sudo?)", file=sys.stderr) continue def parse(paths: list[str], year: int) -> dict: """Return {(date, ip, receiver, outcome, code): [count, sample_text]}.""" agg: dict = defaultdict(lambda: [0, None]) for line in iter_lines(paths): if not EGRESS_CLIENT_RE.search(line): continue sm = STATUS_RE.search(line) if not sm: continue status = sm.group(1) dm = DATE_RE.match(line) if not dm: continue mon, day = MONTHS.get(dm.group(1)), int(dm.group(2)) if not mon: continue log_date = f"{year:04d}-{mon:02d}-{day:02d}" m = RELAY_RE.search(line) relay = m.group(1) if m else "" # Skip internal relays (localhost content filters, our own relay handoff). if relay.startswith("127.0.0.1") or relay.startswith("co.carrierone.com"): continue to_m = TO_RE.search(line) to_addr = to_m.group(1) if to_m else "" # Skip internal/role recipients (system bounces, our own monitoring). rd = to_addr.rsplit("@", 1)[-1].lower() if "@" in to_addr else "" if rd in ("performancewest.net", "send.performancewest.net", "carrierone.com", "perfwest.performancewest.net", "example.com", "example.org"): continue dm2 = DSN_RE.search(line) dsn = dm2.group(1) if dm2 else "" ip_m = FROM_IP_RE.search(line) ip = ip_m.group(1) if ip_m else default_ip_for_client(line) receiver = classify_receiver(relay, to_addr) # Representative reply text = everything after "said:" or the status paren. text = line.split("said:", 1)[-1].strip() if "said:" in line else line outcome = classify_outcome(status, dsn, text) # Reason code: prefer dsn; else the code embedded in the reply text. code = dsn if not code: cm = REPLY_CODE_RE.search(text) code = cm.group(2) if cm else "" key = (log_date, ip, receiver, outcome, code) agg[key][0] += 1 if agg[key][1] is None: agg[key][1] = text[:300].strip() return agg def upsert(conn, agg: dict) -> int: n = 0 with conn.cursor() as cur: for (log_date, ip, receiver, outcome, code), (count, sample) in agg.items(): cur.execute( """ INSERT INTO mail_reputation_daily (log_date, sending_ip, receiver, outcome, reason_code, msg_count, sample_text) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (log_date, sending_ip, receiver, outcome, reason_code) DO UPDATE SET msg_count = EXCLUDED.msg_count, sample_text = EXCLUDED.sample_text, updated_at = now() """, (log_date, ip, receiver, outcome, code or "", count, sample), ) n += 1 conn.commit() return n def summarize(conn, log_date: str | None = None) -> tuple[str, list[str]]: """Build a human summary of a day's reputation. Returns (text, problems).""" with conn.cursor() as cur: if log_date is None: cur.execute("SELECT max(log_date) FROM mail_reputation_daily") row = cur.fetchone() log_date = str(row[0]) if row and row[0] else None if not log_date: return "No mail_reputation_daily data yet.", [] cur.execute( """ SELECT receiver, sum(msg_count) FILTER (WHERE outcome='accepted') AS accepted, sum(msg_count) FILTER (WHERE outcome='throttled') AS throttled, sum(msg_count) FILTER (WHERE outcome='reject_reputation') AS rej_rep, sum(msg_count) FILTER (WHERE outcome='reject_recipient') AS rej_rcpt, sum(msg_count) FILTER (WHERE outcome='reject_content') AS rej_content, sum(msg_count) AS total FROM mail_reputation_daily WHERE log_date = %s GROUP BY receiver ORDER BY total DESC """, (log_date,), ) rows = cur.fetchall() lines = [f"Mail reputation {log_date} (by receiving operator):"] problems: list[str] = [] for receiver, acc, thr, rrep, rrcpt, rcont, total in rows: acc, thr, rrep, rrcpt, rcont, total = ( int(acc or 0), int(thr or 0), int(rrep or 0), int(rrcpt or 0), int(rcont or 0), int(total or 0)) if total == 0: continue acc_pct = round(100 * acc / total) thr_pct = round(100 * thr / total) rrep_pct = round(100 * rrep / total) lines.append( f" {receiver:<10} total={total:<6} accepted={acc_pct}% " f"throttled={thr_pct}% reputation-reject={rrep_pct}% " f"(rcpt={rrcpt} content={rcont})" ) # Reputation alerts (only on meaningful volume). if total >= 100: if rrep_pct >= 10: problems.append(f"{receiver}: {rrep_pct}% reputation rejects ({rrep}/{total})") if receiver == "microsoft" and thr_pct >= 70: problems.append(f"microsoft: {thr_pct}% throttled (451 4.7.500) -- still cold") return "\n".join(lines), problems def send_telegram(text: str) -> bool: bot = os.getenv("TELEGRAM_BOT_TOKEN", "") chat = os.getenv("TELEGRAM_CHAT_ID", "") if not bot or not chat: return False try: data = json.dumps({"chat_id": chat, "text": text}).encode() req = urllib.request.Request( f"https://api.telegram.org/bot{bot}/sendMessage", data=data, headers={"Content-Type": "application/json"}) urllib.request.urlopen(req, timeout=10) return True except Exception as exc: # never fail the cron on a notify error print(f"WARN: telegram notify failed: {exc}", file=sys.stderr) return False def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("logs", nargs="*", default=["/var/log/mail.log"], help="postfix log paths (.gz ok). Default: /var/log/mail.log") ap.add_argument("--dry-run", action="store_true", help="parse + print, no DB writes") ap.add_argument("--alert", action="store_true", help="print summary + send Telegram (summary always; alert if problems)") ap.add_argument("--year", type=int, default=datetime.now().year, help="year to stamp on syslog dates (default: current year)") args = ap.parse_args() logs = args.logs or ["/var/log/mail.log"] agg = parse(logs, args.year) if not agg: print("No egress delivery lines found in the given logs.") return 0 if args.dry_run: # Print the parsed aggregate without touching the DB. by_day: dict = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) for (d, ip, recv, outcome, code), (cnt, _s) in agg.items(): by_day[d][recv][outcome] += cnt for d in sorted(by_day): print(f"\n{d}:") for recv in sorted(by_day[d]): outs = by_day[d][recv] tot = sum(outs.values()) print(f" {recv:<10} total={tot:<6} " + " ".join(f"{k}={v}" for k, v in sorted(outs.items()))) print(f"\nDRY-RUN: parsed {len(agg)} aggregate buckets, no DB writes.") return 0 url = os.getenv("DATABASE_URL", "") if not url: print("ERROR: DATABASE_URL not set", file=sys.stderr) return 2 conn = psycopg2.connect(url) try: n = upsert(conn, agg) print(f"Upserted {n} mail_reputation_daily rows from {len(logs)} log file(s).") if args.alert: text, problems = summarize(conn) print("\n" + text) if problems: msg = ("⚠️ Mail reputation alert\n\n" + text + "\n\nIssues:\n" + "\n".join(f"- {p}" for p in problems)) send_telegram(msg) else: # Silent-on-healthy by default; uncomment to get a daily green ping. pass finally: conn.close() return 0 if __name__ == "__main__": raise SystemExit(main())