From 4f4edb5f00eb9999ace019fd1cccf215f86573b1 Mon Sep 17 00:00:00 2001 From: justin Date: Mon, 1 Jun 2026 20:19:55 -0500 Subject: [PATCH] Add new carrier startup campaign targeting --- docs/trucking-marketing-send-plan.md | 135 ++++++ .../populate_new_carrier_startup_campaign.py | 383 ++++++++++++++++++ scripts/setup_trucking_campaigns.py | 3 +- 3 files changed, 520 insertions(+), 1 deletion(-) create mode 100644 docs/trucking-marketing-send-plan.md create mode 100644 scripts/populate_new_carrier_startup_campaign.py diff --git a/docs/trucking-marketing-send-plan.md b/docs/trucking-marketing-send-plan.md new file mode 100644 index 0000000..6ed8d93 --- /dev/null +++ b/docs/trucking-marketing-send-plan.md @@ -0,0 +1,135 @@ +# Trucking Marketing Send Plan + +Date: 2026-06-02 + +This plan ranks trucking campaign sends by fulfillment friction, data quality, and ability to deliver without asking customers for portal passwords. + +## Prioritized sends + +### 1. NY HUT setup + +**Why first:** New York supports a clean tax-professional authorization workflow through E-ZRep/TR-2000. This is one of the lowest-friction state filings. + +**Target data:** carriers based in NY or operating in NY, especially heavy vehicles likely over 18,000 lbs. + +**Message:** + +> Operating in New York? You may need HUT credentials before operating taxable vehicles on NY public highways. Performance West can prepare and file with your signed authorization. + +**Offer:** NY HUT registration, certificate/decal assistance, quarterly filing support. + +**Required fulfillment:** signed NY authorization/TR-2000, vehicle list, weights, USDOT, tax/business details, government fees. + +### 2. Connecticut Highway Use Fee + +**Why second:** CT requires electronic filing through myconneCT and supports practitioner/third-party/bulk-filer concepts. CSV vehicle import makes this operationally scalable. + +**Target data:** CT base/operating carriers, especially vehicles over 26,000 lbs. + +**Message:** + +> Connecticut HUF is electronic and recurring. We can help register, import vehicles, and file returns with your authorization. + +**Offer:** CT HUF setup and monthly filing support. + +**Required fulfillment:** signed authorization, myconneCT delegate/practitioner access or customer approval, vehicle/mileage records, government tax payment. + +### 3. DOT Drug & Alcohol Program + +**Why third:** Lowest filing friction. No state tax portal. Strong recurring/compliance service. + +**Target data:** carriers with drivers and likely CDL operations. FMCSA fields: `driver_total > 0`, `nbr_power_unit > 0`, for-hire/private property indicators. + +**Message:** + +> If you operate CDL vehicles, you need a DOT-compliant drug and alcohol testing program. + +**Offer:** annual D&A consortium enrollment, driver file review, clearinghouse support. + +**Required fulfillment:** driver/company intake, signed service authorization, provider/consortium enrollment. + +### 4. New Carrier Startup Bundle + +**Why fourth:** Broad appeal and strong cross-sell. Use data to avoid sending to carriers that already purchased or completed startup services. + +**Target data:** recently added DOT carriers with email, small fleet, no evidence in our orders/records of completed startup items. + +**Missing-item signals to infer from data:** + +- no Performance West order for new carrier bundle; +- no Performance West order for BOC-3; +- no Performance West order for UCR; +- no Performance West order for DOT Drug & Alcohol; +- no Performance West order for IRP/IFTA; +- no entity/LLC order where applicable; +- recent FMCSA `add_date` or `created_at`; +- small fleet: 1-3 power units or 1-5 drivers; +- active/usable email. + +**Message:** + +> New DOT number? Most new carriers miss at least one required filing: BOC-3, UCR, D&A, IRP/IFTA, insurance, or state permits. We can review your startup checklist and file missing items. + +**Offer:** New Carrier Starter Bundle, BOC-3, UCR, D&A, IRP/IFTA advisory, LLC/entity setup. + +**Required fulfillment:** startup intake, signed authorization, government fee pass-throughs, state-specific follow-up where needed. + +### 5. California MCP + +**Why fifth:** Good market, but medium friction due to CA portal/forms and insurance coordination. + +**Target data:** CA base/operating carriers. + +**Message:** + +> California carriers may need MCP, CA number, insurance filings, and emissions compliance. We can prepare and file with your authorization. + +**Offer:** CA MCP assistance, CA number/CHP guidance, insurance coordination, CARB/TRUCRS review. + +**Required fulfillment:** signed authorization, vehicle/fleet details, insurance agent, portal or form workflow. + +## Cautious / later sends + +These are valuable but should use conservative language until portal delegation is fully verified: + +- Oregon Weight-Mile +- Kentucky KYU +- New Mexico Weight-Distance +- Texas TxDMV / intrastate authority +- CA CARB/TRUCRS recovery +- IRP/IFTA across all base states + +Use wording: + +- “We prepare and file with your authorization.” +- “Some state portal approval may be required.” +- “Government fees and taxes are billed separately.” +- “Insurance filings may require your insurance agent.” + +## Implementation notes for New Carrier Startup targeting + +Create a Listmonk list named: + +`FMCSA Trucking - New Carrier Missing Startup Items` + +Populate with carriers matching: + +1. usable email; +2. not already sent the new-carrier campaign; +3. recent-ish FMCSA add date or recent DB creation; +4. small/new operator profile; +5. no internal order evidence for new carrier startup items. + +Subscriber attributes should include: + +- `company` +- `dot_number` +- `state` +- `city` +- `trucks` +- `drivers` +- `add_date` +- `missing_items_html` +- `startup_score` + +Do not mark globally `listmonk_sent_at` until the real campaign is sent. Use campaign-type-specific tracking where possible to avoid blocking unrelated campaigns. diff --git a/scripts/populate_new_carrier_startup_campaign.py b/scripts/populate_new_carrier_startup_campaign.py new file mode 100644 index 0000000..eceaac2 --- /dev/null +++ b/scripts/populate_new_carrier_startup_campaign.py @@ -0,0 +1,383 @@ +#!/usr/bin/env python3 +"""Populate a Listmonk audience for new carriers likely missing startup items. + +Targets FMCSA carriers that look new/small and have no Performance West paid +order evidence for core startup services such as BOC-3, UCR, D&A, IRP/IFTA, +DOT full compliance, or new carrier bundle-like services. + +This script is intentionally read-only against the application database. It does +not mark fmcsa_carriers.listmonk_sent_at because adding a carrier to an audience +is not the same as sending the campaign. + +Usage: + DATABASE_URL=postgres://... python3 scripts/populate_new_carrier_startup_campaign.py --dry-run + DATABASE_URL=postgres://... python3 scripts/populate_new_carrier_startup_campaign.py --limit 500 +""" +from __future__ import annotations + +import argparse +import base64 +import html +import json +import logging +import os +import urllib.error +import urllib.parse +import urllib.request +from dataclasses import dataclass +from datetime import date, datetime, timedelta +from typing import Any + +import psycopg2 +import psycopg2.extras + +LOG = logging.getLogger("populate_new_carrier_startup_campaign") + +LISTMONK_URL = os.getenv("LISTMONK_URL", "https://lists.performancewest.net").rstrip("/") +LISTMONK_USER = os.getenv("LISTMONK_USER", "api") +LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y") +LIST_NAME = os.getenv("NEW_CARRIER_LIST_NAME", "FMCSA Trucking - New Carrier Missing Startup Items") +DATABASE_URL = os.getenv("DATABASE_URL", "") + +BLOCKED_EMAIL_DOMAINS = {"aol.com", "yahoo.com", "ymail.com", "rocketmail.com"} +STARTUP_SERVICE_SLUGS = { + "boc3-filing", + "ucr-registration", + "dot-registration", + "mc-authority", + "dot-drug-alcohol", + "dot-audit-prep", + "dot-full-compliance", + "irp-registration", + "ifta-application", + "ifta-quarterly", + "state-trucking-bundle", + "ein-application", + "entity-upgrade-bundle", + "registered-agent", + "virtual-mailbox", +} + +AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode() + + +@dataclass +class CarrierCandidate: + dot_number: str + email: str + legal_name: str + phone: str | None + city: str | None + state: str | None + add_date: str | None + trucks: int | None + drivers: int | None + authorized_for_hire: bool | None + missing_items: list[str] + startup_score: int + + +def lm_api(path: str, data: dict | None = None, method: str | None = None) -> dict: + headers = { + "Authorization": f"Basic {AUTH}", + "Content-Type": "application/json", + "Accept": "application/json", + } + req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers) + if data is not None: + req.data = json.dumps(data).encode() + if method: + req.method = method + try: + raw = urllib.request.urlopen(req, timeout=45).read() + return json.loads(raw or b"{}") + except urllib.error.HTTPError as exc: + body = exc.read().decode(errors="replace")[:1000] + raise RuntimeError(f"Listmonk {path} HTTP {exc.code}: {body}") from exc + + +def get_or_create_list(name: str) -> int: + resp = lm_api("/lists?per_page=all") + for row in resp.get("data", {}).get("results", []): + if row.get("name") == name: + return int(row["id"]) + resp = lm_api( + "/lists", + {"name": name, "type": "private", "optin": "single", "tags": ["trucking", "new-carrier", "auto"]}, + "POST", + ) + return int(resp["data"]["id"]) + + +def add_subscriber(list_id: int, candidate: CarrierCandidate) -> bool: + issues_html = "" + attribs = { + "company": candidate.legal_name, + "dot_number": candidate.dot_number, + "phone": candidate.phone or "", + "city": candidate.city or "", + "state": candidate.state or "", + "add_date": candidate.add_date or "", + "trucks": candidate.trucks or 0, + "drivers": candidate.drivers or 0, + "for_hire": bool(candidate.authorized_for_hire), + "missing_items": candidate.missing_items, + "missing_items_html": issues_html, + "startup_score": candidate.startup_score, + } + payload = { + "email": candidate.email.lower().strip(), + "name": candidate.legal_name or candidate.email, + "status": "enabled", + "lists": [list_id], + "preconfirm_subscriptions": True, + "attribs": attribs, + } + try: + lm_api("/subscribers", payload, "POST") + return True + except Exception as exc: + msg = str(exc) + if "HTTP 409" not in msg and "already exists" not in msg.lower() and "conflict" not in msg.lower(): + LOG.warning("subscriber add failed for %s: %s", candidate.email, exc) + return False + try: + query = urllib.parse.quote(f"subscribers.email='{candidate.email.lower().strip()}'") + results = lm_api(f"/subscribers?query={query}").get("data", {}).get("results", []) + if not results: + return False + sub_id = int(results[0]["id"]) + lm_api( + "/subscribers/lists", + {"ids": [sub_id], "action": "add", "target_list_ids": [list_id], "status": "confirmed"}, + "PUT", + ) + # Preserve fresh campaign attributes where the API supports subscriber update. + try: + lm_api(f"/subscribers/{sub_id}", {"attribs": attribs}, "PUT") + except Exception: + pass + return True + except Exception as inner: + LOG.warning("subscriber attach failed for %s: %s", candidate.email, inner) + return False + + +def table_exists(conn, table: str) -> bool: + with conn.cursor() as cur: + cur.execute( + """ + SELECT EXISTS ( + SELECT 1 + FROM information_schema.tables + WHERE table_schema = 'public' AND table_name = %s + ) + """, + (table,), + ) + return bool(cur.fetchone()[0]) + + +def table_columns(conn, table: str) -> set[str]: + with conn.cursor() as cur: + cur.execute( + """ + SELECT column_name + FROM information_schema.columns + WHERE table_schema = 'public' AND table_name = %s + """, + (table,), + ) + return {r[0] for r in cur.fetchall()} + + +def usable_email_filter(cols: set[str]) -> str: + base = "email_address IS NOT NULL AND email_address <> '' AND position('@' in email_address) > 1" + domain_filter = "lower(split_part(email_address, '@', 2)) <> ALL(%(blocked_domains)s)" + if "email_verified" in cols and "email_verify_result" in cols: + return f"{base} AND {domain_filter} AND (email_verified IS TRUE OR email_verify_result IN ('smtp_valid','catch_all_domain','catch_all_detected'))" + if "email_verify_result" in cols: + return f"{base} AND {domain_filter} AND (email_verify_result IS NULL OR email_verify_result NOT IN ('invalid','smtp_invalid','bad_mailbox','disposable'))" + return f"{base} AND {domain_filter}" + + +def startup_date_filter(cols: set[str], days: int) -> str: + # FMCSA add_date is text in the seed migration. It is commonly YYYY-MM-DD or + # MM/DD/YYYY depending on import source, so handle both and fall back to row + # creation date if parsing fails. + parts = [] + if "add_date" in cols: + parts.append( + """ + CASE + WHEN add_date ~ '^\\d{4}-\\d{2}-\\d{2}' THEN add_date::date + WHEN add_date ~ '^\\d{1,2}/\\d{1,2}/\\d{4}$' THEN to_date(add_date, 'MM/DD/YYYY') + ELSE NULL + END >= CURRENT_DATE - (%(recent_days)s::int * INTERVAL '1 day') + """ + ) + if "created_at" in cols: + parts.append("created_at >= NOW() - (%(recent_days)s::int * INTERVAL '1 day')") + if not parts: + return "TRUE" + return "(" + " OR ".join(parts) + ")" + + +def has_order_exclusion_clause(order_cols: set[str]) -> str: + if not order_cols: + return "TRUE" + # Match by customer email and, when present in intake JSON, by DOT number. + # The JSON text fallback catches varying intake field names without assuming + # every historical order used the same key. + return """ + NOT EXISTS ( + SELECT 1 + FROM compliance_orders co + WHERE co.payment_status = 'paid' + AND co.service_slug = ANY(%(startup_slugs)s) + AND ( + lower(co.customer_email) = lower(c.email_address) + OR co.intake_data::text ILIKE '%%' || c.dot_number || '%%' + ) + ) + """ + + +def infer_missing_items(row: dict[str, Any]) -> tuple[list[str], int]: + missing = ["BOC-3 process agent filing", "UCR annual registration", "DOT drug & alcohol program"] + trucks = row.get("nbr_power_unit") or 0 + drivers = row.get("driver_total") or 0 + for_hire = bool(row.get("authorized_for_hire")) + if trucks > 0: + missing.append("IRP/IFTA review if operating interstate") + if for_hire: + missing.append("MC operating authority / insurance filing review") + if drivers > 0: + missing.append("New entrant safety audit preparation") + score = 50 + if trucks <= 3: + score += 15 + if drivers <= 5: + score += 10 + if for_hire: + score += 10 + if row.get("add_date"): + score += 10 + return missing, min(score, 100) + + +def fetch_candidates(conn, limit: int, recent_days: int) -> list[CarrierCandidate]: + if not table_exists(conn, "fmcsa_carriers"): + raise RuntimeError("required table fmcsa_carriers does not exist; run migrations/import FMCSA data first") + fmcsa_cols = table_columns(conn, "fmcsa_carriers") + order_cols = table_columns(conn, "compliance_orders") if table_exists(conn, "compliance_orders") else set() + filters = [ + usable_email_filter(fmcsa_cols), + startup_date_filter(fmcsa_cols, recent_days), + "COALESCE(nbr_power_unit, 0) BETWEEN 0 AND 3", + "COALESCE(driver_total, 0) BETWEEN 0 AND 5", + has_order_exclusion_clause(order_cols), + ] + if "listmonk_campaign_type" in fmcsa_cols: + filters.append("COALESCE(listmonk_campaign_type, '') <> 'new_carrier_startup'") + + sql = f""" + SELECT dot_number, email_address, legal_name, telephone, phy_city, phy_state, + add_date, nbr_power_unit, driver_total, authorized_for_hire + FROM fmcsa_carriers c + WHERE {' AND '.join(filters)} + ORDER BY + CASE + WHEN add_date ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}' THEN add_date::date + WHEN add_date ~ '^\\d{{1,2}}/\\d{{1,2}}/\\d{{4}}$' THEN to_date(add_date, 'MM/DD/YYYY') + ELSE NULL + END DESC NULLS LAST, + created_at DESC NULLS LAST + LIMIT %(limit)s + """ + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + cur.execute( + sql, + { + "blocked_domains": list(BLOCKED_EMAIL_DOMAINS), + "recent_days": recent_days, + "startup_slugs": list(STARTUP_SERVICE_SLUGS), + "limit": limit, + }, + ) + rows = cur.fetchall() + + candidates = [] + for row in rows: + missing, score = infer_missing_items(row) + candidates.append( + CarrierCandidate( + dot_number=str(row["dot_number"]), + email=row["email_address"], + legal_name=row["legal_name"] or row["email_address"], + phone=row.get("telephone"), + city=row.get("phy_city"), + state=row.get("phy_state"), + add_date=str(row.get("add_date") or ""), + trucks=row.get("nbr_power_unit"), + drivers=row.get("driver_total"), + authorized_for_hire=row.get("authorized_for_hire"), + missing_items=missing, + startup_score=score, + ) + ) + return candidates + + +def main() -> None: + parser = argparse.ArgumentParser(description="Populate New Carrier Startup Listmonk audience") + parser.add_argument("--dry-run", action="store_true", help="Only count/show sample candidates") + parser.add_argument("--limit", type=int, default=500, help="Max candidates to import") + parser.add_argument("--recent-days", type=int, default=180, help="FMCSA add/create recency window") + parser.add_argument("--list-name", default=LIST_NAME) + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + if not DATABASE_URL: + raise SystemExit("DATABASE_URL is required") + + conn = psycopg2.connect(DATABASE_URL) + try: + try: + candidates = fetch_candidates(conn, args.limit, args.recent_days) + except RuntimeError as exc: + raise SystemExit(str(exc)) from exc + finally: + conn.close() + + LOG.info("Found %d new-carrier startup candidates", len(candidates)) + for candidate in candidates[:10]: + LOG.info( + "sample DOT#%s %s <%s> %s trucks=%s drivers=%s missing=%s", + candidate.dot_number, + candidate.legal_name, + candidate.email, + candidate.state, + candidate.trucks, + candidate.drivers, + "; ".join(candidate.missing_items), + ) + + if args.dry_run: + return + + list_id = get_or_create_list(args.list_name) + LOG.info("Using Listmonk list %s (ID %d)", args.list_name, list_id) + imported = 0 + for candidate in candidates: + if add_subscriber(list_id, candidate): + imported += 1 + if imported and imported % 100 == 0: + LOG.info("Imported/attached %d/%d", imported, len(candidates)) + LOG.info("Done. Imported/attached %d subscribers to %s", imported, args.list_name) + + +if __name__ == "__main__": + main() diff --git a/scripts/setup_trucking_campaigns.py b/scripts/setup_trucking_campaigns.py index 9f0cbc5..8be583d 100644 --- a/scripts/setup_trucking_campaigns.py +++ b/scripts/setup_trucking_campaigns.py @@ -223,10 +223,11 @@ CAMPAIGNS = [ "cta_url": "https://performancewest.net/services/trucking/hazmat/", "cta_text": "Review Hazmat Compliance", }, { - "slug": "new-carrier-startup", "list": "FMCSA Trucking - New Carrier Prospects", + "slug": "new-carrier-startup", "list": "FMCSA Trucking - New Carrier Missing Startup Items", "name": "New Carrier Startup Checklist Campaign", "headline": "New Carrier Startup Checklist", "subject": "New DOT number? Startup checklist for {{ .Subscriber.Attribs.company }}", "requirements": [ + "Our startup checklist flagged these likely missing items for your carrier:
{{ Safe .Subscriber.Attribs.missing_items_html }}", "Startup filing sequence — LLC, EIN, USDOT, operating authority, BOC-3, UCR, D&A, insurance, IRP and IFTA can all stack up quickly.", "Missed renewals — new carriers often miss UCR, MCS-150 timing, state permits, or quarterly fuel tax obligations.", "Cash-flow friendly setup — bundling startup compliance prevents piecemeal filings and duplicate work.",