Reputation is tracked per receiving mail operator (Microsoft 365, Google Workspace, Proofpoint, etc.), not per recipient domain -- so warmup can safely send far more total volume if it's spread across many MX operators and throttled per-operator. The verifier now classifies each domain's (already-cached) MX into a provider label and writes an mx_provider column, so the warmup importer can cap sends per operator per day. NPPES institutional sample distribution: Microsoft 33%, Google 11%, Proofpoint ~16%, long tail across dozens of others.
184 lines
7.6 KiB
Python
184 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Verify the emails in a CSV (MX + SMTP RCPT) on the NON-sending IP (.72).
|
|
|
|
Self-contained (no DB deps): the verify logic mirrors
|
|
scripts/workers/email_verifier.verify_email (catch-all detection, MX cache,
|
|
source-IP bind to .72) but works on a CSV instead of a DB table.
|
|
|
|
Reads a CSV with an `email` column, writes:
|
|
<out>_verified.csv rows that passed (valid / catch-all / mx_unreachable / temp)
|
|
<out>_rejected.csv rows that failed hard (no_mx / smtp_rejected / bad_syntax)
|
|
|
|
NEVER binds to a sending IP. Probes go out 207.174.124.72.
|
|
|
|
Usage:
|
|
python3 scripts/verify_csv_emails.py --in data/hc_warmup_week1.csv --out data/hc_warmup_week1 [--workers 15]
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse, csv, os, random, re, smtplib, socket, string, sys, time
|
|
from collections import Counter
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import dns.resolver
|
|
|
|
OUR_DOMAIN = "performancewest.net"
|
|
OUR_EMAIL = f"verify@{OUR_DOMAIN}"
|
|
VERIFY_SOURCE_IP = os.environ.get("VERIFY_SOURCE_IP", "207.174.124.72")
|
|
EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$")
|
|
|
|
CATCH_ALL_DOMAINS = {"gmail.com", "googlemail.com", "outlook.com", "hotmail.com",
|
|
"yahoo.com", "aol.com", "icloud.com"}
|
|
|
|
_mx_cache: dict[str, list[str] | None] = {}
|
|
_catchall_cache: dict[str, bool] = {}
|
|
|
|
PASS_REASONS = {"smtp_valid", "catch_all_domain", "catch_all_detected",
|
|
"mx_unreachable", "smtp_temp_error"}
|
|
|
|
|
|
def is_pass(reason: str) -> bool:
|
|
return reason in PASS_REASONS or reason.startswith(("smtp_unknown_", "error_"))
|
|
|
|
|
|
def mx_provider(domain: str) -> str:
|
|
"""Classify a domain's MX into the receiving-infrastructure operator, so the
|
|
warmup can throttle per MX operator (reputation is tracked per receiving
|
|
system, not per recipient domain). Reuses the verifier's MX cache -- no
|
|
extra DNS. Returns a stable provider label."""
|
|
hosts = _mx_cache.get(domain)
|
|
if not hosts:
|
|
return "no_mx"
|
|
h = " ".join(hosts).lower()
|
|
if "protection.outlook" in h or "outlook.com" in h or "office365" in h: return "microsoft"
|
|
if "aspmx.l.google" in h or "googlemail" in h or "google.com" in h: return "google"
|
|
if "pphosted.com" in h or "ppe-hosted.com" in h or "proofpoint" in h: return "proofpoint"
|
|
if "mimecast" in h: return "mimecast"
|
|
if "iphmx.com" in h or "cisco" in h: return "cisco"
|
|
if "barracuda" in h: return "barracuda"
|
|
if "messagelabs" in h or "symantec" in h or "broadcom" in h: return "broadcom"
|
|
if "secureserver.net" in h: return "godaddy"
|
|
if "zoho" in h: return "zoho"
|
|
if "emailsrvr.com" in h or "rackspace" in h: return "rackspace"
|
|
if "hostedemail.com" in h or "oxcs.net" in h or "ox.com" in h: return "openxchange"
|
|
# collapse to the registrable MX root for the long tail.
|
|
root = hosts[0].rstrip(".").split(".")
|
|
return "mx:" + (".".join(root[-2:]) if len(root) >= 2 else hosts[0])
|
|
|
|
|
|
def get_mx_hosts(domain: str):
|
|
if domain in _mx_cache:
|
|
return _mx_cache[domain]
|
|
try:
|
|
answers = dns.resolver.resolve(domain, "MX", lifetime=8)
|
|
hosts = [str(r.exchange).rstrip(".") for r in sorted(answers, key=lambda x: x.preference)]
|
|
_mx_cache[domain] = hosts or None
|
|
except Exception:
|
|
# fall back to A record (some domains accept mail on the apex)
|
|
try:
|
|
dns.resolver.resolve(domain, "A", lifetime=6)
|
|
_mx_cache[domain] = [domain]
|
|
except Exception:
|
|
_mx_cache[domain] = None
|
|
return _mx_cache[domain]
|
|
|
|
|
|
def verify_email(email: str):
|
|
email = email.strip().lower()
|
|
if not EMAIL_RE.match(email):
|
|
return False, "invalid_syntax"
|
|
domain = email.split("@")[1]
|
|
mx_hosts = get_mx_hosts(domain)
|
|
if not mx_hosts:
|
|
return False, "no_mx_records"
|
|
if domain in CATCH_ALL_DOMAINS:
|
|
return True, "catch_all_domain"
|
|
|
|
for mx_host in mx_hosts[:2]:
|
|
try:
|
|
with smtplib.SMTP(timeout=12, source_address=(VERIFY_SOURCE_IP, 0)) as smtp:
|
|
smtp.connect(mx_host, 25)
|
|
smtp.helo(OUR_DOMAIN)
|
|
code, _ = smtp.mail(OUR_EMAIL)
|
|
if code != 250:
|
|
continue
|
|
if domain not in _catchall_cache:
|
|
ru = "".join(random.choices(string.ascii_lowercase, k=20))
|
|
pc, _ = smtp.rcpt(f"{ru}@{domain}")
|
|
_catchall_cache[domain] = (pc == 250)
|
|
smtp.rset(); smtp.mail(OUR_EMAIL)
|
|
if _catchall_cache.get(domain):
|
|
smtp.quit(); return True, "catch_all_detected"
|
|
code, _ = smtp.rcpt(email)
|
|
smtp.quit()
|
|
if code == 250:
|
|
return True, "smtp_valid"
|
|
if code in (550, 551, 553):
|
|
return False, f"smtp_rejected_{code}"
|
|
if code in (452, 421):
|
|
return True, "smtp_temp_error"
|
|
return True, f"smtp_unknown_{code}"
|
|
except (smtplib.SMTPServerDisconnected, smtplib.SMTPConnectError,
|
|
socket.timeout, ConnectionRefusedError, OSError):
|
|
continue
|
|
except Exception:
|
|
continue
|
|
return True, "mx_unreachable"
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--in", dest="inp", required=True)
|
|
ap.add_argument("--out", dest="out", required=True)
|
|
ap.add_argument("--workers", type=int, default=12)
|
|
ap.add_argument("--limit", type=int, default=None)
|
|
args = ap.parse_args()
|
|
|
|
rows = list(csv.DictReader(open(args.inp)))
|
|
if args.limit:
|
|
rows = rows[: args.limit]
|
|
if not rows or "email" not in rows[0]:
|
|
print("ERROR: CSV needs an 'email' column", file=sys.stderr); sys.exit(1)
|
|
|
|
emails = sorted({r["email"].strip().lower() for r in rows if r.get("email")})
|
|
print(f"rows={len(rows)} unique_emails={len(emails)} workers={args.workers}")
|
|
print(f"verifying via NON-sending IP {VERIFY_SOURCE_IP} (MX + SMTP RCPT)...")
|
|
|
|
results: dict[str, tuple[bool, str]] = {}
|
|
t0 = time.time(); done = 0
|
|
with ThreadPoolExecutor(max_workers=args.workers) as ex:
|
|
futs = {ex.submit(verify_email, e): e for e in emails}
|
|
for fut in as_completed(futs):
|
|
e = futs[fut]
|
|
try:
|
|
results[e] = fut.result()
|
|
except Exception as exc:
|
|
results[e] = (True, f"error_{type(exc).__name__}")
|
|
done += 1
|
|
if done % 100 == 0:
|
|
print(f" {done}/{len(emails)} ({done/max(time.time()-t0,1e-6):.1f}/s)")
|
|
|
|
fieldnames = list(rows[0].keys()) + ["verify_ok", "verify_reason", "mx_provider"]
|
|
vpath, rpath = f"{args.out}_verified.csv", f"{args.out}_rejected.csv"
|
|
vcnt = rcnt = 0; reasons = Counter()
|
|
with open(vpath, "w", newline="") as vf, open(rpath, "w", newline="") as rf:
|
|
vw = csv.DictWriter(vf, fieldnames=fieldnames); vw.writeheader()
|
|
rw = csv.DictWriter(rf, fieldnames=fieldnames); rw.writeheader()
|
|
for r in rows:
|
|
e = r.get("email", "").strip().lower()
|
|
ok, reason = results.get(e, (True, "missing"))
|
|
keep = is_pass(reason); reasons[reason] += 1
|
|
dom = e.rsplit("@", 1)[-1] if "@" in e else ""
|
|
r = {**r, "verify_ok": "Y" if keep else "N", "verify_reason": reason,
|
|
"mx_provider": mx_provider(dom)}
|
|
(vw if keep else rw).writerow(r)
|
|
vcnt += keep; rcnt += (not keep)
|
|
|
|
print(f"\n=== done in {time.time()-t0:.0f}s ===")
|
|
print(f" kept (sendable): {vcnt:>5} -> {vpath}")
|
|
print(f" dropped (hard): {rcnt:>5} -> {rpath}")
|
|
print(" reasons:")
|
|
for reason, c in reasons.most_common():
|
|
print(f" {reason:24} {c}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|