Before checking the real address, sends a random 20-char address to the domain. If the server accepts it (250), the domain is catch-all and individual verification is meaningless. Result cached per domain. Existing known catch-all list (gmail, outlook, etc.) still bypassed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
267 lines
8.7 KiB
Python
267 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Self-hosted email verification for campaign lists.
|
|
|
|
Three-step verification:
|
|
1. Syntax check (regex)
|
|
2. MX record lookup (domain has mail server?)
|
|
3. SMTP RCPT TO handshake (mailbox exists?)
|
|
|
|
Usage:
|
|
python3 -m scripts.workers.email_verifier --table fmcsa_carriers --limit 1000
|
|
python3 -m scripts.workers.email_verifier --table fmcsa_carriers --dry-run
|
|
python3 -m scripts.workers.email_verifier --email test@example.com # single check
|
|
|
|
Results stored in email_verified (boolean) and email_verify_result (text) columns.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import dns.resolver
|
|
import logging
|
|
import os
|
|
import re
|
|
import smtplib
|
|
import socket
|
|
import sys
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
import psycopg2
|
|
|
|
LOG = logging.getLogger("workers.email_verifier")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
|
|
OUR_DOMAIN = "performancewest.net"
|
|
OUR_EMAIL = f"verify@{OUR_DOMAIN}"
|
|
|
|
# Regex for basic email validation
|
|
EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$")
|
|
|
|
# Domains known to accept all (catch-all) — can't verify individual addresses
|
|
CATCH_ALL_DOMAINS = {
|
|
"gmail.com", "googlemail.com",
|
|
"outlook.com", "hotmail.com", "live.com", "msn.com",
|
|
"yahoo.com", "ymail.com", "aol.com",
|
|
"icloud.com", "me.com", "mac.com",
|
|
"protonmail.com", "proton.me",
|
|
}
|
|
|
|
# Cache MX lookups to avoid repeated DNS queries
|
|
_mx_cache: dict[str, list[str] | None] = {}
|
|
# Cache catch-all detection per domain
|
|
_catchall_cache: dict[str, bool] = {}
|
|
|
|
|
|
def get_mx_hosts(domain: str) -> list[str] | None:
|
|
"""Get MX hosts for a domain, cached."""
|
|
if domain in _mx_cache:
|
|
return _mx_cache[domain]
|
|
|
|
try:
|
|
answers = dns.resolver.resolve(domain, "MX")
|
|
hosts = sorted(
|
|
[(r.preference, str(r.exchange).rstrip(".")) for r in answers],
|
|
key=lambda x: x[0],
|
|
)
|
|
result = [h[1] for h in hosts]
|
|
_mx_cache[domain] = result
|
|
return result
|
|
except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN,
|
|
dns.resolver.NoNameservers, dns.exception.Timeout):
|
|
_mx_cache[domain] = None
|
|
return None
|
|
except Exception:
|
|
_mx_cache[domain] = None
|
|
return None
|
|
|
|
|
|
def verify_email(email: str) -> tuple[bool, str]:
|
|
"""
|
|
Verify a single email address.
|
|
|
|
Returns (is_valid, reason).
|
|
"""
|
|
email = email.strip().lower()
|
|
|
|
# Step 1: Syntax
|
|
if not EMAIL_RE.match(email):
|
|
return False, "invalid_syntax"
|
|
|
|
domain = email.split("@")[1]
|
|
|
|
# Step 2: MX lookup
|
|
mx_hosts = get_mx_hosts(domain)
|
|
if not mx_hosts:
|
|
return False, "no_mx_records"
|
|
|
|
# Step 3: Known catch-all domains — can't verify, assume valid
|
|
if domain in CATCH_ALL_DOMAINS:
|
|
return True, "catch_all_domain"
|
|
|
|
# Step 4: SMTP handshake + catch-all detection
|
|
for mx_host in mx_hosts[:2]: # Try first 2 MX servers
|
|
try:
|
|
with smtplib.SMTP(timeout=10) as smtp:
|
|
smtp.connect(mx_host, 25)
|
|
smtp.helo(OUR_DOMAIN)
|
|
code, _ = smtp.mail(OUR_EMAIL)
|
|
if code != 250:
|
|
continue
|
|
|
|
# Step 4a: Check if domain is catch-all by sending a random address
|
|
if domain not in _catchall_cache:
|
|
import random, string
|
|
random_user = "".join(random.choices(string.ascii_lowercase, k=20))
|
|
probe_code, _ = smtp.rcpt(f"{random_user}@{domain}")
|
|
_catchall_cache[domain] = probe_code == 250
|
|
if _catchall_cache[domain]:
|
|
LOG.info("Catch-all detected: %s (accepts %s@%s)", domain, random_user, domain)
|
|
# Reset for real check
|
|
smtp.rset()
|
|
smtp.mail(OUR_EMAIL)
|
|
|
|
if _catchall_cache.get(domain):
|
|
# Catch-all — accepts everything, can't verify individual address
|
|
smtp.quit()
|
|
return True, "catch_all_detected"
|
|
|
|
# Step 4b: Check the actual email
|
|
code, msg = smtp.rcpt(email)
|
|
smtp.quit()
|
|
|
|
if code == 250:
|
|
return True, "smtp_valid"
|
|
elif code == 550 or code == 551 or code == 553:
|
|
return False, f"smtp_rejected_{code}"
|
|
elif code == 452 or code == 421:
|
|
# Temp error — can't determine, assume valid
|
|
return True, "smtp_temp_error"
|
|
else:
|
|
return True, f"smtp_unknown_{code}"
|
|
|
|
except smtplib.SMTPServerDisconnected:
|
|
continue
|
|
except smtplib.SMTPConnectError:
|
|
continue
|
|
except socket.timeout:
|
|
continue
|
|
except ConnectionRefusedError:
|
|
continue
|
|
except OSError:
|
|
continue
|
|
except Exception as e:
|
|
LOG.debug("SMTP error for %s via %s: %s", email, mx_host, e)
|
|
continue
|
|
|
|
# Couldn't connect to any MX — domain exists but server unreachable
|
|
return True, "mx_unreachable"
|
|
|
|
|
|
def verify_table(table: str, limit: int | None = None, dry_run: bool = False) -> dict:
|
|
"""Verify emails in a database table."""
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Add columns if needed
|
|
if not dry_run:
|
|
for col in ["email_verified BOOLEAN", "email_verify_result TEXT"]:
|
|
try:
|
|
cur.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {col}")
|
|
except Exception:
|
|
conn.rollback()
|
|
conn.commit()
|
|
|
|
# Get unverified emails
|
|
limit_clause = f"LIMIT {limit}" if limit else ""
|
|
cur.execute(f"""
|
|
SELECT dot_number, email_address FROM {table}
|
|
WHERE email_address IS NOT NULL
|
|
AND email_address != ''
|
|
AND (email_verified IS NULL)
|
|
{limit_clause}
|
|
""")
|
|
|
|
rows = cur.fetchall()
|
|
LOG.info("Found %d unverified emails to check", len(rows))
|
|
|
|
stats = {"total": len(rows), "valid": 0, "invalid": 0, "catch_all": 0, "error": 0}
|
|
|
|
# Verify in batches with threading (but not too aggressive)
|
|
results = []
|
|
|
|
def check_one(row):
|
|
dot, email = row
|
|
is_valid, reason = verify_email(email)
|
|
return dot, email, is_valid, reason
|
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor:
|
|
futures = {executor.submit(check_one, row): row for row in rows}
|
|
done = 0
|
|
for future in as_completed(futures):
|
|
dot, email, is_valid, reason = future.result()
|
|
results.append((is_valid, reason, dot))
|
|
|
|
if is_valid:
|
|
if reason == "catch_all_domain":
|
|
stats["catch_all"] += 1
|
|
stats["valid"] += 1
|
|
else:
|
|
stats["invalid"] += 1
|
|
|
|
done += 1
|
|
if done % 500 == 0:
|
|
LOG.info(" Verified %d / %d (valid: %d, invalid: %d)",
|
|
done, len(rows), stats["valid"], stats["invalid"])
|
|
|
|
# Update database
|
|
if not dry_run and results:
|
|
LOG.info("Updating %d verification results...", len(results))
|
|
for i in range(0, len(results), 1000):
|
|
chunk = results[i:i+1000]
|
|
for is_valid, reason, dot in chunk:
|
|
cur.execute(f"""
|
|
UPDATE {table} SET
|
|
email_verified = %s,
|
|
email_verify_result = %s
|
|
WHERE dot_number = %s
|
|
""", (is_valid, reason, dot))
|
|
conn.commit()
|
|
LOG.info(" Updated %d / %d", min(i + 1000, len(results)), len(results))
|
|
|
|
conn.close()
|
|
return stats
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Verify email addresses")
|
|
parser.add_argument("--table", default="fmcsa_carriers", help="Table to verify")
|
|
parser.add_argument("--limit", type=int, default=None, help="Limit records")
|
|
parser.add_argument("--dry-run", action="store_true", help="Check but don't save")
|
|
parser.add_argument("--email", type=str, help="Verify a single email")
|
|
args = parser.parse_args()
|
|
|
|
if args.email:
|
|
is_valid, reason = verify_email(args.email)
|
|
print(f"{args.email}: {'VALID' if is_valid else 'INVALID'} ({reason})")
|
|
return
|
|
|
|
stats = verify_table(args.table, limit=args.limit, dry_run=args.dry_run)
|
|
|
|
LOG.info("=== Verification Results ===")
|
|
for k, v in stats.items():
|
|
LOG.info(" %s: %s", k, v)
|
|
|
|
if stats["total"] > 0:
|
|
valid_pct = stats["valid"] / stats["total"] * 100
|
|
LOG.info(" Valid rate: %.1f%%", valid_pct)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|