trucking: weekly FMCSA source refresh so new non-compliant carriers are caught
The FMCSA census was a one-time snapshot (last loaded ~May 30) with NO refresh timer -- carriers newly falling out of MCS-150/UCR compliance were never picked up. New scripts/workers/fmcsa_source_refresh.py orchestrates the full pipeline (census download -> enrichment -> deficiency flag -> verify new emails -> MX-tag new) and runs weekly via cron pw-fmcsa-refresh (Sun 09:00 UTC), codified in the mail-pipeline Ansible role. Idempotent + incremental: the census upsert preserves email_verified / listmonk_sent_at / deficiency_flags, so existing carriers keep their send state and only census fields refresh; new DOTs flow into verification then campaigns. A carrier who refiled gets a fresh mcs150_parsed, so the builder's overdue WHERE clause stops targeting them automatically. Verify is capped per run (20k) so it never stalls on millions of rows. (Healthcare already auto-catches newly-revalidation-overdue providers within its 63k institutional pool via pw-hc-refresh Mon/Wed/Fri.)
This commit is contained in:
parent
4171f48736
commit
899b880e7f
4 changed files with 144 additions and 0 deletions
|
|
@ -196,6 +196,10 @@ api/
|
||||||
|
|
||||||
scripts/workers/
|
scripts/workers/
|
||||||
fmcsa_census_downloader.py — Socrata API bulk download
|
fmcsa_census_downloader.py — Socrata API bulk download
|
||||||
|
fmcsa_source_refresh.py — WEEKLY refresh orchestrator (census ->
|
||||||
|
enrichment -> flag -> verify -> mx-tag);
|
||||||
|
cron pw-fmcsa-refresh (Sun 09:00 UTC), so
|
||||||
|
newly-non-compliant carriers are auto-caught
|
||||||
fmcsa_deficiency_flagger.py — flag carriers + populate Listmonk lists
|
fmcsa_deficiency_flagger.py — flag carriers + populate Listmonk lists
|
||||||
services/mcs150_update.py — MCS-150 handler (admin todo)
|
services/mcs150_update.py — MCS-150 handler (admin todo)
|
||||||
services/boc3_filing.py — BOC-3 handler (Playwright + admin fallback)
|
services/boc3_filing.py — BOC-3 handler (Playwright + admin fallback)
|
||||||
|
|
|
||||||
|
|
@ -112,6 +112,7 @@
|
||||||
- pw-hc-campaign
|
- pw-hc-campaign
|
||||||
- pw-hc-nppes
|
- pw-hc-nppes
|
||||||
- pw-hc-refresh
|
- pw-hc-refresh
|
||||||
|
- pw-fmcsa-refresh
|
||||||
- pw-mta-warmup
|
- pw-mta-warmup
|
||||||
- pw-listmonk-rampcap
|
- pw-listmonk-rampcap
|
||||||
- pw-hc-rampcap
|
- pw-hc-rampcap
|
||||||
|
|
|
||||||
12
infra/cron/pw-fmcsa-refresh
Normal file
12
infra/cron/pw-fmcsa-refresh
Normal file
|
|
@ -0,0 +1,12 @@
|
||||||
|
# Weekly FMCSA trucking-source refresh. Re-ingests the full FMCSA motor-carrier
|
||||||
|
# census from the live Socrata API and re-runs enrichment -> flagging ->
|
||||||
|
# verification -> MX-tagging, so the daily campaign builders automatically catch
|
||||||
|
# carriers who NEWLY fell out of compliance (e.g. an MCS-150 update that just
|
||||||
|
# lapsed) and drop carriers who have since refiled. Idempotent + incremental:
|
||||||
|
# the upsert preserves email_verified / listmonk_sent_at / deficiency_flags, so
|
||||||
|
# existing carriers keep their send state and only census fields refresh; new
|
||||||
|
# DOTs flow into verification then campaigns. The original census load was a
|
||||||
|
# one-time snapshot with no refresh timer -- this closes that gap. Runs Sunday
|
||||||
|
# 09:00 UTC (off-peak, well before Monday's 08:00 UTC trucking builder).
|
||||||
|
# Takes a while (full ~2M-row download + verify batch), so it runs off-peak.
|
||||||
|
0 9 * * 0 deploy cd /opt/performancewest && docker compose exec -T workers python3 -m scripts.workers.fmcsa_source_refresh >> /opt/performancewest/logs/pw-fmcsa-refresh.log 2>&1
|
||||||
127
scripts/workers/fmcsa_source_refresh.py
Normal file
127
scripts/workers/fmcsa_source_refresh.py
Normal file
|
|
@ -0,0 +1,127 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Weekly FMCSA trucking-source refresh.
|
||||||
|
|
||||||
|
Re-ingests the FMCSA motor-carrier census from the live Socrata API and re-runs
|
||||||
|
the enrichment -> flagging -> verification -> MX-tagging pipeline so the trucking
|
||||||
|
campaigns automatically catch carriers who have NEWLY fallen out of compliance
|
||||||
|
(e.g. an MCS-150 biennial update that just lapsed) and drop carriers who have
|
||||||
|
since refiled.
|
||||||
|
|
||||||
|
WHY THIS EXISTS: the original census load was a one-time snapshot (last loaded
|
||||||
|
~May 30 2026). Nothing re-ingested it on a timer, so the trucking audience went
|
||||||
|
stale -- new prospects falling out of compliance were never picked up. This is
|
||||||
|
the trucking analogue of the healthcare hc_data_refresh.
|
||||||
|
|
||||||
|
SAFETY: every step is idempotent and incremental.
|
||||||
|
- census_downloader upserts ON CONFLICT (dot_number) and does NOT touch
|
||||||
|
email_verified / email_verify_result / listmonk_sent_at / deficiency_flags,
|
||||||
|
so existing carriers keep their send + verification state; only census
|
||||||
|
fields (legal_name, mcs150_date, fleet, address, ...) refresh, and brand-new
|
||||||
|
DOTs are inserted with NULL verify/sent state.
|
||||||
|
- enrichment refreshes OOS (out-of-service) order data.
|
||||||
|
- deficiency_flagger re-derives deficiency_flags / campaign_eligible from the
|
||||||
|
refreshed census. Run WITHOUT --listmonk: the daily campaign builders pull
|
||||||
|
their own audiences straight from fmcsa_carriers, so we never push the whole
|
||||||
|
eligible set into a Listmonk list here.
|
||||||
|
- email_verifier only checks rows with email_verified IS NULL (i.e. the newly
|
||||||
|
inserted carriers), capped per run so a refresh never stalls on millions of
|
||||||
|
rows. Catch-up across runs is fine.
|
||||||
|
- mx_tag_carriers only tags rows with mx_provider IS NULL (newly inserted).
|
||||||
|
|
||||||
|
A carrier who refiled their MCS-150 gets a fresh mcs150_parsed, so the builder's
|
||||||
|
"overdue > 2 years" WHERE clause stops targeting them automatically -- no extra
|
||||||
|
suppression needed.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 -m scripts.workers.fmcsa_source_refresh # full weekly refresh
|
||||||
|
python3 -m scripts.workers.fmcsa_source_refresh --dry-run # plan only, no writes
|
||||||
|
python3 -m scripts.workers.fmcsa_source_refresh --skip-census # re-run downstream only
|
||||||
|
|
||||||
|
Run inside the workers container (it has psycopg2 + DATABASE_URL):
|
||||||
|
docker compose exec -T workers python3 -m scripts.workers.fmcsa_source_refresh
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
LOG = logging.getLogger("workers.fmcsa_source_refresh")
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
||||||
|
stream=sys.stdout,
|
||||||
|
)
|
||||||
|
|
||||||
|
# How many newly-inserted carriers to email-verify + MX-tag per refresh run.
|
||||||
|
# The verifier does live SMTP/MX probing, so this bounds wall-clock; the rest
|
||||||
|
# catch up on subsequent runs (email_verified / mx_provider stay NULL until done).
|
||||||
|
VERIFY_LIMIT = 20000
|
||||||
|
MX_DOMAIN_LIMIT = 8000
|
||||||
|
|
||||||
|
|
||||||
|
def run_step(name: str, argv: list[str], dry_run: bool) -> None:
|
||||||
|
"""Run one pipeline step as a subprocess, logging + timing it.
|
||||||
|
|
||||||
|
A failed step is logged but does NOT abort the rest: a transient Socrata or
|
||||||
|
SMTP hiccup in (say) verification must not prevent flagging from running.
|
||||||
|
"""
|
||||||
|
pretty = " ".join(argv)
|
||||||
|
if dry_run:
|
||||||
|
LOG.info("[DRY-RUN] would run: %s (%s)", pretty, name)
|
||||||
|
return
|
||||||
|
LOG.info("=== %s: %s", name, pretty)
|
||||||
|
t0 = time.time()
|
||||||
|
try:
|
||||||
|
subprocess.run(argv, check=True)
|
||||||
|
LOG.info("--- %s OK (%.0fs)", name, time.time() - t0)
|
||||||
|
except subprocess.CalledProcessError as exc:
|
||||||
|
LOG.error("--- %s FAILED (rc=%s, %.0fs) -- continuing", name, exc.returncode, time.time() - t0)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
ap = argparse.ArgumentParser(description="Weekly FMCSA source refresh")
|
||||||
|
ap.add_argument("--dry-run", action="store_true", help="Plan only, no writes")
|
||||||
|
ap.add_argument("--skip-census", action="store_true",
|
||||||
|
help="Skip the census download; re-run downstream steps only")
|
||||||
|
ap.add_argument("--verify-limit", type=int, default=VERIFY_LIMIT,
|
||||||
|
help="Max new carriers to email-verify this run")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
py = [sys.executable, "-m"]
|
||||||
|
|
||||||
|
LOG.info("FMCSA source refresh starting (dry_run=%s)", args.dry_run)
|
||||||
|
t0 = time.time()
|
||||||
|
|
||||||
|
# 1. Re-download the full census (upsert; preserves verify/sent/flags state).
|
||||||
|
if not args.skip_census:
|
||||||
|
run_step("census download",
|
||||||
|
py + ["scripts.workers.fmcsa_census_downloader"], args.dry_run)
|
||||||
|
|
||||||
|
# 2. Refresh enrichment (out-of-service orders etc.).
|
||||||
|
run_step("enrichment (OOS)",
|
||||||
|
py + ["scripts.workers.fmcsa_enrichment", "--oos"], args.dry_run)
|
||||||
|
|
||||||
|
# 3. Re-derive deficiency flags from the refreshed census (NO --listmonk:
|
||||||
|
# the daily builders pull their own audiences from fmcsa_carriers).
|
||||||
|
run_step("deficiency flagging",
|
||||||
|
py + ["scripts.workers.fmcsa_deficiency_flagger"], args.dry_run)
|
||||||
|
|
||||||
|
# 4. Verify newly-inserted emails (email_verified IS NULL), capped per run.
|
||||||
|
run_step("email verification (new rows)",
|
||||||
|
py + ["scripts.workers.email_verifier", "--table", "fmcsa_carriers",
|
||||||
|
"--limit", str(args.verify_limit)], args.dry_run)
|
||||||
|
|
||||||
|
# 5. MX-tag newly-inserted carriers (mx_provider IS NULL).
|
||||||
|
run_step("MX tagging (new rows)",
|
||||||
|
[sys.executable, "scripts/mx_tag_carriers.py",
|
||||||
|
"--limit-domains", str(MX_DOMAIN_LIMIT)], args.dry_run)
|
||||||
|
|
||||||
|
LOG.info("FMCSA source refresh done (%.0fs total)", time.time() - t0)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue