feat(deliverability): mail reputation monitor (SNDS-equivalent from postfix logs)

Adds scripts/mail_reputation_monitor.py + migration 101 (mail_reputation_daily).
Sender reputation is judged by the RECEIVING operator (Microsoft/Google/Yahoo/
Proofpoint), and the provider portals (SNDS/Postmaster/CFL) need a login and lag
24-48h. Our postfix logs already carry the ground truth in real time: every send
records the receiving host + SMTP response, and the response classifies WHY:
  250            -> accepted
  451 4.7.500    -> throttled (Microsoft rate-limiting a cold IP)
  550 5.7.x      -> reject_reputation (spam/reputation)
  550 5.1.1/5.4.1-> reject_recipient (dead mailbox / access denied = list hygiene)
  550 ...SPAM    -> reject_content (SpamAssassin)

The parser classifies each egress delivery (out0x/hcout/relay) by (sending_ip,
receiver, outcome, reason_code) and upserts ONE daily aggregate row per bucket
(idempotent ON CONFLICT), so a nightly cron over the rotated log gives a queryable
trend without re-parse double-counting. --alert prints a per-operator summary and
Telegram-alerts on regressions (>=10% reputation rejects, or Microsoft >=70%
throttled). Reads stdin ("-") so the host-owned /var/log/mail.log can be piped
into the DB-connected workers container.

Motivation: 2026-06-19 audit found ~80% of Microsoft sends were getting 451 4.7.500
throttles on the warming IPs -- this makes that trend visible as reputation recovers.
This commit is contained in:
justin 2026-06-19 08:35:45 -05:00
parent bd7ba23841
commit 08f651dc1e
2 changed files with 409 additions and 0 deletions

View file

@ -0,0 +1,39 @@
-- Daily mail-reputation snapshots parsed from the postfix logs.
--
-- WHY: Sender reputation lives at the RECEIVING operator (Microsoft 365, Google,
-- Yahoo, ...), not at the recipient domain. The provider portals (SNDS, Postmaster,
-- Yahoo CFL) show this but each needs a login and lags 24-48h. Our own postfix logs
-- already contain the ground truth in real time: every send to a Microsoft tenant
-- returns 250 (accepted) / 451 4.7.500 (throttled, "server busy") / 5xx (rejected),
-- and the reject text tells us WHY (reputation 5.7.1, recipient unknown 5.1.1, etc.).
-- A 2026-06-19 audit found 80% of Microsoft sends were getting 451 4.7.500 throttles
-- on the warming IPs -- exactly the signal we need to watch ease as reputation builds.
--
-- This table stores ONE row per (log_date, sending_ip, receiver_operator, outcome
-- class) so we can chart accepted% / deferred% / reject-reason mix per operator over
-- time without re-parsing logs (which rotate fast). Populated by
-- scripts/mail_reputation_monitor.py (idempotent upsert per day).
CREATE TABLE IF NOT EXISTS mail_reputation_daily (
id BIGSERIAL PRIMARY KEY,
log_date DATE NOT NULL, -- the calendar day of the log lines (server TZ)
sending_ip TEXT NOT NULL, -- our egress IP (207.174.124.94 / .107 / ...)
receiver TEXT NOT NULL, -- normalized receiving operator: microsoft|google|yahoo|proofpoint|other
outcome TEXT NOT NULL, -- accepted|throttled|reject_reputation|reject_recipient|reject_content|reject_other|deferred_other
reason_code TEXT, -- representative enhanced status / code (e.g. 4.7.500, 5.7.1, 5.1.1)
msg_count INTEGER NOT NULL DEFAULT 0,
sample_text TEXT, -- one representative raw response, for debugging
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (log_date, sending_ip, receiver, outcome, reason_code)
);
CREATE INDEX IF NOT EXISTS idx_mail_rep_daily_date ON mail_reputation_daily (log_date);
CREATE INDEX IF NOT EXISTS idx_mail_rep_daily_receiver ON mail_reputation_daily (receiver, log_date);
COMMENT ON TABLE mail_reputation_daily IS
'Daily per-IP per-receiving-operator mail delivery outcome counts parsed from postfix logs (SNDS-equivalent reputation trend, no provider login needed). Populated by scripts/mail_reputation_monitor.py.';
COMMENT ON COLUMN mail_reputation_daily.receiver IS
'Normalized receiving operator: microsoft|google|yahoo|proofpoint|mimecast|other';
COMMENT ON COLUMN mail_reputation_daily.outcome IS
'accepted|throttled|reject_reputation|reject_recipient|reject_content|reject_other|deferred_other';

