diff --git a/api/migrations/078_fmcsa_carriers.sql b/api/migrations/078_fmcsa_carriers.sql new file mode 100644 index 0000000..a2a3e38 --- /dev/null +++ b/api/migrations/078_fmcsa_carriers.sql @@ -0,0 +1,57 @@ +-- FMCSA Motor Carrier Census Data +CREATE TABLE IF NOT EXISTS fmcsa_carriers ( + id SERIAL PRIMARY KEY, + dot_number TEXT NOT NULL UNIQUE, + legal_name TEXT NOT NULL, + dba_name TEXT, + carrier_operation TEXT, -- C=carrier, B=broker, etc. + authorized_for_hire BOOLEAN DEFAULT FALSE, + private_property BOOLEAN DEFAULT FALSE, + private_passenger_business BOOLEAN DEFAULT FALSE, + exempt_for_hire BOOLEAN DEFAULT FALSE, + hm_flag BOOLEAN DEFAULT FALSE, + + -- Contact + email_address TEXT, + telephone TEXT, + + -- Physical address + phy_street TEXT, + phy_city TEXT, + phy_state TEXT, + phy_zip TEXT, + phy_country TEXT, + + -- Mailing address + mailing_street TEXT, + mailing_city TEXT, + mailing_state TEXT, + mailing_zip TEXT, + mailing_country TEXT, + + -- Fleet info + nbr_power_unit INTEGER, + driver_total INTEGER, + mcs150_mileage INTEGER, + mcs150_mileage_year INTEGER, + recent_mileage INTEGER, + recent_mileage_year INTEGER, + + -- Compliance dates + mcs150_date TEXT, -- raw date string from FMCSA + mcs150_parsed DATE, -- parsed for querying + add_date TEXT, -- registration date + + -- State info + oic_state TEXT, -- state of principal office + + -- Metadata + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_fmcsa_dot ON fmcsa_carriers(dot_number); +CREATE INDEX IF NOT EXISTS idx_fmcsa_email ON fmcsa_carriers(email_address) WHERE email_address IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_fmcsa_state ON fmcsa_carriers(phy_state); +CREATE INDEX IF NOT EXISTS idx_fmcsa_mcs150 ON fmcsa_carriers(mcs150_parsed); +CREATE INDEX IF NOT EXISTS idx_fmcsa_hire ON fmcsa_carriers(authorized_for_hire) WHERE authorized_for_hire = TRUE; diff --git a/scripts/workers/fmcsa_census_downloader.py b/scripts/workers/fmcsa_census_downloader.py new file mode 100644 index 0000000..7d97e30 --- /dev/null +++ b/scripts/workers/fmcsa_census_downloader.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python3 +""" +fmcsa_census_downloader.py — Download FMCSA motor carrier census data via Socrata API. + +Data source: https://data.transportation.gov/resource/kjg3-diqy.json +~2M carriers with email, phone, address, fleet size, MCS-150 filing date. + +Usage: + python3 -m scripts.workers.fmcsa_census_downloader # full download + python3 -m scripts.workers.fmcsa_census_downloader --limit 1000 # test with 1K records + python3 -m scripts.workers.fmcsa_census_downloader --dry-run # parse but don't save + +Environment: + DATABASE_URL PostgreSQL connection string +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import sys +import urllib.request +from datetime import datetime + +import psycopg2 +from psycopg2.extras import execute_values + +LOG = logging.getLogger("workers.fmcsa_census") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + stream=sys.stdout, +) + +DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") +API_BASE = "https://data.transportation.gov/resource/kjg3-diqy.json" +BATCH_SIZE = 50000 + + +def parse_fmcsa_date(date_str: str | None) -> str | None: + """Parse FMCSA date like '01-JAN-22' or '20-OCT-23' into YYYY-MM-DD.""" + if not date_str or not date_str.strip(): + return None + try: + dt = datetime.strptime(date_str.strip(), "%d-%b-%y") + # 2-digit year: 00-68 -> 2000s, 69-99 -> 1900s + if dt.year > 2068: + dt = dt.replace(year=dt.year - 100) + return dt.strftime("%Y-%m-%d") + except ValueError: + pass + try: + dt = datetime.strptime(date_str.strip(), "%d-%b-%Y") + return dt.strftime("%Y-%m-%d") + except ValueError: + pass + return None + + +def safe_int(val) -> int | None: + """Convert value to int, handling strings and None.""" + if val is None: + return None + try: + return int(val) + except (ValueError, TypeError): + return None + + +def safe_bool(val) -> bool: + """Convert value to bool.""" + if isinstance(val, bool): + return val + if isinstance(val, str): + return val.lower() in ("true", "1", "yes", "y") + return bool(val) + + +def download_all(limit: int | None = None, dry_run: bool = False) -> int: + """Download all FMCSA carriers via Socrata API.""" + offset = 0 + total_processed = 0 + total_inserted = 0 + + conn = None + if not dry_run: + conn = psycopg2.connect(DATABASE_URL) + conn.autocommit = False + + effective_limit = limit or 999_999_999 + + while total_processed < effective_limit: + batch_limit = min(BATCH_SIZE, effective_limit - total_processed) + url = f"{API_BASE}?$limit={batch_limit}&$offset={offset}&$order=:id" + LOG.info("Fetching offset=%d limit=%d ...", offset, batch_limit) + + try: + req = urllib.request.Request(url, headers={ + "Accept": "application/json", + "User-Agent": "PerformanceWest-FMCSA-Ingest/1.0", + }) + with urllib.request.urlopen(req, timeout=120) as resp: + data = json.loads(resp.read()) + except Exception as e: + LOG.error("API error at offset %d: %s", offset, e) + break + + if not data: + LOG.info("No more data at offset %d", offset) + break + + rows = [] + for rec in data: + dot = rec.get("dot_number", "").strip() + name = rec.get("legal_name", "").strip() + if not dot or not name: + continue + + rows.append(( + dot, + name, + rec.get("dba_name", "").strip() or None, + rec.get("carrier_operation", "").strip() or None, + safe_bool(rec.get("authorized_for_hire")), + safe_bool(rec.get("private_property")), + safe_bool(rec.get("private_passenger_business")), + safe_bool(rec.get("exempt_for_hire")), + safe_bool(rec.get("hm_flag")), + rec.get("email_address", "").strip() or None, + rec.get("telephone", "").strip() or None, + rec.get("phy_street", "").strip() or None, + rec.get("phy_city", "").strip() or None, + rec.get("phy_state", "").strip() or None, + rec.get("phy_zip", "").strip() or None, + rec.get("phy_country", "").strip() or None, + rec.get("mailing_street", "").strip() or None, + rec.get("mailing_city", "").strip() or None, + rec.get("mailing_state", "").strip() or None, + rec.get("mailing_zip", "").strip() or None, + rec.get("mailing_country", "").strip() or None, + safe_int(rec.get("nbr_power_unit")), + safe_int(rec.get("driver_total")), + safe_int(rec.get("mcs150_mileage")), + safe_int(rec.get("mcs150_mileage_year")), + safe_int(rec.get("recent_mileage")), + safe_int(rec.get("recent_mileage_year")), + rec.get("mcs150_date", "").strip() or None, + parse_fmcsa_date(rec.get("mcs150_date")), + rec.get("add_date", "").strip() or None, + rec.get("oic_state", "").strip() or None, + )) + + if rows and not dry_run and conn: + cur = conn.cursor() + execute_values(cur, """ + INSERT INTO fmcsa_carriers ( + dot_number, legal_name, dba_name, carrier_operation, + authorized_for_hire, private_property, private_passenger_business, + exempt_for_hire, hm_flag, + email_address, telephone, + phy_street, phy_city, phy_state, phy_zip, phy_country, + mailing_street, mailing_city, mailing_state, mailing_zip, mailing_country, + nbr_power_unit, driver_total, + mcs150_mileage, mcs150_mileage_year, recent_mileage, recent_mileage_year, + mcs150_date, mcs150_parsed, add_date, oic_state + ) VALUES %s + ON CONFLICT (dot_number) DO UPDATE SET + legal_name = EXCLUDED.legal_name, + dba_name = EXCLUDED.dba_name, + carrier_operation = EXCLUDED.carrier_operation, + authorized_for_hire = EXCLUDED.authorized_for_hire, + private_property = EXCLUDED.private_property, + private_passenger_business = EXCLUDED.private_passenger_business, + exempt_for_hire = EXCLUDED.exempt_for_hire, + hm_flag = EXCLUDED.hm_flag, + email_address = EXCLUDED.email_address, + telephone = EXCLUDED.telephone, + phy_street = EXCLUDED.phy_street, + phy_city = EXCLUDED.phy_city, + phy_state = EXCLUDED.phy_state, + phy_zip = EXCLUDED.phy_zip, + phy_country = EXCLUDED.phy_country, + mailing_street = EXCLUDED.mailing_street, + mailing_city = EXCLUDED.mailing_city, + mailing_state = EXCLUDED.mailing_state, + mailing_zip = EXCLUDED.mailing_zip, + mailing_country = EXCLUDED.mailing_country, + nbr_power_unit = EXCLUDED.nbr_power_unit, + driver_total = EXCLUDED.driver_total, + mcs150_mileage = EXCLUDED.mcs150_mileage, + mcs150_mileage_year = EXCLUDED.mcs150_mileage_year, + recent_mileage = EXCLUDED.recent_mileage, + recent_mileage_year = EXCLUDED.recent_mileage_year, + mcs150_date = EXCLUDED.mcs150_date, + mcs150_parsed = EXCLUDED.mcs150_parsed, + add_date = EXCLUDED.add_date, + oic_state = EXCLUDED.oic_state, + updated_at = NOW() + """, rows) + conn.commit() + total_inserted += len(rows) + + total_processed += len(data) + offset += len(data) + + LOG.info( + " Processed: %d (batch: %d, inserted: %d)", + total_processed, len(data), total_inserted, + ) + + if len(data) < batch_limit: + LOG.info("Last batch (got %d < %d)", len(data), batch_limit) + break + + if conn: + conn.close() + + LOG.info("Done. Total processed: %d, inserted/updated: %d", total_processed, total_inserted) + return total_inserted + + +def main(): + parser = argparse.ArgumentParser(description="Download FMCSA carrier census data") + parser.add_argument("--limit", type=int, default=None, help="Limit number of records") + parser.add_argument("--dry-run", action="store_true", help="Parse but don't save to DB") + args = parser.parse_args() + + download_all(limit=args.limit, dry_run=args.dry_run) + + +if __name__ == "__main__": + main()