From 2dacf1ea0ebb2be077ffb2e103d82c2efe64d84b Mon Sep 17 00:00:00 2001 From: justin Date: Sat, 30 May 2026 14:46:40 -0500 Subject: [PATCH] FMCSA enrichment: OOS orders bulk download + authority/insurance API lookup - Downloads 389K OOS orders from Socrata, merges into fmcsa_carriers - Batch enriches authority status + insurance filing via FMCSA API - Adds columns: oos_active, authority_status, insurance_*_on_file, etc. - Rate limited to 1 req/sec for API calls - Prioritizes campaign-eligible for-hire carriers Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/workers/fmcsa_enrichment.py | 253 ++++++++++++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 scripts/workers/fmcsa_enrichment.py diff --git a/scripts/workers/fmcsa_enrichment.py b/scripts/workers/fmcsa_enrichment.py new file mode 100644 index 0000000..1c26441 --- /dev/null +++ b/scripts/workers/fmcsa_enrichment.py @@ -0,0 +1,253 @@ +#!/usr/bin/env python3 +""" +Enrich fmcsa_carriers with additional FMCSA datasets. + +Downloads and merges: + 1. OOS Orders (p2mt-9ige) — 389K records, carriers under out-of-service orders + 2. FMCSA QCMobile API — batch lookup for insurance/authority/BOC-3 status + (sampled, not all 2M — focus on campaign-eligible carriers) + +Usage: + python3 -m scripts.workers.fmcsa_enrichment --oos # download OOS orders + python3 -m scripts.workers.fmcsa_enrichment --authority # sample authority/insurance via API + python3 -m scripts.workers.fmcsa_enrichment --all # everything + python3 -m scripts.workers.fmcsa_enrichment --dry-run # count only +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import sys +import time +import urllib.request +from datetime import datetime + +import psycopg2 +from psycopg2.extras import execute_values + +LOG = logging.getLogger("workers.fmcsa_enrichment") +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout) + +DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") +FMCSA_API_KEY = os.environ.get("FMCSA_API_KEY", "") +SOCRATA_BASE = "https://data.transportation.gov/resource" +FMCSA_API_BASE = "https://mobile.fmcsa.dot.gov/qc/services/carriers" + + +def ensure_columns(conn): + """Add enrichment columns if they don't exist.""" + cur = conn.cursor() + columns = [ + "oos_active BOOLEAN DEFAULT FALSE", + "oos_date DATE", + "oos_reason TEXT", + "authority_status TEXT", + "common_authority TEXT", + "contract_authority TEXT", + "broker_authority TEXT", + "insurance_bipd_required BOOLEAN", + "insurance_bipd_on_file BOOLEAN", + "insurance_cargo_required BOOLEAN", + "insurance_cargo_on_file BOOLEAN", + "insurance_bond_required BOOLEAN", + "insurance_bond_on_file BOOLEAN", + "enriched_at TIMESTAMPTZ", + ] + for col in columns: + col_name = col.split()[0] + try: + cur.execute(f"ALTER TABLE fmcsa_carriers ADD COLUMN IF NOT EXISTS {col}") + except Exception: + conn.rollback() + conn.commit() + LOG.info("Enrichment columns ensured") + + +def download_oos(dry_run=False): + """Download OOS Orders dataset and merge into fmcsa_carriers.""" + LOG.info("Downloading OOS Orders...") + + offset = 0 + batch_size = 50000 + total = 0 + records = [] + + while True: + url = f"{SOCRATA_BASE}/p2mt-9ige.json?$limit={batch_size}&$offset={offset}&$order=:id" + LOG.info(" Fetching offset=%d ...", offset) + try: + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + data = json.loads(urllib.request.urlopen(req, timeout=30).read()) + except Exception as e: + LOG.error(" API error: %s", e) + break + + if not data: + break + + for rec in data: + dot = rec.get("dot_number", "").strip() + if not dot: + continue + records.append(( + rec.get("status", "").upper() == "ACTIVE", + rec.get("oos_date", "").strip()[:10] or None, + rec.get("oos_reason", "").strip() or None, + dot, + )) + + total += len(data) + offset += len(data) + + if len(data) < batch_size: + break + + LOG.info("Downloaded %d OOS records, %d with DOT#", total, len(records)) + + if dry_run or not records: + return + + conn = psycopg2.connect(DATABASE_URL) + ensure_columns(conn) + cur = conn.cursor() + + # Batch update + chunk_size = 5000 + updated = 0 + for i in range(0, len(records), chunk_size): + chunk = records[i:i+chunk_size] + for active, oos_date, reason, dot in chunk: + cur.execute(""" + UPDATE fmcsa_carriers SET + oos_active = %s, + oos_date = %s, + oos_reason = %s + WHERE dot_number = %s + """, (active, oos_date, reason, dot)) + conn.commit() + updated += len(chunk) + LOG.info(" Updated %d / %d", updated, len(records)) + + conn.close() + LOG.info("OOS enrichment complete: %d carriers updated", updated) + + +def enrich_authority_batch(limit=10000, dry_run=False): + """Enrich campaign-eligible carriers with authority/insurance via FMCSA API. + + Rate limited to ~1 req/sec to avoid API throttling. + """ + if not FMCSA_API_KEY: + LOG.error("FMCSA_API_KEY not set") + return + + conn = psycopg2.connect(DATABASE_URL) + ensure_columns(conn) + cur = conn.cursor() + + # Get carriers that haven't been enriched yet, prioritize campaign-eligible + cur.execute(""" + SELECT dot_number FROM fmcsa_carriers + WHERE campaign_eligible = TRUE + AND enriched_at IS NULL + AND authorized_for_hire = TRUE + ORDER BY deficiency_count DESC NULLS LAST + LIMIT %s + """, (limit,)) + + dots = [r[0] for r in cur.fetchall()] + LOG.info("Enriching %d carriers via FMCSA API...", len(dots)) + + if dry_run: + conn.close() + return + + enriched = 0 + errors = 0 + + for dot in dots: + try: + url = f"{FMCSA_API_BASE}/{dot}?webKey={FMCSA_API_KEY}" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + resp = urllib.request.urlopen(req, timeout=10) + data = json.loads(resp.read()) + + carrier = data.get("content", {}).get("carrier", {}) + if not carrier: + continue + + allowed = carrier.get("allowedToOperate", "") + common = carrier.get("commonAuthorityStatus", "") + contract = carrier.get("contractAuthorityStatus", "") + broker = carrier.get("brokerAuthorityStatus", "") + + bipd_req = carrier.get("bipdInsuranceRequired") == "Y" + bipd_on = bool(carrier.get("bipdInsuranceOnFile")) and carrier.get("bipdInsuranceOnFile") != "0" + cargo_req = carrier.get("cargoInsuranceRequired") == "Y" + cargo_on = bool(carrier.get("cargoInsuranceOnFile")) and carrier.get("cargoInsuranceOnFile") != "0" + bond_req = carrier.get("bondInsuranceRequired") == "Y" + bond_on = bool(carrier.get("bondInsuranceOnFile")) and carrier.get("bondInsuranceOnFile") != "0" + + cur.execute(""" + UPDATE fmcsa_carriers SET + authority_status = %s, + common_authority = %s, + contract_authority = %s, + broker_authority = %s, + insurance_bipd_required = %s, + insurance_bipd_on_file = %s, + insurance_cargo_required = %s, + insurance_cargo_on_file = %s, + insurance_bond_required = %s, + insurance_bond_on_file = %s, + enriched_at = NOW() + WHERE dot_number = %s + """, ( + allowed, common, contract, broker, + bipd_req, bipd_on, cargo_req, cargo_on, bond_req, bond_on, + dot, + )) + enriched += 1 + + if enriched % 100 == 0: + conn.commit() + LOG.info(" Enriched %d / %d (errors: %d)", enriched, len(dots), errors) + + # Rate limit: ~1 req/sec + time.sleep(0.5) + + except Exception as e: + errors += 1 + if errors % 50 == 0: + LOG.warning(" %d errors so far, last: %s", errors, e) + time.sleep(1) + + conn.commit() + conn.close() + LOG.info("Authority enrichment complete: %d enriched, %d errors", enriched, errors) + + +def main(): + parser = argparse.ArgumentParser(description="Enrich FMCSA carrier data") + parser.add_argument("--oos", action="store_true", help="Download OOS orders") + parser.add_argument("--authority", action="store_true", help="Enrich authority/insurance via API") + parser.add_argument("--all", action="store_true", help="Run all enrichments") + parser.add_argument("--limit", type=int, default=10000, help="Limit for API enrichment") + parser.add_argument("--dry-run", action="store_true", help="Count only") + args = parser.parse_args() + + if args.all or args.oos: + download_oos(dry_run=args.dry_run) + + if args.all or args.authority: + enrich_authority_batch(limit=args.limit, dry_run=args.dry_run) + + if not args.all and not args.oos and not args.authority: + print("Specify --oos, --authority, or --all") + + +if __name__ == "__main__": + main()