new-site/scripts/populate_new_carrier_startup_campaign.py

379 lines
14 KiB
Python

#!/usr/bin/env python3
"""Populate a Listmonk audience for new carriers likely missing startup items.
Targets FMCSA carriers that look new/small and have no Performance West paid
order evidence for core startup services such as BOC-3, UCR, D&A, IRP/IFTA,
DOT full compliance, or new carrier bundle-like services.
This script is intentionally read-only against the application database. It does
not mark fmcsa_carriers.listmonk_sent_at because adding a carrier to an audience
is not the same as sending the campaign.
Usage:
DATABASE_URL=postgres://... python3 scripts/populate_new_carrier_startup_campaign.py --dry-run
DATABASE_URL=postgres://... python3 scripts/populate_new_carrier_startup_campaign.py --limit 500
"""
from __future__ import annotations
import argparse
import base64
import html
import json
import logging
import os
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from typing import Any
import psycopg2
import psycopg2.extras
LOG = logging.getLogger("populate_new_carrier_startup_campaign")
LISTMONK_URL = os.getenv("LISTMONK_URL", "https://lists.performancewest.net").rstrip("/")
LISTMONK_USER = os.getenv("LISTMONK_USER", "api")
LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
LIST_NAME = os.getenv("NEW_CARRIER_LIST_NAME", "FMCSA Trucking - New Carrier Missing Startup Items")
DATABASE_URL = os.getenv("DATABASE_URL", "")
BLOCKED_EMAIL_DOMAINS = {"aol.com", "yahoo.com", "ymail.com", "rocketmail.com"}
STARTUP_SERVICE_SLUGS = {
"boc3-filing",
"ucr-registration",
"dot-registration",
"mc-authority",
"dot-drug-alcohol",
"dot-audit-prep",
"dot-full-compliance",
"irp-registration",
"ifta-application",
"ifta-quarterly",
"state-trucking-bundle",
"ein-application",
"entity-upgrade-bundle",
"registered-agent",
"virtual-mailbox",
}
AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
@dataclass
class CarrierCandidate:
dot_number: str
email: str
legal_name: str
phone: str | None
city: str | None
state: str | None
add_date: str | None
trucks: int | None
drivers: int | None
authorized_for_hire: bool | None
missing_items: list[str]
startup_score: int
def lm_api(path: str, data: dict | None = None, method: str | None = None) -> dict:
headers = {
"Authorization": f"Basic {AUTH}",
"Content-Type": "application/json",
"Accept": "application/json",
}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if method:
req.method = method
try:
raw = urllib.request.urlopen(req, timeout=45).read()
return json.loads(raw or b"{}")
except urllib.error.HTTPError as exc:
body = exc.read().decode(errors="replace")[:1000]
raise RuntimeError(f"Listmonk {path} HTTP {exc.code}: {body}") from exc
def get_or_create_list(name: str) -> int:
resp = lm_api("/lists?per_page=all")
for row in resp.get("data", {}).get("results", []):
if row.get("name") == name:
return int(row["id"])
resp = lm_api(
"/lists",
{"name": name, "type": "private", "optin": "single", "tags": ["trucking", "new-carrier", "auto"]},
"POST",
)
return int(resp["data"]["id"])
def add_subscriber(list_id: int, candidate: CarrierCandidate) -> bool:
issues_html = "<ul style='margin:0;padding:0 0 0 16px'>" + "".join(
f"<li>{html.escape(item)}</li>" for item in candidate.missing_items
) + "</ul>"
attribs = {
"company": candidate.legal_name,
"dot_number": candidate.dot_number,
"phone": candidate.phone or "",
"city": candidate.city or "",
"state": candidate.state or "",
"add_date": candidate.add_date or "",
"trucks": candidate.trucks or 0,
"drivers": candidate.drivers or 0,
"for_hire": bool(candidate.authorized_for_hire),
"missing_items": candidate.missing_items,
"missing_items_html": issues_html,
"startup_score": candidate.startup_score,
}
payload = {
"email": candidate.email.lower().strip(),
"name": candidate.legal_name or candidate.email,
"status": "enabled",
"lists": [list_id],
"preconfirm_subscriptions": True,
"attribs": attribs,
}
try:
lm_api("/subscribers", payload, "POST")
return True
except Exception as exc:
msg = str(exc)
if "HTTP 409" not in msg and "already exists" not in msg.lower() and "conflict" not in msg.lower():
LOG.warning("subscriber add failed for %s: %s", candidate.email, exc)
return False
try:
query = urllib.parse.quote(f"subscribers.email='{candidate.email.lower().strip()}'")
results = lm_api(f"/subscribers?query={query}").get("data", {}).get("results", [])
if not results:
return False
sub_id = int(results[0]["id"])
lm_api(
"/subscribers/lists",
{"ids": [sub_id], "action": "add", "target_list_ids": [list_id], "status": "confirmed"},
"PUT",
)
# Preserve fresh campaign attributes where the API supports subscriber update.
try:
lm_api(f"/subscribers/{sub_id}", {"attribs": attribs}, "PUT")
except Exception:
pass
return True
except Exception as inner:
LOG.warning("subscriber attach failed for %s: %s", candidate.email, inner)
return False
def table_exists(conn, table: str) -> bool:
with conn.cursor() as cur:
cur.execute(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = %s
)
""",
(table,),
)
return bool(cur.fetchone()[0])
def table_columns(conn, table: str) -> set[str]:
with conn.cursor() as cur:
cur.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
""",
(table,),
)
return {r[0] for r in cur.fetchall()}
def usable_email_filter(cols: set[str]) -> str:
base = "email_address IS NOT NULL AND email_address <> '' AND position('@' in email_address) > 1"
domain_filter = "lower(split_part(email_address, '@', 2)) <> ALL(%(blocked_domains)s)"
if "email_verified" in cols and "email_verify_result" in cols:
return f"{base} AND {domain_filter} AND (email_verified IS TRUE OR email_verify_result IN ('smtp_valid','catch_all_domain','catch_all_detected'))"
if "email_verify_result" in cols:
return f"{base} AND {domain_filter} AND (email_verify_result IS NULL OR email_verify_result NOT IN ('invalid','smtp_invalid','bad_mailbox','disposable'))"
return f"{base} AND {domain_filter}"
def startup_date_filter(cols: set[str], days: int) -> str:
# Use FMCSA's carrier add_date only. Do not fall back to our row created_at,
# because historical FMCSA carriers can be newly imported into our database.
if "add_date" not in cols:
raise RuntimeError("fmcsa_carriers.add_date is required for new-carrier targeting")
return """
(
CASE
WHEN add_date ~ '^\\d{4}-\\d{2}-\\d{2}' THEN add_date::date
WHEN add_date ~ '^\\d{1,2}/\\d{1,2}/\\d{4}$' THEN to_date(add_date, 'MM/DD/YYYY')
WHEN add_date ~ '^\\d{2}-[A-Z]{3}-\\d{2}$' THEN to_date(add_date, 'DD-MON-YY')
ELSE NULL
END
) >= CURRENT_DATE - (%(recent_days)s::int * INTERVAL '1 day')
"""
def has_order_exclusion_clause(order_cols: set[str]) -> str:
if not order_cols:
return "TRUE"
# Match by customer email and, when present in intake JSON, by DOT number.
# The JSON text fallback catches varying intake field names without assuming
# every historical order used the same key.
return """
NOT EXISTS (
SELECT 1
FROM compliance_orders co
WHERE co.payment_status = 'paid'
AND co.service_slug = ANY(%(startup_slugs)s)
AND (
lower(co.customer_email) = lower(c.email_address)
OR co.intake_data::text ILIKE '%%' || c.dot_number || '%%'
)
)
"""
def infer_missing_items(row: dict[str, Any]) -> tuple[list[str], int]:
missing = ["BOC-3 process agent filing", "UCR annual registration", "DOT drug & alcohol program"]
trucks = row.get("nbr_power_unit") or 0
drivers = row.get("driver_total") or 0
for_hire = bool(row.get("authorized_for_hire"))
if trucks > 0:
missing.append("IRP/IFTA review if operating interstate")
if for_hire:
missing.append("MC operating authority / insurance filing review")
if drivers > 0:
missing.append("New entrant safety audit preparation")
score = 50
if trucks <= 3:
score += 15
if drivers <= 5:
score += 10
if for_hire:
score += 10
if row.get("add_date"):
score += 10
return missing, min(score, 100)
def fetch_candidates(conn, limit: int, recent_days: int) -> list[CarrierCandidate]:
if not table_exists(conn, "fmcsa_carriers"):
raise RuntimeError("required table fmcsa_carriers does not exist; run migrations/import FMCSA data first")
fmcsa_cols = table_columns(conn, "fmcsa_carriers")
order_cols = table_columns(conn, "compliance_orders") if table_exists(conn, "compliance_orders") else set()
filters = [
usable_email_filter(fmcsa_cols),
startup_date_filter(fmcsa_cols, recent_days),
"COALESCE(nbr_power_unit, 0) BETWEEN 0 AND 3",
"COALESCE(driver_total, 0) BETWEEN 0 AND 5",
has_order_exclusion_clause(order_cols),
]
if "listmonk_campaign_type" in fmcsa_cols:
filters.append("COALESCE(listmonk_campaign_type, '') <> 'new_carrier_startup'")
sql = f"""
SELECT dot_number, email_address, legal_name, telephone, phy_city, phy_state,
add_date, nbr_power_unit, driver_total, authorized_for_hire
FROM fmcsa_carriers c
WHERE {' AND '.join(filters)}
ORDER BY
CASE
WHEN add_date ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}' THEN add_date::date
WHEN add_date ~ '^\\d{{1,2}}/\\d{{1,2}}/\\d{{4}}$' THEN to_date(add_date, 'MM/DD/YYYY')
WHEN add_date ~ '^\\d{{2}}-[A-Z]{{3}}-\\d{{2}}$' THEN to_date(add_date, 'DD-MON-YY')
ELSE NULL
END DESC NULLS LAST,
created_at DESC NULLS LAST
LIMIT %(limit)s
"""
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
sql,
{
"blocked_domains": list(BLOCKED_EMAIL_DOMAINS),
"recent_days": recent_days,
"startup_slugs": list(STARTUP_SERVICE_SLUGS),
"limit": limit,
},
)
rows = cur.fetchall()
candidates = []
for row in rows:
missing, score = infer_missing_items(row)
candidates.append(
CarrierCandidate(
dot_number=str(row["dot_number"]),
email=row["email_address"],
legal_name=row["legal_name"] or row["email_address"],
phone=row.get("telephone"),
city=row.get("phy_city"),
state=row.get("phy_state"),
add_date=str(row.get("add_date") or ""),
trucks=row.get("nbr_power_unit"),
drivers=row.get("driver_total"),
authorized_for_hire=row.get("authorized_for_hire"),
missing_items=missing,
startup_score=score,
)
)
return candidates
def main() -> None:
parser = argparse.ArgumentParser(description="Populate New Carrier Startup Listmonk audience")
parser.add_argument("--dry-run", action="store_true", help="Only count/show sample candidates")
parser.add_argument("--limit", type=int, default=500, help="Max candidates to import")
parser.add_argument("--recent-days", type=int, default=180, help="FMCSA add/create recency window")
parser.add_argument("--list-name", default=LIST_NAME)
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
if not DATABASE_URL:
raise SystemExit("DATABASE_URL is required")
conn = psycopg2.connect(DATABASE_URL)
try:
try:
candidates = fetch_candidates(conn, args.limit, args.recent_days)
except RuntimeError as exc:
raise SystemExit(str(exc)) from exc
finally:
conn.close()
LOG.info("Found %d new-carrier startup candidates", len(candidates))
for candidate in candidates[:10]:
LOG.info(
"sample DOT#%s %s <%s> %s trucks=%s drivers=%s missing=%s",
candidate.dot_number,
candidate.legal_name,
candidate.email,
candidate.state,
candidate.trucks,
candidate.drivers,
"; ".join(candidate.missing_items),
)
if args.dry_run:
return
list_id = get_or_create_list(args.list_name)
LOG.info("Using Listmonk list %s (ID %d)", args.list_name, list_id)
imported = 0
for candidate in candidates:
if add_subscriber(list_id, candidate):
imported += 1
if imported and imported % 100 == 0:
LOG.info("Imported/attached %d/%d", imported, len(candidates))
LOG.info("Done. Imported/attached %d subscribers to %s", imported, args.list_name)
if __name__ == "__main__":
main()