verify: tag each address with its MX provider for per-operator warmup throttling

Reputation is tracked per receiving mail operator (Microsoft 365, Google
Workspace, Proofpoint, etc.), not per recipient domain -- so warmup can safely
send far more total volume if it's spread across many MX operators and throttled
per-operator. The verifier now classifies each domain's (already-cached) MX into
a provider label and writes an mx_provider column, so the warmup importer can
cap sends per operator per day. NPPES institutional sample distribution:
Microsoft 33%, Google 11%, Proofpoint ~16%, long tail across dozens of others.
This commit is contained in:
justin 2026-06-12 20:06:44 -05:00
parent 51a287271f
commit 921cd1ce3c

View file

@ -39,6 +39,31 @@ def is_pass(reason: str) -> bool:
return reason in PASS_REASONS or reason.startswith(("smtp_unknown_", "error_")) return reason in PASS_REASONS or reason.startswith(("smtp_unknown_", "error_"))
def mx_provider(domain: str) -> str:
"""Classify a domain's MX into the receiving-infrastructure operator, so the
warmup can throttle per MX operator (reputation is tracked per receiving
system, not per recipient domain). Reuses the verifier's MX cache -- no
extra DNS. Returns a stable provider label."""
hosts = _mx_cache.get(domain)
if not hosts:
return "no_mx"
h = " ".join(hosts).lower()
if "protection.outlook" in h or "outlook.com" in h or "office365" in h: return "microsoft"
if "aspmx.l.google" in h or "googlemail" in h or "google.com" in h: return "google"
if "pphosted.com" in h or "ppe-hosted.com" in h or "proofpoint" in h: return "proofpoint"
if "mimecast" in h: return "mimecast"
if "iphmx.com" in h or "cisco" in h: return "cisco"
if "barracuda" in h: return "barracuda"
if "messagelabs" in h or "symantec" in h or "broadcom" in h: return "broadcom"
if "secureserver.net" in h: return "godaddy"
if "zoho" in h: return "zoho"
if "emailsrvr.com" in h or "rackspace" in h: return "rackspace"
if "hostedemail.com" in h or "oxcs.net" in h or "ox.com" in h: return "openxchange"
# collapse to the registrable MX root for the long tail.
root = hosts[0].rstrip(".").split(".")
return "mx:" + (".".join(root[-2:]) if len(root) >= 2 else hosts[0])
def get_mx_hosts(domain: str): def get_mx_hosts(domain: str):
if domain in _mx_cache: if domain in _mx_cache:
return _mx_cache[domain] return _mx_cache[domain]
@ -131,7 +156,7 @@ def main():
if done % 100 == 0: if done % 100 == 0:
print(f" {done}/{len(emails)} ({done/max(time.time()-t0,1e-6):.1f}/s)") print(f" {done}/{len(emails)} ({done/max(time.time()-t0,1e-6):.1f}/s)")
fieldnames = list(rows[0].keys()) + ["verify_ok", "verify_reason"] fieldnames = list(rows[0].keys()) + ["verify_ok", "verify_reason", "mx_provider"]
vpath, rpath = f"{args.out}_verified.csv", f"{args.out}_rejected.csv" vpath, rpath = f"{args.out}_verified.csv", f"{args.out}_rejected.csv"
vcnt = rcnt = 0; reasons = Counter() vcnt = rcnt = 0; reasons = Counter()
with open(vpath, "w", newline="") as vf, open(rpath, "w", newline="") as rf: with open(vpath, "w", newline="") as vf, open(rpath, "w", newline="") as rf:
@ -141,7 +166,9 @@ def main():
e = r.get("email", "").strip().lower() e = r.get("email", "").strip().lower()
ok, reason = results.get(e, (True, "missing")) ok, reason = results.get(e, (True, "missing"))
keep = is_pass(reason); reasons[reason] += 1 keep = is_pass(reason); reasons[reason] += 1
r = {**r, "verify_ok": "Y" if keep else "N", "verify_reason": reason} dom = e.rsplit("@", 1)[-1] if "@" in e else ""
r = {**r, "verify_ok": "Y" if keep else "N", "verify_reason": reason,
"mx_provider": mx_provider(dom)}
(vw if keep else rw).writerow(r) (vw if keep else rw).writerow(r)
vcnt += keep; rcnt += (not keep) vcnt += keep; rcnt += (not keep)