From 899b880e7fb9194e63c9f70290d8a152b5c4b179 Mon Sep 17 00:00:00 2001 From: justin Date: Wed, 17 Jun 2026 20:44:54 -0500 Subject: [PATCH] trucking: weekly FMCSA source refresh so new non-compliant carriers are caught The FMCSA census was a one-time snapshot (last loaded ~May 30) with NO refresh timer -- carriers newly falling out of MCS-150/UCR compliance were never picked up. New scripts/workers/fmcsa_source_refresh.py orchestrates the full pipeline (census download -> enrichment -> deficiency flag -> verify new emails -> MX-tag new) and runs weekly via cron pw-fmcsa-refresh (Sun 09:00 UTC), codified in the mail-pipeline Ansible role. Idempotent + incremental: the census upsert preserves email_verified / listmonk_sent_at / deficiency_flags, so existing carriers keep their send state and only census fields refresh; new DOTs flow into verification then campaigns. A carrier who refiled gets a fresh mcs150_parsed, so the builder's overdue WHERE clause stops targeting them automatically. Verify is capped per run (20k) so it never stalls on millions of rows. (Healthcare already auto-catches newly-revalidation-overdue providers within its 63k institutional pool via pw-hc-refresh Mon/Wed/Fri.) --- docs/trucking-system.md | 4 + .../roles/mail-pipeline/tasks/main.yml | 1 + infra/cron/pw-fmcsa-refresh | 12 ++ scripts/workers/fmcsa_source_refresh.py | 127 ++++++++++++++++++ 4 files changed, 144 insertions(+) create mode 100644 infra/cron/pw-fmcsa-refresh create mode 100644 scripts/workers/fmcsa_source_refresh.py diff --git a/docs/trucking-system.md b/docs/trucking-system.md index 3279cfc..1c54062 100644 --- a/docs/trucking-system.md +++ b/docs/trucking-system.md @@ -196,6 +196,10 @@ api/ scripts/workers/ fmcsa_census_downloader.py — Socrata API bulk download + fmcsa_source_refresh.py — WEEKLY refresh orchestrator (census -> + enrichment -> flag -> verify -> mx-tag); + cron pw-fmcsa-refresh (Sun 09:00 UTC), so + newly-non-compliant carriers are auto-caught fmcsa_deficiency_flagger.py — flag carriers + populate Listmonk lists services/mcs150_update.py — MCS-150 handler (admin todo) services/boc3_filing.py — BOC-3 handler (Playwright + admin fallback) diff --git a/infra/ansible/roles/mail-pipeline/tasks/main.yml b/infra/ansible/roles/mail-pipeline/tasks/main.yml index 0b478e0..c0b73d3 100644 --- a/infra/ansible/roles/mail-pipeline/tasks/main.yml +++ b/infra/ansible/roles/mail-pipeline/tasks/main.yml @@ -112,6 +112,7 @@ - pw-hc-campaign - pw-hc-nppes - pw-hc-refresh + - pw-fmcsa-refresh - pw-mta-warmup - pw-listmonk-rampcap - pw-hc-rampcap diff --git a/infra/cron/pw-fmcsa-refresh b/infra/cron/pw-fmcsa-refresh new file mode 100644 index 0000000..fc27430 --- /dev/null +++ b/infra/cron/pw-fmcsa-refresh @@ -0,0 +1,12 @@ +# Weekly FMCSA trucking-source refresh. Re-ingests the full FMCSA motor-carrier +# census from the live Socrata API and re-runs enrichment -> flagging -> +# verification -> MX-tagging, so the daily campaign builders automatically catch +# carriers who NEWLY fell out of compliance (e.g. an MCS-150 update that just +# lapsed) and drop carriers who have since refiled. Idempotent + incremental: +# the upsert preserves email_verified / listmonk_sent_at / deficiency_flags, so +# existing carriers keep their send state and only census fields refresh; new +# DOTs flow into verification then campaigns. The original census load was a +# one-time snapshot with no refresh timer -- this closes that gap. Runs Sunday +# 09:00 UTC (off-peak, well before Monday's 08:00 UTC trucking builder). +# Takes a while (full ~2M-row download + verify batch), so it runs off-peak. +0 9 * * 0 deploy cd /opt/performancewest && docker compose exec -T workers python3 -m scripts.workers.fmcsa_source_refresh >> /opt/performancewest/logs/pw-fmcsa-refresh.log 2>&1 diff --git a/scripts/workers/fmcsa_source_refresh.py b/scripts/workers/fmcsa_source_refresh.py new file mode 100644 index 0000000..9b8e831 --- /dev/null +++ b/scripts/workers/fmcsa_source_refresh.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +"""Weekly FMCSA trucking-source refresh. + +Re-ingests the FMCSA motor-carrier census from the live Socrata API and re-runs +the enrichment -> flagging -> verification -> MX-tagging pipeline so the trucking +campaigns automatically catch carriers who have NEWLY fallen out of compliance +(e.g. an MCS-150 biennial update that just lapsed) and drop carriers who have +since refiled. + +WHY THIS EXISTS: the original census load was a one-time snapshot (last loaded +~May 30 2026). Nothing re-ingested it on a timer, so the trucking audience went +stale -- new prospects falling out of compliance were never picked up. This is +the trucking analogue of the healthcare hc_data_refresh. + +SAFETY: every step is idempotent and incremental. + - census_downloader upserts ON CONFLICT (dot_number) and does NOT touch + email_verified / email_verify_result / listmonk_sent_at / deficiency_flags, + so existing carriers keep their send + verification state; only census + fields (legal_name, mcs150_date, fleet, address, ...) refresh, and brand-new + DOTs are inserted with NULL verify/sent state. + - enrichment refreshes OOS (out-of-service) order data. + - deficiency_flagger re-derives deficiency_flags / campaign_eligible from the + refreshed census. Run WITHOUT --listmonk: the daily campaign builders pull + their own audiences straight from fmcsa_carriers, so we never push the whole + eligible set into a Listmonk list here. + - email_verifier only checks rows with email_verified IS NULL (i.e. the newly + inserted carriers), capped per run so a refresh never stalls on millions of + rows. Catch-up across runs is fine. + - mx_tag_carriers only tags rows with mx_provider IS NULL (newly inserted). + +A carrier who refiled their MCS-150 gets a fresh mcs150_parsed, so the builder's +"overdue > 2 years" WHERE clause stops targeting them automatically -- no extra +suppression needed. + +Usage: + python3 -m scripts.workers.fmcsa_source_refresh # full weekly refresh + python3 -m scripts.workers.fmcsa_source_refresh --dry-run # plan only, no writes + python3 -m scripts.workers.fmcsa_source_refresh --skip-census # re-run downstream only + +Run inside the workers container (it has psycopg2 + DATABASE_URL): + docker compose exec -T workers python3 -m scripts.workers.fmcsa_source_refresh +""" + +from __future__ import annotations + +import argparse +import logging +import subprocess +import sys +import time + +LOG = logging.getLogger("workers.fmcsa_source_refresh") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + stream=sys.stdout, +) + +# How many newly-inserted carriers to email-verify + MX-tag per refresh run. +# The verifier does live SMTP/MX probing, so this bounds wall-clock; the rest +# catch up on subsequent runs (email_verified / mx_provider stay NULL until done). +VERIFY_LIMIT = 20000 +MX_DOMAIN_LIMIT = 8000 + + +def run_step(name: str, argv: list[str], dry_run: bool) -> None: + """Run one pipeline step as a subprocess, logging + timing it. + + A failed step is logged but does NOT abort the rest: a transient Socrata or + SMTP hiccup in (say) verification must not prevent flagging from running. + """ + pretty = " ".join(argv) + if dry_run: + LOG.info("[DRY-RUN] would run: %s (%s)", pretty, name) + return + LOG.info("=== %s: %s", name, pretty) + t0 = time.time() + try: + subprocess.run(argv, check=True) + LOG.info("--- %s OK (%.0fs)", name, time.time() - t0) + except subprocess.CalledProcessError as exc: + LOG.error("--- %s FAILED (rc=%s, %.0fs) -- continuing", name, exc.returncode, time.time() - t0) + + +def main() -> None: + ap = argparse.ArgumentParser(description="Weekly FMCSA source refresh") + ap.add_argument("--dry-run", action="store_true", help="Plan only, no writes") + ap.add_argument("--skip-census", action="store_true", + help="Skip the census download; re-run downstream steps only") + ap.add_argument("--verify-limit", type=int, default=VERIFY_LIMIT, + help="Max new carriers to email-verify this run") + args = ap.parse_args() + + py = [sys.executable, "-m"] + + LOG.info("FMCSA source refresh starting (dry_run=%s)", args.dry_run) + t0 = time.time() + + # 1. Re-download the full census (upsert; preserves verify/sent/flags state). + if not args.skip_census: + run_step("census download", + py + ["scripts.workers.fmcsa_census_downloader"], args.dry_run) + + # 2. Refresh enrichment (out-of-service orders etc.). + run_step("enrichment (OOS)", + py + ["scripts.workers.fmcsa_enrichment", "--oos"], args.dry_run) + + # 3. Re-derive deficiency flags from the refreshed census (NO --listmonk: + # the daily builders pull their own audiences from fmcsa_carriers). + run_step("deficiency flagging", + py + ["scripts.workers.fmcsa_deficiency_flagger"], args.dry_run) + + # 4. Verify newly-inserted emails (email_verified IS NULL), capped per run. + run_step("email verification (new rows)", + py + ["scripts.workers.email_verifier", "--table", "fmcsa_carriers", + "--limit", str(args.verify_limit)], args.dry_run) + + # 5. MX-tag newly-inserted carriers (mx_provider IS NULL). + run_step("MX tagging (new rows)", + [sys.executable, "scripts/mx_tag_carriers.py", + "--limit-domains", str(MX_DOMAIN_LIMIT)], args.dry_run) + + LOG.info("FMCSA source refresh done (%.0fs total)", time.time() - t0) + + +if __name__ == "__main__": + main()