Add FMCSA motor carrier census table and Socrata data downloader

New vertical: FMCSA/DOT motor carrier compliance services.
- Migration 078: fmcsa_carriers table with 31 fields (DOT#, name,
  email, phone, address, fleet size, MCS-150 date, carrier type)
- Downloader: Socrata API ingest for 2M+ carriers with upsert
- Data source: data.transportation.gov (free, public)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
justin 2026-05-28 21:05:46 -05:00
parent 27a1a1f7ed
commit dfee4fc6c0
2 changed files with 291 additions and 0 deletions

View file

@ -0,0 +1,234 @@
#!/usr/bin/env python3
"""
fmcsa_census_downloader.py Download FMCSA motor carrier census data via Socrata API.
Data source: https://data.transportation.gov/resource/kjg3-diqy.json
~2M carriers with email, phone, address, fleet size, MCS-150 filing date.
Usage:
python3 -m scripts.workers.fmcsa_census_downloader # full download
python3 -m scripts.workers.fmcsa_census_downloader --limit 1000 # test with 1K records
python3 -m scripts.workers.fmcsa_census_downloader --dry-run # parse but don't save
Environment:
DATABASE_URL PostgreSQL connection string
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import urllib.request
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_values
LOG = logging.getLogger("workers.fmcsa_census")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
API_BASE = "https://data.transportation.gov/resource/kjg3-diqy.json"
BATCH_SIZE = 50000
def parse_fmcsa_date(date_str: str | None) -> str | None:
"""Parse FMCSA date like '01-JAN-22' or '20-OCT-23' into YYYY-MM-DD."""
if not date_str or not date_str.strip():
return None
try:
dt = datetime.strptime(date_str.strip(), "%d-%b-%y")
# 2-digit year: 00-68 -> 2000s, 69-99 -> 1900s
if dt.year > 2068:
dt = dt.replace(year=dt.year - 100)
return dt.strftime("%Y-%m-%d")
except ValueError:
pass
try:
dt = datetime.strptime(date_str.strip(), "%d-%b-%Y")
return dt.strftime("%Y-%m-%d")
except ValueError:
pass
return None
def safe_int(val) -> int | None:
"""Convert value to int, handling strings and None."""
if val is None:
return None
try:
return int(val)
except (ValueError, TypeError):
return None
def safe_bool(val) -> bool:
"""Convert value to bool."""
if isinstance(val, bool):
return val
if isinstance(val, str):
return val.lower() in ("true", "1", "yes", "y")
return bool(val)
def download_all(limit: int | None = None, dry_run: bool = False) -> int:
"""Download all FMCSA carriers via Socrata API."""
offset = 0
total_processed = 0
total_inserted = 0
conn = None
if not dry_run:
conn = psycopg2.connect(DATABASE_URL)
conn.autocommit = False
effective_limit = limit or 999_999_999
while total_processed < effective_limit:
batch_limit = min(BATCH_SIZE, effective_limit - total_processed)
url = f"{API_BASE}?$limit={batch_limit}&$offset={offset}&$order=:id"
LOG.info("Fetching offset=%d limit=%d ...", offset, batch_limit)
try:
req = urllib.request.Request(url, headers={
"Accept": "application/json",
"User-Agent": "PerformanceWest-FMCSA-Ingest/1.0",
})
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read())
except Exception as e:
LOG.error("API error at offset %d: %s", offset, e)
break
if not data:
LOG.info("No more data at offset %d", offset)
break
rows = []
for rec in data:
dot = rec.get("dot_number", "").strip()
name = rec.get("legal_name", "").strip()
if not dot or not name:
continue
rows.append((
dot,
name,
rec.get("dba_name", "").strip() or None,
rec.get("carrier_operation", "").strip() or None,
safe_bool(rec.get("authorized_for_hire")),
safe_bool(rec.get("private_property")),
safe_bool(rec.get("private_passenger_business")),
safe_bool(rec.get("exempt_for_hire")),
safe_bool(rec.get("hm_flag")),
rec.get("email_address", "").strip() or None,
rec.get("telephone", "").strip() or None,
rec.get("phy_street", "").strip() or None,
rec.get("phy_city", "").strip() or None,
rec.get("phy_state", "").strip() or None,
rec.get("phy_zip", "").strip() or None,
rec.get("phy_country", "").strip() or None,
rec.get("mailing_street", "").strip() or None,
rec.get("mailing_city", "").strip() or None,
rec.get("mailing_state", "").strip() or None,
rec.get("mailing_zip", "").strip() or None,
rec.get("mailing_country", "").strip() or None,
safe_int(rec.get("nbr_power_unit")),
safe_int(rec.get("driver_total")),
safe_int(rec.get("mcs150_mileage")),
safe_int(rec.get("mcs150_mileage_year")),
safe_int(rec.get("recent_mileage")),
safe_int(rec.get("recent_mileage_year")),
rec.get("mcs150_date", "").strip() or None,
parse_fmcsa_date(rec.get("mcs150_date")),
rec.get("add_date", "").strip() or None,
rec.get("oic_state", "").strip() or None,
))
if rows and not dry_run and conn:
cur = conn.cursor()
execute_values(cur, """
INSERT INTO fmcsa_carriers (
dot_number, legal_name, dba_name, carrier_operation,
authorized_for_hire, private_property, private_passenger_business,
exempt_for_hire, hm_flag,
email_address, telephone,
phy_street, phy_city, phy_state, phy_zip, phy_country,
mailing_street, mailing_city, mailing_state, mailing_zip, mailing_country,
nbr_power_unit, driver_total,
mcs150_mileage, mcs150_mileage_year, recent_mileage, recent_mileage_year,
mcs150_date, mcs150_parsed, add_date, oic_state
) VALUES %s
ON CONFLICT (dot_number) DO UPDATE SET
legal_name = EXCLUDED.legal_name,
dba_name = EXCLUDED.dba_name,
carrier_operation = EXCLUDED.carrier_operation,
authorized_for_hire = EXCLUDED.authorized_for_hire,
private_property = EXCLUDED.private_property,
private_passenger_business = EXCLUDED.private_passenger_business,
exempt_for_hire = EXCLUDED.exempt_for_hire,
hm_flag = EXCLUDED.hm_flag,
email_address = EXCLUDED.email_address,
telephone = EXCLUDED.telephone,
phy_street = EXCLUDED.phy_street,
phy_city = EXCLUDED.phy_city,
phy_state = EXCLUDED.phy_state,
phy_zip = EXCLUDED.phy_zip,
phy_country = EXCLUDED.phy_country,
mailing_street = EXCLUDED.mailing_street,
mailing_city = EXCLUDED.mailing_city,
mailing_state = EXCLUDED.mailing_state,
mailing_zip = EXCLUDED.mailing_zip,
mailing_country = EXCLUDED.mailing_country,
nbr_power_unit = EXCLUDED.nbr_power_unit,
driver_total = EXCLUDED.driver_total,
mcs150_mileage = EXCLUDED.mcs150_mileage,
mcs150_mileage_year = EXCLUDED.mcs150_mileage_year,
recent_mileage = EXCLUDED.recent_mileage,
recent_mileage_year = EXCLUDED.recent_mileage_year,
mcs150_date = EXCLUDED.mcs150_date,
mcs150_parsed = EXCLUDED.mcs150_parsed,
add_date = EXCLUDED.add_date,
oic_state = EXCLUDED.oic_state,
updated_at = NOW()
""", rows)
conn.commit()
total_inserted += len(rows)
total_processed += len(data)
offset += len(data)
LOG.info(
" Processed: %d (batch: %d, inserted: %d)",
total_processed, len(data), total_inserted,
)
if len(data) < batch_limit:
LOG.info("Last batch (got %d < %d)", len(data), batch_limit)
break
if conn:
conn.close()
LOG.info("Done. Total processed: %d, inserted/updated: %d", total_processed, total_inserted)
return total_inserted
def main():
parser = argparse.ArgumentParser(description="Download FMCSA carrier census data")
parser.add_argument("--limit", type=int, default=None, help="Limit number of records")
parser.add_argument("--dry-run", action="store_true", help="Parse but don't save to DB")
args = parser.parse_args()
download_all(limit=args.limit, dry_run=args.dry_run)
if __name__ == "__main__":
main()