#!/usr/bin/env python3 """ Enrich fmcsa_carriers with additional FMCSA datasets. Downloads and merges: 1. OOS Orders (p2mt-9ige) — 389K records, carriers under out-of-service orders 2. FMCSA QCMobile API — batch lookup for insurance/authority/BOC-3 status (sampled, not all 2M — focus on campaign-eligible carriers) Usage: python3 -m scripts.workers.fmcsa_enrichment --oos # download OOS orders python3 -m scripts.workers.fmcsa_enrichment --authority # sample authority/insurance via API python3 -m scripts.workers.fmcsa_enrichment --all # everything python3 -m scripts.workers.fmcsa_enrichment --dry-run # count only """ from __future__ import annotations import argparse import json import logging import os import sys import time import urllib.request from datetime import datetime import psycopg2 from psycopg2.extras import execute_values LOG = logging.getLogger("workers.fmcsa_enrichment") logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout) DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") FMCSA_API_KEY = os.environ.get("FMCSA_API_KEY", "") SOCRATA_BASE = "https://data.transportation.gov/resource" FMCSA_API_BASE = "https://mobile.fmcsa.dot.gov/qc/services/carriers" def ensure_columns(conn): """Add enrichment columns if they don't exist.""" cur = conn.cursor() columns = [ "oos_active BOOLEAN DEFAULT FALSE", "oos_date DATE", "oos_reason TEXT", "authority_status TEXT", "common_authority TEXT", "contract_authority TEXT", "broker_authority TEXT", "insurance_bipd_required BOOLEAN", "insurance_bipd_on_file BOOLEAN", "insurance_cargo_required BOOLEAN", "insurance_cargo_on_file BOOLEAN", "insurance_bond_required BOOLEAN", "insurance_bond_on_file BOOLEAN", "enriched_at TIMESTAMPTZ", ] for col in columns: col_name = col.split()[0] try: cur.execute(f"ALTER TABLE fmcsa_carriers ADD COLUMN IF NOT EXISTS {col}") except Exception: conn.rollback() conn.commit() LOG.info("Enrichment columns ensured") def download_oos(dry_run=False): """Download OOS Orders dataset and merge into fmcsa_carriers.""" LOG.info("Downloading OOS Orders...") offset = 0 batch_size = 50000 total = 0 records = [] while True: url = f"{SOCRATA_BASE}/p2mt-9ige.json?$limit={batch_size}&$offset={offset}&$order=:id" LOG.info(" Fetching offset=%d ...", offset) try: req = urllib.request.Request(url, headers={"Accept": "application/json"}) data = json.loads(urllib.request.urlopen(req, timeout=30).read()) except Exception as e: LOG.error(" API error: %s", e) break if not data: break for rec in data: dot = rec.get("dot_number", "").strip() if not dot: continue records.append(( rec.get("status", "").upper() == "ACTIVE", rec.get("oos_date", "").strip()[:10] or None, rec.get("oos_reason", "").strip() or None, dot, )) total += len(data) offset += len(data) if len(data) < batch_size: break LOG.info("Downloaded %d OOS records, %d with DOT#", total, len(records)) if dry_run or not records: return conn = psycopg2.connect(DATABASE_URL) ensure_columns(conn) cur = conn.cursor() # Batch update chunk_size = 5000 updated = 0 for i in range(0, len(records), chunk_size): chunk = records[i:i+chunk_size] for active, oos_date, reason, dot in chunk: cur.execute(""" UPDATE fmcsa_carriers SET oos_active = %s, oos_date = %s, oos_reason = %s WHERE dot_number = %s """, (active, oos_date, reason, dot)) conn.commit() updated += len(chunk) LOG.info(" Updated %d / %d", updated, len(records)) conn.close() LOG.info("OOS enrichment complete: %d carriers updated", updated) def enrich_authority_batch(limit=10000, dry_run=False): """Enrich campaign-eligible carriers with authority/insurance via FMCSA API. Rate limited to ~1 req/sec to avoid API throttling. """ if not FMCSA_API_KEY: LOG.error("FMCSA_API_KEY not set") return conn = psycopg2.connect(DATABASE_URL) ensure_columns(conn) cur = conn.cursor() # Get carriers that haven't been enriched yet, prioritize campaign-eligible cur.execute(""" SELECT dot_number FROM fmcsa_carriers WHERE campaign_eligible = TRUE AND enriched_at IS NULL AND authorized_for_hire = TRUE ORDER BY deficiency_count DESC NULLS LAST LIMIT %s """, (limit,)) dots = [r[0] for r in cur.fetchall()] LOG.info("Enriching %d carriers via FMCSA API...", len(dots)) if dry_run: conn.close() return enriched = 0 errors = 0 for dot in dots: try: url = f"{FMCSA_API_BASE}/{dot}?webKey={FMCSA_API_KEY}" req = urllib.request.Request(url, headers={"Accept": "application/json"}) resp = urllib.request.urlopen(req, timeout=10) data = json.loads(resp.read()) carrier = data.get("content", {}).get("carrier", {}) if not carrier: continue allowed = carrier.get("allowedToOperate", "") common = carrier.get("commonAuthorityStatus", "") contract = carrier.get("contractAuthorityStatus", "") broker = carrier.get("brokerAuthorityStatus", "") bipd_req = carrier.get("bipdInsuranceRequired") == "Y" bipd_on = bool(carrier.get("bipdInsuranceOnFile")) and carrier.get("bipdInsuranceOnFile") != "0" cargo_req = carrier.get("cargoInsuranceRequired") == "Y" cargo_on = bool(carrier.get("cargoInsuranceOnFile")) and carrier.get("cargoInsuranceOnFile") != "0" bond_req = carrier.get("bondInsuranceRequired") == "Y" bond_on = bool(carrier.get("bondInsuranceOnFile")) and carrier.get("bondInsuranceOnFile") != "0" cur.execute(""" UPDATE fmcsa_carriers SET authority_status = %s, common_authority = %s, contract_authority = %s, broker_authority = %s, insurance_bipd_required = %s, insurance_bipd_on_file = %s, insurance_cargo_required = %s, insurance_cargo_on_file = %s, insurance_bond_required = %s, insurance_bond_on_file = %s, enriched_at = NOW() WHERE dot_number = %s """, ( allowed, common, contract, broker, bipd_req, bipd_on, cargo_req, cargo_on, bond_req, bond_on, dot, )) enriched += 1 if enriched % 100 == 0: conn.commit() LOG.info(" Enriched %d / %d (errors: %d)", enriched, len(dots), errors) # Rate limit: ~1 req/sec time.sleep(0.5) except Exception as e: errors += 1 if errors % 50 == 0: LOG.warning(" %d errors so far, last: %s", errors, e) time.sleep(1) conn.commit() conn.close() LOG.info("Authority enrichment complete: %d enriched, %d errors", enriched, errors) def main(): parser = argparse.ArgumentParser(description="Enrich FMCSA carrier data") parser.add_argument("--oos", action="store_true", help="Download OOS orders") parser.add_argument("--authority", action="store_true", help="Enrich authority/insurance via API") parser.add_argument("--all", action="store_true", help="Run all enrichments") parser.add_argument("--limit", type=int, default=10000, help="Limit for API enrichment") parser.add_argument("--dry-run", action="store_true", help="Count only") args = parser.parse_args() if args.all or args.oos: download_oos(dry_run=args.dry_run) if args.all or args.authority: enrich_authority_batch(limit=args.limit, dry_run=args.dry_run) if not args.all and not args.oos and not args.authority: print("Specify --oos, --authority, or --all") if __name__ == "__main__": main()