#!/usr/bin/env python3 """Populate a Listmonk audience for new carriers likely missing startup items. Targets FMCSA carriers that look new/small and have no Performance West paid order evidence for core startup services such as BOC-3, UCR, D&A, IRP/IFTA, DOT full compliance, or new carrier bundle-like services. This script is intentionally read-only against the application database. It does not mark fmcsa_carriers.listmonk_sent_at because adding a carrier to an audience is not the same as sending the campaign. Usage: DATABASE_URL=postgres://... python3 scripts/populate_new_carrier_startup_campaign.py --dry-run DATABASE_URL=postgres://... python3 scripts/populate_new_carrier_startup_campaign.py --limit 500 """ from __future__ import annotations import argparse import base64 import html import json import logging import os import urllib.error import urllib.parse import urllib.request from dataclasses import dataclass from datetime import date, datetime, timedelta from typing import Any import psycopg2 import psycopg2.extras from scripts._email_exclusions import BLOCKED_EMAIL_DOMAINS LOG = logging.getLogger("populate_new_carrier_startup_campaign") LISTMONK_URL = os.getenv("LISTMONK_URL", "https://lists.performancewest.net").rstrip("/") LISTMONK_USER = os.getenv("LISTMONK_USER", "api") LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y") LIST_NAME = os.getenv("NEW_CARRIER_LIST_NAME", "FMCSA Trucking - New Carrier Missing Startup Items") DATABASE_URL = os.getenv("DATABASE_URL", "") STARTUP_SERVICE_SLUGS = { "boc3-filing", "ucr-registration", "dot-registration", "mc-authority", "dot-drug-alcohol", "dot-audit-prep", "dot-full-compliance", "irp-registration", "ifta-application", "ifta-quarterly", "state-trucking-bundle", "ein-application", "entity-upgrade-bundle", "registered-agent", "virtual-mailbox", } AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode() @dataclass class CarrierCandidate: dot_number: str email: str legal_name: str phone: str | None city: str | None state: str | None add_date: str | None trucks: int | None drivers: int | None authorized_for_hire: bool | None missing_items: list[str] startup_score: int def lm_api(path: str, data: dict | None = None, method: str | None = None) -> dict: headers = { "Authorization": f"Basic {AUTH}", "Content-Type": "application/json", "Accept": "application/json", } req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers) if data is not None: req.data = json.dumps(data).encode() if method: req.method = method try: raw = urllib.request.urlopen(req, timeout=45).read() return json.loads(raw or b"{}") except urllib.error.HTTPError as exc: body = exc.read().decode(errors="replace")[:1000] raise RuntimeError(f"Listmonk {path} HTTP {exc.code}: {body}") from exc def get_or_create_list(name: str) -> int: resp = lm_api("/lists?per_page=all") for row in resp.get("data", {}).get("results", []): if row.get("name") == name: return int(row["id"]) resp = lm_api( "/lists", {"name": name, "type": "private", "optin": "single", "tags": ["trucking", "new-carrier", "auto"]}, "POST", ) return int(resp["data"]["id"]) def add_subscriber(list_id: int, candidate: CarrierCandidate) -> bool: issues_html = "" attribs = { "company": candidate.legal_name, "dot_number": candidate.dot_number, "phone": candidate.phone or "", "city": candidate.city or "", "state": candidate.state or "", "add_date": candidate.add_date or "", "trucks": candidate.trucks or 0, "drivers": candidate.drivers or 0, "for_hire": bool(candidate.authorized_for_hire), "missing_items": candidate.missing_items, "missing_items_html": issues_html, "startup_score": candidate.startup_score, } payload = { "email": candidate.email.lower().strip(), "name": candidate.legal_name or candidate.email, "status": "enabled", "lists": [list_id], "preconfirm_subscriptions": True, "attribs": attribs, } try: lm_api("/subscribers", payload, "POST") return True except Exception as exc: msg = str(exc) if "HTTP 409" not in msg and "already exists" not in msg.lower() and "conflict" not in msg.lower(): LOG.warning("subscriber add failed for %s: %s", candidate.email, exc) return False try: query = urllib.parse.quote(f"subscribers.email='{candidate.email.lower().strip()}'") results = lm_api(f"/subscribers?query={query}").get("data", {}).get("results", []) if not results: return False sub_id = int(results[0]["id"]) lm_api( "/subscribers/lists", {"ids": [sub_id], "action": "add", "target_list_ids": [list_id], "status": "confirmed"}, "PUT", ) # Preserve fresh campaign attributes where the API supports subscriber update. try: lm_api(f"/subscribers/{sub_id}", {"attribs": attribs}, "PUT") except Exception: pass return True except Exception as inner: LOG.warning("subscriber attach failed for %s: %s", candidate.email, inner) return False def table_exists(conn, table: str) -> bool: with conn.cursor() as cur: cur.execute( """ SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = %s ) """, (table,), ) return bool(cur.fetchone()[0]) def table_columns(conn, table: str) -> set[str]: with conn.cursor() as cur: cur.execute( """ SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = %s """, (table,), ) return {r[0] for r in cur.fetchall()} def usable_email_filter(cols: set[str]) -> str: base = "email_address IS NOT NULL AND email_address <> '' AND position('@' in email_address) > 1" domain_filter = "lower(split_part(email_address, '@', 2)) <> ALL(%(blocked_domains)s)" if "email_verified" in cols and "email_verify_result" in cols: return f"{base} AND {domain_filter} AND (email_verified IS TRUE OR email_verify_result IN ('smtp_valid','catch_all_domain','catch_all_detected'))" if "email_verify_result" in cols: return f"{base} AND {domain_filter} AND (email_verify_result IS NULL OR email_verify_result NOT IN ('invalid','smtp_invalid','bad_mailbox','disposable'))" return f"{base} AND {domain_filter}" def startup_date_filter(cols: set[str], days: int) -> str: # Use FMCSA's carrier add_date only. Do not fall back to our row created_at, # because historical FMCSA carriers can be newly imported into our database. if "add_date" not in cols: raise RuntimeError("fmcsa_carriers.add_date is required for new-carrier targeting") return """ ( CASE WHEN add_date ~ '^\\d{4}-\\d{2}-\\d{2}' THEN add_date::date WHEN add_date ~ '^\\d{1,2}/\\d{1,2}/\\d{4}$' THEN to_date(add_date, 'MM/DD/YYYY') WHEN add_date ~ '^\\d{2}-[A-Z]{3}-\\d{2}$' THEN to_date(add_date, 'DD-MON-YY') ELSE NULL END ) >= CURRENT_DATE - (%(recent_days)s::int * INTERVAL '1 day') """ def has_order_exclusion_clause(order_cols: set[str]) -> str: if not order_cols: return "TRUE" # Match by customer email and, when present in intake JSON, by DOT number. # The JSON text fallback catches varying intake field names without assuming # every historical order used the same key. return """ NOT EXISTS ( SELECT 1 FROM compliance_orders co WHERE co.payment_status = 'paid' AND co.service_slug = ANY(%(startup_slugs)s) AND ( lower(co.customer_email) = lower(c.email_address) OR co.intake_data::text ILIKE '%%' || c.dot_number || '%%' ) ) """ def infer_missing_items(row: dict[str, Any]) -> tuple[list[str], int]: missing = ["BOC-3 process agent filing", "UCR annual registration", "DOT drug & alcohol program"] trucks = row.get("nbr_power_unit") or 0 drivers = row.get("driver_total") or 0 for_hire = bool(row.get("authorized_for_hire")) if trucks > 0: missing.append("IRP/IFTA review if operating interstate") if for_hire: missing.append("MC operating authority / insurance filing review") if drivers > 0: missing.append("New entrant safety audit preparation") score = 50 if trucks <= 3: score += 15 if drivers <= 5: score += 10 if for_hire: score += 10 if row.get("add_date"): score += 10 return missing, min(score, 100) def fetch_candidates(conn, limit: int, recent_days: int) -> list[CarrierCandidate]: if not table_exists(conn, "fmcsa_carriers"): raise RuntimeError("required table fmcsa_carriers does not exist; run migrations/import FMCSA data first") fmcsa_cols = table_columns(conn, "fmcsa_carriers") order_cols = table_columns(conn, "compliance_orders") if table_exists(conn, "compliance_orders") else set() filters = [ usable_email_filter(fmcsa_cols), startup_date_filter(fmcsa_cols, recent_days), "COALESCE(nbr_power_unit, 0) BETWEEN 0 AND 3", "COALESCE(driver_total, 0) BETWEEN 0 AND 5", has_order_exclusion_clause(order_cols), ] if "listmonk_campaign_type" in fmcsa_cols: filters.append("COALESCE(listmonk_campaign_type, '') <> 'new_carrier_startup'") sql = f""" SELECT dot_number, email_address, legal_name, telephone, phy_city, phy_state, add_date, nbr_power_unit, driver_total, authorized_for_hire FROM fmcsa_carriers c WHERE {' AND '.join(filters)} ORDER BY CASE WHEN add_date ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}' THEN add_date::date WHEN add_date ~ '^\\d{{1,2}}/\\d{{1,2}}/\\d{{4}}$' THEN to_date(add_date, 'MM/DD/YYYY') WHEN add_date ~ '^\\d{{2}}-[A-Z]{{3}}-\\d{{2}}$' THEN to_date(add_date, 'DD-MON-YY') ELSE NULL END DESC NULLS LAST, created_at DESC NULLS LAST LIMIT %(limit)s """ with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( sql, { "blocked_domains": list(BLOCKED_EMAIL_DOMAINS), "recent_days": recent_days, "startup_slugs": list(STARTUP_SERVICE_SLUGS), "limit": limit, }, ) rows = cur.fetchall() candidates = [] for row in rows: missing, score = infer_missing_items(row) candidates.append( CarrierCandidate( dot_number=str(row["dot_number"]), email=row["email_address"], legal_name=row["legal_name"] or row["email_address"], phone=row.get("telephone"), city=row.get("phy_city"), state=row.get("phy_state"), add_date=str(row.get("add_date") or ""), trucks=row.get("nbr_power_unit"), drivers=row.get("driver_total"), authorized_for_hire=row.get("authorized_for_hire"), missing_items=missing, startup_score=score, ) ) return candidates def main() -> None: parser = argparse.ArgumentParser(description="Populate New Carrier Startup Listmonk audience") parser.add_argument("--dry-run", action="store_true", help="Only count/show sample candidates") parser.add_argument("--limit", type=int, default=500, help="Max candidates to import") parser.add_argument("--recent-days", type=int, default=180, help="FMCSA add/create recency window") parser.add_argument("--list-name", default=LIST_NAME) args = parser.parse_args() logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") if not DATABASE_URL: raise SystemExit("DATABASE_URL is required") conn = psycopg2.connect(DATABASE_URL) try: try: candidates = fetch_candidates(conn, args.limit, args.recent_days) except RuntimeError as exc: raise SystemExit(str(exc)) from exc finally: conn.close() LOG.info("Found %d new-carrier startup candidates", len(candidates)) for candidate in candidates[:10]: LOG.info( "sample DOT#%s %s <%s> %s trucks=%s drivers=%s missing=%s", candidate.dot_number, candidate.legal_name, candidate.email, candidate.state, candidate.trucks, candidate.drivers, "; ".join(candidate.missing_items), ) if args.dry_run: return list_id = get_or_create_list(args.list_name) LOG.info("Using Listmonk list %s (ID %d)", args.list_name, list_id) imported = 0 for candidate in candidates: if add_subscriber(list_id, candidate): imported += 1 if imported and imported % 100 == 0: LOG.info("Imported/attached %d/%d", imported, len(candidates)) LOG.info("Done. Imported/attached %d subscribers to %s", imported, args.list_name) if __name__ == "__main__": main()