#!/usr/bin/env python3 """ Flag FMCSA carriers with compliance deficiencies for email campaigns. Analyzes the fmcsa_carriers census data and flags each carrier with: - MCS-150 overdue (biennial, >2 years) - MCS-150 severely overdue (>4 years) - For-hire without likely BOC-3 (operating for hire) - No email (can't contact — skip) - Fleet size category Stores flags in fmcsa_carriers table columns for campaign targeting. Creates Listmonk subscriber lists for email outreach. Usage: python3 -m scripts.workers.fmcsa_deficiency_flagger # flag all python3 -m scripts.workers.fmcsa_deficiency_flagger --dry-run # count only python3 -m scripts.workers.fmcsa_deficiency_flagger --listmonk # also populate Listmonk """ from __future__ import annotations import argparse import json import logging import os import sys import urllib.request import base64 from datetime import datetime, timedelta import psycopg2 from psycopg2.extras import execute_values LOG = logging.getLogger("workers.fmcsa_flagger") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", stream=sys.stdout, ) DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") LISTMONK_API = os.environ.get("LISTMONK_URL", "http://localhost:9100") LISTMONK_USER = os.environ.get("LISTMONK_USER", "admin") LISTMONK_PASS = os.environ.get("LISTMONK_PASSWORD", "") TWO_YEARS_AGO = datetime.now() - timedelta(days=730) FOUR_YEARS_AGO = datetime.now() - timedelta(days=1460) def flag_carriers(dry_run: bool = False) -> dict: """Analyze carriers and flag deficiencies.""" conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() # Add flag columns if they don't exist if not dry_run: for col in [ "deficiency_flags TEXT[]", "deficiency_count INTEGER DEFAULT 0", "deficiency_severity TEXT", # minor, major, critical "issues_summary TEXT", "campaign_eligible BOOLEAN DEFAULT FALSE", ]: col_name = col.split()[0] try: cur.execute(f"ALTER TABLE fmcsa_carriers ADD COLUMN IF NOT EXISTS {col}") except Exception: conn.rollback() conn.commit() # Load state trucking requirements for state-level flags state_reqs = {} try: cur.execute("SELECT state_code, weight_distance_tax, state_carrier_permit, " "emissions_program, intrastate_authority_required, " "weight_distance_tax_name, state_carrier_permit_name, " "emissions_program_name FROM state_trucking_requirements") for sr in cur.fetchall(): state_reqs[sr[0]] = { "weight_tax": sr[1], "carrier_permit": sr[2], "emissions": sr[3], "intrastate": sr[4], "weight_tax_name": sr[5], "permit_name": sr[6], "emissions_name": sr[7], } LOG.info("Loaded state requirements for %d states", len(state_reqs)) except Exception: conn.rollback() LOG.warning("state_trucking_requirements table not found — skipping state flags") # Query all carriers with email LOG.info("Querying carriers...") cur.execute(""" SELECT dot_number, legal_name, email_address, telephone, phy_city, phy_state, mcs150_parsed, mcs150_date, nbr_power_unit, driver_total, authorized_for_hire, hm_flag, carrier_operation FROM fmcsa_carriers WHERE email_address IS NOT NULL AND email_address != '' """) rows = cur.fetchall() LOG.info("Found %d carriers with email", len(rows)) stats = { "total": len(rows), "mcs150_overdue": 0, "mcs150_severe": 0, "for_hire": 0, "hazmat": 0, "flagged": 0, "campaign_eligible": 0, } updates = [] for row in rows: (dot, name, email, phone, city, state, mcs150_parsed, mcs150_raw, trucks, drivers, for_hire, hazmat, carrier_op) = row flags = [] issues = [] severity = "none" # MCS-150 check if mcs150_parsed: if mcs150_parsed < FOUR_YEARS_AGO.date(): # Skip severely overdue (4+ years) — likely abandoned, spam trap risk flags.append("mcs150_severely_overdue") stats["mcs150_severe"] += 1 stats["mcs150_overdue"] += 1 # Mark as NOT campaign eligible — too old severity = "stale" elif mcs150_parsed < TWO_YEARS_AGO.date(): flags.append("mcs150_overdue") issues.append(f"MCS-150 biennial update is overdue — last filed {mcs150_parsed}. Required every 2 years. Fines up to $1,000/day.") severity = "major" if severity == "none" else severity stats["mcs150_overdue"] += 1 else: # No date — skip for campaign (can't verify recency) flags.append("mcs150_unknown") severity = "stale" # For-hire carrier — needs BOC-3, UCR, insurance if for_hire: flags.append("for_hire_carrier") issues.append("For-hire carrier — BOC-3, UCR registration, and liability insurance required") severity = "major" if severity == "none" else severity stats["for_hire"] += 1 # Hazmat — needs PHMSA registration if hazmat: flags.append("hazmat_carrier") issues.append("Hazmat carrier — verify PHMSA registration is current") severity = "major" if severity in ("none", "minor") else severity stats["hazmat"] += 1 # Zero trucks/drivers — possibly inactive or data stale if (trucks is not None and trucks == 0) or (drivers is not None and drivers == 0): flags.append("zero_fleet") issues.append("Zero trucks or drivers on file — update your MCS-150 if still operating") stats.setdefault("zero_fleet", 0) stats["zero_fleet"] += 1 # State-level flags based on carrier's physical state # carrier_operation: A = Interstate, B = Intrastate Hazmat, C = Intrastate Non-Hazmat is_interstate = (carrier_op or "").upper() == "A" sr = state_reqs.get(state) if state else None if sr: # Interstate carriers need IRP and IFTA if is_interstate: flags.append("interstate_needs_irp_ifta") issues.append("Interstate carrier — IRP (apportioned registration) and IFTA (fuel tax) required") stats.setdefault("interstate_irp_ifta", 0) stats["interstate_irp_ifta"] += 1 # Weight-distance tax states (OR, NY, KY, NM, CT) if sr["weight_tax"]: flags.append(f"state_weight_tax_{state}") issues.append(f"{sr['weight_tax_name'] or 'Weight-distance tax'} required in {state}") stats.setdefault("state_weight_tax", 0) stats["state_weight_tax"] += 1 # State carrier permit (CA MCP, etc.) if sr["carrier_permit"]: flags.append(f"state_permit_{state}") issues.append(f"{sr['permit_name'] or 'State carrier permit'} required in {state}") stats.setdefault("state_carrier_permit", 0) stats["state_carrier_permit"] += 1 # Emissions (CA CARB, ACT states) if sr["emissions"]: flags.append(f"state_emissions_{state}") issues.append(f"{sr['emissions_name'] or 'Emissions compliance'} required in {state}") stats.setdefault("state_emissions", 0) stats["state_emissions"] += 1 # Intrastate authority (29 states) if sr["intrastate"] and for_hire: flags.append(f"intrastate_authority_{state}") issues.append(f"For-hire intrastate carriers in {state} need state operating authority") stats.setdefault("intrastate_authority", 0) stats["intrastate_authority"] += 1 # New entrant (registered in last 18 months) — needs safety audit prep # add_date is stored as text like "01-JAN-25" # We can't easily parse all formats here, so use mcs150_parsed as a proxy # for recent registration if the carrier has very few inspections # Determine campaign eligibility — any real issue makes them eligible # For-hire + MCS-150 overdue = critical (they need everything) if for_hire and "mcs150_overdue" in flags: severity = "critical" # Exclude stale/severely overdue from campaign (spam trap risk) actionable_flags = [f for f in flags if f not in ("zero_fleet", "mcs150_severely_overdue", "mcs150_unknown")] deficiency_count = len(actionable_flags) campaign_eligible = deficiency_count > 0 and bool(email) and severity != "stale" if campaign_eligible: stats["campaign_eligible"] += 1 if flags: stats["flagged"] += 1 if not dry_run: updates.append(( flags if flags else None, deficiency_count, severity, "; ".join(issues) if issues else None, campaign_eligible, dot, )) if not dry_run and updates: LOG.info("Updating %d carrier records...", len(updates)) # Batch update in chunks chunk_size = 5000 for i in range(0, len(updates), chunk_size): chunk = updates[i:i+chunk_size] for u in chunk: cur.execute(""" UPDATE fmcsa_carriers SET deficiency_flags = %s, deficiency_count = %s, deficiency_severity = %s, issues_summary = %s, campaign_eligible = %s WHERE dot_number = %s """, u) conn.commit() LOG.info(" Updated %d / %d", min(i + chunk_size, len(updates)), len(updates)) conn.close() return stats def populate_listmonk(list_name: str = "FMCSA MCS-150 Overdue Carriers") -> int: """Create/populate a Listmonk list with campaign-eligible carriers.""" conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() # Get campaign-eligible carriers cur.execute(""" SELECT dot_number, legal_name, email_address, telephone, phy_city, phy_state, mcs150_parsed, mcs150_date, nbr_power_unit, driver_total, authorized_for_hire, deficiency_count, deficiency_severity, issues_summary FROM fmcsa_carriers WHERE campaign_eligible = TRUE AND deficiency_count > 0 ORDER BY deficiency_severity DESC, deficiency_count DESC """) rows = cur.fetchall() conn.close() LOG.info("Found %d campaign-eligible carriers for Listmonk", len(rows)) if not rows: return 0 # Get or create Listmonk list auth = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode() headers = { "Content-Type": "application/json", "Authorization": f"Basic {auth}", } # Check if list exists try: req = urllib.request.Request( f"{LISTMONK_API}/api/lists", headers=headers, ) resp = json.loads(urllib.request.urlopen(req).read()) existing = [l for l in resp.get("data", {}).get("results", []) if l["name"] == list_name] if existing: list_id = existing[0]["id"] LOG.info("Using existing Listmonk list: %s (ID %d)", list_name, list_id) else: # Create new list data = json.dumps({"name": list_name, "type": "public", "optin": "single"}).encode() req = urllib.request.Request( f"{LISTMONK_API}/api/lists", data=data, method="POST", headers=headers, ) resp = json.loads(urllib.request.urlopen(req).read()) list_id = resp["data"]["id"] LOG.info("Created Listmonk list: %s (ID %d)", list_name, list_id) except Exception as e: LOG.error("Failed to get/create Listmonk list: %s", e) return 0 # Import subscribers in batches imported = 0 batch_size = 500 for i in range(0, len(rows), batch_size): batch = rows[i:i+batch_size] subscribers = [] for row in batch: (dot, name, email, phone, city, state, mcs150_parsed, mcs150_raw, trucks, drivers, for_hire, def_count, def_severity, issues) = row # Build issues HTML for email template issues_list = (issues or "").split("; ") issues_html = "" subscribers.append({ "email": email.lower().strip(), "name": name or "", "status": "enabled", "lists": [list_id], "attribs": { "company": name or "", "dot_number": dot, "phone": phone or "", "city": city or "", "state": state or "", "mcs150_date": str(mcs150_parsed) if mcs150_parsed else "", "trucks": trucks or 0, "drivers": drivers or 0, "for_hire": for_hire or False, "deficiency_count": def_count or 0, "severity": def_severity or "", "issues_html": issues_html, }, }) # Batch import try: data = json.dumps({ "mode": "subscribe", "subscribers": subscribers, "lists": [list_id], "overwrite": True, }).encode() req = urllib.request.Request( f"{LISTMONK_API}/api/import/subscribers", data=data, method="POST", headers=headers, ) urllib.request.urlopen(req) imported += len(batch) LOG.info(" Imported %d / %d", imported, len(rows)) except Exception as e: LOG.error(" Import batch failed: %s", e) LOG.info("Done. Imported %d subscribers to list '%s'", imported, list_name) return imported def populate_state_lists() -> None: """Create state-targeted Listmonk lists for campaign segmentation.""" STATE_LISTS = { "CA": "FMCSA State - California (MCP/CARB)", "OR": "FMCSA State - Oregon (Weight-Mile Tax)", "NY": "FMCSA State - New York (HUT)", "KY": "FMCSA State - Kentucky (KYU)", "NM": "FMCSA State - New Mexico (Weight-Distance)", "CT": "FMCSA State - Connecticut (Highway Use Fee)", "TX": "FMCSA State - Texas", "FL": "FMCSA State - Florida", } conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() auth = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode() headers = {"Content-Type": "application/json", "Authorization": f"Basic {auth}"} for state_code, list_name in STATE_LISTS.items(): cur.execute(""" SELECT dot_number, legal_name, email_address, telephone, phy_city, phy_state, issues_summary, deficiency_count FROM fmcsa_carriers WHERE campaign_eligible = TRUE AND phy_state = %s AND email_address IS NOT NULL AND email_address != '' """, (state_code,)) rows = cur.fetchall() if not rows: LOG.info("No eligible carriers in %s — skipping", state_code) continue LOG.info("Found %d eligible carriers in %s", len(rows), state_code) # Get or create list try: req = urllib.request.Request(f"{LISTMONK_API}/api/lists", headers=headers) resp = json.loads(urllib.request.urlopen(req).read()) existing = [l for l in resp.get("data", {}).get("results", []) if l["name"] == list_name] if existing: list_id = existing[0]["id"] else: data = json.dumps({"name": list_name, "type": "public", "optin": "single"}).encode() req = urllib.request.Request(f"{LISTMONK_API}/api/lists", data=data, method="POST", headers=headers) resp = json.loads(urllib.request.urlopen(req).read()) list_id = resp["data"]["id"] LOG.info("Created list: %s (ID %d)", list_name, list_id) except Exception as e: LOG.error("Failed to create list %s: %s", list_name, e) continue # Import subscribers individually (bulk import API returns 400) imported = 0 failed = 0 for dot, name, email, phone, city, st, issues, def_count in rows: issues_list = (issues or "").split("; ") issues_html = "".join(f"
  • {iss}
  • " for iss in issues_list if iss) sub_data = { "email": email.lower().strip(), "name": name or email.split("@")[0], "status": "enabled", "lists": [list_id], "preconfirm_subscriptions": True, "attribs": { "company": name or "", "dot_number": dot, "phone": phone or "", "city": city or "", "state": st or "", "deficiency_count": def_count or 0, "issues_html": f"", }, } try: data = json.dumps(sub_data).encode() req = urllib.request.Request(f"{LISTMONK_API}/api/subscribers", data=data, method="POST", headers=headers) urllib.request.urlopen(req) imported += 1 except urllib.error.HTTPError as e: if e.code == 409: # Already exists — add to list try: # Look up existing subscriber and add to list search_req = urllib.request.Request( f"{LISTMONK_API}/api/subscribers?query=subscribers.email='{email.lower().strip()}'", headers=headers) search_resp = json.loads(urllib.request.urlopen(search_req).read()) results = search_resp.get("data", {}).get("results", []) if results: sub_id = results[0]["id"] existing_lists = [l["id"] for l in results[0].get("lists", [])] if list_id not in existing_lists: existing_lists.append(list_id) update_data = json.dumps({"lists": existing_lists, "attribs": sub_data["attribs"]}).encode() update_req = urllib.request.Request( f"{LISTMONK_API}/api/subscribers/{sub_id}", data=update_data, method="PUT", headers=headers) urllib.request.urlopen(update_req) imported += 1 except Exception: failed += 1 else: failed += 1 except Exception: failed += 1 if (imported + failed) % 200 == 0 and (imported + failed) > 0: LOG.info(" %s: %d imported, %d failed of %d", state_code, imported, failed, len(rows)) LOG.info(" %s: imported %d, failed %d to list '%s'", state_code, imported, failed, list_name) conn.close() def main(): parser = argparse.ArgumentParser(description="Flag FMCSA carrier deficiencies") parser.add_argument("--dry-run", action="store_true", help="Count only, don't update") parser.add_argument("--listmonk", action="store_true", help="Also populate Listmonk list") parser.add_argument("--state-lists", action="store_true", help="Also populate state-targeted lists") args = parser.parse_args() stats = flag_carriers(dry_run=args.dry_run) LOG.info("=== Flagging Results ===") for k, v in stats.items(): LOG.info(" %s: %s", k, v) if args.listmonk and not args.dry_run: populate_listmonk() if args.state_lists and not args.dry_run: populate_state_lists() if __name__ == "__main__": main()