From 0b7a35a58e1a69af02e60e953aaa6d7dcec4db05 Mon Sep 17 00:00:00 2001 From: justin Date: Sun, 31 May 2026 10:07:44 -0500 Subject: [PATCH] trucking campaigns: daily builder + MX verifier concurrency + tracking column - build_trucking_campaigns.py: nightly script that creates 8 Listmonk campaigns per day (4 TZ x 2 types: MCS-150 overdue 2k/TZ, inactive USDOT 1k/TZ) at 4AM ET / 5AM ET (CT) / 6AM ET (MT) / 7AM ET (PT). Deduplicates via listmonk_sent_at column. - migration 083: add listmonk_sent_at + listmonk_campaign_type to fmcsa_carriers - email_verifier.py: bump max_workers from 5 to 20 for 4x faster throughput - cron: daily pw-trucking-campaigns at 08:00 UTC (3 AM EST) Co-Authored-By: Claude Sonnet 4.6 --- .../083_fmcsa_campaign_tracking.sql | 10 + .../roles/worker-crons/defaults/main.yml | 9 + scripts/build_trucking_campaigns.py | 274 ++++++++++++++++++ scripts/workers/email_verifier.py | 2 +- 4 files changed, 294 insertions(+), 1 deletion(-) create mode 100644 api/migrations/083_fmcsa_campaign_tracking.sql create mode 100644 scripts/build_trucking_campaigns.py diff --git a/api/migrations/083_fmcsa_campaign_tracking.sql b/api/migrations/083_fmcsa_campaign_tracking.sql new file mode 100644 index 0000000..2a3bc83 --- /dev/null +++ b/api/migrations/083_fmcsa_campaign_tracking.sql @@ -0,0 +1,10 @@ +-- Track which fmcsa_carriers records have been added to Listmonk campaign lists, +-- so the daily campaign builder never double-sends to the same carrier. + +ALTER TABLE fmcsa_carriers + ADD COLUMN IF NOT EXISTS listmonk_sent_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS listmonk_campaign_type TEXT; + +CREATE INDEX IF NOT EXISTS idx_fmcsa_carriers_listmonk_sent + ON fmcsa_carriers (listmonk_sent_at) + WHERE listmonk_sent_at IS NULL; diff --git a/infra/ansible/roles/worker-crons/defaults/main.yml b/infra/ansible/roles/worker-crons/defaults/main.yml index 332c08c..7b5ba53 100644 --- a/infra/ansible/roles/worker-crons/defaults/main.yml +++ b/infra/ansible/roles/worker-crons/defaults/main.yml @@ -199,3 +199,12 @@ worker_crons: module: scripts.workers.quarterly_499q_notify on_calendar: "*-*-* 13:00:00 UTC" persistent: true + + # Trucking campaign builder — 3 AM EST (08:00 UTC) daily. + # Creates 8 Listmonk campaigns for the next day's sends (4 TZ × MCS-150 + inactive USDOT). + # 2,000 carriers/TZ for MCS-150; 1,000 carriers/TZ for inactive USDOT. + - name: pw-trucking-campaigns + description: Build daily trucking Listmonk campaigns (MCS-150 overdue + inactive USDOT) + module: scripts.build_trucking_campaigns + on_calendar: "*-*-* 08:00:00 UTC" + persistent: true diff --git a/scripts/build_trucking_campaigns.py b/scripts/build_trucking_campaigns.py new file mode 100644 index 0000000..aa1be72 --- /dev/null +++ b/scripts/build_trucking_campaigns.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +"""Daily trucking campaign builder. + +Runs nightly (3 AM EST) to create the next day's 8 Listmonk campaigns: + - 4 TZ regions x MCS-150 overdue (2,000 carriers each, 4 AM / 5 AM / 6 AM / 7 AM EST) + - 4 TZ regions x Inactive USDOT (1,000 carriers each, same schedule) + +Selection criteria: + - email verified or catch-all (usable) + - mcs150_parsed > 2 years ago (for MCS-150 campaign) + - oos_active IS TRUE (for inactive USDOT campaign) + - listmonk_sent_at IS NULL (not yet sent) + - ordered by mcs150_parsed ASC (most overdue first) + +Usage: + python3 scripts/build_trucking_campaigns.py + python3 scripts/build_trucking_campaigns.py --dry-run + python3 scripts/build_trucking_campaigns.py --date 2026-06-02 +""" +from __future__ import annotations + +import argparse +import base64 +import json +import logging +import os +import sys +import urllib.request +from datetime import date, datetime, timedelta, timezone + +import psycopg2 + +LOG = logging.getLogger("build_trucking_campaigns") + +# ── Listmonk ────────────────────────────────────────────────────────────────── +LISTMONK_URL = os.getenv("LISTMONK_URL", "http://listmonk:9000") +LISTMONK_USER = os.getenv("LISTMONK_USER", "api") +LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y") +_LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode() + +# ── Source campaign IDs ──────────────────────────────────────────────────────── +CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk" +CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over" + +# ── TZ config: tz_key -> (states, send_hour_utc) ───────────────────────────── +# 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local +TIMEZONE_CONFIG = { + "ET": { + "states": ("CT","DC","DE","FL","GA","IN","KY","MA","MD","ME","MI", + "NH","NJ","NY","NC","OH","PA","RI","SC","TN","VA","VT","WV"), + "send_hour_utc": 9, # 4 AM EST + }, + "CT": { + "states": ("AL","AR","IA","IL","KS","LA","MN","MO","MS","ND", + "NE","OK","SD","TX","WI"), + "send_hour_utc": 10, # 5 AM EST = 4 AM CST + }, + "MT": { + "states": ("AZ","CO","ID","MT","NM","UT","WY"), + "send_hour_utc": 11, # 6 AM EST = 4 AM MST + }, + "PT": { + "states": ("AK","CA","HI","NV","OR","WA"), + "send_hour_utc": 12, # 7 AM EST = 4 AM PST + }, +} + +USABLE_FILTER = ( + "(email_verified IS TRUE OR email_verify_result IN " + "('smtp_valid','catch_all_domain','catch_all_detected'))" +) + +DB_URL = os.getenv("DATABASE_URL", "") + + +def lm_api(path: str, data: dict | None = None, method: str | None = None): + headers = {"Content-Type": "application/json", "Authorization": f"Basic {_LM_AUTH}"} + req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers) + if data is not None: + req.data = json.dumps(data).encode() + if method: + req.method = method + try: + return json.loads(urllib.request.urlopen(req, timeout=30).read()) + except urllib.error.HTTPError as e: + body = e.read().decode()[:300] + raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}") + + +def get_base_campaign(campaign_id: int) -> dict: + return lm_api(f"/campaigns/{campaign_id}")["data"] + + +def create_list(name: str) -> int: + result = lm_api("/lists", { + "name": name, + "type": "private", + "optin": "single", + "tags": ["trucking", "auto"], + }, "POST") + return result["data"]["id"] + + +def import_subscribers(list_id: int, subscribers: list[dict]) -> None: + """Bulk-import subscribers to a Listmonk list.""" + # Listmonk bulk import via CSV-style JSON + lm_api(f"/subscribers", { + "subscribers": subscribers, + "list_ids": [list_id], + "overwrite": False, + }, "POST") + + +def create_and_schedule_campaign( + base: dict, + list_id: int, + name: str, + send_at_utc: datetime, +) -> int: + payload = { + "name": name, + "subject": base["subject"], + "lists": [list_id], + "from_email": base["from_email"], + "type": "regular", + "content_type": base["content_type"], + "body": base["body"], + "altbody": base.get("altbody"), + "template_id": base["template_id"], + "tags": base.get("tags") or [], + "messenger": base.get("messenger") or "email", + "send_at": send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00"), + } + result = lm_api("/campaigns", payload, "POST") + cid = result["data"]["id"] + # Schedule it + lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT") + return cid + + +def fetch_carriers( + conn, + tz_states: tuple, + campaign_type: str, + limit: int, +) -> list[tuple]: + """Return (dot_number, email_address, legal_name, phy_state) rows.""" + cur = conn.cursor() + states_placeholder = ",".join(["%s"] * len(tz_states)) + if campaign_type == "mcs150": + type_filter = "mcs150_parsed < NOW() - INTERVAL '2 years'" + else: + type_filter = "oos_active IS TRUE" + + cur.execute(f""" + SELECT dot_number, email_address, legal_name, phy_state + FROM fmcsa_carriers + WHERE {type_filter} + AND {USABLE_FILTER} + AND listmonk_sent_at IS NULL + AND phy_state IN ({states_placeholder}) + ORDER BY mcs150_parsed ASC NULLS LAST + LIMIT %s + """, list(tz_states) + [limit]) + return cur.fetchall() + + +def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None: + cur = conn.cursor() + cur.execute(""" + UPDATE fmcsa_carriers + SET listmonk_sent_at = NOW(), + listmonk_campaign_type = %s + WHERE dot_number = ANY(%s::text[]) + """, (campaign_type, dot_numbers)) + conn.commit() + + +def run(send_date: date, dry_run: bool = False) -> None: + conn = psycopg2.connect(DB_URL) + + base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID) + base_inactive = get_base_campaign(CAMPAIGN_INACTIVE_ID) + + campaign_specs = [ + ("mcs150", base_mcs150, 2000, "MCS-150 Overdue"), + ("inactive", base_inactive, 1000, "Inactive USDOT"), + ] + + for tz, tz_cfg in TIMEZONE_CONFIG.items(): + states = tz_cfg["states"] + send_hour = tz_cfg["send_hour_utc"] + send_at = datetime( + send_date.year, send_date.month, send_date.day, + send_hour, 0, 0, tzinfo=timezone.utc + ) + + for campaign_type, base, limit, label in campaign_specs: + rows = fetch_carriers(conn, states, campaign_type, limit) + if not rows: + LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type) + continue + + actual = len(rows) + list_name = f"Trucking {label} {tz} {send_date.isoformat()}" + campaign_name = f"{label} - {tz} - {send_date.strftime('%b %d %Y')}" + + LOG.info("[%s/%s] %d carriers -> %s (send %s UTC)", + tz, campaign_type, actual, campaign_name, + send_at.strftime("%Y-%m-%d %H:%M")) + + if dry_run: + LOG.info(" DRY RUN — skipping Listmonk + DB writes") + continue + + # Create Listmonk list + list_id = create_list(list_name) + + # Import subscribers + subscribers = [ + { + "email": row[1], + "name": row[2] or row[1], + "attribs": { + "dot_number": row[0], + "company": row[2] or "", + "state": row[3] or "", + }, + } + for row in rows + ] + try: + import_subscribers(list_id, subscribers) + except Exception as exc: + LOG.warning("[%s/%s] Subscriber import error: %s — falling back to individual", tz, campaign_type, exc) + for sub in subscribers: + try: + lm_api("/subscribers", {**sub, "list_ids": [list_id], "status": "enabled", "preconfirm_subscriptions": True}, "POST") + except Exception: + pass + + # Create and schedule the campaign + cid = create_and_schedule_campaign(base, list_id, campaign_name, send_at) + LOG.info("[%s/%s] Campaign %d scheduled for %s UTC", tz, campaign_type, cid, send_at.isoformat()) + + # Mark carriers as sent in DB + dot_numbers = [row[0] for row in rows] + mark_sent(conn, dot_numbers, campaign_type) + + conn.close() + LOG.info("Done.") + + +def main(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + ) + parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk") + parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes") + parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)") + args = parser.parse_args() + + if args.date: + send_date = date.fromisoformat(args.date) + else: + send_date = date.today() + timedelta(days=1) + + LOG.info("Building campaigns for send date %s (dry_run=%s)", send_date, args.dry_run) + run(send_date, dry_run=args.dry_run) + + +if __name__ == "__main__": + main() diff --git a/scripts/workers/email_verifier.py b/scripts/workers/email_verifier.py index 3ff9392..f83933d 100644 --- a/scripts/workers/email_verifier.py +++ b/scripts/workers/email_verifier.py @@ -205,7 +205,7 @@ def verify_table(table: str, limit: int | None = None, dry_run: bool = False, wh is_valid, reason = verify_email(email) return dot, email, is_valid, reason - with ThreadPoolExecutor(max_workers=5) as executor: + with ThreadPoolExecutor(max_workers=20) as executor: futures = {executor.submit(check_one, row): row for row in rows} done = 0 for future in as_completed(futures):