157 lines
6.2 KiB
Python
157 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Verify the emails in a CSV (MX + SMTP RCPT) on the NON-sending IP (.72).
|
|
|
|
Self-contained (no DB deps): the verify logic mirrors
|
|
scripts/workers/email_verifier.verify_email (catch-all detection, MX cache,
|
|
source-IP bind to .72) but works on a CSV instead of a DB table.
|
|
|
|
Reads a CSV with an `email` column, writes:
|
|
<out>_verified.csv rows that passed (valid / catch-all / mx_unreachable / temp)
|
|
<out>_rejected.csv rows that failed hard (no_mx / smtp_rejected / bad_syntax)
|
|
|
|
NEVER binds to a sending IP. Probes go out 207.174.124.72.
|
|
|
|
Usage:
|
|
python3 scripts/verify_csv_emails.py --in data/hc_warmup_week1.csv --out data/hc_warmup_week1 [--workers 15]
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse, csv, os, random, re, smtplib, socket, string, sys, time
|
|
from collections import Counter
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import dns.resolver
|
|
|
|
OUR_DOMAIN = "performancewest.net"
|
|
OUR_EMAIL = f"verify@{OUR_DOMAIN}"
|
|
VERIFY_SOURCE_IP = os.environ.get("VERIFY_SOURCE_IP", "207.174.124.72")
|
|
EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$")
|
|
|
|
CATCH_ALL_DOMAINS = {"gmail.com", "googlemail.com", "outlook.com", "hotmail.com",
|
|
"yahoo.com", "aol.com", "icloud.com"}
|
|
|
|
_mx_cache: dict[str, list[str] | None] = {}
|
|
_catchall_cache: dict[str, bool] = {}
|
|
|
|
PASS_REASONS = {"smtp_valid", "catch_all_domain", "catch_all_detected",
|
|
"mx_unreachable", "smtp_temp_error"}
|
|
|
|
|
|
def is_pass(reason: str) -> bool:
|
|
return reason in PASS_REASONS or reason.startswith(("smtp_unknown_", "error_"))
|
|
|
|
|
|
def get_mx_hosts(domain: str):
|
|
if domain in _mx_cache:
|
|
return _mx_cache[domain]
|
|
try:
|
|
answers = dns.resolver.resolve(domain, "MX", lifetime=8)
|
|
hosts = [str(r.exchange).rstrip(".") for r in sorted(answers, key=lambda x: x.preference)]
|
|
_mx_cache[domain] = hosts or None
|
|
except Exception:
|
|
# fall back to A record (some domains accept mail on the apex)
|
|
try:
|
|
dns.resolver.resolve(domain, "A", lifetime=6)
|
|
_mx_cache[domain] = [domain]
|
|
except Exception:
|
|
_mx_cache[domain] = None
|
|
return _mx_cache[domain]
|
|
|
|
|
|
def verify_email(email: str):
|
|
email = email.strip().lower()
|
|
if not EMAIL_RE.match(email):
|
|
return False, "invalid_syntax"
|
|
domain = email.split("@")[1]
|
|
mx_hosts = get_mx_hosts(domain)
|
|
if not mx_hosts:
|
|
return False, "no_mx_records"
|
|
if domain in CATCH_ALL_DOMAINS:
|
|
return True, "catch_all_domain"
|
|
|
|
for mx_host in mx_hosts[:2]:
|
|
try:
|
|
with smtplib.SMTP(timeout=12, source_address=(VERIFY_SOURCE_IP, 0)) as smtp:
|
|
smtp.connect(mx_host, 25)
|
|
smtp.helo(OUR_DOMAIN)
|
|
code, _ = smtp.mail(OUR_EMAIL)
|
|
if code != 250:
|
|
continue
|
|
if domain not in _catchall_cache:
|
|
ru = "".join(random.choices(string.ascii_lowercase, k=20))
|
|
pc, _ = smtp.rcpt(f"{ru}@{domain}")
|
|
_catchall_cache[domain] = (pc == 250)
|
|
smtp.rset(); smtp.mail(OUR_EMAIL)
|
|
if _catchall_cache.get(domain):
|
|
smtp.quit(); return True, "catch_all_detected"
|
|
code, _ = smtp.rcpt(email)
|
|
smtp.quit()
|
|
if code == 250:
|
|
return True, "smtp_valid"
|
|
if code in (550, 551, 553):
|
|
return False, f"smtp_rejected_{code}"
|
|
if code in (452, 421):
|
|
return True, "smtp_temp_error"
|
|
return True, f"smtp_unknown_{code}"
|
|
except (smtplib.SMTPServerDisconnected, smtplib.SMTPConnectError,
|
|
socket.timeout, ConnectionRefusedError, OSError):
|
|
continue
|
|
except Exception:
|
|
continue
|
|
return True, "mx_unreachable"
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--in", dest="inp", required=True)
|
|
ap.add_argument("--out", dest="out", required=True)
|
|
ap.add_argument("--workers", type=int, default=12)
|
|
ap.add_argument("--limit", type=int, default=None)
|
|
args = ap.parse_args()
|
|
|
|
rows = list(csv.DictReader(open(args.inp)))
|
|
if args.limit:
|
|
rows = rows[: args.limit]
|
|
if not rows or "email" not in rows[0]:
|
|
print("ERROR: CSV needs an 'email' column", file=sys.stderr); sys.exit(1)
|
|
|
|
emails = sorted({r["email"].strip().lower() for r in rows if r.get("email")})
|
|
print(f"rows={len(rows)} unique_emails={len(emails)} workers={args.workers}")
|
|
print(f"verifying via NON-sending IP {VERIFY_SOURCE_IP} (MX + SMTP RCPT)...")
|
|
|
|
results: dict[str, tuple[bool, str]] = {}
|
|
t0 = time.time(); done = 0
|
|
with ThreadPoolExecutor(max_workers=args.workers) as ex:
|
|
futs = {ex.submit(verify_email, e): e for e in emails}
|
|
for fut in as_completed(futs):
|
|
e = futs[fut]
|
|
try:
|
|
results[e] = fut.result()
|
|
except Exception as exc:
|
|
results[e] = (True, f"error_{type(exc).__name__}")
|
|
done += 1
|
|
if done % 100 == 0:
|
|
print(f" {done}/{len(emails)} ({done/max(time.time()-t0,1e-6):.1f}/s)")
|
|
|
|
fieldnames = list(rows[0].keys()) + ["verify_ok", "verify_reason"]
|
|
vpath, rpath = f"{args.out}_verified.csv", f"{args.out}_rejected.csv"
|
|
vcnt = rcnt = 0; reasons = Counter()
|
|
with open(vpath, "w", newline="") as vf, open(rpath, "w", newline="") as rf:
|
|
vw = csv.DictWriter(vf, fieldnames=fieldnames); vw.writeheader()
|
|
rw = csv.DictWriter(rf, fieldnames=fieldnames); rw.writeheader()
|
|
for r in rows:
|
|
e = r.get("email", "").strip().lower()
|
|
ok, reason = results.get(e, (True, "missing"))
|
|
keep = is_pass(reason); reasons[reason] += 1
|
|
r = {**r, "verify_ok": "Y" if keep else "N", "verify_reason": reason}
|
|
(vw if keep else rw).writerow(r)
|
|
vcnt += keep; rcnt += (not keep)
|
|
|
|
print(f"\n=== done in {time.time()-t0:.0f}s ===")
|
|
print(f" kept (sendable): {vcnt:>5} -> {vpath}")
|
|
print(f" dropped (hard): {rcnt:>5} -> {rpath}")
|
|
print(" reasons:")
|
|
for reason, c in reasons.most_common():
|
|
print(f" {reason:24} {c}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|