diff --git a/scripts/workers/email_verifier.py b/scripts/workers/email_verifier.py new file mode 100644 index 0000000..d426eec --- /dev/null +++ b/scripts/workers/email_verifier.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +""" +Self-hosted email verification for campaign lists. + +Three-step verification: + 1. Syntax check (regex) + 2. MX record lookup (domain has mail server?) + 3. SMTP RCPT TO handshake (mailbox exists?) + +Usage: + python3 -m scripts.workers.email_verifier --table fmcsa_carriers --limit 1000 + python3 -m scripts.workers.email_verifier --table fmcsa_carriers --dry-run + python3 -m scripts.workers.email_verifier --email test@example.com # single check + +Results stored in email_verified (boolean) and email_verify_result (text) columns. +""" + +from __future__ import annotations + +import argparse +import dns.resolver +import logging +import os +import re +import smtplib +import socket +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +import psycopg2 + +LOG = logging.getLogger("workers.email_verifier") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + stream=sys.stdout, +) + +DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") +OUR_DOMAIN = "performancewest.net" +OUR_EMAIL = f"verify@{OUR_DOMAIN}" + +# Regex for basic email validation +EMAIL_RE = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$") + +# Domains known to accept all (catch-all) — can't verify individual addresses +CATCH_ALL_DOMAINS = { + "gmail.com", "googlemail.com", + "outlook.com", "hotmail.com", "live.com", "msn.com", + "yahoo.com", "ymail.com", "aol.com", + "icloud.com", "me.com", "mac.com", + "protonmail.com", "proton.me", +} + +# Cache MX lookups to avoid repeated DNS queries +_mx_cache: dict[str, list[str] | None] = {} + + +def get_mx_hosts(domain: str) -> list[str] | None: + """Get MX hosts for a domain, cached.""" + if domain in _mx_cache: + return _mx_cache[domain] + + try: + answers = dns.resolver.resolve(domain, "MX") + hosts = sorted( + [(r.preference, str(r.exchange).rstrip(".")) for r in answers], + key=lambda x: x[0], + ) + result = [h[1] for h in hosts] + _mx_cache[domain] = result + return result + except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, + dns.resolver.NoNameservers, dns.exception.Timeout): + _mx_cache[domain] = None + return None + except Exception: + _mx_cache[domain] = None + return None + + +def verify_email(email: str) -> tuple[bool, str]: + """ + Verify a single email address. + + Returns (is_valid, reason). + """ + email = email.strip().lower() + + # Step 1: Syntax + if not EMAIL_RE.match(email): + return False, "invalid_syntax" + + domain = email.split("@")[1] + + # Step 2: MX lookup + mx_hosts = get_mx_hosts(domain) + if not mx_hosts: + return False, "no_mx_records" + + # Step 3: Catch-all domains — can't verify, assume valid + if domain in CATCH_ALL_DOMAINS: + return True, "catch_all_domain" + + # Step 4: SMTP handshake + for mx_host in mx_hosts[:2]: # Try first 2 MX servers + try: + with smtplib.SMTP(timeout=10) as smtp: + smtp.connect(mx_host, 25) + smtp.helo(OUR_DOMAIN) + code, _ = smtp.mail(OUR_EMAIL) + if code != 250: + continue + code, msg = smtp.rcpt(email) + smtp.quit() + + if code == 250: + return True, "smtp_valid" + elif code == 550 or code == 551 or code == 553: + return False, f"smtp_rejected_{code}" + elif code == 452 or code == 421: + # Temp error — can't determine, assume valid + return True, "smtp_temp_error" + else: + return True, f"smtp_unknown_{code}" + + except smtplib.SMTPServerDisconnected: + continue + except smtplib.SMTPConnectError: + continue + except socket.timeout: + continue + except ConnectionRefusedError: + continue + except OSError: + continue + except Exception as e: + LOG.debug("SMTP error for %s via %s: %s", email, mx_host, e) + continue + + # Couldn't connect to any MX — domain exists but server unreachable + return True, "mx_unreachable" + + +def verify_table(table: str, limit: int | None = None, dry_run: bool = False) -> dict: + """Verify emails in a database table.""" + conn = psycopg2.connect(DATABASE_URL) + cur = conn.cursor() + + # Add columns if needed + if not dry_run: + for col in ["email_verified BOOLEAN", "email_verify_result TEXT"]: + try: + cur.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {col}") + except Exception: + conn.rollback() + conn.commit() + + # Get unverified emails + limit_clause = f"LIMIT {limit}" if limit else "" + cur.execute(f""" + SELECT dot_number, email_address FROM {table} + WHERE email_address IS NOT NULL + AND email_address != '' + AND (email_verified IS NULL) + {limit_clause} + """) + + rows = cur.fetchall() + LOG.info("Found %d unverified emails to check", len(rows)) + + stats = {"total": len(rows), "valid": 0, "invalid": 0, "catch_all": 0, "error": 0} + + # Verify in batches with threading (but not too aggressive) + results = [] + + def check_one(row): + dot, email = row + is_valid, reason = verify_email(email) + return dot, email, is_valid, reason + + with ThreadPoolExecutor(max_workers=5) as executor: + futures = {executor.submit(check_one, row): row for row in rows} + done = 0 + for future in as_completed(futures): + dot, email, is_valid, reason = future.result() + results.append((is_valid, reason, dot)) + + if is_valid: + if reason == "catch_all_domain": + stats["catch_all"] += 1 + stats["valid"] += 1 + else: + stats["invalid"] += 1 + + done += 1 + if done % 500 == 0: + LOG.info(" Verified %d / %d (valid: %d, invalid: %d)", + done, len(rows), stats["valid"], stats["invalid"]) + + # Update database + if not dry_run and results: + LOG.info("Updating %d verification results...", len(results)) + for i in range(0, len(results), 1000): + chunk = results[i:i+1000] + for is_valid, reason, dot in chunk: + cur.execute(f""" + UPDATE {table} SET + email_verified = %s, + email_verify_result = %s + WHERE dot_number = %s + """, (is_valid, reason, dot)) + conn.commit() + LOG.info(" Updated %d / %d", min(i + 1000, len(results)), len(results)) + + conn.close() + return stats + + +def main(): + parser = argparse.ArgumentParser(description="Verify email addresses") + parser.add_argument("--table", default="fmcsa_carriers", help="Table to verify") + parser.add_argument("--limit", type=int, default=None, help="Limit records") + parser.add_argument("--dry-run", action="store_true", help="Check but don't save") + parser.add_argument("--email", type=str, help="Verify a single email") + args = parser.parse_args() + + if args.email: + is_valid, reason = verify_email(args.email) + print(f"{args.email}: {'VALID' if is_valid else 'INVALID'} ({reason})") + return + + stats = verify_table(args.table, limit=args.limit, dry_run=args.dry_run) + + LOG.info("=== Verification Results ===") + for k, v in stats.items(): + LOG.info(" %s: %s", k, v) + + if stats["total"] > 0: + valid_pct = stats["valid"] / stats["total"] * 100 + LOG.info(" Valid rate: %.1f%%", valid_pct) + + +if __name__ == "__main__": + main() diff --git a/scripts/workers/fmcsa_deficiency_flagger.py b/scripts/workers/fmcsa_deficiency_flagger.py index 4f9b2a8..00cf740 100644 --- a/scripts/workers/fmcsa_deficiency_flagger.py +++ b/scripts/workers/fmcsa_deficiency_flagger.py @@ -106,21 +106,21 @@ def flag_carriers(dry_run: bool = False) -> dict: # MCS-150 check if mcs150_parsed: if mcs150_parsed < FOUR_YEARS_AGO.date(): + # Skip severely overdue (4+ years) — likely abandoned, spam trap risk flags.append("mcs150_severely_overdue") - issues.append(f"MCS-150 severely overdue (last filed {mcs150_parsed})") - severity = "critical" stats["mcs150_severe"] += 1 stats["mcs150_overdue"] += 1 + # Mark as NOT campaign eligible — too old + severity = "stale" elif mcs150_parsed < TWO_YEARS_AGO.date(): flags.append("mcs150_overdue") - issues.append(f"MCS-150 overdue (last filed {mcs150_parsed})") + issues.append(f"MCS-150 biennial update is overdue — last filed {mcs150_parsed}. Required every 2 years. Fines up to $1,000/day.") severity = "major" if severity == "none" else severity stats["mcs150_overdue"] += 1 else: - # No date — can't determine, flag as unknown + # No date — skip for campaign (can't verify recency) flags.append("mcs150_unknown") - issues.append("MCS-150 filing date unknown — verify at FMCSA Portal") - severity = "minor" if severity == "none" else severity + severity = "stale" # For-hire carrier — needs BOC-3, UCR, insurance if for_hire: @@ -153,9 +153,10 @@ def flag_carriers(dry_run: bool = False) -> dict: if for_hire and "mcs150_overdue" in flags: severity = "critical" - actionable_flags = [f for f in flags if f != "zero_fleet"] + # Exclude stale/severely overdue from campaign (spam trap risk) + actionable_flags = [f for f in flags if f not in ("zero_fleet", "mcs150_severely_overdue", "mcs150_unknown")] deficiency_count = len(actionable_flags) - campaign_eligible = deficiency_count > 0 and bool(email) + campaign_eligible = deficiency_count > 0 and bool(email) and severity != "stale" if campaign_eligible: stats["campaign_eligible"] += 1 diff --git a/site/src/lib/intake_manifest.ts b/site/src/lib/intake_manifest.ts index 6425586..fae6579 100644 --- a/site/src/lib/intake_manifest.ts +++ b/site/src/lib/intake_manifest.ts @@ -102,14 +102,16 @@ export const INTAKE_MANIFEST: Record = { ], // ── DOT / FMCSA Motor Carrier Services ────────────────────────── - "mcs150-update": ["entity", "review", "payment"], - "boc3-filing": ["entity", "review", "payment"], - "ucr-registration": ["entity", "review", "payment"], - "dot-registration": ["entity", "review", "payment"], - "mc-authority": ["entity", "review", "payment"], - "dot-drug-alcohol": ["entity", "review", "payment"], - "dot-audit-prep": ["entity", "review", "payment"], - "dot-full-compliance": ["entity", "review", "payment"], + // Review-only intake: DOT#, name, email already collected at checkout. + // Admin handles the filing using the info from the order. + "mcs150-update": ["review"], + "boc3-filing": ["review"], + "ucr-registration": ["review"], + "dot-registration": ["review"], + "mc-authority": ["review"], + "dot-drug-alcohol": ["review"], + "dot-audit-prep": ["review"], + "dot-full-compliance": ["review"], }; // Category-gated dynamic steps. The Wizard inserts these after the `category`