#!/usr/bin/env python3 """Daily trucking campaign builder. Runs nightly (3 AM EST) to create the next day's 8 Listmonk campaigns: - 4 TZ regions x MCS-150 overdue (2,000 carriers each, 4 AM / 5 AM / 6 AM / 7 AM EST) - 4 TZ regions x Inactive USDOT (1,000 carriers each, same schedule) Selection criteria: - email verified or catch-all (usable) - mcs150_parsed > 2 years ago (for MCS-150 campaign) - oos_active IS TRUE (for inactive USDOT campaign) - listmonk_sent_at IS NULL (not yet sent) - ordered by mcs150_parsed ASC (most overdue first) Usage: python3 scripts/build_trucking_campaigns.py python3 scripts/build_trucking_campaigns.py --dry-run python3 scripts/build_trucking_campaigns.py --date 2026-06-02 """ from __future__ import annotations import argparse import base64 import json import logging import os import sys import urllib.request import urllib.parse from datetime import date, datetime, timedelta, timezone import psycopg2 LOG = logging.getLogger("build_trucking_campaigns") # ── Listmonk ────────────────────────────────────────────────────────────────── LISTMONK_URL = os.getenv("LISTMONK_URL", "http://listmonk:9000") LISTMONK_USER = os.getenv("LISTMONK_USER", "api") LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y") _LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode() # ── Source campaign IDs ──────────────────────────────────────────────────────── CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk" CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over" # Public site for landing-page links injected into Listmonk subscriber attribs. SITE_DOMAIN = os.getenv("SITE_DOMAIN", "https://performancewest.com") # ── Deficiency-flag segments (Phase 5) ────────────────────────────────────── # Each segment targets carriers by a `deficiency_flags` value (the TEXT[] column # written by fmcsa_deficiency_flagger.py) and links to its order landing page. # # `source_id` is the Listmonk template campaign to clone; set via env once the # template exists (see CAMPAIGN_SOURCE_ENV). Segments without a configured # source_id are reported by --list-segments but skipped by the scheduled run so # the nightly job never breaks on an unconfigured template. # # `flag_sql` is a predicate over the deficiency_flags array. DEFICIENCY_SEGMENTS = { "for_hire_boc3": { "label": "For-Hire BOC-3 + UCR", "flag_sql": "'for_hire_carrier' = ANY(deficiency_flags)", "lp_slug": "boc3-filing", "source_env": "CAMPAIGN_FOR_HIRE_ID", "limit": 1500, }, "irp_ifta": { "label": "IRP / IFTA Registration", "flag_sql": "'interstate_needs_irp_ifta' = ANY(deficiency_flags)", "lp_slug": "state-trucking-bundle", "source_env": "CAMPAIGN_IRP_IFTA_ID", "limit": 1500, }, "intrastate_authority": { "label": "Intrastate Operating Authority", # any intrastate_authority_ flag "flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f " "WHERE f LIKE 'intrastate_authority_%%')", "lp_slug": "intrastate-authority", "source_env": "CAMPAIGN_INTRASTATE_ID", "limit": 1000, }, "state_weight_tax": { "label": "State Weight-Distance Tax", "flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f " "WHERE f LIKE 'state_weight_tax_%%')", # per-state LP resolved per-row in build_lp_link() "lp_slug": "state-trucking-bundle", "source_env": "CAMPAIGN_WEIGHT_TAX_ID", "limit": 1000, }, "state_emissions": { "label": "State Clean-Truck / Emissions", "flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f " "WHERE f LIKE 'state_emissions_%%')", "lp_slug": "state-emissions", "source_env": "CAMPAIGN_EMISSIONS_ID", "limit": 1000, }, "hazmat": { "label": "Hazmat PHMSA Registration", "flag_sql": "'hazmat_carrier' = ANY(deficiency_flags)", "lp_slug": "hazmat-phmsa", "source_env": "CAMPAIGN_HAZMAT_ID", "limit": 500, }, } # Per-state weight-tax LP overrides (CA emissions goes to ca-mcp-carb). _WEIGHT_TAX_LP = { "OR": "or-weight-mile-tax", "NY": "ny-hut-registration", "KY": "ky-kyu-registration", "NM": "nm-weight-distance", "CT": "ct-highway-use-fee", } def build_lp_link(campaign_type: str, phy_state: str | None) -> str: """Return the order landing-page URL for a (segment, state).""" seg = DEFICIENCY_SEGMENTS.get(campaign_type) slug = seg["lp_slug"] if seg else "dot-full-compliance" if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP: slug = _WEIGHT_TAX_LP[phy_state] if campaign_type == "state_emissions" and phy_state == "CA": slug = "ca-mcp-carb" return f"{SITE_DOMAIN}/order/{slug}" # ── TZ config: tz_key -> (states, send_hour_utc) ───────────────────────────── # 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local TIMEZONE_CONFIG = { "ET": { "states": ("CT","DC","DE","FL","GA","IN","KY","MA","MD","ME","MI", "NH","NJ","NY","NC","OH","PA","RI","SC","TN","VA","VT","WV"), "send_hour_utc": 9, # 4 AM EST }, "CT": { "states": ("AL","AR","IA","IL","KS","LA","MN","MO","MS","ND", "NE","OK","SD","TX","WI"), "send_hour_utc": 10, # 5 AM EST = 4 AM CST }, "MT": { "states": ("AZ","CO","ID","MT","NM","UT","WY"), "send_hour_utc": 11, # 6 AM EST = 4 AM MST }, "PT": { "states": ("AK","CA","HI","NV","OR","WA"), "send_hour_utc": 12, # 7 AM EST = 4 AM PST }, } # Owner email — test sends go here before each campaign is scheduled TEST_EMAIL = os.getenv("CAMPAIGN_TEST_EMAIL", "carrierone@gmx.com") USABLE_FILTER = ( "(email_verified IS TRUE OR email_verify_result IN " "('smtp_valid','catch_all_domain','catch_all_detected'))" ) BLOCKED_EMAIL_DOMAINS = ( "aol.com", "yahoo.com", "ymail.com", "rocketmail.com", ) DB_URL = os.getenv("DATABASE_URL", "") def lm_api(path: str, data: dict | None = None, method: str | None = None): headers = {"Content-Type": "application/json", "Authorization": f"Basic {_LM_AUTH}"} req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers) if data is not None: req.data = json.dumps(data).encode() if method: req.method = method try: return json.loads(urllib.request.urlopen(req, timeout=30).read()) except urllib.error.HTTPError as e: body = e.read().decode()[:300] raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}") def get_base_campaign(campaign_id: int) -> dict: return lm_api(f"/campaigns/{campaign_id}")["data"] def create_list(name: str) -> int: result = lm_api("/lists", { "name": name, "type": "private", "optin": "single", "tags": ["trucking", "auto"], }, "POST") return result["data"]["id"] def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool: """Create/attach a single subscriber to a list. Returns True on success. Listmonk's POST /api/subscribers takes one subscriber with `lists` (array of list IDs) and preconfirm_subscriptions=true so they're immediately sendable. """ try: lm_api("/subscribers", { "email": email, "name": name or email, "status": "enabled", "lists": [list_id], "attribs": attribs, "preconfirm_subscriptions": True, }, "POST") return True except Exception as exc: msg = str(exc) # Already exists — attach to this list instead of failing if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg: try: subs = lm_api(f"/subscribers?query=" + urllib.parse.quote(f"subscribers.email='{email}'")) results = subs.get("data", {}).get("results", []) if results: lm_api("/subscribers/lists", { "ids": [results[0]["id"]], "action": "add", "target_list_ids": [list_id], "status": "confirmed", }, "PUT") return True except Exception: return False return False def import_subscribers(list_id: int, subscribers: list[dict]) -> int: """Add subscribers to a list one-by-one. Returns count successfully added.""" added = 0 for sub in subscribers: if add_subscriber(list_id, sub["email"], sub.get("name", ""), sub.get("attribs", {})): added += 1 return added def create_and_schedule_campaign( base: dict, list_id: int, name: str, send_at_utc: datetime, schedule: bool = True, ) -> int: payload = { "name": name, "subject": base["subject"], "lists": [list_id], "from_email": base["from_email"], "type": "regular", "content_type": base["content_type"], "body": base["body"], "altbody": base.get("altbody"), "template_id": base["template_id"], "tags": base.get("tags") or [], "messenger": base.get("messenger") or "email", } if schedule: payload["send_at"] = send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00") result = lm_api("/campaigns", payload, "POST") cid = result["data"]["id"] if schedule: lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT") # else: leave as draft (preview mode) return cid def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: str) -> None: """Send one test email so the owner can approve before the real blast goes out.""" dot, email, name, state = sample_row body = base["body"] body = body.replace("{{ .Subscriber.Attribs.company }}", name or "Sample Carrier LLC") body = body.replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000") body = body.replace("{{ .Subscriber.Attribs.state }}", state or "TX") # NOTE: leave {{ UnsubscribeURL }} alone — Listmonk renders it into a real, # working per-subscriber unsubscribe link (even on test sends). Overwriting it # produced a dead /unsubscribe link with no subscriber identity. subj = base["subject"].replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000") # Listmonk /test needs `lists` as an array of IDs (base["lists"] is objects) list_ids = [l["id"] for l in base.get("lists", []) if isinstance(l, dict)] or [1] payload = { "name": base.get("name", "Test"), "subject": subj, "lists": list_ids, "from_email": base["from_email"], "type": "regular", "content_type": base["content_type"], "body": body, "altbody": base.get("altbody"), "template_id": base["template_id"], "tags": base.get("tags") or [], "messenger": base.get("messenger") or "email", "subscribers": [TEST_EMAIL], } try: lm_api(f"/campaigns/{campaign_id}/test", payload, "POST") LOG.info("[%s/%s] Test sent to %s (sample: %s DOT#%s)", tz, label, TEST_EMAIL, name, dot) except Exception as exc: LOG.warning("[%s/%s] Test send failed: %s", tz, label, exc) def fetch_carriers( conn, tz_states: tuple, campaign_type: str, limit: int, ) -> list[tuple]: """Return (dot_number, email_address, legal_name, phy_state) rows.""" cur = conn.cursor() states_placeholder = ",".join(["%s"] * len(tz_states)) if campaign_type == "mcs150": type_filter = "mcs150_parsed < NOW() - INTERVAL '2 years'" elif campaign_type == "inactive": type_filter = "oos_active IS TRUE" elif campaign_type in DEFICIENCY_SEGMENTS: # Flag-based segment: dedupe against already-sent and require the flag. type_filter = DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"] else: raise ValueError(f"unknown campaign_type {campaign_type!r}") cur.execute(f""" SELECT dot_number, email_address, legal_name, phy_state FROM fmcsa_carriers WHERE {type_filter} AND {USABLE_FILTER} AND listmonk_sent_at IS NULL AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND phy_state IN ({states_placeholder}) ORDER BY mcs150_parsed ASC NULLS LAST LIMIT %s """, [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit]) return cur.fetchall() def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None: cur = conn.cursor() cur.execute(""" UPDATE fmcsa_carriers SET listmonk_sent_at = NOW(), listmonk_campaign_type = %s WHERE dot_number = ANY(%s::text[]) """, (campaign_type, dot_numbers)) conn.commit() def list_segments(send_date: date) -> None: """Report deduped audience size per deficiency segment (no writes). Verifiable Phase-5 metric: each segment selects a nonzero, deduped audience and points at a valid order LP. Source-template configuration is shown so we know which segments are ready to schedule. """ conn = psycopg2.connect(DB_URL) print(f"\nDeficiency-flag segments (send date {send_date.isoformat()}):\n") print(f"{'segment':<22}{'audience':>10} {'source':<10} landing page") print("-" * 78) grand_total = 0 for ctype, seg in DEFICIENCY_SEGMENTS.items(): total = 0 for tz_cfg in TIMEZONE_CONFIG.values(): rows = fetch_carriers(conn, tz_cfg["states"], ctype, seg["limit"]) total += len(rows) grand_total += total src = os.getenv(seg["source_env"]) src_str = src if src else "UNSET" lp = build_lp_link(ctype, None) print(f"{ctype:<22}{total:>10} {src_str:<10} {lp}") print("-" * 78) print(f"{'TOTAL':<22}{grand_total:>10}") print("\n(UNSET source = template not yet created; segment is skipped by the " "scheduled run until its CAMPAIGN_*_ID env is set.)\n") conn.close() def run(send_date: date, dry_run: bool = False, preview: bool = False) -> None: conn = psycopg2.connect(DB_URL) base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID) base_inactive = get_base_campaign(CAMPAIGN_INACTIVE_ID) campaign_specs = [ ("mcs150", base_mcs150, 2000, "MCS-150 Overdue"), ("inactive", base_inactive, 1000, "Inactive USDOT"), ] # Add deficiency-flag segments whose Listmonk source template is configured. for ctype, seg in DEFICIENCY_SEGMENTS.items(): src = os.getenv(seg["source_env"]) if not src: LOG.info("[segment] %s skipped — %s not set (template not created)", ctype, seg["source_env"]) continue try: base = get_base_campaign(int(src)) except Exception as exc: # noqa: BLE001 LOG.warning("[segment] %s skipped — source %s unusable: %s", ctype, src, exc) continue campaign_specs.append((ctype, base, seg["limit"], seg["label"])) if preview: LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, " "test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL) for tz, tz_cfg in TIMEZONE_CONFIG.items(): states = tz_cfg["states"] send_hour = tz_cfg["send_hour_utc"] send_at = datetime( send_date.year, send_date.month, send_date.day, send_hour, 0, 0, tzinfo=timezone.utc ) for campaign_type, base, limit, label in campaign_specs: fetch_limit = 1 if preview else limit rows = fetch_carriers(conn, states, campaign_type, fetch_limit) if not rows: LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type) continue actual = len(rows) tag = "PREVIEW " if preview else "" list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}" campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}" LOG.info("[%s/%s] %d carriers -> %s (send %s UTC)", tz, campaign_type, actual, campaign_name, send_at.strftime("%Y-%m-%d %H:%M")) if dry_run: LOG.info(" DRY RUN — skipping Listmonk + DB writes") continue # Build subscriber list. In preview, only the owner's address (with the # sample carrier's attribs) so the real audience is never touched. if preview: r0 = rows[0] subscribers = [{ "email": TEST_EMAIL, "name": r0[2] or "Sample Carrier", "attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or "", "lp_link": build_lp_link(campaign_type, r0[3])}, }] else: subscribers = [ { "email": row[1], "name": row[2] or row[1], "attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or "", "lp_link": build_lp_link(campaign_type, row[3])}, } for row in rows ] # Create list + add subscribers list_id = create_list(list_name) added = import_subscribers(list_id, subscribers) LOG.info("[%s/%s] List %d: %d/%d subscribers added", tz, campaign_type, list_id, added, len(subscribers)) if added == 0: LOG.error("[%s/%s] BUG: 0 subscribers added to list %d — skipping", tz, campaign_type, list_id) continue # Create campaign (draft in preview, scheduled in real run) cid = create_and_schedule_campaign(base, list_id, campaign_name, send_at, schedule=not preview) LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid, "draft/preview" if preview else f"scheduled {send_at.isoformat()}") # Send a test to the owner so they can spot-check the rendered email send_test(base, cid, rows[0], label, tz) # Mark carriers as sent only on a real run if not preview: mark_sent(conn, [row[0] for row in rows], campaign_type) conn.close() LOG.info("Done.") def main(): logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", ) parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk") parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes") parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent") parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)") parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)") args = parser.parse_args() if args.date: send_date = date.fromisoformat(args.date) else: send_date = date.today() + timedelta(days=1) if args.list_segments: list_segments(send_date) return LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s)", send_date, args.dry_run, args.preview) run(send_date, dry_run=args.dry_run, preview=args.preview) if __name__ == "__main__": main()