diff --git a/scripts/workers/fmcsa_deficiency_flagger.py b/scripts/workers/fmcsa_deficiency_flagger.py new file mode 100644 index 0000000..b87d553 --- /dev/null +++ b/scripts/workers/fmcsa_deficiency_flagger.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python3 +""" +Flag FMCSA carriers with compliance deficiencies for email campaigns. + +Analyzes the fmcsa_carriers census data and flags each carrier with: + - MCS-150 overdue (biennial, >2 years) + - MCS-150 severely overdue (>4 years) + - For-hire without likely BOC-3 (operating for hire) + - No email (can't contact — skip) + - Fleet size category + +Stores flags in fmcsa_carriers table columns for campaign targeting. +Creates Listmonk subscriber lists for email outreach. + +Usage: + python3 -m scripts.workers.fmcsa_deficiency_flagger # flag all + python3 -m scripts.workers.fmcsa_deficiency_flagger --dry-run # count only + python3 -m scripts.workers.fmcsa_deficiency_flagger --listmonk # also populate Listmonk +""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +import sys +import urllib.request +import base64 +from datetime import datetime, timedelta + +import psycopg2 +from psycopg2.extras import execute_values + +LOG = logging.getLogger("workers.fmcsa_flagger") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + stream=sys.stdout, +) + +DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") +LISTMONK_API = os.environ.get("LISTMONK_URL", "http://localhost:9100") +LISTMONK_USER = os.environ.get("LISTMONK_USER", "admin") +LISTMONK_PASS = os.environ.get("LISTMONK_PASSWORD", "") + +TWO_YEARS_AGO = datetime.now() - timedelta(days=730) +FOUR_YEARS_AGO = datetime.now() - timedelta(days=1460) + + +def flag_carriers(dry_run: bool = False) -> dict: + """Analyze carriers and flag deficiencies.""" + conn = psycopg2.connect(DATABASE_URL) + cur = conn.cursor() + + # Add flag columns if they don't exist + if not dry_run: + for col in [ + "deficiency_flags TEXT[]", + "deficiency_count INTEGER DEFAULT 0", + "deficiency_severity TEXT", # minor, major, critical + "issues_summary TEXT", + "campaign_eligible BOOLEAN DEFAULT FALSE", + ]: + col_name = col.split()[0] + try: + cur.execute(f"ALTER TABLE fmcsa_carriers ADD COLUMN IF NOT EXISTS {col}") + except Exception: + conn.rollback() + conn.commit() + + # Query all carriers with email + LOG.info("Querying carriers...") + cur.execute(""" + SELECT dot_number, legal_name, email_address, telephone, + phy_city, phy_state, mcs150_parsed, mcs150_date, + nbr_power_unit, driver_total, authorized_for_hire, + hm_flag, carrier_operation + FROM fmcsa_carriers + WHERE email_address IS NOT NULL AND email_address != '' + """) + + rows = cur.fetchall() + LOG.info("Found %d carriers with email", len(rows)) + + stats = { + "total": len(rows), + "mcs150_overdue": 0, + "mcs150_severe": 0, + "for_hire": 0, + "hazmat": 0, + "flagged": 0, + "campaign_eligible": 0, + } + + updates = [] + + for row in rows: + (dot, name, email, phone, city, state, mcs150_parsed, mcs150_raw, + trucks, drivers, for_hire, hazmat, carrier_op) = row + + flags = [] + issues = [] + severity = "none" + + # MCS-150 check + if mcs150_parsed: + if mcs150_parsed < FOUR_YEARS_AGO.date(): + flags.append("mcs150_severely_overdue") + issues.append(f"MCS-150 severely overdue (last filed {mcs150_parsed})") + severity = "critical" + stats["mcs150_severe"] += 1 + stats["mcs150_overdue"] += 1 + elif mcs150_parsed < TWO_YEARS_AGO.date(): + flags.append("mcs150_overdue") + issues.append(f"MCS-150 overdue (last filed {mcs150_parsed})") + severity = "major" if severity == "none" else severity + stats["mcs150_overdue"] += 1 + else: + # No date — can't determine, flag as unknown + flags.append("mcs150_unknown") + issues.append("MCS-150 filing date unknown — verify at FMCSA Portal") + severity = "minor" if severity == "none" else severity + + # For-hire carrier — needs BOC-3, UCR, insurance + if for_hire: + flags.append("for_hire_carrier") + stats["for_hire"] += 1 + + # Hazmat — needs PHMSA registration + if hazmat: + flags.append("hazmat_carrier") + issues.append("Hazmat carrier — verify PHMSA registration is current") + severity = "major" if severity in ("none", "minor") else severity + stats["hazmat"] += 1 + + # Determine campaign eligibility + deficiency_count = len([f for f in flags if f not in ("for_hire_carrier",)]) + campaign_eligible = deficiency_count > 0 and email + + if campaign_eligible: + stats["campaign_eligible"] += 1 + + if flags: + stats["flagged"] += 1 + + if not dry_run: + updates.append(( + flags if flags else None, + deficiency_count, + severity, + "; ".join(issues) if issues else None, + campaign_eligible, + dot, + )) + + if not dry_run and updates: + LOG.info("Updating %d carrier records...", len(updates)) + # Batch update in chunks + chunk_size = 5000 + for i in range(0, len(updates), chunk_size): + chunk = updates[i:i+chunk_size] + for u in chunk: + cur.execute(""" + UPDATE fmcsa_carriers SET + deficiency_flags = %s, + deficiency_count = %s, + deficiency_severity = %s, + issues_summary = %s, + campaign_eligible = %s + WHERE dot_number = %s + """, u) + conn.commit() + LOG.info(" Updated %d / %d", min(i + chunk_size, len(updates)), len(updates)) + + conn.close() + return stats + + +def populate_listmonk(list_name: str = "FMCSA MCS-150 Overdue Carriers") -> int: + """Create/populate a Listmonk list with campaign-eligible carriers.""" + conn = psycopg2.connect(DATABASE_URL) + cur = conn.cursor() + + # Get campaign-eligible carriers + cur.execute(""" + SELECT dot_number, legal_name, email_address, telephone, + phy_city, phy_state, mcs150_parsed, mcs150_date, + nbr_power_unit, driver_total, authorized_for_hire, + deficiency_count, deficiency_severity, issues_summary + FROM fmcsa_carriers + WHERE campaign_eligible = TRUE + AND deficiency_count > 0 + ORDER BY deficiency_severity DESC, deficiency_count DESC + """) + + rows = cur.fetchall() + conn.close() + LOG.info("Found %d campaign-eligible carriers for Listmonk", len(rows)) + + if not rows: + return 0 + + # Get or create Listmonk list + auth = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode() + headers = { + "Content-Type": "application/json", + "Authorization": f"Basic {auth}", + } + + # Check if list exists + try: + req = urllib.request.Request( + f"{LISTMONK_API}/api/lists", + headers=headers, + ) + resp = json.loads(urllib.request.urlopen(req).read()) + existing = [l for l in resp.get("data", {}).get("results", []) if l["name"] == list_name] + if existing: + list_id = existing[0]["id"] + LOG.info("Using existing Listmonk list: %s (ID %d)", list_name, list_id) + else: + # Create new list + data = json.dumps({"name": list_name, "type": "public", "optin": "single"}).encode() + req = urllib.request.Request( + f"{LISTMONK_API}/api/lists", + data=data, method="POST", headers=headers, + ) + resp = json.loads(urllib.request.urlopen(req).read()) + list_id = resp["data"]["id"] + LOG.info("Created Listmonk list: %s (ID %d)", list_name, list_id) + except Exception as e: + LOG.error("Failed to get/create Listmonk list: %s", e) + return 0 + + # Import subscribers in batches + imported = 0 + batch_size = 500 + + for i in range(0, len(rows), batch_size): + batch = rows[i:i+batch_size] + subscribers = [] + + for row in batch: + (dot, name, email, phone, city, state, mcs150_parsed, mcs150_raw, + trucks, drivers, for_hire, def_count, def_severity, issues) = row + + # Build issues HTML for email template + issues_list = (issues or "").split("; ") + issues_html = "" + + subscribers.append({ + "email": email.lower().strip(), + "name": name or "", + "status": "enabled", + "lists": [list_id], + "attribs": { + "company": name or "", + "dot_number": dot, + "phone": phone or "", + "city": city or "", + "state": state or "", + "mcs150_date": str(mcs150_parsed) if mcs150_parsed else "", + "trucks": trucks or 0, + "drivers": drivers or 0, + "for_hire": for_hire or False, + "deficiency_count": def_count or 0, + "severity": def_severity or "", + "issues_html": issues_html, + }, + }) + + # Batch import + try: + data = json.dumps({ + "mode": "subscribe", + "subscribers": subscribers, + "lists": [list_id], + "overwrite": True, + }).encode() + req = urllib.request.Request( + f"{LISTMONK_API}/api/import/subscribers", + data=data, method="POST", headers=headers, + ) + urllib.request.urlopen(req) + imported += len(batch) + LOG.info(" Imported %d / %d", imported, len(rows)) + except Exception as e: + LOG.error(" Import batch failed: %s", e) + + LOG.info("Done. Imported %d subscribers to list '%s'", imported, list_name) + return imported + + +def main(): + parser = argparse.ArgumentParser(description="Flag FMCSA carrier deficiencies") + parser.add_argument("--dry-run", action="store_true", help="Count only, don't update") + parser.add_argument("--listmonk", action="store_true", help="Also populate Listmonk list") + args = parser.parse_args() + + stats = flag_carriers(dry_run=args.dry_run) + + LOG.info("=== Flagging Results ===") + for k, v in stats.items(): + LOG.info(" %s: %s", k, v) + + if args.listmonk and not args.dry_run: + populate_listmonk() + + +if __name__ == "__main__": + main()