new-site/scripts/workers/fmcsa_enrichment.py
justin 2dacf1ea0e FMCSA enrichment: OOS orders bulk download + authority/insurance API lookup
- Downloads 389K OOS orders from Socrata, merges into fmcsa_carriers
- Batch enriches authority status + insurance filing via FMCSA API
- Adds columns: oos_active, authority_status, insurance_*_on_file, etc.
- Rate limited to 1 req/sec for API calls
- Prioritizes campaign-eligible for-hire carriers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-30 14:46:40 -05:00

253 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""
Enrich fmcsa_carriers with additional FMCSA datasets.
Downloads and merges:
1. OOS Orders (p2mt-9ige) — 389K records, carriers under out-of-service orders
2. FMCSA QCMobile API — batch lookup for insurance/authority/BOC-3 status
(sampled, not all 2M — focus on campaign-eligible carriers)
Usage:
python3 -m scripts.workers.fmcsa_enrichment --oos # download OOS orders
python3 -m scripts.workers.fmcsa_enrichment --authority # sample authority/insurance via API
python3 -m scripts.workers.fmcsa_enrichment --all # everything
python3 -m scripts.workers.fmcsa_enrichment --dry-run # count only
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import time
import urllib.request
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_values
LOG = logging.getLogger("workers.fmcsa_enrichment")
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout)
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
FMCSA_API_KEY = os.environ.get("FMCSA_API_KEY", "")
SOCRATA_BASE = "https://data.transportation.gov/resource"
FMCSA_API_BASE = "https://mobile.fmcsa.dot.gov/qc/services/carriers"
def ensure_columns(conn):
"""Add enrichment columns if they don't exist."""
cur = conn.cursor()
columns = [
"oos_active BOOLEAN DEFAULT FALSE",
"oos_date DATE",
"oos_reason TEXT",
"authority_status TEXT",
"common_authority TEXT",
"contract_authority TEXT",
"broker_authority TEXT",
"insurance_bipd_required BOOLEAN",
"insurance_bipd_on_file BOOLEAN",
"insurance_cargo_required BOOLEAN",
"insurance_cargo_on_file BOOLEAN",
"insurance_bond_required BOOLEAN",
"insurance_bond_on_file BOOLEAN",
"enriched_at TIMESTAMPTZ",
]
for col in columns:
col_name = col.split()[0]
try:
cur.execute(f"ALTER TABLE fmcsa_carriers ADD COLUMN IF NOT EXISTS {col}")
except Exception:
conn.rollback()
conn.commit()
LOG.info("Enrichment columns ensured")
def download_oos(dry_run=False):
"""Download OOS Orders dataset and merge into fmcsa_carriers."""
LOG.info("Downloading OOS Orders...")
offset = 0
batch_size = 50000
total = 0
records = []
while True:
url = f"{SOCRATA_BASE}/p2mt-9ige.json?$limit={batch_size}&$offset={offset}&$order=:id"
LOG.info(" Fetching offset=%d ...", offset)
try:
req = urllib.request.Request(url, headers={"Accept": "application/json"})
data = json.loads(urllib.request.urlopen(req, timeout=30).read())
except Exception as e:
LOG.error(" API error: %s", e)
break
if not data:
break
for rec in data:
dot = rec.get("dot_number", "").strip()
if not dot:
continue
records.append((
rec.get("status", "").upper() == "ACTIVE",
rec.get("oos_date", "").strip()[:10] or None,
rec.get("oos_reason", "").strip() or None,
dot,
))
total += len(data)
offset += len(data)
if len(data) < batch_size:
break
LOG.info("Downloaded %d OOS records, %d with DOT#", total, len(records))
if dry_run or not records:
return
conn = psycopg2.connect(DATABASE_URL)
ensure_columns(conn)
cur = conn.cursor()
# Batch update
chunk_size = 5000
updated = 0
for i in range(0, len(records), chunk_size):
chunk = records[i:i+chunk_size]
for active, oos_date, reason, dot in chunk:
cur.execute("""
UPDATE fmcsa_carriers SET
oos_active = %s,
oos_date = %s,
oos_reason = %s
WHERE dot_number = %s
""", (active, oos_date, reason, dot))
conn.commit()
updated += len(chunk)
LOG.info(" Updated %d / %d", updated, len(records))
conn.close()
LOG.info("OOS enrichment complete: %d carriers updated", updated)
def enrich_authority_batch(limit=10000, dry_run=False):
"""Enrich campaign-eligible carriers with authority/insurance via FMCSA API.
Rate limited to ~1 req/sec to avoid API throttling.
"""
if not FMCSA_API_KEY:
LOG.error("FMCSA_API_KEY not set")
return
conn = psycopg2.connect(DATABASE_URL)
ensure_columns(conn)
cur = conn.cursor()
# Get carriers that haven't been enriched yet, prioritize campaign-eligible
cur.execute("""
SELECT dot_number FROM fmcsa_carriers
WHERE campaign_eligible = TRUE
AND enriched_at IS NULL
AND authorized_for_hire = TRUE
ORDER BY deficiency_count DESC NULLS LAST
LIMIT %s
""", (limit,))
dots = [r[0] for r in cur.fetchall()]
LOG.info("Enriching %d carriers via FMCSA API...", len(dots))
if dry_run:
conn.close()
return
enriched = 0
errors = 0
for dot in dots:
try:
url = f"{FMCSA_API_BASE}/{dot}?webKey={FMCSA_API_KEY}"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
resp = urllib.request.urlopen(req, timeout=10)
data = json.loads(resp.read())
carrier = data.get("content", {}).get("carrier", {})
if not carrier:
continue
allowed = carrier.get("allowedToOperate", "")
common = carrier.get("commonAuthorityStatus", "")
contract = carrier.get("contractAuthorityStatus", "")
broker = carrier.get("brokerAuthorityStatus", "")
bipd_req = carrier.get("bipdInsuranceRequired") == "Y"
bipd_on = bool(carrier.get("bipdInsuranceOnFile")) and carrier.get("bipdInsuranceOnFile") != "0"
cargo_req = carrier.get("cargoInsuranceRequired") == "Y"
cargo_on = bool(carrier.get("cargoInsuranceOnFile")) and carrier.get("cargoInsuranceOnFile") != "0"
bond_req = carrier.get("bondInsuranceRequired") == "Y"
bond_on = bool(carrier.get("bondInsuranceOnFile")) and carrier.get("bondInsuranceOnFile") != "0"
cur.execute("""
UPDATE fmcsa_carriers SET
authority_status = %s,
common_authority = %s,
contract_authority = %s,
broker_authority = %s,
insurance_bipd_required = %s,
insurance_bipd_on_file = %s,
insurance_cargo_required = %s,
insurance_cargo_on_file = %s,
insurance_bond_required = %s,
insurance_bond_on_file = %s,
enriched_at = NOW()
WHERE dot_number = %s
""", (
allowed, common, contract, broker,
bipd_req, bipd_on, cargo_req, cargo_on, bond_req, bond_on,
dot,
))
enriched += 1
if enriched % 100 == 0:
conn.commit()
LOG.info(" Enriched %d / %d (errors: %d)", enriched, len(dots), errors)
# Rate limit: ~1 req/sec
time.sleep(0.5)
except Exception as e:
errors += 1
if errors % 50 == 0:
LOG.warning(" %d errors so far, last: %s", errors, e)
time.sleep(1)
conn.commit()
conn.close()
LOG.info("Authority enrichment complete: %d enriched, %d errors", enriched, errors)
def main():
parser = argparse.ArgumentParser(description="Enrich FMCSA carrier data")
parser.add_argument("--oos", action="store_true", help="Download OOS orders")
parser.add_argument("--authority", action="store_true", help="Enrich authority/insurance via API")
parser.add_argument("--all", action="store_true", help="Run all enrichments")
parser.add_argument("--limit", type=int, default=10000, help="Limit for API enrichment")
parser.add_argument("--dry-run", action="store_true", help="Count only")
args = parser.parse_args()
if args.all or args.oos:
download_oos(dry_run=args.dry_run)
if args.all or args.authority:
enrich_authority_batch(limit=args.limit, dry_run=args.dry_run)
if not args.all and not args.oos and not args.authority:
print("Specify --oos, --authority, or --all")
if __name__ == "__main__":
main()