#!/usr/bin/env python3 """Weekly FMCSA trucking-source refresh. Re-ingests the FMCSA motor-carrier census from the live Socrata API and re-runs the enrichment -> flagging -> verification -> MX-tagging pipeline so the trucking campaigns automatically catch carriers who have NEWLY fallen out of compliance (e.g. an MCS-150 biennial update that just lapsed) and drop carriers who have since refiled. WHY THIS EXISTS: the original census load was a one-time snapshot (last loaded ~May 30 2026). Nothing re-ingested it on a timer, so the trucking audience went stale -- new prospects falling out of compliance were never picked up. This is the trucking analogue of the healthcare hc_data_refresh. SAFETY: every step is idempotent and incremental. - census_downloader upserts ON CONFLICT (dot_number) and does NOT touch email_verified / email_verify_result / listmonk_sent_at / deficiency_flags, so existing carriers keep their send + verification state; only census fields (legal_name, mcs150_date, fleet, address, ...) refresh, and brand-new DOTs are inserted with NULL verify/sent state. - enrichment refreshes OOS (out-of-service) order data. - deficiency_flagger re-derives deficiency_flags / campaign_eligible from the refreshed census. Run WITHOUT --listmonk: the daily campaign builders pull their own audiences straight from fmcsa_carriers, so we never push the whole eligible set into a Listmonk list here. - email_verifier only checks rows with email_verified IS NULL (i.e. the newly inserted carriers), capped per run so a refresh never stalls on millions of rows. Catch-up across runs is fine. - mx_tag_carriers only tags rows with mx_provider IS NULL (newly inserted). A carrier who refiled their MCS-150 gets a fresh mcs150_parsed, so the builder's "overdue > 2 years" WHERE clause stops targeting them automatically -- no extra suppression needed. Usage: python3 -m scripts.workers.fmcsa_source_refresh # full weekly refresh python3 -m scripts.workers.fmcsa_source_refresh --dry-run # plan only, no writes python3 -m scripts.workers.fmcsa_source_refresh --skip-census # re-run downstream only Run inside the workers container (it has psycopg2 + DATABASE_URL): docker compose exec -T workers python3 -m scripts.workers.fmcsa_source_refresh """ from __future__ import annotations import argparse import logging import subprocess import sys import time LOG = logging.getLogger("workers.fmcsa_source_refresh") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout, ) # How many newly-inserted carriers to email-verify + MX-tag per refresh run. # The verifier does live SMTP/MX probing, so this bounds wall-clock; the rest # catch up on subsequent runs (email_verified / mx_provider stay NULL until done). VERIFY_LIMIT = 20000 MX_DOMAIN_LIMIT = 8000 def run_step(name: str, argv: list[str], dry_run: bool) -> None: """Run one pipeline step as a subprocess, logging + timing it. A failed step is logged but does NOT abort the rest: a transient Socrata or SMTP hiccup in (say) verification must not prevent flagging from running. """ pretty = " ".join(argv) if dry_run: LOG.info("[DRY-RUN] would run: %s (%s)", pretty, name) return LOG.info("=== %s: %s", name, pretty) t0 = time.time() try: subprocess.run(argv, check=True) LOG.info("--- %s OK (%.0fs)", name, time.time() - t0) except subprocess.CalledProcessError as exc: LOG.error("--- %s FAILED (rc=%s, %.0fs) -- continuing", name, exc.returncode, time.time() - t0) def main() -> None: ap = argparse.ArgumentParser(description="Weekly FMCSA source refresh") ap.add_argument("--dry-run", action="store_true", help="Plan only, no writes") ap.add_argument("--skip-census", action="store_true", help="Skip the census download; re-run downstream steps only") ap.add_argument("--verify-limit", type=int, default=VERIFY_LIMIT, help="Max new carriers to email-verify this run") args = ap.parse_args() py = [sys.executable, "-m"] LOG.info("FMCSA source refresh starting (dry_run=%s)", args.dry_run) t0 = time.time() # 1. Re-download the full census (upsert; preserves verify/sent/flags state). if not args.skip_census: run_step("census download", py + ["scripts.workers.fmcsa_census_downloader"], args.dry_run) # 2. Refresh enrichment (out-of-service orders etc.). run_step("enrichment (OOS)", py + ["scripts.workers.fmcsa_enrichment", "--oos"], args.dry_run) # 3. Re-derive deficiency flags from the refreshed census (NO --listmonk: # the daily builders pull their own audiences from fmcsa_carriers). run_step("deficiency flagging", py + ["scripts.workers.fmcsa_deficiency_flagger"], args.dry_run) # 4. Verify newly-inserted emails (email_verified IS NULL), capped per run. run_step("email verification (new rows)", py + ["scripts.workers.email_verifier", "--table", "fmcsa_carriers", "--limit", str(args.verify_limit)], args.dry_run) # 5. MX-tag newly-inserted carriers (mx_provider IS NULL). run_step("MX tagging (new rows)", [sys.executable, "scripts/mx_tag_carriers.py", "--limit-domains", str(MX_DOMAIN_LIMIT)], args.dry_run) LOG.info("FMCSA source refresh done (%.0fs total)", time.time() - t0) if __name__ == "__main__": main()