#!/usr/bin/env python3 """Verify the emails in a CSV (MX + SMTP RCPT) on the NON-sending IP (.72). Self-contained (no DB deps): the verify logic mirrors scripts/workers/email_verifier.verify_email (catch-all detection, MX cache, source-IP bind to .72) but works on a CSV instead of a DB table. Reads a CSV with an `email` column, writes: _verified.csv rows that passed (valid / catch-all / mx_unreachable / temp) _rejected.csv rows that failed hard (no_mx / smtp_rejected / bad_syntax) NEVER binds to a sending IP. Probes go out 207.174.124.72. Usage: python3 scripts/verify_csv_emails.py --in data/hc_warmup_week1.csv --out data/hc_warmup_week1 [--workers 15] """ from __future__ import annotations import argparse, csv, os, random, re, smtplib, socket, string, sys, time from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed import dns.resolver OUR_DOMAIN = "performancewest.net" OUR_EMAIL = f"verify@{OUR_DOMAIN}" VERIFY_SOURCE_IP = os.environ.get("VERIFY_SOURCE_IP", "207.174.124.72") EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$") CATCH_ALL_DOMAINS = {"gmail.com", "googlemail.com", "outlook.com", "hotmail.com", "yahoo.com", "aol.com", "icloud.com"} _mx_cache: dict[str, list[str] | None] = {} _catchall_cache: dict[str, bool] = {} PASS_REASONS = {"smtp_valid", "catch_all_domain", "catch_all_detected", "mx_unreachable", "smtp_temp_error"} def is_pass(reason: str) -> bool: return reason in PASS_REASONS or reason.startswith(("smtp_unknown_", "error_")) def mx_provider(domain: str) -> str: """Classify a domain's MX into the receiving-infrastructure operator, so the warmup can throttle per MX operator (reputation is tracked per receiving system, not per recipient domain). Reuses the verifier's MX cache -- no extra DNS. Returns a stable provider label.""" hosts = _mx_cache.get(domain) if not hosts: return "no_mx" h = " ".join(hosts).lower() if "protection.outlook" in h or "outlook.com" in h or "office365" in h: return "microsoft" if "aspmx.l.google" in h or "googlemail" in h or "google.com" in h: return "google" if "pphosted.com" in h or "ppe-hosted.com" in h or "proofpoint" in h: return "proofpoint" if "mimecast" in h: return "mimecast" if "iphmx.com" in h or "cisco" in h: return "cisco" if "barracuda" in h: return "barracuda" if "messagelabs" in h or "symantec" in h or "broadcom" in h: return "broadcom" if "secureserver.net" in h: return "godaddy" if "zoho" in h: return "zoho" if "emailsrvr.com" in h or "rackspace" in h: return "rackspace" if "hostedemail.com" in h or "oxcs.net" in h or "ox.com" in h: return "openxchange" # collapse to the registrable MX root for the long tail. root = hosts[0].rstrip(".").split(".") return "mx:" + (".".join(root[-2:]) if len(root) >= 2 else hosts[0]) def get_mx_hosts(domain: str): if domain in _mx_cache: return _mx_cache[domain] try: answers = dns.resolver.resolve(domain, "MX", lifetime=8) hosts = [str(r.exchange).rstrip(".") for r in sorted(answers, key=lambda x: x.preference)] _mx_cache[domain] = hosts or None except Exception: # fall back to A record (some domains accept mail on the apex) try: dns.resolver.resolve(domain, "A", lifetime=6) _mx_cache[domain] = [domain] except Exception: _mx_cache[domain] = None return _mx_cache[domain] def verify_email(email: str): email = email.strip().lower() if not EMAIL_RE.match(email): return False, "invalid_syntax" domain = email.split("@")[1] mx_hosts = get_mx_hosts(domain) if not mx_hosts: return False, "no_mx_records" if domain in CATCH_ALL_DOMAINS: return True, "catch_all_domain" for mx_host in mx_hosts[:2]: try: with smtplib.SMTP(timeout=12, source_address=(VERIFY_SOURCE_IP, 0)) as smtp: smtp.connect(mx_host, 25) smtp.helo(OUR_DOMAIN) code, _ = smtp.mail(OUR_EMAIL) if code != 250: continue if domain not in _catchall_cache: ru = "".join(random.choices(string.ascii_lowercase, k=20)) pc, _ = smtp.rcpt(f"{ru}@{domain}") _catchall_cache[domain] = (pc == 250) smtp.rset(); smtp.mail(OUR_EMAIL) if _catchall_cache.get(domain): smtp.quit(); return True, "catch_all_detected" code, _ = smtp.rcpt(email) smtp.quit() if code == 250: return True, "smtp_valid" if code in (550, 551, 553): return False, f"smtp_rejected_{code}" if code in (452, 421): return True, "smtp_temp_error" return True, f"smtp_unknown_{code}" except (smtplib.SMTPServerDisconnected, smtplib.SMTPConnectError, socket.timeout, ConnectionRefusedError, OSError): continue except Exception: continue return True, "mx_unreachable" def main(): ap = argparse.ArgumentParser() ap.add_argument("--in", dest="inp", required=True) ap.add_argument("--out", dest="out", required=True) ap.add_argument("--workers", type=int, default=12) ap.add_argument("--limit", type=int, default=None) args = ap.parse_args() rows = list(csv.DictReader(open(args.inp))) if args.limit: rows = rows[: args.limit] if not rows or "email" not in rows[0]: print("ERROR: CSV needs an 'email' column", file=sys.stderr); sys.exit(1) emails = sorted({r["email"].strip().lower() for r in rows if r.get("email")}) print(f"rows={len(rows)} unique_emails={len(emails)} workers={args.workers}") print(f"verifying via NON-sending IP {VERIFY_SOURCE_IP} (MX + SMTP RCPT)...") results: dict[str, tuple[bool, str]] = {} t0 = time.time(); done = 0 with ThreadPoolExecutor(max_workers=args.workers) as ex: futs = {ex.submit(verify_email, e): e for e in emails} for fut in as_completed(futs): e = futs[fut] try: results[e] = fut.result() except Exception as exc: results[e] = (True, f"error_{type(exc).__name__}") done += 1 if done % 100 == 0: print(f" {done}/{len(emails)} ({done/max(time.time()-t0,1e-6):.1f}/s)") fieldnames = list(rows[0].keys()) + ["verify_ok", "verify_reason", "mx_provider"] vpath, rpath = f"{args.out}_verified.csv", f"{args.out}_rejected.csv" vcnt = rcnt = 0; reasons = Counter() with open(vpath, "w", newline="") as vf, open(rpath, "w", newline="") as rf: vw = csv.DictWriter(vf, fieldnames=fieldnames); vw.writeheader() rw = csv.DictWriter(rf, fieldnames=fieldnames); rw.writeheader() for r in rows: e = r.get("email", "").strip().lower() ok, reason = results.get(e, (True, "missing")) keep = is_pass(reason); reasons[reason] += 1 dom = e.rsplit("@", 1)[-1] if "@" in e else "" r = {**r, "verify_ok": "Y" if keep else "N", "verify_reason": reason, "mx_provider": mx_provider(dom)} (vw if keep else rw).writerow(r) vcnt += keep; rcnt += (not keep) print(f"\n=== done in {time.time()-t0:.0f}s ===") print(f" kept (sendable): {vcnt:>5} -> {vpath}") print(f" dropped (hard): {rcnt:>5} -> {rpath}") print(" reasons:") for reason, c in reasons.most_common(): print(f" {reason:24} {c}") if __name__ == "__main__": main()