healthcare email: teal gradient header (matches site hero) + standalone CSV MX/SMTP verifier (binds .72 non-sending IP); gitignore PII warmup lists

This commit is contained in:
justin 2026-06-06 03:39:19 -05:00
parent 5129ebec5c
commit 29c7a421e9
8 changed files with 170 additions and 12 deletions

View file

@ -158,10 +158,10 @@ def render(seg_key: str, *, test: bool = False) -> tuple[str, str]:
<table role="presentation" class="pw-wrap" width="620" cellpadding="0" cellspacing="0" style="margin:0 auto;border-radius:10px;overflow:hidden;background:#fff;">
<!-- Header -->
<tr><td style="background:#0f172a;padding:24px 28px;">
<tr><td style="background-color:#0f766e;background:linear-gradient(135deg,#0f766e 0%,#14b8a6 100%);padding:26px 28px;">
<img src="{SITE}/images/logo.png" alt="Performance West" style="height:44px;margin-bottom:10px;display:block" />
<h1 style="color:#fff;margin:0;font-size:22px;font-weight:700;font-family:Inter,system-ui,sans-serif;">{s['alert']}</h1>
<p style="color:#94a3b8;margin:6px 0 0;font-size:13px;font-family:Inter,system-ui,sans-serif;">{s['subhead']}</p>
<p style="color:#ccfbf1;margin:6px 0 0;font-size:13px;font-family:Inter,system-ui,sans-serif;">{s['subhead']}</p>
</td></tr>
<!-- Body -->

View file

