FMCSA enrichment: OOS orders bulk download + authority/insurance API lookup
- Downloads 389K OOS orders from Socrata, merges into fmcsa_carriers - Batch enriches authority status + insurance filing via FMCSA API - Adds columns: oos_active, authority_status, insurance_*_on_file, etc. - Rate limited to 1 req/sec for API calls - Prioritizes campaign-eligible for-hire carriers Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
6f3ad1b686
commit
2dacf1ea0e
1 changed files with 253 additions and 0 deletions
253
scripts/workers/fmcsa_enrichment.py
Normal file
253
scripts/workers/fmcsa_enrichment.py
Normal file
|
|
@ -0,0 +1,253 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Enrich fmcsa_carriers with additional FMCSA datasets.
|
||||
|
||||
Downloads and merges:
|
||||
1. OOS Orders (p2mt-9ige) — 389K records, carriers under out-of-service orders
|
||||
2. FMCSA QCMobile API — batch lookup for insurance/authority/BOC-3 status
|
||||
(sampled, not all 2M — focus on campaign-eligible carriers)
|
||||
|
||||
Usage:
|
||||
python3 -m scripts.workers.fmcsa_enrichment --oos # download OOS orders
|
||||
python3 -m scripts.workers.fmcsa_enrichment --authority # sample authority/insurance via API
|
||||
python3 -m scripts.workers.fmcsa_enrichment --all # everything
|
||||
python3 -m scripts.workers.fmcsa_enrichment --dry-run # count only
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
from datetime import datetime
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
LOG = logging.getLogger("workers.fmcsa_enrichment")
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout)
|
||||
|
||||
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
|
||||
FMCSA_API_KEY = os.environ.get("FMCSA_API_KEY", "")
|
||||
SOCRATA_BASE = "https://data.transportation.gov/resource"
|
||||
FMCSA_API_BASE = "https://mobile.fmcsa.dot.gov/qc/services/carriers"
|
||||
|
||||
|
||||
def ensure_columns(conn):
|
||||
"""Add enrichment columns if they don't exist."""
|
||||
cur = conn.cursor()
|
||||
columns = [
|
||||
"oos_active BOOLEAN DEFAULT FALSE",
|
||||
"oos_date DATE",
|
||||
"oos_reason TEXT",
|
||||
"authority_status TEXT",
|
||||
"common_authority TEXT",
|
||||
"contract_authority TEXT",
|
||||
"broker_authority TEXT",
|
||||
"insurance_bipd_required BOOLEAN",
|
||||
"insurance_bipd_on_file BOOLEAN",
|
||||
"insurance_cargo_required BOOLEAN",
|
||||
"insurance_cargo_on_file BOOLEAN",
|
||||
"insurance_bond_required BOOLEAN",
|
||||
"insurance_bond_on_file BOOLEAN",
|
||||
"enriched_at TIMESTAMPTZ",
|
||||
]
|
||||
for col in columns:
|
||||
col_name = col.split()[0]
|
||||
try:
|
||||
cur.execute(f"ALTER TABLE fmcsa_carriers ADD COLUMN IF NOT EXISTS {col}")
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
conn.commit()
|
||||
LOG.info("Enrichment columns ensured")
|
||||
|
||||
|
||||
def download_oos(dry_run=False):
|
||||
"""Download OOS Orders dataset and merge into fmcsa_carriers."""
|
||||
LOG.info("Downloading OOS Orders...")
|
||||
|
||||
offset = 0
|
||||
batch_size = 50000
|
||||
total = 0
|
||||
records = []
|
||||
|
||||
while True:
|
||||
url = f"{SOCRATA_BASE}/p2mt-9ige.json?$limit={batch_size}&$offset={offset}&$order=:id"
|
||||
LOG.info(" Fetching offset=%d ...", offset)
|
||||
try:
|
||||
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
||||
data = json.loads(urllib.request.urlopen(req, timeout=30).read())
|
||||
except Exception as e:
|
||||
LOG.error(" API error: %s", e)
|
||||
break
|
||||
|
||||
if not data:
|
||||
break
|
||||
|
||||
for rec in data:
|
||||
dot = rec.get("dot_number", "").strip()
|
||||
if not dot:
|
||||
continue
|
||||
records.append((
|
||||
rec.get("status", "").upper() == "ACTIVE",
|
||||
rec.get("oos_date", "").strip()[:10] or None,
|
||||
rec.get("oos_reason", "").strip() or None,
|
||||
dot,
|
||||
))
|
||||
|
||||
total += len(data)
|
||||
offset += len(data)
|
||||
|
||||
if len(data) < batch_size:
|
||||
break
|
||||
|
||||
LOG.info("Downloaded %d OOS records, %d with DOT#", total, len(records))
|
||||
|
||||
if dry_run or not records:
|
||||
return
|
||||
|
||||
conn = psycopg2.connect(DATABASE_URL)
|
||||
ensure_columns(conn)
|
||||
cur = conn.cursor()
|
||||
|
||||
# Batch update
|
||||
chunk_size = 5000
|
||||
updated = 0
|
||||
for i in range(0, len(records), chunk_size):
|
||||
chunk = records[i:i+chunk_size]
|
||||
for active, oos_date, reason, dot in chunk:
|
||||
cur.execute("""
|
||||
UPDATE fmcsa_carriers SET
|
||||
oos_active = %s,
|
||||
oos_date = %s,
|
||||
oos_reason = %s
|
||||
WHERE dot_number = %s
|
||||
""", (active, oos_date, reason, dot))
|
||||
conn.commit()
|
||||
updated += len(chunk)
|
||||
LOG.info(" Updated %d / %d", updated, len(records))
|
||||
|
||||
conn.close()
|
||||
LOG.info("OOS enrichment complete: %d carriers updated", updated)
|
||||
|
||||
|
||||
def enrich_authority_batch(limit=10000, dry_run=False):
|
||||
"""Enrich campaign-eligible carriers with authority/insurance via FMCSA API.
|
||||
|
||||
Rate limited to ~1 req/sec to avoid API throttling.
|
||||
"""
|
||||
if not FMCSA_API_KEY:
|
||||
LOG.error("FMCSA_API_KEY not set")
|
||||
return
|
||||
|
||||
conn = psycopg2.connect(DATABASE_URL)
|
||||
ensure_columns(conn)
|
||||
cur = conn.cursor()
|
||||
|
||||
# Get carriers that haven't been enriched yet, prioritize campaign-eligible
|
||||
cur.execute("""
|
||||
SELECT dot_number FROM fmcsa_carriers
|
||||
WHERE campaign_eligible = TRUE
|
||||
AND enriched_at IS NULL
|
||||
AND authorized_for_hire = TRUE
|
||||
ORDER BY deficiency_count DESC NULLS LAST
|
||||
LIMIT %s
|
||||
""", (limit,))
|
||||
|
||||
dots = [r[0] for r in cur.fetchall()]
|
||||
LOG.info("Enriching %d carriers via FMCSA API...", len(dots))
|
||||
|
||||
if dry_run:
|
||||
conn.close()
|
||||
return
|
||||
|
||||
enriched = 0
|
||||
errors = 0
|
||||
|
||||
for dot in dots:
|
||||
try:
|
||||
url = f"{FMCSA_API_BASE}/{dot}?webKey={FMCSA_API_KEY}"
|
||||
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
||||
resp = urllib.request.urlopen(req, timeout=10)
|
||||
data = json.loads(resp.read())
|
||||
|
||||
carrier = data.get("content", {}).get("carrier", {})
|
||||
if not carrier:
|
||||
continue
|
||||
|
||||
allowed = carrier.get("allowedToOperate", "")
|
||||
common = carrier.get("commonAuthorityStatus", "")
|
||||
contract = carrier.get("contractAuthorityStatus", "")
|
||||
broker = carrier.get("brokerAuthorityStatus", "")
|
||||
|
||||
bipd_req = carrier.get("bipdInsuranceRequired") == "Y"
|
||||
bipd_on = bool(carrier.get("bipdInsuranceOnFile")) and carrier.get("bipdInsuranceOnFile") != "0"
|
||||
cargo_req = carrier.get("cargoInsuranceRequired") == "Y"
|
||||
cargo_on = bool(carrier.get("cargoInsuranceOnFile")) and carrier.get("cargoInsuranceOnFile") != "0"
|
||||
bond_req = carrier.get("bondInsuranceRequired") == "Y"
|
||||
bond_on = bool(carrier.get("bondInsuranceOnFile")) and carrier.get("bondInsuranceOnFile") != "0"
|
||||
|
||||
cur.execute("""
|
||||
UPDATE fmcsa_carriers SET
|
||||
authority_status = %s,
|
||||
common_authority = %s,
|
||||
contract_authority = %s,
|
||||
broker_authority = %s,
|
||||
insurance_bipd_required = %s,
|
||||
insurance_bipd_on_file = %s,
|
||||
insurance_cargo_required = %s,
|
||||
insurance_cargo_on_file = %s,
|
||||
insurance_bond_required = %s,
|
||||
insurance_bond_on_file = %s,
|
||||
enriched_at = NOW()
|
||||
WHERE dot_number = %s
|
||||
""", (
|
||||
allowed, common, contract, broker,
|
||||
bipd_req, bipd_on, cargo_req, cargo_on, bond_req, bond_on,
|
||||
dot,
|
||||
))
|
||||
enriched += 1
|
||||
|
||||
if enriched % 100 == 0:
|
||||
conn.commit()
|
||||
LOG.info(" Enriched %d / %d (errors: %d)", enriched, len(dots), errors)
|
||||
|
||||
# Rate limit: ~1 req/sec
|
||||
time.sleep(0.5)
|
||||
|
||||
except Exception as e:
|
||||
errors += 1
|
||||
if errors % 50 == 0:
|
||||
LOG.warning(" %d errors so far, last: %s", errors, e)
|
||||
time.sleep(1)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
LOG.info("Authority enrichment complete: %d enriched, %d errors", enriched, errors)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Enrich FMCSA carrier data")
|
||||
parser.add_argument("--oos", action="store_true", help="Download OOS orders")
|
||||
parser.add_argument("--authority", action="store_true", help="Enrich authority/insurance via API")
|
||||
parser.add_argument("--all", action="store_true", help="Run all enrichments")
|
||||
parser.add_argument("--limit", type=int, default=10000, help="Limit for API enrichment")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Count only")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.all or args.oos:
|
||||
download_oos(dry_run=args.dry_run)
|
||||
|
||||
if args.all or args.authority:
|
||||
enrich_authority_batch(limit=args.limit, dry_run=args.dry_run)
|
||||
|
||||
if not args.all and not args.oos and not args.authority:
|
||||
print("Specify --oos, --authority, or --all")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue