new-site/scripts/workers/email_verifier.py
justin 97f6a08183 Bind email verifier to secondary IP (.72) for SMTP probes
Campaign emails send from .71 via Postfix (now explicitly bound).
Verification RCPT TO probes go from .72 to protect sending reputation.
Configurable via VERIFY_SOURCE_IP env var.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-30 14:52:22 -05:00

269 lines
8.9 KiB
Python

#!/usr/bin/env python3
"""
Self-hosted email verification for campaign lists.
Three-step verification:
1. Syntax check (regex)
2. MX record lookup (domain has mail server?)
3. SMTP RCPT TO handshake (mailbox exists?)
Usage:
python3 -m scripts.workers.email_verifier --table fmcsa_carriers --limit 1000
python3 -m scripts.workers.email_verifier --table fmcsa_carriers --dry-run
python3 -m scripts.workers.email_verifier --email test@example.com # single check
Results stored in email_verified (boolean) and email_verify_result (text) columns.
"""
from __future__ import annotations
import argparse
import dns.resolver
import logging
import os
import re
import smtplib
import socket
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import psycopg2
LOG = logging.getLogger("workers.email_verifier")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
OUR_DOMAIN = "performancewest.net"
OUR_EMAIL = f"verify@{OUR_DOMAIN}"
# Bind SMTP probes to secondary IP so campaign sending IP (.71) stays clean
VERIFY_SOURCE_IP = os.environ.get("VERIFY_SOURCE_IP", "207.174.124.72")
# Regex for basic email validation
EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$")
# Domains known to accept all (catch-all) — can't verify individual addresses
CATCH_ALL_DOMAINS = {
"gmail.com", "googlemail.com",
"outlook.com", "hotmail.com", "live.com", "msn.com",
"yahoo.com", "ymail.com", "aol.com",
"icloud.com", "me.com", "mac.com",
"protonmail.com", "proton.me",
}
# Cache MX lookups to avoid repeated DNS queries
_mx_cache: dict[str, list[str] | None] = {}
# Cache catch-all detection per domain
_catchall_cache: dict[str, bool] = {}
def get_mx_hosts(domain: str) -> list[str] | None:
"""Get MX hosts for a domain, cached."""
if domain in _mx_cache:
return _mx_cache[domain]
try:
answers = dns.resolver.resolve(domain, "MX")
hosts = sorted(
[(r.preference, str(r.exchange).rstrip(".")) for r in answers],
key=lambda x: x[0],
)
result = [h[1] for h in hosts]
_mx_cache[domain] = result
return result
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN,
dns.resolver.NoNameservers, dns.exception.Timeout):
_mx_cache[domain] = None
return None
except Exception:
_mx_cache[domain] = None
return None
def verify_email(email: str) -> tuple[bool, str]:
"""
Verify a single email address.
Returns (is_valid, reason).
"""
email = email.strip().lower()
# Step 1: Syntax
if not EMAIL_RE.match(email):
return False, "invalid_syntax"
domain = email.split("@")[1]
# Step 2: MX lookup
mx_hosts = get_mx_hosts(domain)
if not mx_hosts:
return False, "no_mx_records"
# Step 3: Known catch-all domains — can't verify, assume valid
if domain in CATCH_ALL_DOMAINS:
return True, "catch_all_domain"
# Step 4: SMTP handshake + catch-all detection
for mx_host in mx_hosts[:2]: # Try first 2 MX servers
try:
with smtplib.SMTP(timeout=10, source_address=(VERIFY_SOURCE_IP, 0)) as smtp:
smtp.connect(mx_host, 25)
smtp.helo(OUR_DOMAIN)
code, _ = smtp.mail(OUR_EMAIL)
if code != 250:
continue
# Step 4a: Check if domain is catch-all by sending a random address
if domain not in _catchall_cache:
import random, string
random_user = "".join(random.choices(string.ascii_lowercase, k=20))
probe_code, _ = smtp.rcpt(f"{random_user}@{domain}")
_catchall_cache[domain] = probe_code == 250
if _catchall_cache[domain]:
LOG.info("Catch-all detected: %s (accepts %s@%s)", domain, random_user, domain)
# Reset for real check
smtp.rset()
smtp.mail(OUR_EMAIL)
if _catchall_cache.get(domain):
# Catch-all — accepts everything, can't verify individual address
smtp.quit()
return True, "catch_all_detected"
# Step 4b: Check the actual email
code, msg = smtp.rcpt(email)
smtp.quit()
if code == 250:
return True, "smtp_valid"
elif code == 550 or code == 551 or code == 553:
return False, f"smtp_rejected_{code}"
elif code == 452 or code == 421:
# Temp error — can't determine, assume valid
return True, "smtp_temp_error"
else:
return True, f"smtp_unknown_{code}"
except smtplib.SMTPServerDisconnected:
continue
except smtplib.SMTPConnectError:
continue
except socket.timeout:
continue
except ConnectionRefusedError:
continue
except OSError:
continue
except Exception as e:
LOG.debug("SMTP error for %s via %s: %s", email, mx_host, e)
continue
# Couldn't connect to any MX — domain exists but server unreachable
return True, "mx_unreachable"
def verify_table(table: str, limit: int | None = None, dry_run: bool = False) -> dict:
"""Verify emails in a database table."""
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
# Add columns if needed
if not dry_run:
for col in ["email_verified BOOLEAN", "email_verify_result TEXT"]:
try:
cur.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {col}")
except Exception:
conn.rollback()
conn.commit()
# Get unverified emails
limit_clause = f"LIMIT {limit}" if limit else ""
cur.execute(f"""
SELECT dot_number, email_address FROM {table}
WHERE email_address IS NOT NULL
AND email_address != ''
AND (email_verified IS NULL)
{limit_clause}
""")
rows = cur.fetchall()
LOG.info("Found %d unverified emails to check", len(rows))
stats = {"total": len(rows), "valid": 0, "invalid": 0, "catch_all": 0, "error": 0}
# Verify in batches with threading (but not too aggressive)
results = []
def check_one(row):
dot, email = row
is_valid, reason = verify_email(email)
return dot, email, is_valid, reason
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(check_one, row): row for row in rows}
done = 0
for future in as_completed(futures):
dot, email, is_valid, reason = future.result()
results.append((is_valid, reason, dot))
if is_valid:
if reason == "catch_all_domain":
stats["catch_all"] += 1
stats["valid"] += 1
else:
stats["invalid"] += 1
done += 1
if done % 500 == 0:
LOG.info(" Verified %d / %d (valid: %d, invalid: %d)",
done, len(rows), stats["valid"], stats["invalid"])
# Update database
if not dry_run and results:
LOG.info("Updating %d verification results...", len(results))
for i in range(0, len(results), 1000):
chunk = results[i:i+1000]
for is_valid, reason, dot in chunk:
cur.execute(f"""
UPDATE {table} SET
email_verified = %s,
email_verify_result = %s
WHERE dot_number = %s
""", (is_valid, reason, dot))
conn.commit()
LOG.info(" Updated %d / %d", min(i + 1000, len(results)), len(results))
conn.close()
return stats
def main():
parser = argparse.ArgumentParser(description="Verify email addresses")
parser.add_argument("--table", default="fmcsa_carriers", help="Table to verify")
parser.add_argument("--limit", type=int, default=None, help="Limit records")
parser.add_argument("--dry-run", action="store_true", help="Check but don't save")
parser.add_argument("--email", type=str, help="Verify a single email")
args = parser.parse_args()
if args.email:
is_valid, reason = verify_email(args.email)
print(f"{args.email}: {'VALID' if is_valid else 'INVALID'} ({reason})")
return
stats = verify_table(args.table, limit=args.limit, dry_run=args.dry_run)
LOG.info("=== Verification Results ===")
for k, v in stats.items():
LOG.info(" %s: %s", k, v)
if stats["total"] > 0:
valid_pct = stats["valid"] / stats["total"] * 100
LOG.info(" Valid rate: %.1f%%", valid_pct)
if __name__ == "__main__":
main()