View file

@ -0,0 +1,370 @@
#!/usr/bin/env python3
"""Mail reputation monitor -- SNDS-equivalent trend from our own postfix logs.
WHY
---
Sender reputation is judged by the RECEIVING operator (Microsoft 365, Google,
Yahoo, Proofpoint, ...), not by recipient domain. The provider portals (Microsoft
SNDS, Google Postmaster, Yahoo CFL) show this, but each needs a login and lags
24-48h. Our postfix logs already contain the ground truth in real time: every
delivery attempt records the receiving host + the SMTP response, and the response
text tells us WHY a message was throttled/rejected:
* 250 ... -> accepted
* 451 4.7.500 Server busy -> throttled (Microsoft rate-limiting a cold IP)
* 550 5.7.1 / 5.7.x reputation -> rejected for sender reputation/spam
* 550 5.4.1 / 5.1.1 -> recipient unknown / access denied (list hygiene)
* 550 ... classified as SPAM -> content rejection (SpamAssassin etc.)
A 2026-06-19 audit found ~80% of Microsoft sends were getting 451 4.7.500 throttles
on the warming IPs. This script turns that into a daily, queryable trend so we can
SEE reputation recover (throttle% falling, accept% rising) without any provider login.
WHAT IT DOES
------------
Parses one or more postfix logs, keeps only OUR bulk/campaign egress (the
out0x/hcout/relay smtp clients), classifies every status=sent/deferred/bounced
line by (sending_ip, receiving operator, outcome class, reason code), and upserts
daily aggregate rows into `mail_reputation_daily`. Idempotent: re-running for the
same day overwrites that day's counts (ON CONFLICT upsert), so a cron can run it
nightly against the rotated log without double-counting.
Optionally sends a Telegram summary / alert (see --alert).
USAGE
-----
# parse the current + rotated logs, upsert daily snapshots
sudo python3 -m scripts.mail_reputation_monitor /var/log/mail.log /var/log/mail.log.1
# also print a human summary of the most recent day and alert on regressions
sudo python3 -m scripts.mail_reputation_monitor --alert /var/log/mail.log
# dry-run (parse + print, no DB writes)
sudo python3 -m scripts.mail_reputation_monitor --dry-run /var/log/mail.log
Connection: DATABASE_URL (the app DB). Telegram: TELEGRAM_BOT_TOKEN/TELEGRAM_CHAT_ID.
"""
from __future__ import annotations
import argparse
import gzip
import json
import os
import re
import sys
import urllib.request
from collections import defaultdict
from datetime import datetime
import psycopg2
# ── Which postfix smtp clients are OUR outbound campaign/transactional egress ──
# Postfix logs these as postfix/<transport>/smtp[pid] where <transport> is the
# master.cf service name: out05-09 = trucking/main pool; hcout0-9 = healthcare;
# rehab0x = (disabled) rehab pool; plain "smtp" = transactional via co.carrierone.
EGRESS_CLIENT_RE = re.compile(r"postfix/(out0[0-9]/smtp|hcout[0-9]/smtp|rehab0[0-9]/smtp|smtp)\[")
# Pull the structured fields out of a postfix smtp delivery line.
DATE_RE = re.compile(r"^([A-Z][a-z]{2})\s+(\d{1,2})\s")
STATUS_RE = re.compile(r"status=(sent|deferred|bounced)")
RELAY_RE = re.compile(r"relay=([^,\s]+)")
TO_RE = re.compile(r"to=<([^>]*)>")
DSN_RE = re.compile(r"dsn=(\d\.\d+\.\d+)")
# Microsoft puts "from [207.174.124.94]" in throttle messages; otherwise we tag
# the IP from the queue-id<->IP mapping is unavailable, so we read it from the
# "from [ip]" hint when present, else leave as the stream's default below.
FROM_IP_RE = re.compile(r"from \[(\d{1,3}(?:\.\d{1,3}){3})\]")
# Enhanced status code inside the remote reply, e.g. "550 5.7.1" or "451 4.7.500".
REPLY_CODE_RE = re.compile(r"\b([45]\d\d)[ -](\d\.\d+\.\d+)\b")
MONTHS = {m: i for i, m in enumerate(
["Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"], start=1)}
# Map a sending postfix client to its default egress IP (when the line itself
# doesn't echo "from [ip]"). out05-09 -> .94 main pool; hcout* -> .107 HC stream.
def default_ip_for_client(line: str) -> str:
if re.search(r"postfix/out0[0-9]/smtp\[", line):
return "207.174.124.94"
if re.search(r"postfix/hcout[0-9]/smtp\[", line):
return "207.174.124.107"
if re.search(r"postfix/rehab0[0-9]/smtp\[", line):
return "207.174.124.9x" # rehab pool (disabled), kept for historical logs
return "207.174.124.71" # transactional default egress
def classify_receiver(relay: str, to_addr: str) -> str:
"""Normalize the receiving operator from the relay hostname / recipient domain."""
r = (relay or "").lower()
if "protection.outlook.com" in r or "outlook.com" in r or "hotmail" in r:
return "microsoft"
if "google.com" in r or "aspmx.l.google" in r or "googlemail" in r or "psmtp" in r:
return "google"
if "yahoodns.net" in r or "yahoo.com" in r or "aol.com" in r:
return "yahoo"
if "pphosted.com" in r or "proofpoint" in r:
return "proofpoint"
if "mimecast" in r:
return "mimecast"
# Fall back to the recipient domain for the consumer mailboxes.
d = to_addr.rsplit("@", 1)[-1].lower() if "@" in to_addr else ""
if d in ("outlook.com", "hotmail.com", "live.com", "msn.com"):
return "microsoft"
if d in ("gmail.com", "googlemail.com"):
return "google"
if d in ("yahoo.com", "aol.com", "ymail.com"):
return "yahoo"
return "other"
def classify_outcome(status: str, dsn: str, text: str) -> str:
"""Bucket a delivery into an outcome class from status + dsn + reply text."""
t = (text or "").lower()
if status == "sent":
return "accepted"
if status == "deferred":
# 4.7.500 server busy / throttling is the dominant Microsoft cold-IP signal.
if dsn == "4.7.500" or "server busy" in t or "4.7.500" in t:
return "throttled"
if dsn.startswith("4.7") or "rate" in t or "too many" in t or "throttl" in t:
return "throttled"
return "deferred_other"
# bounced (5xx)
if dsn in ("5.7.1",) or "5.7.1" in t or "reputation" in t or "spam" in t and "classified" not in t:
return "reject_reputation"
if "classified as spam" in t or "content" in t:
return "reject_content"
if dsn in ("5.1.1", "5.4.1", "5.1.0") or "user unknown" in t or "recipient address rejected" in t \
or "does not exist" in t or "no mailbox" in t or "access denied" in t:
return "reject_recipient"
if dsn.startswith("5.7"):
return "reject_reputation"
return "reject_other"
def iter_lines(paths: list[str]):
"""Yield lines from each path, transparently decompressing .gz.
A path of "-" reads from stdin (so a cron can `sudo cat /var/log/mail.log |
docker compose exec -T workers python3 -m scripts.mail_reputation_monitor -`,
which is how we feed the host-owned log into the DB-connected container).
"""
for p in paths:
try:
if p == "-":
yield from sys.stdin
elif p.endswith(".gz"):
with gzip.open(p, "rt", errors="replace") as fh:
yield from fh
else:
with open(p, "rt", errors="replace") as fh:
yield from fh
except FileNotFoundError:
continue
except PermissionError:
print(f"WARN: cannot read {p} (need sudo?)", file=sys.stderr)
continue
def parse(paths: list[str], year: int) -> dict:
"""Return {(date, ip, receiver, outcome, code): [count, sample_text]}."""
agg: dict = defaultdict(lambda: [0, None])
for line in iter_lines(paths):
if not EGRESS_CLIENT_RE.search(line):
continue
sm = STATUS_RE.search(line)
if not sm:
continue
status = sm.group(1)
dm = DATE_RE.match(line)
if not dm:
continue
mon, day = MONTHS.get(dm.group(1)), int(dm.group(2))
if not mon:
continue
log_date = f"{year:04d}-{mon:02d}-{day:02d}"
m = RELAY_RE.search(line)
relay = m.group(1) if m else ""
# Skip internal relays (localhost content filters, our own relay handoff).
if relay.startswith("127.0.0.1") or relay.startswith("co.carrierone.com"):
continue
to_m = TO_RE.search(line)
to_addr = to_m.group(1) if to_m else ""
# Skip internal/role recipients (system bounces, our own monitoring).
rd = to_addr.rsplit("@", 1)[-1].lower() if "@" in to_addr else ""
if rd in ("performancewest.net", "send.performancewest.net", "carrierone.com",
"perfwest.performancewest.net", "example.com", "example.org"):
continue
dm2 = DSN_RE.search(line)
dsn = dm2.group(1) if dm2 else ""
ip_m = FROM_IP_RE.search(line)
ip = ip_m.group(1) if ip_m else default_ip_for_client(line)
receiver = classify_receiver(relay, to_addr)
# Representative reply text = everything after "said:" or the status paren.
text = line.split("said:", 1)[-1].strip() if "said:" in line else line
outcome = classify_outcome(status, dsn, text)
# Reason code: prefer dsn; else the code embedded in the reply text.
code = dsn
if not code:
cm = REPLY_CODE_RE.search(text)
code = cm.group(2) if cm else ""
key = (log_date, ip, receiver, outcome, code)
agg[key][0] += 1
if agg[key][1] is None:
agg[key][1] = text[:300].strip()
return agg
def upsert(conn, agg: dict) -> int:
n = 0
with conn.cursor() as cur:
for (log_date, ip, receiver, outcome, code), (count, sample) in agg.items():
cur.execute(
"""
INSERT INTO mail_reputation_daily
(log_date, sending_ip, receiver, outcome, reason_code, msg_count, sample_text)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (log_date, sending_ip, receiver, outcome, reason_code)
DO UPDATE SET msg_count = EXCLUDED.msg_count,
sample_text = EXCLUDED.sample_text,
updated_at = now()
""",
(log_date, ip, receiver, outcome, code or "", count, sample),
)
n += 1
conn.commit()
return n
def summarize(conn, log_date: str | None = None) -> tuple[str, list[str]]:
"""Build a human summary of a day's reputation. Returns (text, problems)."""
with conn.cursor() as cur:
if log_date is None:
cur.execute("SELECT max(log_date) FROM mail_reputation_daily")
row = cur.fetchone()
log_date = str(row[0]) if row and row[0] else None
if not log_date:
return "No mail_reputation_daily data yet.", []
cur.execute(
"""
SELECT receiver,
sum(msg_count) FILTER (WHERE outcome='accepted') AS accepted,
sum(msg_count) FILTER (WHERE outcome='throttled') AS throttled,
sum(msg_count) FILTER (WHERE outcome='reject_reputation') AS rej_rep,
sum(msg_count) FILTER (WHERE outcome='reject_recipient') AS rej_rcpt,
sum(msg_count) FILTER (WHERE outcome='reject_content') AS rej_content,
sum(msg_count) AS total
FROM mail_reputation_daily
WHERE log_date = %s
GROUP BY receiver
ORDER BY total DESC
""",
(log_date,),
)
rows = cur.fetchall()
lines = [f"Mail reputation {log_date} (by receiving operator):"]
problems: list[str] = []
for receiver, acc, thr, rrep, rrcpt, rcont, total in rows:
acc, thr, rrep, rrcpt, rcont, total = (
int(acc or 0), int(thr or 0), int(rrep or 0),
int(rrcpt or 0), int(rcont or 0), int(total or 0))
if total == 0:
continue
acc_pct = round(100 * acc / total)
thr_pct = round(100 * thr / total)
rrep_pct = round(100 * rrep / total)
lines.append(
f" {receiver:<10} total={total:<6} accepted={acc_pct}% "
f"throttled={thr_pct}% reputation-reject={rrep_pct}% "
f"(rcpt={rrcpt} content={rcont})"
)
# Reputation alerts (only on meaningful volume).
if total >= 100:
if rrep_pct >= 10:
problems.append(f"{receiver}: {rrep_pct}% reputation rejects ({rrep}/{total})")
if receiver == "microsoft" and thr_pct >= 70:
problems.append(f"microsoft: {thr_pct}% throttled (451 4.7.500) -- still cold")
return "\n".join(lines), problems
def send_telegram(text: str) -> bool:
bot = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot or not chat:
return False
try:
data = json.dumps({"chat_id": chat, "text": text}).encode()
req = urllib.request.Request(
f"https://api.telegram.org/bot{bot}/sendMessage",
data=data, headers={"Content-Type": "application/json"})
urllib.request.urlopen(req, timeout=10)
return True
except Exception as exc: # never fail the cron on a notify error
print(f"WARN: telegram notify failed: {exc}", file=sys.stderr)
return False
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("logs", nargs="*",
default=["/var/log/mail.log"],
help="postfix log paths (.gz ok). Default: /var/log/mail.log")
ap.add_argument("--dry-run", action="store_true", help="parse + print, no DB writes")
ap.add_argument("--alert", action="store_true",
help="print summary + send Telegram (summary always; alert if problems)")
ap.add_argument("--year", type=int, default=datetime.now().year,
help="year to stamp on syslog dates (default: current year)")
args = ap.parse_args()
logs = args.logs or ["/var/log/mail.log"]
agg = parse(logs, args.year)
if not agg:
print("No egress delivery lines found in the given logs.")
return 0
if args.dry_run:
# Print the parsed aggregate without touching the DB.
by_day: dict = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
for (d, ip, recv, outcome, code), (cnt, _s) in agg.items():
by_day[d][recv][outcome] += cnt
for d in sorted(by_day):
print(f"\n{d}:")
for recv in sorted(by_day[d]):
outs = by_day[d][recv]
tot = sum(outs.values())
print(f" {recv:<10} total={tot:<6} " +
" ".join(f"{k}={v}" for k, v in sorted(outs.items())))
print(f"\nDRY-RUN: parsed {len(agg)} aggregate buckets, no DB writes.")
return 0
url = os.getenv("DATABASE_URL", "")
if not url:
print("ERROR: DATABASE_URL not set", file=sys.stderr)
return 2
conn = psycopg2.connect(url)
try:
n = upsert(conn, agg)
print(f"Upserted {n} mail_reputation_daily rows from {len(logs)} log file(s).")
if args.alert:
text, problems = summarize(conn)
print("\n" + text)
if problems:
msg = ("⚠️ Mail reputation alert\n\n" + text +
"\n\nIssues:\n" + "\n".join(f"- {p}" for p in problems))
send_telegram(msg)
else:
# Silent-on-healthy by default; uncomment to get a daily green ping.
pass
finally:
conn.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())