Adds scripts/mail_reputation_monitor.py + migration 101 (mail_reputation_daily).
Sender reputation is judged by the RECEIVING operator (Microsoft/Google/Yahoo/
Proofpoint), and the provider portals (SNDS/Postmaster/CFL) need a login and lag
24-48h. Our postfix logs already carry the ground truth in real time: every send
records the receiving host + SMTP response, and the response classifies WHY:
250 -> accepted
451 4.7.500 -> throttled (Microsoft rate-limiting a cold IP)
550 5.7.x -> reject_reputation (spam/reputation)
550 5.1.1/5.4.1-> reject_recipient (dead mailbox / access denied = list hygiene)
550 ...SPAM -> reject_content (SpamAssassin)
The parser classifies each egress delivery (out0x/hcout/relay) by (sending_ip,
receiver, outcome, reason_code) and upserts ONE daily aggregate row per bucket
(idempotent ON CONFLICT), so a nightly cron over the rotated log gives a queryable
trend without re-parse double-counting. --alert prints a per-operator summary and
Telegram-alerts on regressions (>=10% reputation rejects, or Microsoft >=70%
throttled). Reads stdin ("-") so the host-owned /var/log/mail.log can be piped
into the DB-connected workers container.
Motivation: 2026-06-19 audit found ~80% of Microsoft sends were getting 451 4.7.500
throttles on the warming IPs -- this makes that trend visible as reputation recovers.
370 lines
16 KiB
Python
370 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""Mail reputation monitor -- SNDS-equivalent trend from our own postfix logs.
|
|
|
|
WHY
|
|
---
|
|
Sender reputation is judged by the RECEIVING operator (Microsoft 365, Google,
|
|
Yahoo, Proofpoint, ...), not by recipient domain. The provider portals (Microsoft
|
|
SNDS, Google Postmaster, Yahoo CFL) show this, but each needs a login and lags
|
|
24-48h. Our postfix logs already contain the ground truth in real time: every
|
|
delivery attempt records the receiving host + the SMTP response, and the response
|
|
text tells us WHY a message was throttled/rejected:
|
|
|
|
* 250 ... -> accepted
|
|
* 451 4.7.500 Server busy -> throttled (Microsoft rate-limiting a cold IP)
|
|
* 550 5.7.1 / 5.7.x reputation -> rejected for sender reputation/spam
|
|
* 550 5.4.1 / 5.1.1 -> recipient unknown / access denied (list hygiene)
|
|
* 550 ... classified as SPAM -> content rejection (SpamAssassin etc.)
|
|
|
|
A 2026-06-19 audit found ~80% of Microsoft sends were getting 451 4.7.500 throttles
|
|
on the warming IPs. This script turns that into a daily, queryable trend so we can
|
|
SEE reputation recover (throttle% falling, accept% rising) without any provider login.
|
|
|
|
WHAT IT DOES
|
|
------------
|
|
Parses one or more postfix logs, keeps only OUR bulk/campaign egress (the
|
|
out0x/hcout/relay smtp clients), classifies every status=sent/deferred/bounced
|
|
line by (sending_ip, receiving operator, outcome class, reason code), and upserts
|
|
daily aggregate rows into `mail_reputation_daily`. Idempotent: re-running for the
|
|
same day overwrites that day's counts (ON CONFLICT upsert), so a cron can run it
|
|
nightly against the rotated log without double-counting.
|
|
|
|
Optionally sends a Telegram summary / alert (see --alert).
|
|
|
|
USAGE
|
|
-----
|
|
# parse the current + rotated logs, upsert daily snapshots
|
|
sudo python3 -m scripts.mail_reputation_monitor /var/log/mail.log /var/log/mail.log.1
|
|
|
|
# also print a human summary of the most recent day and alert on regressions
|
|
sudo python3 -m scripts.mail_reputation_monitor --alert /var/log/mail.log
|
|
|
|
# dry-run (parse + print, no DB writes)
|
|
sudo python3 -m scripts.mail_reputation_monitor --dry-run /var/log/mail.log
|
|
|
|
Connection: DATABASE_URL (the app DB). Telegram: TELEGRAM_BOT_TOKEN/TELEGRAM_CHAT_ID.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import gzip
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.request
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
import psycopg2
|
|
|
|
# ── Which postfix smtp clients are OUR outbound campaign/transactional egress ──
|
|
# Postfix logs these as postfix/<transport>/smtp[pid] where <transport> is the
|
|
# master.cf service name: out05-09 = trucking/main pool; hcout0-9 = healthcare;
|
|
# rehab0x = (disabled) rehab pool; plain "smtp" = transactional via co.carrierone.
|
|
EGRESS_CLIENT_RE = re.compile(r"postfix/(out0[0-9]/smtp|hcout[0-9]/smtp|rehab0[0-9]/smtp|smtp)\[")
|
|
|
|
# Pull the structured fields out of a postfix smtp delivery line.
|
|
DATE_RE = re.compile(r"^([A-Z][a-z]{2})\s+(\d{1,2})\s")
|
|
STATUS_RE = re.compile(r"status=(sent|deferred|bounced)")
|
|
RELAY_RE = re.compile(r"relay=([^,\s]+)")
|
|
TO_RE = re.compile(r"to=<([^>]*)>")
|
|
DSN_RE = re.compile(r"dsn=(\d\.\d+\.\d+)")
|
|
# Microsoft puts "from [207.174.124.94]" in throttle messages; otherwise we tag
|
|
# the IP from the queue-id<->IP mapping is unavailable, so we read it from the
|
|
# "from [ip]" hint when present, else leave as the stream's default below.
|
|
FROM_IP_RE = re.compile(r"from \[(\d{1,3}(?:\.\d{1,3}){3})\]")
|
|
# Enhanced status code inside the remote reply, e.g. "550 5.7.1" or "451 4.7.500".
|
|
REPLY_CODE_RE = re.compile(r"\b([45]\d\d)[ -](\d\.\d+\.\d+)\b")
|
|
|
|
MONTHS = {m: i for i, m in enumerate(
|
|
["Jan", "Feb", "Mar", "Apr", "May", "Jun",
|
|
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec"], start=1)}
|
|
|
|
# Map a sending postfix client to its default egress IP (when the line itself
|
|
# doesn't echo "from [ip]"). out05-09 -> .94 main pool; hcout* -> .107 HC stream.
|
|
def default_ip_for_client(line: str) -> str:
|
|
if re.search(r"postfix/out0[0-9]/smtp\[", line):
|
|
return "207.174.124.94"
|
|
if re.search(r"postfix/hcout[0-9]/smtp\[", line):
|
|
return "207.174.124.107"
|
|
if re.search(r"postfix/rehab0[0-9]/smtp\[", line):
|
|
return "207.174.124.9x" # rehab pool (disabled), kept for historical logs
|
|
return "207.174.124.71" # transactional default egress
|
|
|
|
|
|
def classify_receiver(relay: str, to_addr: str) -> str:
|
|
"""Normalize the receiving operator from the relay hostname / recipient domain."""
|
|
r = (relay or "").lower()
|
|
if "protection.outlook.com" in r or "outlook.com" in r or "hotmail" in r:
|
|
return "microsoft"
|
|
if "google.com" in r or "aspmx.l.google" in r or "googlemail" in r or "psmtp" in r:
|
|
return "google"
|
|
if "yahoodns.net" in r or "yahoo.com" in r or "aol.com" in r:
|
|
return "yahoo"
|
|
if "pphosted.com" in r or "proofpoint" in r:
|
|
return "proofpoint"
|
|
if "mimecast" in r:
|
|
return "mimecast"
|
|
# Fall back to the recipient domain for the consumer mailboxes.
|
|
d = to_addr.rsplit("@", 1)[-1].lower() if "@" in to_addr else ""
|
|
if d in ("outlook.com", "hotmail.com", "live.com", "msn.com"):
|
|
return "microsoft"
|
|
if d in ("gmail.com", "googlemail.com"):
|
|
return "google"
|
|
if d in ("yahoo.com", "aol.com", "ymail.com"):
|
|
return "yahoo"
|
|
return "other"
|
|
|
|
|
|
def classify_outcome(status: str, dsn: str, text: str) -> str:
|
|
"""Bucket a delivery into an outcome class from status + dsn + reply text."""
|
|
t = (text or "").lower()
|
|
if status == "sent":
|
|
return "accepted"
|
|
if status == "deferred":
|
|
# 4.7.500 server busy / throttling is the dominant Microsoft cold-IP signal.
|
|
if dsn == "4.7.500" or "server busy" in t or "4.7.500" in t:
|
|
return "throttled"
|
|
if dsn.startswith("4.7") or "rate" in t or "too many" in t or "throttl" in t:
|
|
return "throttled"
|
|
return "deferred_other"
|
|
# bounced (5xx)
|
|
if dsn in ("5.7.1",) or "5.7.1" in t or "reputation" in t or "spam" in t and "classified" not in t:
|
|
return "reject_reputation"
|
|
if "classified as spam" in t or "content" in t:
|
|
return "reject_content"
|
|
if dsn in ("5.1.1", "5.4.1", "5.1.0") or "user unknown" in t or "recipient address rejected" in t \
|
|
or "does not exist" in t or "no mailbox" in t or "access denied" in t:
|
|
return "reject_recipient"
|
|
if dsn.startswith("5.7"):
|
|
return "reject_reputation"
|
|
return "reject_other"
|
|
|
|
|
|
def iter_lines(paths: list[str]):
|
|
"""Yield lines from each path, transparently decompressing .gz.
|
|
|
|
A path of "-" reads from stdin (so a cron can `sudo cat /var/log/mail.log |
|
|
docker compose exec -T workers python3 -m scripts.mail_reputation_monitor -`,
|
|
which is how we feed the host-owned log into the DB-connected container).
|
|
"""
|
|
for p in paths:
|
|
try:
|
|
if p == "-":
|
|
yield from sys.stdin
|
|
elif p.endswith(".gz"):
|
|
with gzip.open(p, "rt", errors="replace") as fh:
|
|
yield from fh
|
|
else:
|
|
with open(p, "rt", errors="replace") as fh:
|
|
yield from fh
|
|
except FileNotFoundError:
|
|
continue
|
|
except PermissionError:
|
|
print(f"WARN: cannot read {p} (need sudo?)", file=sys.stderr)
|
|
continue
|
|
|
|
|
|
def parse(paths: list[str], year: int) -> dict:
|
|
"""Return {(date, ip, receiver, outcome, code): [count, sample_text]}."""
|
|
agg: dict = defaultdict(lambda: [0, None])
|
|
for line in iter_lines(paths):
|
|
if not EGRESS_CLIENT_RE.search(line):
|
|
continue
|
|
sm = STATUS_RE.search(line)
|
|
if not sm:
|
|
continue
|
|
status = sm.group(1)
|
|
dm = DATE_RE.match(line)
|
|
if not dm:
|
|
continue
|
|
mon, day = MONTHS.get(dm.group(1)), int(dm.group(2))
|
|
if not mon:
|
|
continue
|
|
log_date = f"{year:04d}-{mon:02d}-{day:02d}"
|
|
|
|
m = RELAY_RE.search(line)
|
|
relay = m.group(1) if m else ""
|
|
# Skip internal relays (localhost content filters, our own relay handoff).
|
|
if relay.startswith("127.0.0.1") or relay.startswith("co.carrierone.com"):
|
|
continue
|
|
to_m = TO_RE.search(line)
|
|
to_addr = to_m.group(1) if to_m else ""
|
|
# Skip internal/role recipients (system bounces, our own monitoring).
|
|
rd = to_addr.rsplit("@", 1)[-1].lower() if "@" in to_addr else ""
|
|
if rd in ("performancewest.net", "send.performancewest.net", "carrierone.com",
|
|
"perfwest.performancewest.net", "example.com", "example.org"):
|
|
continue
|
|
|
|
dm2 = DSN_RE.search(line)
|
|
dsn = dm2.group(1) if dm2 else ""
|
|
ip_m = FROM_IP_RE.search(line)
|
|
ip = ip_m.group(1) if ip_m else default_ip_for_client(line)
|
|
|
|
receiver = classify_receiver(relay, to_addr)
|
|
# Representative reply text = everything after "said:" or the status paren.
|
|
text = line.split("said:", 1)[-1].strip() if "said:" in line else line
|
|
outcome = classify_outcome(status, dsn, text)
|
|
|
|
# Reason code: prefer dsn; else the code embedded in the reply text.
|
|
code = dsn
|
|
if not code:
|
|
cm = REPLY_CODE_RE.search(text)
|
|
code = cm.group(2) if cm else ""
|
|
|
|
key = (log_date, ip, receiver, outcome, code)
|
|
agg[key][0] += 1
|
|
if agg[key][1] is None:
|
|
agg[key][1] = text[:300].strip()
|
|
return agg
|
|
|
|
|
|
def upsert(conn, agg: dict) -> int:
|
|
n = 0
|
|
with conn.cursor() as cur:
|
|
for (log_date, ip, receiver, outcome, code), (count, sample) in agg.items():
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO mail_reputation_daily
|
|
(log_date, sending_ip, receiver, outcome, reason_code, msg_count, sample_text)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (log_date, sending_ip, receiver, outcome, reason_code)
|
|
DO UPDATE SET msg_count = EXCLUDED.msg_count,
|
|
sample_text = EXCLUDED.sample_text,
|
|
updated_at = now()
|
|
""",
|
|
(log_date, ip, receiver, outcome, code or "", count, sample),
|
|
)
|
|
n += 1
|
|
conn.commit()
|
|
return n
|
|
|
|
|
|
def summarize(conn, log_date: str | None = None) -> tuple[str, list[str]]:
|
|
"""Build a human summary of a day's reputation. Returns (text, problems)."""
|
|
with conn.cursor() as cur:
|
|
if log_date is None:
|
|
cur.execute("SELECT max(log_date) FROM mail_reputation_daily")
|
|
row = cur.fetchone()
|
|
log_date = str(row[0]) if row and row[0] else None
|
|
if not log_date:
|
|
return "No mail_reputation_daily data yet.", []
|
|
cur.execute(
|
|
"""
|
|
SELECT receiver,
|
|
sum(msg_count) FILTER (WHERE outcome='accepted') AS accepted,
|
|
sum(msg_count) FILTER (WHERE outcome='throttled') AS throttled,
|
|
sum(msg_count) FILTER (WHERE outcome='reject_reputation') AS rej_rep,
|
|
sum(msg_count) FILTER (WHERE outcome='reject_recipient') AS rej_rcpt,
|
|
sum(msg_count) FILTER (WHERE outcome='reject_content') AS rej_content,
|
|
sum(msg_count) AS total
|
|
FROM mail_reputation_daily
|
|
WHERE log_date = %s
|
|
GROUP BY receiver
|
|
ORDER BY total DESC
|
|
""",
|
|
(log_date,),
|
|
)
|
|
rows = cur.fetchall()
|
|
|
|
lines = [f"Mail reputation {log_date} (by receiving operator):"]
|
|
problems: list[str] = []
|
|
for receiver, acc, thr, rrep, rrcpt, rcont, total in rows:
|
|
acc, thr, rrep, rrcpt, rcont, total = (
|
|
int(acc or 0), int(thr or 0), int(rrep or 0),
|
|
int(rrcpt or 0), int(rcont or 0), int(total or 0))
|
|
if total == 0:
|
|
continue
|
|
acc_pct = round(100 * acc / total)
|
|
thr_pct = round(100 * thr / total)
|
|
rrep_pct = round(100 * rrep / total)
|
|
lines.append(
|
|
f" {receiver:<10} total={total:<6} accepted={acc_pct}% "
|
|
f"throttled={thr_pct}% reputation-reject={rrep_pct}% "
|
|
f"(rcpt={rrcpt} content={rcont})"
|
|
)
|
|
# Reputation alerts (only on meaningful volume).
|
|
if total >= 100:
|
|
if rrep_pct >= 10:
|
|
problems.append(f"{receiver}: {rrep_pct}% reputation rejects ({rrep}/{total})")
|
|
if receiver == "microsoft" and thr_pct >= 70:
|
|
problems.append(f"microsoft: {thr_pct}% throttled (451 4.7.500) -- still cold")
|
|
return "\n".join(lines), problems
|
|
|
|
|
|
def send_telegram(text: str) -> bool:
|
|
bot = os.getenv("TELEGRAM_BOT_TOKEN", "")
|
|
chat = os.getenv("TELEGRAM_CHAT_ID", "")
|
|
if not bot or not chat:
|
|
return False
|
|
try:
|
|
data = json.dumps({"chat_id": chat, "text": text}).encode()
|
|
req = urllib.request.Request(
|
|
f"https://api.telegram.org/bot{bot}/sendMessage",
|
|
data=data, headers={"Content-Type": "application/json"})
|
|
urllib.request.urlopen(req, timeout=10)
|
|
return True
|
|
except Exception as exc: # never fail the cron on a notify error
|
|
print(f"WARN: telegram notify failed: {exc}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument("logs", nargs="*",
|
|
default=["/var/log/mail.log"],
|
|
help="postfix log paths (.gz ok). Default: /var/log/mail.log")
|
|
ap.add_argument("--dry-run", action="store_true", help="parse + print, no DB writes")
|
|
ap.add_argument("--alert", action="store_true",
|
|
help="print summary + send Telegram (summary always; alert if problems)")
|
|
ap.add_argument("--year", type=int, default=datetime.now().year,
|
|
help="year to stamp on syslog dates (default: current year)")
|
|
args = ap.parse_args()
|
|
|
|
logs = args.logs or ["/var/log/mail.log"]
|
|
agg = parse(logs, args.year)
|
|
if not agg:
|
|
print("No egress delivery lines found in the given logs.")
|
|
return 0
|
|
|
|
if args.dry_run:
|
|
# Print the parsed aggregate without touching the DB.
|
|
by_day: dict = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
|
|
for (d, ip, recv, outcome, code), (cnt, _s) in agg.items():
|
|
by_day[d][recv][outcome] += cnt
|
|
for d in sorted(by_day):
|
|
print(f"\n{d}:")
|
|
for recv in sorted(by_day[d]):
|
|
outs = by_day[d][recv]
|
|
tot = sum(outs.values())
|
|
print(f" {recv:<10} total={tot:<6} " +
|
|
" ".join(f"{k}={v}" for k, v in sorted(outs.items())))
|
|
print(f"\nDRY-RUN: parsed {len(agg)} aggregate buckets, no DB writes.")
|
|
return 0
|
|
|
|
url = os.getenv("DATABASE_URL", "")
|
|
if not url:
|
|
print("ERROR: DATABASE_URL not set", file=sys.stderr)
|
|
return 2
|
|
conn = psycopg2.connect(url)
|
|
try:
|
|
n = upsert(conn, agg)
|
|
print(f"Upserted {n} mail_reputation_daily rows from {len(logs)} log file(s).")
|
|
if args.alert:
|
|
text, problems = summarize(conn)
|
|
print("\n" + text)
|
|
if problems:
|
|
msg = ("⚠️ Mail reputation alert\n\n" + text +
|
|
"\n\nIssues:\n" + "\n".join(f"- {p}" for p in problems))
|
|
send_telegram(msg)
|
|
else:
|
|
# Silent-on-healthy by default; uncomment to get a daily green ping.
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|