@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""Verify the emails in a CSV (MX + SMTP RCPT) on the NON-sending IP (.72).
Self-contained (no DB deps): the verify logic mirrors
scripts/workers/email_verifier.verify_email (catch-all detection, MX cache,
source-IP bind to .72) but works on a CSV instead of a DB table.
Reads a CSV with an `email` column, writes:
<out>_verified.csv rows that passed (valid / catch-all / mx_unreachable / temp)
<out>_rejected.csv rows that failed hard (no_mx / smtp_rejected / bad_syntax)
NEVER binds to a sending IP. Probes go out 207.174.124.72.
Usage:
python3 scripts/verify_csv_emails.py --in data/hc_warmup_week1.csv --out data/hc_warmup_week1 [--workers 15]
"""
from __future__ import annotations
import argparse, csv, os, random, re, smtplib, socket, string, sys, time
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import dns.resolver
OUR_DOMAIN = "performancewest.net"
OUR_EMAIL = f"verify@{OUR_DOMAIN}"
VERIFY_SOURCE_IP = os.environ.get("VERIFY_SOURCE_IP", "207.174.124.72")
EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$")
CATCH_ALL_DOMAINS = {"gmail.com", "googlemail.com", "outlook.com", "hotmail.com",
"yahoo.com", "aol.com", "icloud.com"}
_mx_cache: dict[str, list[str] | None] = {}
_catchall_cache: dict[str, bool] = {}
PASS_REASONS = {"smtp_valid", "catch_all_domain", "catch_all_detected",
"mx_unreachable", "smtp_temp_error"}
def is_pass(reason: str) -> bool:
return reason in PASS_REASONS or reason.startswith(("smtp_unknown_", "error_"))
def get_mx_hosts(domain: str):
if domain in _mx_cache:
return _mx_cache[domain]
try:
answers = dns.resolver.resolve(domain, "MX", lifetime=8)
hosts = [str(r.exchange).rstrip(".") for r in sorted(answers, key=lambda x: x.preference)]
_mx_cache[domain] = hosts or None
except Exception:
# fall back to A record (some domains accept mail on the apex)
try:
dns.resolver.resolve(domain, "A", lifetime=6)
_mx_cache[domain] = [domain]
except Exception:
_mx_cache[domain] = None
return _mx_cache[domain]
def verify_email(email: str):
email = email.strip().lower()
if not EMAIL_RE.match(email):
return False, "invalid_syntax"
domain = email.split("@")[1]
mx_hosts = get_mx_hosts(domain)
if not mx_hosts:
return False, "no_mx_records"
if domain in CATCH_ALL_DOMAINS:
return True, "catch_all_domain"
for mx_host in mx_hosts[:2]:
try:
with smtplib.SMTP(timeout=12, source_address=(VERIFY_SOURCE_IP, 0)) as smtp:
smtp.connect(mx_host, 25)
smtp.helo(OUR_DOMAIN)
code, _ = smtp.mail(OUR_EMAIL)
if code != 250:
continue
if domain not in _catchall_cache:
ru = "".join(random.choices(string.ascii_lowercase, k=20))
pc, _ = smtp.rcpt(f"{ru}@{domain}")
_catchall_cache[domain] = (pc == 250)
smtp.rset(); smtp.mail(OUR_EMAIL)
if _catchall_cache.get(domain):
smtp.quit(); return True, "catch_all_detected"
code, _ = smtp.rcpt(email)
smtp.quit()
if code == 250:
return True, "smtp_valid"
if code in (550, 551, 553):
return False, f"smtp_rejected_{code}"
if code in (452, 421):
return True, "smtp_temp_error"
return True, f"smtp_unknown_{code}"
except (smtplib.SMTPServerDisconnected, smtplib.SMTPConnectError,
socket.timeout, ConnectionRefusedError, OSError):
continue
except Exception:
continue
return True, "mx_unreachable"
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--in", dest="inp", required=True)
ap.add_argument("--out", dest="out", required=True)
ap.add_argument("--workers", type=int, default=12)
ap.add_argument("--limit", type=int, default=None)
args = ap.parse_args()
rows = list(csv.DictReader(open(args.inp)))
if args.limit:
rows = rows[: args.limit]
if not rows or "email" not in rows[0]:
print("ERROR: CSV needs an 'email' column", file=sys.stderr); sys.exit(1)
emails = sorted({r["email"].strip().lower() for r in rows if r.get("email")})
print(f"rows={len(rows)} unique_emails={len(emails)} workers={args.workers}")
print(f"verifying via NON-sending IP {VERIFY_SOURCE_IP} (MX + SMTP RCPT)...")
results: dict[str, tuple[bool, str]] = {}
t0 = time.time(); done = 0
with ThreadPoolExecutor(max_workers=args.workers) as ex:
futs = {ex.submit(verify_email, e): e for e in emails}
for fut in as_completed(futs):
e = futs[fut]
try:
results[e] = fut.result()
except Exception as exc:
results[e] = (True, f"error_{type(exc).__name__}")
done += 1
if done % 100 == 0:
print(f" {done}/{len(emails)} ({done/max(time.time()-t0,1e-6):.1f}/s)")
fieldnames = list(rows[0].keys()) + ["verify_ok", "verify_reason"]
vpath, rpath = f"{args.out}_verified.csv", f"{args.out}_rejected.csv"
vcnt = rcnt = 0; reasons = Counter()
with open(vpath, "w", newline="") as vf, open(rpath, "w", newline="") as rf:
vw = csv.DictWriter(vf, fieldnames=fieldnames); vw.writeheader()
rw = csv.DictWriter(rf, fieldnames=fieldnames); rw.writeheader()
for r in rows:
e = r.get("email", "").strip().lower()
ok, reason = results.get(e, (True, "missing"))
keep = is_pass(reason); reasons[reason] += 1
r = {**r, "verify_ok": "Y" if keep else "N", "verify_reason": reason}
(vw if keep else rw).writerow(r)
vcnt += keep; rcnt += (not keep)
print(f"\n=== done in {time.time()-t0:.0f}s ===")
print(f" kept (sendable): {vcnt:>5} -> {vpath}")
print(f" dropped (hard): {rcnt:>5} -> {rpath}")
print(" reasons:")
for reason, c in reasons.most_common():
print(f" {reason:24} {c}")
if __name__ == "__main__":
main()