FMCSA census carrier_operation is single-letter: A=Interstate, B=Intrastate Hazmat, C=Intrastate Non-Hazmat. Previous code searched for "interstate" in text which never matched. Now 22,089 interstate carriers will be properly flagged for IRP/IFTA. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
486 lines
19 KiB
Python
486 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Flag FMCSA carriers with compliance deficiencies for email campaigns.
|
|
|
|
Analyzes the fmcsa_carriers census data and flags each carrier with:
|
|
- MCS-150 overdue (biennial, >2 years)
|
|
- MCS-150 severely overdue (>4 years)
|
|
- For-hire without likely BOC-3 (operating for hire)
|
|
- No email (can't contact — skip)
|
|
- Fleet size category
|
|
|
|
Stores flags in fmcsa_carriers table columns for campaign targeting.
|
|
Creates Listmonk subscriber lists for email outreach.
|
|
|
|
Usage:
|
|
python3 -m scripts.workers.fmcsa_deficiency_flagger # flag all
|
|
python3 -m scripts.workers.fmcsa_deficiency_flagger --dry-run # count only
|
|
python3 -m scripts.workers.fmcsa_deficiency_flagger --listmonk # also populate Listmonk
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import urllib.request
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
|
|
LOG = logging.getLogger("workers.fmcsa_flagger")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
|
|
LISTMONK_API = os.environ.get("LISTMONK_URL", "http://localhost:9100")
|
|
LISTMONK_USER = os.environ.get("LISTMONK_USER", "admin")
|
|
LISTMONK_PASS = os.environ.get("LISTMONK_PASSWORD", "")
|
|
|
|
TWO_YEARS_AGO = datetime.now() - timedelta(days=730)
|
|
FOUR_YEARS_AGO = datetime.now() - timedelta(days=1460)
|
|
|
|
|
|
def flag_carriers(dry_run: bool = False) -> dict:
|
|
"""Analyze carriers and flag deficiencies."""
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Add flag columns if they don't exist
|
|
if not dry_run:
|
|
for col in [
|
|
"deficiency_flags TEXT[]",
|
|
"deficiency_count INTEGER DEFAULT 0",
|
|
"deficiency_severity TEXT", # minor, major, critical
|
|
"issues_summary TEXT",
|
|
"campaign_eligible BOOLEAN DEFAULT FALSE",
|
|
]:
|
|
col_name = col.split()[0]
|
|
try:
|
|
cur.execute(f"ALTER TABLE fmcsa_carriers ADD COLUMN IF NOT EXISTS {col}")
|
|
except Exception:
|
|
conn.rollback()
|
|
conn.commit()
|
|
|
|
# Load state trucking requirements for state-level flags
|
|
state_reqs = {}
|
|
try:
|
|
cur.execute("SELECT state_code, weight_distance_tax, state_carrier_permit, "
|
|
"emissions_program, intrastate_authority_required, "
|
|
"weight_distance_tax_name, state_carrier_permit_name, "
|
|
"emissions_program_name FROM state_trucking_requirements")
|
|
for sr in cur.fetchall():
|
|
state_reqs[sr[0]] = {
|
|
"weight_tax": sr[1], "carrier_permit": sr[2],
|
|
"emissions": sr[3], "intrastate": sr[4],
|
|
"weight_tax_name": sr[5], "permit_name": sr[6],
|
|
"emissions_name": sr[7],
|
|
}
|
|
LOG.info("Loaded state requirements for %d states", len(state_reqs))
|
|
except Exception:
|
|
conn.rollback()
|
|
LOG.warning("state_trucking_requirements table not found — skipping state flags")
|
|
|
|
# Query all carriers with email
|
|
LOG.info("Querying carriers...")
|
|
cur.execute("""
|
|
SELECT dot_number, legal_name, email_address, telephone,
|
|
phy_city, phy_state, mcs150_parsed, mcs150_date,
|
|
nbr_power_unit, driver_total, authorized_for_hire,
|
|
hm_flag, carrier_operation
|
|
FROM fmcsa_carriers
|
|
WHERE email_address IS NOT NULL AND email_address != ''
|
|
""")
|
|
|
|
rows = cur.fetchall()
|
|
LOG.info("Found %d carriers with email", len(rows))
|
|
|
|
stats = {
|
|
"total": len(rows),
|
|
"mcs150_overdue": 0,
|
|
"mcs150_severe": 0,
|
|
"for_hire": 0,
|
|
"hazmat": 0,
|
|
"flagged": 0,
|
|
"campaign_eligible": 0,
|
|
}
|
|
|
|
updates = []
|
|
|
|
for row in rows:
|
|
(dot, name, email, phone, city, state, mcs150_parsed, mcs150_raw,
|
|
trucks, drivers, for_hire, hazmat, carrier_op) = row
|
|
|
|
flags = []
|
|
issues = []
|
|
severity = "none"
|
|
|
|
# MCS-150 check
|
|
if mcs150_parsed:
|
|
if mcs150_parsed < FOUR_YEARS_AGO.date():
|
|
# Skip severely overdue (4+ years) — likely abandoned, spam trap risk
|
|
flags.append("mcs150_severely_overdue")
|
|
stats["mcs150_severe"] += 1
|
|
stats["mcs150_overdue"] += 1
|
|
# Mark as NOT campaign eligible — too old
|
|
severity = "stale"
|
|
elif mcs150_parsed < TWO_YEARS_AGO.date():
|
|
flags.append("mcs150_overdue")
|
|
issues.append(f"MCS-150 biennial update is overdue — last filed {mcs150_parsed}. Required every 2 years. Fines up to $1,000/day.")
|
|
severity = "major" if severity == "none" else severity
|
|
stats["mcs150_overdue"] += 1
|
|
else:
|
|
# No date — skip for campaign (can't verify recency)
|
|
flags.append("mcs150_unknown")
|
|
severity = "stale"
|
|
|
|
# For-hire carrier — needs BOC-3, UCR, insurance
|
|
if for_hire:
|
|
flags.append("for_hire_carrier")
|
|
issues.append("For-hire carrier — BOC-3, UCR registration, and liability insurance required")
|
|
severity = "major" if severity == "none" else severity
|
|
stats["for_hire"] += 1
|
|
|
|
# Hazmat — needs PHMSA registration
|
|
if hazmat:
|
|
flags.append("hazmat_carrier")
|
|
issues.append("Hazmat carrier — verify PHMSA registration is current")
|
|
severity = "major" if severity in ("none", "minor") else severity
|
|
stats["hazmat"] += 1
|
|
|
|
# Zero trucks/drivers — possibly inactive or data stale
|
|
if (trucks is not None and trucks == 0) or (drivers is not None and drivers == 0):
|
|
flags.append("zero_fleet")
|
|
issues.append("Zero trucks or drivers on file — update your MCS-150 if still operating")
|
|
stats.setdefault("zero_fleet", 0)
|
|
stats["zero_fleet"] += 1
|
|
|
|
# State-level flags based on carrier's physical state
|
|
# carrier_operation: A = Interstate, B = Intrastate Hazmat, C = Intrastate Non-Hazmat
|
|
is_interstate = (carrier_op or "").upper() == "A"
|
|
sr = state_reqs.get(state) if state else None
|
|
if sr:
|
|
# Interstate carriers need IRP and IFTA
|
|
if is_interstate:
|
|
flags.append("interstate_needs_irp_ifta")
|
|
issues.append("Interstate carrier — IRP (apportioned registration) and IFTA (fuel tax) required")
|
|
stats.setdefault("interstate_irp_ifta", 0)
|
|
stats["interstate_irp_ifta"] += 1
|
|
|
|
# Weight-distance tax states (OR, NY, KY, NM, CT)
|
|
if sr["weight_tax"]:
|
|
flags.append(f"state_weight_tax_{state}")
|
|
issues.append(f"{sr['weight_tax_name'] or 'Weight-distance tax'} required in {state}")
|
|
stats.setdefault("state_weight_tax", 0)
|
|
stats["state_weight_tax"] += 1
|
|
|
|
# State carrier permit (CA MCP, etc.)
|
|
if sr["carrier_permit"]:
|
|
flags.append(f"state_permit_{state}")
|
|
issues.append(f"{sr['permit_name'] or 'State carrier permit'} required in {state}")
|
|
stats.setdefault("state_carrier_permit", 0)
|
|
stats["state_carrier_permit"] += 1
|
|
|
|
# Emissions (CA CARB, ACT states)
|
|
if sr["emissions"]:
|
|
flags.append(f"state_emissions_{state}")
|
|
issues.append(f"{sr['emissions_name'] or 'Emissions compliance'} required in {state}")
|
|
stats.setdefault("state_emissions", 0)
|
|
stats["state_emissions"] += 1
|
|
|
|
# Intrastate authority (29 states)
|
|
if sr["intrastate"] and for_hire:
|
|
flags.append(f"intrastate_authority_{state}")
|
|
issues.append(f"For-hire intrastate carriers in {state} need state operating authority")
|
|
stats.setdefault("intrastate_authority", 0)
|
|
stats["intrastate_authority"] += 1
|
|
|
|
# New entrant (registered in last 18 months) — needs safety audit prep
|
|
# add_date is stored as text like "01-JAN-25"
|
|
# We can't easily parse all formats here, so use mcs150_parsed as a proxy
|
|
# for recent registration if the carrier has very few inspections
|
|
|
|
# Determine campaign eligibility — any real issue makes them eligible
|
|
# For-hire + MCS-150 overdue = critical (they need everything)
|
|
if for_hire and "mcs150_overdue" in flags:
|
|
severity = "critical"
|
|
|
|
# Exclude stale/severely overdue from campaign (spam trap risk)
|
|
actionable_flags = [f for f in flags if f not in ("zero_fleet", "mcs150_severely_overdue", "mcs150_unknown")]
|
|
deficiency_count = len(actionable_flags)
|
|
campaign_eligible = deficiency_count > 0 and bool(email) and severity != "stale"
|
|
|
|
if campaign_eligible:
|
|
stats["campaign_eligible"] += 1
|
|
|
|
if flags:
|
|
stats["flagged"] += 1
|
|
|
|
if not dry_run:
|
|
updates.append((
|
|
flags if flags else None,
|
|
deficiency_count,
|
|
severity,
|
|
"; ".join(issues) if issues else None,
|
|
campaign_eligible,
|
|
dot,
|
|
))
|
|
|
|
if not dry_run and updates:
|
|
LOG.info("Updating %d carrier records...", len(updates))
|
|
# Batch update in chunks
|
|
chunk_size = 5000
|
|
for i in range(0, len(updates), chunk_size):
|
|
chunk = updates[i:i+chunk_size]
|
|
for u in chunk:
|
|
cur.execute("""
|
|
UPDATE fmcsa_carriers SET
|
|
deficiency_flags = %s,
|
|
deficiency_count = %s,
|
|
deficiency_severity = %s,
|
|
issues_summary = %s,
|
|
campaign_eligible = %s
|
|
WHERE dot_number = %s
|
|
""", u)
|
|
conn.commit()
|
|
LOG.info(" Updated %d / %d", min(i + chunk_size, len(updates)), len(updates))
|
|
|
|
conn.close()
|
|
return stats
|
|
|
|
|
|
def populate_listmonk(list_name: str = "FMCSA MCS-150 Overdue Carriers") -> int:
|
|
"""Create/populate a Listmonk list with campaign-eligible carriers."""
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Get campaign-eligible carriers
|
|
cur.execute("""
|
|
SELECT dot_number, legal_name, email_address, telephone,
|
|
phy_city, phy_state, mcs150_parsed, mcs150_date,
|
|
nbr_power_unit, driver_total, authorized_for_hire,
|
|
deficiency_count, deficiency_severity, issues_summary
|
|
FROM fmcsa_carriers
|
|
WHERE campaign_eligible = TRUE
|
|
AND deficiency_count > 0
|
|
ORDER BY deficiency_severity DESC, deficiency_count DESC
|
|
""")
|
|
|
|
rows = cur.fetchall()
|
|
conn.close()
|
|
LOG.info("Found %d campaign-eligible carriers for Listmonk", len(rows))
|
|
|
|
if not rows:
|
|
return 0
|
|
|
|
# Get or create Listmonk list
|
|
auth = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Basic {auth}",
|
|
}
|
|
|
|
# Check if list exists
|
|
try:
|
|
req = urllib.request.Request(
|
|
f"{LISTMONK_API}/api/lists",
|
|
headers=headers,
|
|
)
|
|
resp = json.loads(urllib.request.urlopen(req).read())
|
|
existing = [l for l in resp.get("data", {}).get("results", []) if l["name"] == list_name]
|
|
if existing:
|
|
list_id = existing[0]["id"]
|
|
LOG.info("Using existing Listmonk list: %s (ID %d)", list_name, list_id)
|
|
else:
|
|
# Create new list
|
|
data = json.dumps({"name": list_name, "type": "public", "optin": "single"}).encode()
|
|
req = urllib.request.Request(
|
|
f"{LISTMONK_API}/api/lists",
|
|
data=data, method="POST", headers=headers,
|
|
)
|
|
resp = json.loads(urllib.request.urlopen(req).read())
|
|
list_id = resp["data"]["id"]
|
|
LOG.info("Created Listmonk list: %s (ID %d)", list_name, list_id)
|
|
except Exception as e:
|
|
LOG.error("Failed to get/create Listmonk list: %s", e)
|
|
return 0
|
|
|
|
# Import subscribers in batches
|
|
imported = 0
|
|
batch_size = 500
|
|
|
|
for i in range(0, len(rows), batch_size):
|
|
batch = rows[i:i+batch_size]
|
|
subscribers = []
|
|
|
|
for row in batch:
|
|
(dot, name, email, phone, city, state, mcs150_parsed, mcs150_raw,
|
|
trucks, drivers, for_hire, def_count, def_severity, issues) = row
|
|
|
|
# Build issues HTML for email template
|
|
issues_list = (issues or "").split("; ")
|
|
issues_html = "<ul style='margin:0;padding:0 0 0 16px'>"
|
|
for iss in issues_list:
|
|
if iss:
|
|
issues_html += f"<li>{iss}</li>"
|
|
issues_html += "</ul>"
|
|
|
|
subscribers.append({
|
|
"email": email.lower().strip(),
|
|
"name": name or "",
|
|
"status": "enabled",
|
|
"lists": [list_id],
|
|
"attribs": {
|
|
"company": name or "",
|
|
"dot_number": dot,
|
|
"phone": phone or "",
|
|
"city": city or "",
|
|
"state": state or "",
|
|
"mcs150_date": str(mcs150_parsed) if mcs150_parsed else "",
|
|
"trucks": trucks or 0,
|
|
"drivers": drivers or 0,
|
|
"for_hire": for_hire or False,
|
|
"deficiency_count": def_count or 0,
|
|
"severity": def_severity or "",
|
|
"issues_html": issues_html,
|
|
},
|
|
})
|
|
|
|
# Batch import
|
|
try:
|
|
data = json.dumps({
|
|
"mode": "subscribe",
|
|
"subscribers": subscribers,
|
|
"lists": [list_id],
|
|
"overwrite": True,
|
|
}).encode()
|
|
req = urllib.request.Request(
|
|
f"{LISTMONK_API}/api/import/subscribers",
|
|
data=data, method="POST", headers=headers,
|
|
)
|
|
urllib.request.urlopen(req)
|
|
imported += len(batch)
|
|
LOG.info(" Imported %d / %d", imported, len(rows))
|
|
except Exception as e:
|
|
LOG.error(" Import batch failed: %s", e)
|
|
|
|
LOG.info("Done. Imported %d subscribers to list '%s'", imported, list_name)
|
|
return imported
|
|
|
|
|
|
def populate_state_lists() -> None:
|
|
"""Create state-targeted Listmonk lists for campaign segmentation."""
|
|
STATE_LISTS = {
|
|
"CA": "FMCSA State - California (MCP/CARB)",
|
|
"OR": "FMCSA State - Oregon (Weight-Mile Tax)",
|
|
"NY": "FMCSA State - New York (HUT)",
|
|
"KY": "FMCSA State - Kentucky (KYU)",
|
|
"NM": "FMCSA State - New Mexico (Weight-Distance)",
|
|
"CT": "FMCSA State - Connecticut (Highway Use Fee)",
|
|
"TX": "FMCSA State - Texas",
|
|
"FL": "FMCSA State - Florida",
|
|
}
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
auth = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
|
|
headers = {"Content-Type": "application/json", "Authorization": f"Basic {auth}"}
|
|
|
|
for state_code, list_name in STATE_LISTS.items():
|
|
cur.execute("""
|
|
SELECT dot_number, legal_name, email_address, telephone,
|
|
phy_city, phy_state, issues_summary, deficiency_count
|
|
FROM fmcsa_carriers
|
|
WHERE campaign_eligible = TRUE
|
|
AND phy_state = %s
|
|
AND email_address IS NOT NULL AND email_address != ''
|
|
""", (state_code,))
|
|
rows = cur.fetchall()
|
|
if not rows:
|
|
LOG.info("No eligible carriers in %s — skipping", state_code)
|
|
continue
|
|
|
|
LOG.info("Found %d eligible carriers in %s", len(rows), state_code)
|
|
|
|
# Get or create list
|
|
try:
|
|
req = urllib.request.Request(f"{LISTMONK_API}/api/lists", headers=headers)
|
|
resp = json.loads(urllib.request.urlopen(req).read())
|
|
existing = [l for l in resp.get("data", {}).get("results", []) if l["name"] == list_name]
|
|
if existing:
|
|
list_id = existing[0]["id"]
|
|
else:
|
|
data = json.dumps({"name": list_name, "type": "public", "optin": "single"}).encode()
|
|
req = urllib.request.Request(f"{LISTMONK_API}/api/lists", data=data, method="POST", headers=headers)
|
|
resp = json.loads(urllib.request.urlopen(req).read())
|
|
list_id = resp["data"]["id"]
|
|
LOG.info("Created list: %s (ID %d)", list_name, list_id)
|
|
except Exception as e:
|
|
LOG.error("Failed to create list %s: %s", list_name, e)
|
|
continue
|
|
|
|
# Import in batches
|
|
imported = 0
|
|
for i in range(0, len(rows), 500):
|
|
batch = rows[i:i+500]
|
|
subscribers = []
|
|
for dot, name, email, phone, city, st, issues, def_count in batch:
|
|
issues_list = (issues or "").split("; ")
|
|
issues_html = "".join(f"<li>{iss}</li>" for iss in issues_list if iss)
|
|
subscribers.append({
|
|
"email": email.lower().strip(),
|
|
"name": name or "",
|
|
"status": "enabled",
|
|
"lists": [list_id],
|
|
"attribs": {
|
|
"company": name or "", "dot_number": dot,
|
|
"phone": phone or "", "city": city or "",
|
|
"state": st or "", "deficiency_count": def_count or 0,
|
|
"issues_html": f"<ul style='margin:0;padding:0 0 0 16px'>{issues_html}</ul>",
|
|
},
|
|
})
|
|
try:
|
|
data = json.dumps({"mode": "subscribe", "subscribers": subscribers,
|
|
"lists": [list_id], "overwrite": True}).encode()
|
|
req = urllib.request.Request(f"{LISTMONK_API}/api/import/subscribers",
|
|
data=data, method="POST", headers=headers)
|
|
urllib.request.urlopen(req)
|
|
imported += len(batch)
|
|
except Exception as e:
|
|
LOG.error(" Batch import failed for %s: %s", state_code, e)
|
|
|
|
LOG.info(" %s: imported %d to list '%s'", state_code, imported, list_name)
|
|
|
|
conn.close()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Flag FMCSA carrier deficiencies")
|
|
parser.add_argument("--dry-run", action="store_true", help="Count only, don't update")
|
|
parser.add_argument("--listmonk", action="store_true", help="Also populate Listmonk list")
|
|
parser.add_argument("--state-lists", action="store_true", help="Also populate state-targeted lists")
|
|
args = parser.parse_args()
|
|
|
|
stats = flag_carriers(dry_run=args.dry_run)
|
|
|
|
LOG.info("=== Flagging Results ===")
|
|
for k, v in stats.items():
|
|
LOG.info(" %s: %s", k, v)
|
|
|
|
if args.listmonk and not args.dry_run:
|
|
populate_listmonk()
|
|
|
|
if args.state_lists and not args.dry_run:
|
|
populate_state_lists()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|