diff --git a/.gitignore b/.gitignore index 5c76a0c..1a63043 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ api/dist/ site/dist/ site/.astro/ mcp/dist/ +data/hc_warmup*.csv diff --git a/data/hc_campaigns/hc_compliance_bundle.html b/data/hc_campaigns/hc_compliance_bundle.html index 165a5dd..e164041 100644 --- a/data/hc_campaigns/hc_compliance_bundle.html +++ b/data/hc_campaigns/hc_compliance_bundle.html @@ -4,10 +4,10 @@ - diff --git a/data/hc_campaigns/hc_npi_reactivation.html b/data/hc_campaigns/hc_npi_reactivation.html index dea7588..2ab1c84 100644 --- a/data/hc_campaigns/hc_npi_reactivation.html +++ b/data/hc_campaigns/hc_npi_reactivation.html @@ -4,10 +4,10 @@ - diff --git a/data/hc_campaigns/hc_nppes_outdated.html b/data/hc_campaigns/hc_nppes_outdated.html index 058f431..57b2516 100644 --- a/data/hc_campaigns/hc_nppes_outdated.html +++ b/data/hc_campaigns/hc_nppes_outdated.html @@ -4,10 +4,10 @@ - diff --git a/data/hc_campaigns/hc_oig_screening.html b/data/hc_campaigns/hc_oig_screening.html index 698b705..36ad9b3 100644 --- a/data/hc_campaigns/hc_oig_screening.html +++ b/data/hc_campaigns/hc_oig_screening.html @@ -4,10 +4,10 @@ - diff --git a/data/hc_campaigns/hc_revalidation_overdue.html b/data/hc_campaigns/hc_revalidation_overdue.html index 1e253f2..b3e14c7 100644 --- a/data/hc_campaigns/hc_revalidation_overdue.html +++ b/data/hc_campaigns/hc_revalidation_overdue.html @@ -4,10 +4,10 @@ - diff --git a/scripts/build_healthcare_campaigns.py b/scripts/build_healthcare_campaigns.py index 6f69f25..9f556be 100644 --- a/scripts/build_healthcare_campaigns.py +++ b/scripts/build_healthcare_campaigns.py @@ -158,10 +158,10 @@ def render(seg_key: str, *, test: bool = False) -> tuple[str, str]: - diff --git a/scripts/verify_csv_emails.py b/scripts/verify_csv_emails.py new file mode 100644 index 0000000..43b6067 --- /dev/null +++ b/scripts/verify_csv_emails.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +"""Verify the emails in a CSV (MX + SMTP RCPT) on the NON-sending IP (.72). + +Self-contained (no DB deps): the verify logic mirrors +scripts/workers/email_verifier.verify_email (catch-all detection, MX cache, +source-IP bind to .72) but works on a CSV instead of a DB table. + +Reads a CSV with an `email` column, writes: + _verified.csv rows that passed (valid / catch-all / mx_unreachable / temp) + _rejected.csv rows that failed hard (no_mx / smtp_rejected / bad_syntax) + +NEVER binds to a sending IP. Probes go out 207.174.124.72. + +Usage: + python3 scripts/verify_csv_emails.py --in data/hc_warmup_week1.csv --out data/hc_warmup_week1 [--workers 15] +""" +from __future__ import annotations +import argparse, csv, os, random, re, smtplib, socket, string, sys, time +from collections import Counter +from concurrent.futures import ThreadPoolExecutor, as_completed +import dns.resolver + +OUR_DOMAIN = "performancewest.net" +OUR_EMAIL = f"verify@{OUR_DOMAIN}" +VERIFY_SOURCE_IP = os.environ.get("VERIFY_SOURCE_IP", "207.174.124.72") +EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$") + +CATCH_ALL_DOMAINS = {"gmail.com", "googlemail.com", "outlook.com", "hotmail.com", + "yahoo.com", "aol.com", "icloud.com"} + +_mx_cache: dict[str, list[str] | None] = {} +_catchall_cache: dict[str, bool] = {} + +PASS_REASONS = {"smtp_valid", "catch_all_domain", "catch_all_detected", + "mx_unreachable", "smtp_temp_error"} + + +def is_pass(reason: str) -> bool: + return reason in PASS_REASONS or reason.startswith(("smtp_unknown_", "error_")) + + +def get_mx_hosts(domain: str): + if domain in _mx_cache: + return _mx_cache[domain] + try: + answers = dns.resolver.resolve(domain, "MX", lifetime=8) + hosts = [str(r.exchange).rstrip(".") for r in sorted(answers, key=lambda x: x.preference)] + _mx_cache[domain] = hosts or None + except Exception: + # fall back to A record (some domains accept mail on the apex) + try: + dns.resolver.resolve(domain, "A", lifetime=6) + _mx_cache[domain] = [domain] + except Exception: + _mx_cache[domain] = None + return _mx_cache[domain] + + +def verify_email(email: str): + email = email.strip().lower() + if not EMAIL_RE.match(email): + return False, "invalid_syntax" + domain = email.split("@")[1] + mx_hosts = get_mx_hosts(domain) + if not mx_hosts: + return False, "no_mx_records" + if domain in CATCH_ALL_DOMAINS: + return True, "catch_all_domain" + + for mx_host in mx_hosts[:2]: + try: + with smtplib.SMTP(timeout=12, source_address=(VERIFY_SOURCE_IP, 0)) as smtp: + smtp.connect(mx_host, 25) + smtp.helo(OUR_DOMAIN) + code, _ = smtp.mail(OUR_EMAIL) + if code != 250: + continue + if domain not in _catchall_cache: + ru = "".join(random.choices(string.ascii_lowercase, k=20)) + pc, _ = smtp.rcpt(f"{ru}@{domain}") + _catchall_cache[domain] = (pc == 250) + smtp.rset(); smtp.mail(OUR_EMAIL) + if _catchall_cache.get(domain): + smtp.quit(); return True, "catch_all_detected" + code, _ = smtp.rcpt(email) + smtp.quit() + if code == 250: + return True, "smtp_valid" + if code in (550, 551, 553): + return False, f"smtp_rejected_{code}" + if code in (452, 421): + return True, "smtp_temp_error" + return True, f"smtp_unknown_{code}" + except (smtplib.SMTPServerDisconnected, smtplib.SMTPConnectError, + socket.timeout, ConnectionRefusedError, OSError): + continue + except Exception: + continue + return True, "mx_unreachable" + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--in", dest="inp", required=True) + ap.add_argument("--out", dest="out", required=True) + ap.add_argument("--workers", type=int, default=12) + ap.add_argument("--limit", type=int, default=None) + args = ap.parse_args() + + rows = list(csv.DictReader(open(args.inp))) + if args.limit: + rows = rows[: args.limit] + if not rows or "email" not in rows[0]: + print("ERROR: CSV needs an 'email' column", file=sys.stderr); sys.exit(1) + + emails = sorted({r["email"].strip().lower() for r in rows if r.get("email")}) + print(f"rows={len(rows)} unique_emails={len(emails)} workers={args.workers}") + print(f"verifying via NON-sending IP {VERIFY_SOURCE_IP} (MX + SMTP RCPT)...") + + results: dict[str, tuple[bool, str]] = {} + t0 = time.time(); done = 0 + with ThreadPoolExecutor(max_workers=args.workers) as ex: + futs = {ex.submit(verify_email, e): e for e in emails} + for fut in as_completed(futs): + e = futs[fut] + try: + results[e] = fut.result() + except Exception as exc: + results[e] = (True, f"error_{type(exc).__name__}") + done += 1 + if done % 100 == 0: + print(f" {done}/{len(emails)} ({done/max(time.time()-t0,1e-6):.1f}/s)") + + fieldnames = list(rows[0].keys()) + ["verify_ok", "verify_reason"] + vpath, rpath = f"{args.out}_verified.csv", f"{args.out}_rejected.csv" + vcnt = rcnt = 0; reasons = Counter() + with open(vpath, "w", newline="") as vf, open(rpath, "w", newline="") as rf: + vw = csv.DictWriter(vf, fieldnames=fieldnames); vw.writeheader() + rw = csv.DictWriter(rf, fieldnames=fieldnames); rw.writeheader() + for r in rows: + e = r.get("email", "").strip().lower() + ok, reason = results.get(e, (True, "missing")) + keep = is_pass(reason); reasons[reason] += 1 + r = {**r, "verify_ok": "Y" if keep else "N", "verify_reason": reason} + (vw if keep else rw).writerow(r) + vcnt += keep; rcnt += (not keep) + + print(f"\n=== done in {time.time()-t0:.0f}s ===") + print(f" kept (sendable): {vcnt:>5} -> {vpath}") + print(f" dropped (hard): {rcnt:>5} -> {rpath}") + print(" reasons:") + for reason, c in reasons.most_common(): + print(f" {reason:24} {c}") + + +if __name__ == "__main__": + main()