new-site/scripts/workers/fmcsa_deficiency_flagger.py
justin e67df3c4c3 DOT intake: review-only (no telecom entity step), email verifier,
updated flagger excluding 4+ year stale carriers

- Intake manifest: DOT services use ["review"] only, skipping the
  telecom entity step with FRN/USAC fields
- Flagger: excludes 4+ year overdue carriers from campaign (spam
  trap risk). 18,277 safe targets from 100K records.
- Email verifier: self-hosted MX + SMTP verification tool

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-29 00:29:28 -05:00

336 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Flag FMCSA carriers with compliance deficiencies for email campaigns.
Analyzes the fmcsa_carriers census data and flags each carrier with:
- MCS-150 overdue (biennial, >2 years)
- MCS-150 severely overdue (>4 years)
- For-hire without likely BOC-3 (operating for hire)
- No email (can't contact — skip)
- Fleet size category
Stores flags in fmcsa_carriers table columns for campaign targeting.
Creates Listmonk subscriber lists for email outreach.
Usage:
python3 -m scripts.workers.fmcsa_deficiency_flagger # flag all
python3 -m scripts.workers.fmcsa_deficiency_flagger --dry-run # count only
python3 -m scripts.workers.fmcsa_deficiency_flagger --listmonk # also populate Listmonk
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import urllib.request
import base64
from datetime import datetime, timedelta
import psycopg2
from psycopg2.extras import execute_values
LOG = logging.getLogger("workers.fmcsa_flagger")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
LISTMONK_API = os.environ.get("LISTMONK_URL", "http://localhost:9100")
LISTMONK_USER = os.environ.get("LISTMONK_USER", "admin")
LISTMONK_PASS = os.environ.get("LISTMONK_PASSWORD", "")
TWO_YEARS_AGO = datetime.now() - timedelta(days=730)
FOUR_YEARS_AGO = datetime.now() - timedelta(days=1460)
def flag_carriers(dry_run: bool = False) -> dict:
"""Analyze carriers and flag deficiencies."""
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
# Add flag columns if they don't exist
if not dry_run:
for col in [
"deficiency_flags TEXT[]",
"deficiency_count INTEGER DEFAULT 0",
"deficiency_severity TEXT", # minor, major, critical
"issues_summary TEXT",
"campaign_eligible BOOLEAN DEFAULT FALSE",
]:
col_name = col.split()[0]
try:
cur.execute(f"ALTER TABLE fmcsa_carriers ADD COLUMN IF NOT EXISTS {col}")
except Exception:
conn.rollback()
conn.commit()
# Query all carriers with email
LOG.info("Querying carriers...")
cur.execute("""
SELECT dot_number, legal_name, email_address, telephone,
phy_city, phy_state, mcs150_parsed, mcs150_date,
nbr_power_unit, driver_total, authorized_for_hire,
hm_flag, carrier_operation
FROM fmcsa_carriers
WHERE email_address IS NOT NULL AND email_address != ''
""")
rows = cur.fetchall()
LOG.info("Found %d carriers with email", len(rows))
stats = {
"total": len(rows),
"mcs150_overdue": 0,
"mcs150_severe": 0,
"for_hire": 0,
"hazmat": 0,
"flagged": 0,
"campaign_eligible": 0,
}
updates = []
for row in rows:
(dot, name, email, phone, city, state, mcs150_parsed, mcs150_raw,
trucks, drivers, for_hire, hazmat, carrier_op) = row
flags = []
issues = []
severity = "none"
# MCS-150 check
if mcs150_parsed:
if mcs150_parsed < FOUR_YEARS_AGO.date():
# Skip severely overdue (4+ years) — likely abandoned, spam trap risk
flags.append("mcs150_severely_overdue")
stats["mcs150_severe"] += 1
stats["mcs150_overdue"] += 1
# Mark as NOT campaign eligible — too old
severity = "stale"
elif mcs150_parsed < TWO_YEARS_AGO.date():
flags.append("mcs150_overdue")
issues.append(f"MCS-150 biennial update is overdue — last filed {mcs150_parsed}. Required every 2 years. Fines up to $1,000/day.")
severity = "major" if severity == "none" else severity
stats["mcs150_overdue"] += 1
else:
# No date — skip for campaign (can't verify recency)
flags.append("mcs150_unknown")
severity = "stale"
# For-hire carrier — needs BOC-3, UCR, insurance
if for_hire:
flags.append("for_hire_carrier")
issues.append("For-hire carrier — BOC-3, UCR registration, and liability insurance required")
severity = "major" if severity == "none" else severity
stats["for_hire"] += 1
# Hazmat — needs PHMSA registration
if hazmat:
flags.append("hazmat_carrier")
issues.append("Hazmat carrier — verify PHMSA registration is current")
severity = "major" if severity in ("none", "minor") else severity
stats["hazmat"] += 1
# Zero trucks/drivers — possibly inactive or data stale
if (trucks is not None and trucks == 0) or (drivers is not None and drivers == 0):
flags.append("zero_fleet")
issues.append("Zero trucks or drivers on file — update your MCS-150 if still operating")
stats.setdefault("zero_fleet", 0)
stats["zero_fleet"] += 1
# New entrant (registered in last 18 months) — needs safety audit prep
# add_date is stored as text like "01-JAN-25"
# We can't easily parse all formats here, so use mcs150_parsed as a proxy
# for recent registration if the carrier has very few inspections
# Determine campaign eligibility — any real issue makes them eligible
# For-hire + MCS-150 overdue = critical (they need everything)
if for_hire and "mcs150_overdue" in flags:
severity = "critical"
# Exclude stale/severely overdue from campaign (spam trap risk)
actionable_flags = [f for f in flags if f not in ("zero_fleet", "mcs150_severely_overdue", "mcs150_unknown")]
deficiency_count = len(actionable_flags)
campaign_eligible = deficiency_count > 0 and bool(email) and severity != "stale"
if campaign_eligible:
stats["campaign_eligible"] += 1
if flags:
stats["flagged"] += 1
if not dry_run:
updates.append((
flags if flags else None,
deficiency_count,
severity,
"; ".join(issues) if issues else None,
campaign_eligible,
dot,
))
if not dry_run and updates:
LOG.info("Updating %d carrier records...", len(updates))
# Batch update in chunks
chunk_size = 5000
for i in range(0, len(updates), chunk_size):
chunk = updates[i:i+chunk_size]
for u in chunk:
cur.execute("""
UPDATE fmcsa_carriers SET
deficiency_flags = %s,
deficiency_count = %s,
deficiency_severity = %s,
issues_summary = %s,
campaign_eligible = %s
WHERE dot_number = %s
""", u)
conn.commit()
LOG.info(" Updated %d / %d", min(i + chunk_size, len(updates)), len(updates))
conn.close()
return stats
def populate_listmonk(list_name: str = "FMCSA MCS-150 Overdue Carriers") -> int:
"""Create/populate a Listmonk list with campaign-eligible carriers."""
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
# Get campaign-eligible carriers
cur.execute("""
SELECT dot_number, legal_name, email_address, telephone,
phy_city, phy_state, mcs150_parsed, mcs150_date,
nbr_power_unit, driver_total, authorized_for_hire,
deficiency_count, deficiency_severity, issues_summary
FROM fmcsa_carriers
WHERE campaign_eligible = TRUE
AND deficiency_count > 0
ORDER BY deficiency_severity DESC, deficiency_count DESC
""")
rows = cur.fetchall()
conn.close()
LOG.info("Found %d campaign-eligible carriers for Listmonk", len(rows))
if not rows:
return 0
# Get or create Listmonk list
auth = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
headers = {
"Content-Type": "application/json",
"Authorization": f"Basic {auth}",
}
# Check if list exists
try:
req = urllib.request.Request(
f"{LISTMONK_API}/api/lists",
headers=headers,
)
resp = json.loads(urllib.request.urlopen(req).read())
existing = [l for l in resp.get("data", {}).get("results", []) if l["name"] == list_name]
if existing:
list_id = existing[0]["id"]
LOG.info("Using existing Listmonk list: %s (ID %d)", list_name, list_id)
else:
# Create new list
data = json.dumps({"name": list_name, "type": "public", "optin": "single"}).encode()
req = urllib.request.Request(
f"{LISTMONK_API}/api/lists",
data=data, method="POST", headers=headers,
)
resp = json.loads(urllib.request.urlopen(req).read())
list_id = resp["data"]["id"]
LOG.info("Created Listmonk list: %s (ID %d)", list_name, list_id)
except Exception as e:
LOG.error("Failed to get/create Listmonk list: %s", e)
return 0
# Import subscribers in batches
imported = 0
batch_size = 500
for i in range(0, len(rows), batch_size):
batch = rows[i:i+batch_size]
subscribers = []
for row in batch:
(dot, name, email, phone, city, state, mcs150_parsed, mcs150_raw,
trucks, drivers, for_hire, def_count, def_severity, issues) = row
# Build issues HTML for email template
issues_list = (issues or "").split("; ")
issues_html = "<ul style='margin:0;padding:0 0 0 16px'>"
for iss in issues_list:
if iss:
issues_html += f"<li>{iss}</li>"
issues_html += "</ul>"
subscribers.append({
"email": email.lower().strip(),
"name": name or "",
"status": "enabled",
"lists": [list_id],
"attribs": {
"company": name or "",
"dot_number": dot,
"phone": phone or "",
"city": city or "",
"state": state or "",
"mcs150_date": str(mcs150_parsed) if mcs150_parsed else "",
"trucks": trucks or 0,
"drivers": drivers or 0,
"for_hire": for_hire or False,
"deficiency_count": def_count or 0,
"severity": def_severity or "",
"issues_html": issues_html,
},
})
# Batch import
try:
data = json.dumps({
"mode": "subscribe",
"subscribers": subscribers,
"lists": [list_id],
"overwrite": True,
}).encode()
req = urllib.request.Request(
f"{LISTMONK_API}/api/import/subscribers",
data=data, method="POST", headers=headers,
)
urllib.request.urlopen(req)
imported += len(batch)
LOG.info(" Imported %d / %d", imported, len(rows))
except Exception as e:
LOG.error(" Import batch failed: %s", e)
LOG.info("Done. Imported %d subscribers to list '%s'", imported, list_name)
return imported
def main():
parser = argparse.ArgumentParser(description="Flag FMCSA carrier deficiencies")
parser.add_argument("--dry-run", action="store_true", help="Count only, don't update")
parser.add_argument("--listmonk", action="store_true", help="Also populate Listmonk list")
args = parser.parse_args()
stats = flag_carriers(dry_run=args.dry_run)
LOG.info("=== Flagging Results ===")
for k, v in stats.items():
LOG.info(" %s: %s", k, v)
if args.listmonk and not args.dry_run:
populate_listmonk()
if __name__ == "__main__":
main()