new-site/scripts/build_trucking_campaigns.py
justin fd3ceb3efc build_trucking_campaigns: add --max-per-segment / --only-segment / --send-hour for fresh-IP warmup sends
Lets us fire small, controlled batches (e.g. MCS-150 only, 100/tz, sent today)
while the new sending IPs warm up, instead of the full multi-segment schedule.
2026-06-02 12:30:08 -05:00

527 lines
22 KiB
Python

#!/usr/bin/env python3
"""Daily trucking campaign builder.
Runs nightly (3 AM EST) to create the next day's 8 Listmonk campaigns:
- 4 TZ regions x MCS-150 overdue (2,000 carriers each, 4 AM / 5 AM / 6 AM / 7 AM EST)
- 4 TZ regions x Inactive USDOT (1,000 carriers each, same schedule)
Selection criteria:
- email verified or catch-all (usable)
- mcs150_parsed > 2 years ago (for MCS-150 campaign)
- oos_active IS TRUE (for inactive USDOT campaign)
- listmonk_sent_at IS NULL (not yet sent)
- ordered by mcs150_parsed ASC (most overdue first)
Usage:
python3 scripts/build_trucking_campaigns.py
python3 scripts/build_trucking_campaigns.py --dry-run
python3 scripts/build_trucking_campaigns.py --date 2026-06-02
"""
from __future__ import annotations
import argparse
import base64
import json
import logging
import os
import sys
import urllib.request
import urllib.parse
from datetime import date, datetime, timedelta, timezone
import psycopg2
from scripts._email_exclusions import BLOCKED_EMAIL_DOMAINS
LOG = logging.getLogger("build_trucking_campaigns")
# ── Listmonk ──────────────────────────────────────────────────────────────────
LISTMONK_URL = os.getenv("LISTMONK_URL", "http://listmonk:9000")
LISTMONK_USER = os.getenv("LISTMONK_USER", "api")
LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
_LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
# ── Source campaign IDs ────────────────────────────────────────────────────────
CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk"
CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over"
# Public site for landing-page links injected into Listmonk subscriber attribs.
SITE_DOMAIN = os.getenv("SITE_DOMAIN", "https://performancewest.com")
# ── Deficiency-flag segments (Phase 5) ──────────────────────────────────────
# Each segment targets carriers by a `deficiency_flags` value (the TEXT[] column
# written by fmcsa_deficiency_flagger.py) and links to its order landing page.
#
# `source_id` is the Listmonk template campaign to clone; set via env once the
# template exists (see CAMPAIGN_SOURCE_ENV). Segments without a configured
# source_id are reported by --list-segments but skipped by the scheduled run so
# the nightly job never breaks on an unconfigured template.
#
# `flag_sql` is a predicate over the deficiency_flags array.
DEFICIENCY_SEGMENTS = {
"for_hire_boc3": {
"label": "For-Hire BOC-3 + UCR",
"flag_sql": "'for_hire_carrier' = ANY(deficiency_flags)",
"lp_slug": "boc3-filing",
"source_env": "CAMPAIGN_FOR_HIRE_ID",
"limit": 1500,
},
"irp_ifta": {
"label": "IRP / IFTA Registration",
"flag_sql": "'interstate_needs_irp_ifta' = ANY(deficiency_flags)",
"lp_slug": "state-trucking-bundle",
"source_env": "CAMPAIGN_IRP_IFTA_ID",
"limit": 1500,
},
"intrastate_authority": {
"label": "Intrastate Operating Authority",
# any intrastate_authority_<state> flag
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'intrastate_authority_%%')",
"lp_slug": "intrastate-authority",
"source_env": "CAMPAIGN_INTRASTATE_ID",
"limit": 1000,
},
"state_weight_tax": {
"label": "State Weight-Distance Tax",
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'state_weight_tax_%%')",
# per-state LP resolved per-row in build_lp_link()
"lp_slug": "state-trucking-bundle",
"source_env": "CAMPAIGN_WEIGHT_TAX_ID",
"limit": 1000,
},
"state_emissions": {
"label": "State Clean-Truck / Emissions",
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'state_emissions_%%')",
"lp_slug": "state-emissions",
"source_env": "CAMPAIGN_EMISSIONS_ID",
"limit": 1000,
},
"hazmat": {
"label": "Hazmat PHMSA Registration",
"flag_sql": "'hazmat_carrier' = ANY(deficiency_flags)",
"lp_slug": "hazmat-phmsa",
"source_env": "CAMPAIGN_HAZMAT_ID",
"limit": 500,
},
}
# Per-state weight-tax LP overrides (CA emissions goes to ca-mcp-carb).
_WEIGHT_TAX_LP = {
"OR": "or-weight-mile-tax", "NY": "ny-hut-registration",
"KY": "ky-kyu-registration", "NM": "nm-weight-distance",
"CT": "ct-highway-use-fee",
}
def build_lp_link(campaign_type: str, phy_state: str | None) -> str:
"""Return the order landing-page URL for a (segment, state)."""
seg = DEFICIENCY_SEGMENTS.get(campaign_type)
slug = seg["lp_slug"] if seg else "dot-full-compliance"
if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP:
slug = _WEIGHT_TAX_LP[phy_state]
if campaign_type == "state_emissions" and phy_state == "CA":
slug = "ca-mcp-carb"
return f"{SITE_DOMAIN}/order/{slug}"
# ── TZ config: tz_key -> (states, send_hour_utc) ─────────────────────────────
# 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local
TIMEZONE_CONFIG = {
"ET": {
"states": ("CT","DC","DE","FL","GA","IN","KY","MA","MD","ME","MI",
"NH","NJ","NY","NC","OH","PA","RI","SC","TN","VA","VT","WV"),
"send_hour_utc": 9, # 4 AM EST
},
"CT": {
"states": ("AL","AR","IA","IL","KS","LA","MN","MO","MS","ND",
"NE","OK","SD","TX","WI"),
"send_hour_utc": 10, # 5 AM EST = 4 AM CST
},
"MT": {
"states": ("AZ","CO","ID","MT","NM","UT","WY"),
"send_hour_utc": 11, # 6 AM EST = 4 AM MST
},
"PT": {
"states": ("AK","CA","HI","NV","OR","WA"),
"send_hour_utc": 12, # 7 AM EST = 4 AM PST
},
}
# Owner email — test sends go here before each campaign is scheduled
TEST_EMAIL = os.getenv("CAMPAIGN_TEST_EMAIL", "carrierone@gmx.com")
USABLE_FILTER = (
"(email_verified IS TRUE OR email_verify_result IN "
"('smtp_valid','catch_all_domain','catch_all_detected'))"
)
DB_URL = os.getenv("DATABASE_URL", "")
def lm_api(path: str, data: dict | None = None, method: str | None = None):
headers = {"Content-Type": "application/json", "Authorization": f"Basic {_LM_AUTH}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if method:
req.method = method
try:
return json.loads(urllib.request.urlopen(req, timeout=30).read())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}")
def get_base_campaign(campaign_id: int) -> dict:
return lm_api(f"/campaigns/{campaign_id}")["data"]
def create_list(name: str) -> int:
result = lm_api("/lists", {
"name": name,
"type": "private",
"optin": "single",
"tags": ["trucking", "auto"],
}, "POST")
return result["data"]["id"]
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
"""Create/attach a single subscriber to a list. Returns True on success.
Listmonk's POST /api/subscribers takes one subscriber with `lists` (array of
list IDs) and preconfirm_subscriptions=true so they're immediately sendable.
"""
try:
lm_api("/subscribers", {
"email": email,
"name": name or email,
"status": "enabled",
"lists": [list_id],
"attribs": attribs,
"preconfirm_subscriptions": True,
}, "POST")
return True
except Exception as exc:
msg = str(exc)
# Already exists — attach to this list instead of failing
if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg:
try:
subs = lm_api(f"/subscribers?query=" + urllib.parse.quote(f"subscribers.email='{email}'"))
results = subs.get("data", {}).get("results", [])
if results:
lm_api("/subscribers/lists", {
"ids": [results[0]["id"]],
"action": "add",
"target_list_ids": [list_id],
"status": "confirmed",
}, "PUT")
return True
except Exception:
return False
return False
def import_subscribers(list_id: int, subscribers: list[dict]) -> int:
"""Add subscribers to a list one-by-one. Returns count successfully added."""
added = 0
for sub in subscribers:
if add_subscriber(list_id, sub["email"], sub.get("name", ""), sub.get("attribs", {})):
added += 1
return added
def create_and_schedule_campaign(
base: dict,
list_id: int,
name: str,
send_at_utc: datetime,
schedule: bool = True,
) -> int:
payload = {
"name": name,
"subject": base["subject"],
"lists": [list_id],
"from_email": base["from_email"],
"type": "regular",
"content_type": base["content_type"],
"body": base["body"],
"altbody": base.get("altbody"),
"template_id": base["template_id"],
"tags": base.get("tags") or [],
"messenger": base.get("messenger") or "email",
}
if schedule:
payload["send_at"] = send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00")
result = lm_api("/campaigns", payload, "POST")
cid = result["data"]["id"]
if schedule:
lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT")
# else: leave as draft (preview mode)
return cid
def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: str) -> None:
"""Send one test email so the owner can approve before the real blast goes out."""
dot, email, name, state = sample_row
body = base["body"]
body = body.replace("{{ .Subscriber.Attribs.company }}", name or "Sample Carrier LLC")
body = body.replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
body = body.replace("{{ .Subscriber.Attribs.state }}", state or "TX")
# NOTE: leave {{ UnsubscribeURL }} alone — Listmonk renders it into a real,
# working per-subscriber unsubscribe link (even on test sends). Overwriting it
# produced a dead /unsubscribe link with no subscriber identity.
subj = base["subject"].replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
# Listmonk /test needs `lists` as an array of IDs (base["lists"] is objects)
list_ids = [l["id"] for l in base.get("lists", []) if isinstance(l, dict)] or [1]
payload = {
"name": base.get("name", "Test"), "subject": subj,
"lists": list_ids, "from_email": base["from_email"],
"type": "regular", "content_type": base["content_type"],
"body": body, "altbody": base.get("altbody"),
"template_id": base["template_id"],
"tags": base.get("tags") or [], "messenger": base.get("messenger") or "email",
"subscribers": [TEST_EMAIL],
}
try:
lm_api(f"/campaigns/{campaign_id}/test", payload, "POST")
LOG.info("[%s/%s] Test sent to %s (sample: %s DOT#%s)", tz, label, TEST_EMAIL, name, dot)
except Exception as exc:
LOG.warning("[%s/%s] Test send failed: %s", tz, label, exc)
def fetch_carriers(
conn,
tz_states: tuple,
campaign_type: str,
limit: int,
) -> list[tuple]:
"""Return (dot_number, email_address, legal_name, phy_state) rows."""
cur = conn.cursor()
states_placeholder = ",".join(["%s"] * len(tz_states))
if campaign_type == "mcs150":
type_filter = "mcs150_parsed < NOW() - INTERVAL '2 years'"
elif campaign_type == "inactive":
type_filter = "oos_active IS TRUE"
elif campaign_type in DEFICIENCY_SEGMENTS:
# Flag-based segment: dedupe against already-sent and require the flag.
type_filter = DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"]
else:
raise ValueError(f"unknown campaign_type {campaign_type!r}")
cur.execute(f"""
SELECT dot_number, email_address, legal_name, phy_state
FROM fmcsa_carriers
WHERE {type_filter}
AND {USABLE_FILTER}
AND listmonk_sent_at IS NULL
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND phy_state IN ({states_placeholder})
ORDER BY mcs150_parsed ASC NULLS LAST
LIMIT %s
""", [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit])
return cur.fetchall()
def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None:
cur = conn.cursor()
cur.execute("""
UPDATE fmcsa_carriers
SET listmonk_sent_at = NOW(),
listmonk_campaign_type = %s
WHERE dot_number = ANY(%s::text[])
""", (campaign_type, dot_numbers))
conn.commit()
def list_segments(send_date: date) -> None:
"""Report deduped audience size per deficiency segment (no writes).
Verifiable Phase-5 metric: each segment selects a nonzero, deduped audience
and points at a valid order LP. Source-template configuration is shown so we
know which segments are ready to schedule.
"""
conn = psycopg2.connect(DB_URL)
print(f"\nDeficiency-flag segments (send date {send_date.isoformat()}):\n")
print(f"{'segment':<22}{'audience':>10} {'source':<10} landing page")
print("-" * 78)
grand_total = 0
for ctype, seg in DEFICIENCY_SEGMENTS.items():
total = 0
for tz_cfg in TIMEZONE_CONFIG.values():
rows = fetch_carriers(conn, tz_cfg["states"], ctype, seg["limit"])
total += len(rows)
grand_total += total
src = os.getenv(seg["source_env"])
src_str = src if src else "UNSET"
lp = build_lp_link(ctype, None)
print(f"{ctype:<22}{total:>10} {src_str:<10} {lp}")
print("-" * 78)
print(f"{'TOTAL':<22}{grand_total:>10}")
print("\n(UNSET source = template not yet created; segment is skipped by the "
"scheduled run until its CAMPAIGN_*_ID env is set.)\n")
conn.close()
def run(send_date: date, dry_run: bool = False, preview: bool = False,
max_per_segment: int | None = None, only_segments: set[str] | None = None,
send_hour_override: int | None = None, send_minute: int = 0) -> None:
conn = psycopg2.connect(DB_URL)
base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID)
base_inactive = get_base_campaign(CAMPAIGN_INACTIVE_ID)
campaign_specs = [
("mcs150", base_mcs150, 2000, "MCS-150 Overdue"),
("inactive", base_inactive, 1000, "Inactive USDOT"),
]
# Add deficiency-flag segments whose Listmonk source template is configured.
for ctype, seg in DEFICIENCY_SEGMENTS.items():
src = os.getenv(seg["source_env"])
if not src:
LOG.info("[segment] %s skipped — %s not set (template not created)",
ctype, seg["source_env"])
continue
try:
base = get_base_campaign(int(src))
except Exception as exc: # noqa: BLE001
LOG.warning("[segment] %s skipped — source %s unusable: %s",
ctype, src, exc)
continue
campaign_specs.append((ctype, base, seg["limit"], seg["label"]))
# Optional warmup controls: restrict to specific segments and/or cap the
# per-segment audience so a fresh-IP day-0 send stays small.
if only_segments:
campaign_specs = [s for s in campaign_specs if s[0] in only_segments]
if not campaign_specs:
LOG.error("No campaign specs match --only-segment %s", sorted(only_segments))
conn.close()
return
if max_per_segment is not None:
campaign_specs = [
(ct, base, min(lim, max_per_segment), label)
for (ct, base, lim, label) in campaign_specs
]
if preview:
LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, "
"test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL)
for tz, tz_cfg in TIMEZONE_CONFIG.items():
states = tz_cfg["states"]
send_hour = send_hour_override if send_hour_override is not None else tz_cfg["send_hour_utc"]
send_at = datetime(
send_date.year, send_date.month, send_date.day,
send_hour, send_minute, 0, tzinfo=timezone.utc
)
for campaign_type, base, limit, label in campaign_specs:
fetch_limit = 1 if preview else limit
rows = fetch_carriers(conn, states, campaign_type, fetch_limit)
if not rows:
LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type)
continue
actual = len(rows)
tag = "PREVIEW " if preview else ""
list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}"
campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}"
LOG.info("[%s/%s] %d carriers -> %s (send %s UTC)",
tz, campaign_type, actual, campaign_name,
send_at.strftime("%Y-%m-%d %H:%M"))
if dry_run:
LOG.info(" DRY RUN — skipping Listmonk + DB writes")
continue
# Build subscriber list. In preview, only the owner's address (with the
# sample carrier's attribs) so the real audience is never touched.
if preview:
r0 = rows[0]
subscribers = [{
"email": TEST_EMAIL,
"name": r0[2] or "Sample Carrier",
"attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or "",
"lp_link": build_lp_link(campaign_type, r0[3])},
}]
else:
subscribers = [
{
"email": row[1],
"name": row[2] or row[1],
"attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or "",
"lp_link": build_lp_link(campaign_type, row[3])},
}
for row in rows
]
# Create list + add subscribers
list_id = create_list(list_name)
added = import_subscribers(list_id, subscribers)
LOG.info("[%s/%s] List %d: %d/%d subscribers added", tz, campaign_type, list_id, added, len(subscribers))
if added == 0:
LOG.error("[%s/%s] BUG: 0 subscribers added to list %d — skipping", tz, campaign_type, list_id)
continue
# Create campaign (draft in preview, scheduled in real run)
cid = create_and_schedule_campaign(base, list_id, campaign_name, send_at, schedule=not preview)
LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid,
"draft/preview" if preview else f"scheduled {send_at.isoformat()}")
# Send a test to the owner so they can spot-check the rendered email
send_test(base, cid, rows[0], label, tz)
# Mark carriers as sent only on a real run
if not preview:
mark_sent(conn, [row[0] for row in rows], campaign_type)
conn.close()
LOG.info("Done.")
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk")
parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes")
parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent")
parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)")
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: tomorrow)")
parser.add_argument("--max-per-segment", type=int, default=None,
help="Cap audience per (segment x timezone). Use for fresh-IP warmup small sends.")
parser.add_argument("--only-segment", action="append", default=None, metavar="SEG",
help="Restrict to specific segment(s), e.g. mcs150, inactive, for_hire_boc3. Repeatable.")
parser.add_argument("--send-hour", type=int, default=None,
help="Override send hour (UTC) for ALL timezones instead of per-tz defaults.")
parser.add_argument("--send-minute", type=int, default=0,
help="Send minute (UTC), used with --send-hour. Default 0.")
args = parser.parse_args()
if args.date:
send_date = date.fromisoformat(args.date)
else:
send_date = date.today() + timedelta(days=1)
if args.list_segments:
list_segments(send_date)
return
only = set(args.only_segment) if args.only_segment else None
LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s, "
"max_per_segment=%s, only=%s, send_hour=%s)",
send_date, args.dry_run, args.preview, args.max_per_segment,
sorted(only) if only else None, args.send_hour)
run(send_date, dry_run=args.dry_run, preview=args.preview,
max_per_segment=args.max_per_segment, only_segments=only,
send_hour_override=args.send_hour, send_minute=args.send_minute)
if __name__ == "__main__":
main()