#!/usr/bin/env python3 """ fmcsa_census_downloader.py — Download FMCSA motor carrier census data via Socrata API. Data source: https://data.transportation.gov/resource/kjg3-diqy.json ~2M carriers with email, phone, address, fleet size, MCS-150 filing date. Usage: python3 -m scripts.workers.fmcsa_census_downloader # full download python3 -m scripts.workers.fmcsa_census_downloader --limit 1000 # test with 1K records python3 -m scripts.workers.fmcsa_census_downloader --dry-run # parse but don't save Environment: DATABASE_URL PostgreSQL connection string """ from __future__ import annotations import argparse import json import logging import os import sys import urllib.request from datetime import datetime import psycopg2 from psycopg2.extras import execute_values LOG = logging.getLogger("workers.fmcsa_census") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout, ) DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") API_BASE = "https://data.transportation.gov/resource/kjg3-diqy.json" BATCH_SIZE = 50000 def parse_fmcsa_date(date_str: str | None) -> str | None: """Parse FMCSA date like '01-JAN-22' or '20-OCT-23' into YYYY-MM-DD.""" if not date_str or not date_str.strip(): return None try: dt = datetime.strptime(date_str.strip(), "%d-%b-%y") # 2-digit year: 00-68 -> 2000s, 69-99 -> 1900s if dt.year > 2068: dt = dt.replace(year=dt.year - 100) return dt.strftime("%Y-%m-%d") except ValueError: pass try: dt = datetime.strptime(date_str.strip(), "%d-%b-%Y") return dt.strftime("%Y-%m-%d") except ValueError: pass return None def safe_int(val, max_val: int = 2_147_483_647) -> int | None: """Convert value to int, handling strings, None, and out-of-range values.""" if val is None: return None try: v = int(float(val)) # handle "1.234e7" scientific notation if v > max_val or v < -max_val: return None # out of range for PostgreSQL INTEGER return v except (ValueError, TypeError, OverflowError): return None def safe_bool(val) -> bool: """Convert value to bool.""" if isinstance(val, bool): return val if isinstance(val, str): return val.lower() in ("true", "1", "yes", "y") return bool(val) def download_all(limit: int | None = None, dry_run: bool = False) -> int: """Download all FMCSA carriers via Socrata API.""" offset = 0 total_processed = 0 total_inserted = 0 conn = None if not dry_run: conn = psycopg2.connect(DATABASE_URL) conn.autocommit = False effective_limit = limit or 999_999_999 while total_processed < effective_limit: batch_limit = min(BATCH_SIZE, effective_limit - total_processed) url = f"{API_BASE}?$limit={batch_limit}&$offset={offset}&$order=:id" LOG.info("Fetching offset=%d limit=%d ...", offset, batch_limit) try: req = urllib.request.Request(url, headers={ "Accept": "application/json", "User-Agent": "PerformanceWest-FMCSA-Ingest/1.0", }) with urllib.request.urlopen(req, timeout=120) as resp: data = json.loads(resp.read()) except Exception as e: LOG.error("API error at offset %d: %s", offset, e) break if not data: LOG.info("No more data at offset %d", offset) break rows = [] for rec in data: dot = rec.get("dot_number", "").strip() name = rec.get("legal_name", "").strip() if not dot or not name: continue rows.append(( dot, name, rec.get("dba_name", "").strip() or None, rec.get("carrier_operation", "").strip() or None, safe_bool(rec.get("authorized_for_hire")), safe_bool(rec.get("private_property")), safe_bool(rec.get("private_passenger_business")), safe_bool(rec.get("exempt_for_hire")), safe_bool(rec.get("hm_flag")), rec.get("email_address", "").strip() or None, rec.get("telephone", "").strip() or None, rec.get("phy_street", "").strip() or None, rec.get("phy_city", "").strip() or None, rec.get("phy_state", "").strip() or None, rec.get("phy_zip", "").strip() or None, rec.get("phy_country", "").strip() or None, rec.get("mailing_street", "").strip() or None, rec.get("mailing_city", "").strip() or None, rec.get("mailing_state", "").strip() or None, rec.get("mailing_zip", "").strip() or None, rec.get("mailing_country", "").strip() or None, safe_int(rec.get("nbr_power_unit")), safe_int(rec.get("driver_total")), safe_int(rec.get("mcs150_mileage")), safe_int(rec.get("mcs150_mileage_year")), safe_int(rec.get("recent_mileage")), safe_int(rec.get("recent_mileage_year")), rec.get("mcs150_date", "").strip() or None, parse_fmcsa_date(rec.get("mcs150_date")), rec.get("add_date", "").strip() or None, rec.get("oic_state", "").strip() or None, )) if rows and not dry_run and conn: cur = conn.cursor() execute_values(cur, """ INSERT INTO fmcsa_carriers ( dot_number, legal_name, dba_name, carrier_operation, authorized_for_hire, private_property, private_passenger_business, exempt_for_hire, hm_flag, email_address, telephone, phy_street, phy_city, phy_state, phy_zip, phy_country, mailing_street, mailing_city, mailing_state, mailing_zip, mailing_country, nbr_power_unit, driver_total, mcs150_mileage, mcs150_mileage_year, recent_mileage, recent_mileage_year, mcs150_date, mcs150_parsed, add_date, oic_state ) VALUES %s ON CONFLICT (dot_number) DO UPDATE SET legal_name = EXCLUDED.legal_name, dba_name = EXCLUDED.dba_name, carrier_operation = EXCLUDED.carrier_operation, authorized_for_hire = EXCLUDED.authorized_for_hire, private_property = EXCLUDED.private_property, private_passenger_business = EXCLUDED.private_passenger_business, exempt_for_hire = EXCLUDED.exempt_for_hire, hm_flag = EXCLUDED.hm_flag, email_address = EXCLUDED.email_address, telephone = EXCLUDED.telephone, phy_street = EXCLUDED.phy_street, phy_city = EXCLUDED.phy_city, phy_state = EXCLUDED.phy_state, phy_zip = EXCLUDED.phy_zip, phy_country = EXCLUDED.phy_country, mailing_street = EXCLUDED.mailing_street, mailing_city = EXCLUDED.mailing_city, mailing_state = EXCLUDED.mailing_state, mailing_zip = EXCLUDED.mailing_zip, mailing_country = EXCLUDED.mailing_country, nbr_power_unit = EXCLUDED.nbr_power_unit, driver_total = EXCLUDED.driver_total, mcs150_mileage = EXCLUDED.mcs150_mileage, mcs150_mileage_year = EXCLUDED.mcs150_mileage_year, recent_mileage = EXCLUDED.recent_mileage, recent_mileage_year = EXCLUDED.recent_mileage_year, mcs150_date = EXCLUDED.mcs150_date, mcs150_parsed = EXCLUDED.mcs150_parsed, add_date = EXCLUDED.add_date, oic_state = EXCLUDED.oic_state, updated_at = NOW() """, rows) conn.commit() total_inserted += len(rows) total_processed += len(data) offset += len(data) LOG.info( " Processed: %d (batch: %d, inserted: %d)", total_processed, len(data), total_inserted, ) if len(data) < batch_limit: LOG.info("Last batch (got %d < %d)", len(data), batch_limit) break if conn: conn.close() LOG.info("Done. Total processed: %d, inserted/updated: %d", total_processed, total_inserted) return total_inserted def main(): parser = argparse.ArgumentParser(description="Download FMCSA carrier census data") parser.add_argument("--limit", type=int, default=None, help="Limit number of records") parser.add_argument("--dry-run", action="store_true", help="Parse but don't save to DB") args = parser.parse_args() download_all(limit=args.limit, dry_run=args.dry_run) if __name__ == "__main__": main()