new-site/scripts/build_trucking_campaigns.py
justin 5e9aec40d1 trucking: same-day expiring coupon to drive immediate conversion
The sales we got came at $79 + a 24hr coupon; cutting MCS-150 to $39 flat
removed urgency and conversions did NOT improve (a permanent low price sets a
new anchor and lets people defer). Restore the higher anchor and let an
expiring discount create the now-or-lose-it decision.

- Restore MCS-150 anchor $39 -> $79 (catalog single source + regenerated).
- build_trucking_campaigns.py: mint ONE random 5-letter coupon per send-day
  (40% off, valid through 23:59:59 ET that day) into the existing discount_codes
  table; inject coupon_code/pct/expires + a ?code= LP link into every email.
  Idempotent per day; service-fee-only scope (gov/pass-through fees never cut).
- Listmonk MCS-150 (186) + Inactive USDOT (188) templates: lead with the
  struck-through anchor + sale price + code + 'expires tonight', and point the
  primary CTA at the order page (with code) instead of the 'free check' tool.
- OrderPriceBanner: validates ?code= via /api/v1/discount and shows
  was/now + expiry; Wizard forwards the code to order creation.
- Verified: code gen, expiry math, scope enforcement, discount API
  (40% off $79 = $47.40), site+api builds clean.
2026-06-13 20:43:47 -05:00

1003 lines
42 KiB
Python

#!/usr/bin/env python3
"""Daily trucking campaign builder.
Runs nightly (3 AM EST) to create the next day's 8 Listmonk campaigns:
- 4 TZ regions x MCS-150 overdue (2,000 carriers each, 4 AM / 5 AM / 6 AM / 7 AM EST)
- 4 TZ regions x Inactive USDOT (1,000 carriers each, same schedule)
Selection criteria:
- email verified or catch-all (usable)
- mcs150_parsed > 2 years ago (for MCS-150 campaign)
- oos_active IS TRUE (for inactive USDOT campaign)
- listmonk_sent_at IS NULL (not yet sent)
- ordered by mcs150_parsed ASC (most overdue first)
Usage:
python3 scripts/build_trucking_campaigns.py
python3 scripts/build_trucking_campaigns.py --dry-run
python3 scripts/build_trucking_campaigns.py --date 2026-06-02
"""
from __future__ import annotations
import argparse
import base64
import json
import logging
import os
import sys
import urllib.request
import urllib.parse
import math
import time
from datetime import date, datetime, timedelta, timezone
import psycopg2
# Allow both supported invocation styles:
# python -m scripts.build_trucking_campaigns
# python3 scripts/build_trucking_campaigns.py
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
from scripts._email_exclusions import BLOCKED_EMAIL_DOMAINS
LOG = logging.getLogger("build_trucking_campaigns")
# ── Listmonk ──────────────────────────────────────────────────────────────────
LISTMONK_URL = os.getenv("LISTMONK_URL", "http://listmonk:9000")
LISTMONK_USER = os.getenv("LISTMONK_USER", "api")
LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
_LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
# ── Source campaign IDs ────────────────────────────────────────────────────────
CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk"
CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over"
# Public site for landing-page links injected into Listmonk subscriber attribs.
SITE_DOMAIN = os.getenv("SITE_DOMAIN", "https://performancewest.net")
# ── Deficiency-flag segments (Phase 5) ──────────────────────────────────────
# Each segment targets carriers by a `deficiency_flags` value (the TEXT[] column
# written by fmcsa_deficiency_flagger.py) and links to its order landing page.
#
# `source_id` is the Listmonk template campaign to clone; set via env once the
# template exists (see CAMPAIGN_SOURCE_ENV). Segments without a configured
# source_id are reported by --list-segments but skipped by the scheduled run so
# the nightly job never breaks on an unconfigured template.
#
# `flag_sql` is a predicate over the deficiency_flags array.
DEFICIENCY_SEGMENTS = {
"for_hire_boc3": {
"label": "For-Hire BOC-3 + UCR",
"flag_sql": "'for_hire_carrier' = ANY(deficiency_flags)",
"lp_slug": "boc3-filing",
"source_env": "CAMPAIGN_FOR_HIRE_ID",
"limit": 1500,
},
"irp_ifta": {
"label": "IRP / IFTA Registration",
"flag_sql": "'interstate_needs_irp_ifta' = ANY(deficiency_flags)",
"lp_slug": "state-trucking-bundle",
"source_env": "CAMPAIGN_IRP_IFTA_ID",
"limit": 1500,
},
"intrastate_authority": {
"label": "Intrastate Operating Authority",
# any intrastate_authority_<state> flag
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'intrastate_authority_%%')",
"lp_slug": "intrastate-authority",
"source_env": "CAMPAIGN_INTRASTATE_ID",
"limit": 1000,
},
"state_weight_tax": {
"label": "State Weight-Distance Tax",
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'state_weight_tax_%%')",
# per-state LP resolved per-row in build_lp_link()
"lp_slug": "state-trucking-bundle",
"source_env": "CAMPAIGN_WEIGHT_TAX_ID",
"limit": 1000,
},
"state_emissions": {
"label": "State Clean-Truck / Emissions",
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'state_emissions_%%')",
"lp_slug": "state-emissions",
"source_env": "CAMPAIGN_EMISSIONS_ID",
"limit": 1000,
},
"hazmat": {
"label": "Hazmat PHMSA Registration",
"flag_sql": "'hazmat_carrier' = ANY(deficiency_flags)",
"lp_slug": "hazmat-phmsa",
"source_env": "CAMPAIGN_HAZMAT_ID",
"limit": 500,
},
}
# Per-state weight-tax LP overrides (CA emissions goes to ca-mcp-carb).
_WEIGHT_TAX_LP = {
"OR": "or-weight-mile-tax", "NY": "ny-hut-registration",
"KY": "ky-kyu-registration", "NM": "nm-weight-distance",
"CT": "ct-highway-use-fee",
}
def build_lp_link(campaign_type: str, phy_state: str | None) -> str:
"""Return the order landing-page URL for a (segment, state)."""
seg = DEFICIENCY_SEGMENTS.get(campaign_type)
slug = seg["lp_slug"] if seg else "dot-full-compliance"
if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP:
slug = _WEIGHT_TAX_LP[phy_state]
if campaign_type == "state_emissions" and phy_state == "CA":
slug = "ca-mcp-carb"
return f"{SITE_DOMAIN}/order/{slug}"
def lp_slug_for(campaign_type: str, phy_state: str | None = None) -> str:
"""The order-page slug (== the discountable service slug) for a segment."""
seg = DEFICIENCY_SEGMENTS.get(campaign_type)
slug = seg["lp_slug"] if seg else "dot-full-compliance"
if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP:
slug = _WEIGHT_TAX_LP[phy_state]
if campaign_type == "state_emissions" and phy_state == "CA":
slug = "ca-mcp-carb"
return slug
# ── Daily same-day coupon ───────────────────────────────────────────────────
# Every send day gets ONE random 5-letter coupon at COUPON_PCT off, valid only
# through 23:59:59 of the send date (America/New_York). The code is written to
# the app's `discount_codes` table; the existing /api/v1/discount validator and
# checkout enforce expiry + the service-fee-only scope (pass-through government
# fees are never discounted). The code + prices are merged into the email so the
# recipient sees a real, expiring deal.
COUPON_PCT = int(os.getenv("CAMPAIGN_COUPON_PCT", "40"))
# Eligible slugs = every discountable service a trucking campaign can link to.
# Pass-through-only slugs (boc3-filing $25 passthrough, etc.) are intentionally
# eligible too: the discount math only touches the service-fee portion, so a
# code scoped to them simply yields $0 off the passthrough and full off the fee.
COUPON_SLUGS = (
"mcs150-update,usdot-reactivation,dot-drug-alcohol,dot-full-compliance,"
"ucr-registration,state-trucking-bundle,intrastate-authority,irp-registration,"
"ifta-application,hazmat-phmsa,state-emissions,state-weight-tax,trucking-wrap-up,"
"boc3-filing"
)
_ET = timezone(timedelta(hours=-5)) # EST anchor; close enough for an end-of-day cutoff
_COUPON_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ" # no I/O to avoid confusion
def _random_coupon_code() -> str:
import secrets
return "".join(secrets.choice(_COUPON_ALPHABET) for _ in range(5))
def get_or_create_daily_coupon(conn, send_date: date) -> str:
"""Return the 5-letter coupon code for `send_date`, creating it if needed.
Idempotent: a marker in `description` (campaign-daily:<date>) lets a re-run
on the same day reuse the existing code instead of minting a duplicate.
"""
marker = f"campaign-daily:{send_date.isoformat()}"
cur = conn.cursor()
cur.execute("SELECT code FROM discount_codes WHERE description = %s LIMIT 1", (marker,))
row = cur.fetchone()
if row:
return row[0]
# 23:59:59 ET of the send date
expires = datetime.combine(send_date, datetime.min.time(), tzinfo=_ET) + timedelta(
hours=23, minutes=59, seconds=59
)
starts = datetime.combine(send_date, datetime.min.time(), tzinfo=_ET)
# Retry on the (rare) code collision against the UNIQUE constraint.
for _ in range(25):
code = _random_coupon_code()
try:
cur.execute(
"""
INSERT INTO discount_codes
(code, description, discount_type, discount_value, applies_to,
max_uses_per_email, active, starts_at, expires_at)
VALUES (%s, %s, 'percent', %s, %s, 1, TRUE, %s, %s)
ON CONFLICT (code) DO NOTHING
RETURNING code
""",
(code, marker, COUPON_PCT, COUPON_SLUGS, starts, expires),
)
r = cur.fetchone()
if r:
conn.commit()
LOG.info("[coupon] daily code %s (%d%% off, expires %s ET)",
code, COUPON_PCT, expires.isoformat())
return r[0]
except Exception:
conn.rollback()
raise RuntimeError("could not mint a unique daily coupon code")
def coupon_attribs(coupon_code: str | None) -> dict:
"""Merge fields for the same-day deal, blank when no coupon is active."""
if not coupon_code:
return {"coupon_code": "", "coupon_pct": "", "coupon_expires": ""}
return {
"coupon_code": coupon_code,
"coupon_pct": str(COUPON_PCT),
# Human-readable cutoff for the email body.
"coupon_expires": "11:59 PM ET tonight",
}
def lp_link_with_coupon(campaign_type: str, phy_state: str | None,
coupon_code: str | None) -> str:
"""build_lp_link + a ?code= query param so the LP pre-applies the deal."""
url = build_lp_link(campaign_type, phy_state)
if coupon_code:
sep = "&" if "?" in url else "?"
url = f"{url}{sep}code={coupon_code}"
return url
# ── TZ config: tz_key -> (states, send_hour_utc) ─────────────────────────────
# 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local
TIMEZONE_CONFIG = {
"ET": {
"states": ("CT","DC","DE","FL","GA","IN","KY","MA","MD","ME","MI",
"NH","NJ","NY","NC","OH","PA","RI","SC","TN","VA","VT","WV"),
"send_hour_utc": 9, # 4 AM EST
},
"CT": {
"states": ("AL","AR","IA","IL","KS","LA","MN","MO","MS","ND",
"NE","OK","SD","TX","WI"),
"send_hour_utc": 10, # 5 AM EST = 4 AM CST
},
"MT": {
"states": ("AZ","CO","ID","MT","NM","UT","WY"),
"send_hour_utc": 11, # 6 AM EST = 4 AM MST
},
"PT": {
"states": ("AK","CA","HI","NV","OR","WA"),
"send_hour_utc": 12, # 7 AM EST = 4 AM PST
},
}
# Owner email — test sends go here before each campaign is scheduled
TEST_EMAIL = os.getenv("CAMPAIGN_TEST_EMAIL", "carrierone@gmx.com")
REPLY_TO_EMAIL = os.getenv("CAMPAIGN_REPLY_TO", "info@performancewest.net")
REPLY_TO_HEADERS = [{"name": "Reply-To", "value": REPLY_TO_EMAIL}]
USABLE_FILTER = (
"(email_verified IS TRUE OR email_verify_result IN "
"('smtp_valid','catch_all_domain','catch_all_detected'))"
)
DB_URL = os.getenv("DATABASE_URL", "")
WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start")
def warmup_day() -> int | None:
"""Return days since MTA warmup start, or None if not configured/readable."""
try:
with open(WARMUP_START_FILE, "r", encoding="utf-8") as fh:
start = int(fh.read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return None
def warmup_daily_queue_cap(day: int | None = None) -> int | None:
"""Daily campaign-builder queue cap aligned with Listmonk rampcap targets.
This prevents the builder from scheduling 10k-20k recipients/day while
Listmonk is throttled to only 500-3,000/day, which would create a massive
backlog that later drains unpredictably. The numbers match the runbook's
intended daily warmup totals, not the theoretical 24-hour sliding-window max.
"""
if day is None:
day = warmup_day()
if day is None:
return None
if day <= 1:
return 500
if day <= 3:
return 1500
if day <= 6:
return 2500
return 3000
def lm_api(path: str, data: dict | None = None, method: str | None = None):
headers = {"Content-Type": "application/json", "Authorization": f"Basic {_LM_AUTH}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if method:
req.method = method
try:
return json.loads(urllib.request.urlopen(req, timeout=30).read())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}")
_LM_SUPPRESSION_CACHE: dict[str, tuple[bool, str]] = {}
def _lm_email_query(email: str) -> str:
"""Return a Listmonk subscriber query for an exact email address."""
# Listmonk's query language uses single-quoted strings. Email addresses should
# not contain quotes, but escape defensively so a bad source row cannot break
# the query or broaden it.
safe = email.lower().strip().replace("'", "''")
return f"email = '{safe}'"
def listmonk_sendable(email: str) -> tuple[bool, str]:
"""Return whether Listmonk will send to this address, with reason.
SQL eligibility is not enough: an address may already exist in Listmonk as
disabled, blocklisted, bounced, or unsubscribed. If we add those rows to a
fresh campaign list, Listmonk filters them at send time, producing a smaller
`to_send` count than the builder's daily warmup allocation. Check status at
build time and fetch replacements so each campaign list is filled with
actually-sendable subscribers.
"""
key = email.lower().strip()
if not key:
return False, "blank_email"
if key in _LM_SUPPRESSION_CACHE:
return _LM_SUPPRESSION_CACHE[key]
try:
res = lm_api(
"/subscribers?" + urllib.parse.urlencode({
"query": _lm_email_query(key),
"per_page": 1,
})
)
results = res.get("data", {}).get("results", [])
if not results:
out = (True, "new")
else:
sub = results[0]
status = (sub.get("status") or "").lower()
if status != "enabled":
out = (False, f"listmonk_status:{status or 'unknown'}")
else:
bad_list_status = ""
for lst in sub.get("lists") or []:
lstatus = (lst.get("subscription_status") or "").lower()
if lstatus in {"unsubscribed", "unconfirmed"}:
bad_list_status = lstatus
break
if bad_list_status:
out = (False, f"list_subscription_status:{bad_list_status}")
else:
out = (True, "enabled")
except Exception as exc: # noqa: BLE001
# Do not silently underfill campaigns because Listmonk's lookup endpoint
# hiccupped. Let the caller/logs surface the problem; treating it as not
# sendable is safer than scheduling a suppressed address and marking it
# sent in our source DB.
out = (False, f"lookup_error:{exc}")
_LM_SUPPRESSION_CACHE[key] = out
return out
def get_base_campaign(campaign_id: int) -> dict:
return lm_api(f"/campaigns/{campaign_id}")["data"]
def create_list(name: str) -> int:
result = lm_api("/lists", {
"name": name,
"type": "private",
"optin": "single",
"tags": ["trucking", "auto"],
}, "POST")
return result["data"]["id"]
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
"""Create/attach a single subscriber to a list. Returns True on success.
Listmonk's POST /api/subscribers takes one subscriber with `lists` (array of
list IDs) and preconfirm_subscriptions=true so they're immediately sendable.
"""
try:
lm_api("/subscribers", {
"email": email,
"name": name or email,
"status": "enabled",
"lists": [list_id],
"attribs": attribs,
"preconfirm_subscriptions": True,
}, "POST")
return True
except Exception as exc:
msg = str(exc)
# Already exists — attach to this list instead of failing
if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg:
try:
subs = lm_api(
"/subscribers?"
+ urllib.parse.urlencode({
"query": f"email = '{email.lower().strip()}'",
"per_page": 1,
})
)
results = subs.get("data", {}).get("results", [])
if results:
status = (results[0].get("status") or "").lower()
if status != "enabled":
LOG.info("Listmonk suppressed existing subscriber %s status=%s", email, status or "unknown")
return False
lm_api("/subscribers/lists", {
"ids": [results[0]["id"]],
"action": "add",
"target_list_ids": [list_id],
"status": "confirmed",
}, "PUT")
return True
except Exception:
return False
return False
def import_subscribers(list_id: int, subscribers: list[dict]) -> int:
"""Add subscribers to a list one-by-one. Returns count successfully added."""
added = 0
for sub in subscribers:
if add_subscriber(list_id, sub["email"], sub.get("name", ""), sub.get("attribs", {})):
added += 1
return added
def create_and_schedule_campaign(
base: dict,
list_id: int,
name: str,
send_at_utc: datetime,
schedule: bool = True,
) -> int:
payload = {
"name": name,
"subject": base["subject"],
"lists": [list_id],
"from_email": base["from_email"],
"type": "regular",
"content_type": base["content_type"],
"body": base["body"],
"altbody": base.get("altbody"),
"template_id": base["template_id"],
"tags": base.get("tags") or [],
"messenger": base.get("messenger") or "email",
"headers": base.get("headers") or REPLY_TO_HEADERS,
}
if schedule:
payload["send_at"] = send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00")
result = lm_api("/campaigns", payload, "POST")
cid = result["data"]["id"]
if schedule:
lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT")
# else: leave as draft (preview mode)
return cid
def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: str,
campaign_type: str) -> None:
"""Send one test email so the owner can approve before the real blast goes out."""
# sample_row is (dot, email, name, phy_state, target_state). Older callers may
# still pass a 4-tuple (dot, email, name, state); handle both.
dot, email, name, phy_state = sample_row[0], sample_row[1], sample_row[2], sample_row[3]
target_state = sample_row[4] if len(sample_row) > 4 else phy_state
state = phy_state
body = base["body"]
body = body.replace("{{ .Subscriber.Attribs.company }}", name or "Sample Carrier LLC")
body = body.replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
body = body.replace("{{ .Subscriber.Attribs.state }}", state or "TX")
# Real subscribers get a populated lp_link attrib; the test send must mirror
# that or the CTA button (e.g. "Check My Emissions Status") renders as a bare
# "?dot=..." that links to nowhere. Build the same link the audience gets,
# using the target_state (the state the offer applies to, which for per-state
# programs comes from the deficiency flag, not the base state).
body = body.replace("{{ .Subscriber.Attribs.lp_link }}",
build_lp_link(campaign_type, target_state))
# NOTE: leave {{ UnsubscribeURL }} alone — Listmonk renders it into a real,
# working per-subscriber unsubscribe link (even on test sends). Overwriting it
# produced a dead /unsubscribe link with no subscriber identity.
subj = base["subject"].replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
# Listmonk /test needs `lists` as an array of IDs (base["lists"] is objects)
list_ids = [l["id"] for l in base.get("lists", []) if isinstance(l, dict)] or [1]
payload = {
"name": base.get("name", "Test"), "subject": subj,
"lists": list_ids, "from_email": base["from_email"],
"type": "regular", "content_type": base["content_type"],
"body": body, "altbody": base.get("altbody"),
"template_id": base["template_id"],
"tags": base.get("tags") or [], "messenger": base.get("messenger") or "email",
"headers": base.get("headers") or REPLY_TO_HEADERS,
"subscribers": [TEST_EMAIL],
}
try:
lm_api(f"/campaigns/{campaign_id}/test", payload, "POST")
LOG.info("[%s/%s] Test sent to %s (sample: %s DOT#%s)", tz, label, TEST_EMAIL, name, dot)
except Exception as exc:
LOG.warning("[%s/%s] Test send failed: %s", tz, label, exc)
def campaign_type_filter(campaign_type: str) -> str:
if campaign_type == "mcs150":
return "mcs150_parsed < NOW() - INTERVAL '2 years'"
if campaign_type == "inactive":
return "oos_active IS TRUE"
if campaign_type in DEFICIENCY_SEGMENTS:
return DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"]
raise ValueError(f"unknown campaign_type {campaign_type!r}")
def campaign_weight(campaign_type: str) -> float:
"""Warmup allocation weight by campaign specificity.
Highly specific deficiency copy should generally get more early warmup share
than generic overdue/inactive notices because it is more relevant and less
repetitive at the mailbox-provider/content-fingerprint level.
"""
if campaign_type == "mcs150":
return 0.75
if campaign_type == "inactive":
return 0.65
return 1.25
def count_carriers(conn, tz_states: tuple, campaign_type: str, limit: int | None = None) -> int:
cur = conn.cursor()
states_placeholder = ",".join(["%s"] * len(tz_states))
type_filter = campaign_type_filter(campaign_type)
cur.execute(f"""
SELECT count(*)
FROM (
SELECT 1
FROM fmcsa_carriers
WHERE {type_filter}
AND {USABLE_FILTER}
AND listmonk_sent_at IS NULL
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND phy_state IN ({states_placeholder})
LIMIT %s
) s
""", [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit or 10_000_000])
return int(cur.fetchone()[0])
def allocate_daily_budget(candidates: list[dict], daily_cap: int | None) -> dict[tuple[str, str], int]:
"""Allocate daily cap across (timezone, campaign_type) by weighted audience.
The allocation is proportional to each slot's eligible lead count and campaign
specificity weight, capped by the slot's audience/limit. This avoids sending
the same number from a tiny hazmat segment and a massive MCS-150 segment while
still preserving body-text variety during warmup.
"""
positive = [c for c in candidates if c["audience"] > 0]
if daily_cap is None:
return {(c["tz"], c["campaign_type"]): c["audience"] for c in candidates}
remaining_cap = min(daily_cap, sum(c["audience"] for c in positive))
quotas = {(c["tz"], c["campaign_type"]): 0 for c in candidates}
remaining = { (c["tz"], c["campaign_type"]): c["audience"] for c in positive }
weighted = { (c["tz"], c["campaign_type"]): c["audience"] * c["weight"] for c in positive }
# Repeated largest-remainder passes handle slots that hit their audience cap.
while remaining_cap > 0 and remaining:
total_weight = sum(weighted[k] for k in remaining)
if total_weight <= 0:
break
raw = {k: remaining_cap * (weighted[k] / total_weight) for k in remaining}
assigned_this_pass = 0
remainders: list[tuple[float, tuple[str, str]]] = []
for k, val in raw.items():
add = min(remaining[k], int(math.floor(val)))
quotas[k] += add
remaining[k] -= add
assigned_this_pass += add
if remaining[k] > 0:
remainders.append((val - math.floor(val), k))
leftover = remaining_cap - assigned_this_pass
for _, k in sorted(remainders, reverse=True):
if leftover <= 0:
break
if remaining.get(k, 0) <= 0:
continue
quotas[k] += 1
remaining[k] -= 1
assigned_this_pass += 1
leftover -= 1
remaining_cap -= assigned_this_pass
remaining = {k: v for k, v in remaining.items() if v > 0}
if assigned_this_pass == 0:
break
return quotas
def fetch_carriers(
conn,
tz_states: tuple,
campaign_type: str,
limit: int,
offset: int = 0,
) -> list[tuple]:
"""Return (dot_number, email_address, legal_name, phy_state, target_state) rows.
target_state is the state the offer applies to. For most segments that is
the carrier's base (phy_state). For per-state programs (state_weight_tax,
state_emissions) the relevant state is encoded in the deficiency flag suffix
(e.g. 'state_weight_tax_OR'), which can differ from the base state, so we
pull it out of the flag so the CTA links to the correct state's order page.
"""
cur = conn.cursor()
states_placeholder = ",".join(["%s"] * len(tz_states))
type_filter = campaign_type_filter(campaign_type)
# target_state expression: pull the state suffix out of the matching flag
# for per-state programs, otherwise fall back to the base (phy_state).
if campaign_type in ("state_weight_tax", "state_emissions"):
prefix = f"{campaign_type}_"
target_state_sql = (
"COALESCE((SELECT upper(substr(f, %s)) "
"FROM unnest(deficiency_flags) f "
f"WHERE f LIKE '{campaign_type}_%%' LIMIT 1), phy_state)"
)
target_state_params = [len(prefix) + 1]
else:
target_state_sql = "phy_state"
target_state_params = []
cur.execute(f"""
SELECT dot_number, email_address, legal_name, phy_state,
{target_state_sql} AS target_state
FROM fmcsa_carriers
WHERE {type_filter}
AND {USABLE_FILTER}
AND listmonk_sent_at IS NULL
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND phy_state IN ({states_placeholder})
ORDER BY mcs150_parsed ASC NULLS LAST
LIMIT %s OFFSET %s
""", target_state_params + [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit, offset])
return cur.fetchall()
def select_sendable_carriers(
conn,
tz_states: tuple,
campaign_type: str,
quota: int,
max_scan: int | None = None,
) -> tuple[list[tuple], dict[str, int]]:
"""Fetch rows until `quota` Listmonk-sendable carriers are found.
Returns (rows, suppression_counts). Rows that are disabled/blocklisted/etc.
in Listmonk are skipped and not marked sent in Postgres.
"""
selected: list[tuple] = []
skipped: dict[str, int] = {}
seen_emails: set[str] = set()
offset = 0
# Warmup caps are small, but old audiences can contain many prior bounces or
# unsubscribes. Scan beyond the quota while still bounding worst-case API calls.
scan_limit = max_scan or max(quota * 8, quota + 500, 1000)
batch_size = min(max(quota * 2, 100), 1000)
while len(selected) < quota and offset < scan_limit:
batch = fetch_carriers(conn, tz_states, campaign_type, batch_size, offset=offset)
if not batch:
break
offset += len(batch)
for row in batch:
email = (row[1] or "").lower().strip()
if email in seen_emails:
skipped["duplicate_email"] = skipped.get("duplicate_email", 0) + 1
continue
seen_emails.add(email)
ok, reason = listmonk_sendable(email)
if not ok:
skipped[reason] = skipped.get(reason, 0) + 1
continue
selected.append(row)
if len(selected) >= quota:
break
if len(batch) < batch_size:
break
return selected, skipped
def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None:
cur = conn.cursor()
cur.execute("""
UPDATE fmcsa_carriers
SET listmonk_sent_at = NOW(),
listmonk_campaign_type = %s
WHERE dot_number = ANY(%s::text[])
""", (campaign_type, dot_numbers))
conn.commit()
def list_segments(send_date: date) -> None:
"""Report deduped audience size per deficiency segment (no writes).
Verifiable Phase-5 metric: each segment selects a nonzero, deduped audience
and points at a valid order LP. Source-template configuration is shown so we
know which segments are ready to schedule.
"""
conn = psycopg2.connect(DB_URL)
print(f"\nDeficiency-flag segments (send date {send_date.isoformat()}):\n")
print(f"{'segment':<22}{'audience':>10} {'source':<10} landing page")
print("-" * 78)
grand_total = 0
for ctype, seg in DEFICIENCY_SEGMENTS.items():
total = 0
for tz_cfg in TIMEZONE_CONFIG.values():
rows = fetch_carriers(conn, tz_cfg["states"], ctype, seg["limit"])
total += len(rows)
grand_total += total
src = os.getenv(seg["source_env"])
src_str = src if src else "UNSET"
lp = build_lp_link(ctype, None)
print(f"{ctype:<22}{total:>10} {src_str:<10} {lp}")
print("-" * 78)
print(f"{'TOTAL':<22}{grand_total:>10}")
print("\n(UNSET source = template not yet created; segment is skipped by the "
"scheduled run until its CAMPAIGN_*_ID env is set.)\n")
conn.close()
def run(send_date: date, dry_run: bool = False, preview: bool = False,
max_per_segment: int | None = None, only_segments: set[str] | None = None,
send_hour_override: int | None = None, send_minute: int = 0,
stagger_minutes: int = 0, daily_cap: int | None = None,
warmup_cap: bool = True) -> None:
conn = psycopg2.connect(DB_URL)
# Mint (or reuse) the same-day coupon for this send date so every campaign
# in the run shares one expiring code. Preview/dry runs skip the write.
daily_coupon = None
if not dry_run and not preview:
try:
daily_coupon = get_or_create_daily_coupon(conn, send_date)
except Exception as exc: # noqa: BLE001
LOG.warning("[coupon] could not mint daily coupon: %s (sending without)", exc)
base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID)
base_inactive = get_base_campaign(CAMPAIGN_INACTIVE_ID)
campaign_specs = [
("mcs150", base_mcs150, 2000, "MCS-150 Overdue"),
("inactive", base_inactive, 1000, "Inactive USDOT"),
]
# Add deficiency-flag segments whose Listmonk source template is configured.
for ctype, seg in DEFICIENCY_SEGMENTS.items():
src = os.getenv(seg["source_env"])
if not src:
LOG.info("[segment] %s skipped — %s not set (template not created)",
ctype, seg["source_env"])
continue
try:
base = get_base_campaign(int(src))
except Exception as exc: # noqa: BLE001
LOG.warning("[segment] %s skipped — source %s unusable: %s",
ctype, src, exc)
continue
campaign_specs.append((ctype, base, seg["limit"], seg["label"]))
# Optional warmup controls: restrict to specific segments and/or cap the
# per-segment audience so a fresh-IP day-0 send stays small.
if only_segments:
campaign_specs = [s for s in campaign_specs if s[0] in only_segments]
if not campaign_specs:
LOG.error("No campaign specs match --only-segment %s", sorted(only_segments))
conn.close()
return
if max_per_segment is not None:
campaign_specs = [
(ct, base, min(lim, max_per_segment), label)
for (ct, base, lim, label) in campaign_specs
]
if daily_cap is None and warmup_cap and not preview:
daily_cap = warmup_daily_queue_cap()
if daily_cap is not None:
LOG.info("Daily queue cap active: %d recipients total", daily_cap)
if preview:
LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, "
"test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL)
candidates: list[dict] = []
totals_by_campaign: dict[str, int] = {}
for tz, tz_cfg in TIMEZONE_CONFIG.items():
for campaign_type, _base, limit, label in campaign_specs:
audience = 1 if preview else count_carriers(conn, tz_cfg["states"], campaign_type, limit)
weight = campaign_weight(campaign_type)
candidates.append({
"tz": tz,
"states": tz_cfg["states"],
"campaign_type": campaign_type,
"limit": limit,
"label": label,
"audience": audience,
"weight": weight,
})
totals_by_campaign[campaign_type] = totals_by_campaign.get(campaign_type, 0) + audience
quotas = allocate_daily_budget(candidates, None if preview else daily_cap)
LOG.info("Eligible audience by SQL criterion: %s", ", ".join(
f"{ct}={n}" for ct, n in sorted(totals_by_campaign.items())
))
if daily_cap is not None and not preview:
planned_by_campaign: dict[str, int] = {}
for (tz, ctype), quota in quotas.items():
planned_by_campaign[ctype] = planned_by_campaign.get(ctype, 0) + quota
LOG.info("Planned allocation by criterion: %s (total=%d/%d)", ", ".join(
f"{ct}={n}" for ct, n in sorted(planned_by_campaign.items())
), sum(planned_by_campaign.values()), daily_cap)
scheduled_count = 0
queued_recipients = 0
for tz, tz_cfg in TIMEZONE_CONFIG.items():
states = tz_cfg["states"]
send_hour = send_hour_override if send_hour_override is not None else tz_cfg["send_hour_utc"]
send_at = datetime(
send_date.year, send_date.month, send_date.day,
send_hour, send_minute, 0, tzinfo=timezone.utc
)
for campaign_type, base, limit, label in campaign_specs:
if daily_cap is not None and queued_recipients >= daily_cap:
LOG.info("[%s/%s] Daily cap reached (%d/%d) — skipping remaining slots",
tz, campaign_type, queued_recipients, daily_cap)
continue
effective_send_at = send_at + timedelta(minutes=stagger_minutes * scheduled_count)
fetch_limit = 1 if preview else limit
if daily_cap is not None and not preview:
fetch_limit = min(fetch_limit, quotas.get((tz, campaign_type), 0))
if fetch_limit <= 0:
LOG.info("[%s/%s] Allocation is 0 — skipping", tz, campaign_type)
continue
if preview:
rows = fetch_carriers(conn, states, campaign_type, fetch_limit)
suppressed: dict[str, int] = {}
else:
rows, suppressed = select_sendable_carriers(conn, states, campaign_type, fetch_limit)
if suppressed:
LOG.info("[%s/%s] Runtime Listmonk suppression skipped %d rows: %s",
tz, campaign_type, sum(suppressed.values()), ", ".join(
f"{reason}={n}" for reason, n in sorted(suppressed.items())
))
if not rows:
LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type)
continue
actual = len(rows)
tag = "PREVIEW " if preview else ""
list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}"
campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}"
LOG.info("[%s/%s] %d sendable carriers -> %s (send %s UTC)",
tz, campaign_type, actual, campaign_name,
effective_send_at.strftime("%Y-%m-%d %H:%M"))
if dry_run:
LOG.info(" DRY RUN — skipping Listmonk + DB writes")
scheduled_count += 1
queued_recipients += actual
continue
# Build subscriber list. In preview, only the owner's address (with the
# sample carrier's attribs) so the real audience is never touched.
if preview:
r0 = rows[0]
subscribers = [{
"email": TEST_EMAIL,
"name": r0[2] or "Sample Carrier",
"attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or "",
"lp_link": lp_link_with_coupon(campaign_type, r0[4], daily_coupon),
**coupon_attribs(daily_coupon)},
}]
else:
subscribers = [
{
"email": row[1],
"name": row[2] or row[1],
"attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or "",
"lp_link": lp_link_with_coupon(campaign_type, row[4], daily_coupon),
**coupon_attribs(daily_coupon)},
}
for row in rows
]
# Create list + add subscribers
list_id = create_list(list_name)
added = import_subscribers(list_id, subscribers)
LOG.info("[%s/%s] List %d: %d/%d subscribers added", tz, campaign_type, list_id, added, len(subscribers))
if added == 0:
LOG.error("[%s/%s] BUG: 0 subscribers added to list %d — skipping", tz, campaign_type, list_id)
continue
# Create campaign (draft in preview, scheduled in real run)
cid = create_and_schedule_campaign(base, list_id, campaign_name, effective_send_at, schedule=not preview)
LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid,
"draft/preview" if preview else f"scheduled {effective_send_at.isoformat()}")
scheduled_count += 1
queued_recipients += added
# Send a test to the owner so they can spot-check the rendered email
send_test(base, cid, rows[0], label, tz, campaign_type)
# Mark carriers as sent only on a real run
if not preview:
mark_sent(conn, [row[0] for row in rows], campaign_type)
conn.close()
LOG.info("Done. queued_recipients=%d%s", queued_recipients,
f" daily_cap={daily_cap}" if daily_cap is not None else "")
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk")
parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes")
parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent")
parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)")
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: today)")
parser.add_argument("--tomorrow", action="store_true",
help="Schedule for tomorrow instead of today. Use only for manual pre-builds.")
parser.add_argument("--max-per-segment", type=int, default=None,
help="Cap audience per (segment x timezone). Use for fresh-IP warmup small sends.")
parser.add_argument("--only-segment", action="append", default=None, metavar="SEG",
help="Restrict to specific segment(s), e.g. mcs150, inactive, for_hire_boc3. Repeatable.")
parser.add_argument("--send-hour", type=int, default=None,
help="Override send hour (UTC) for ALL timezones instead of per-tz defaults.")
parser.add_argument("--send-minute", type=int, default=0,
help="Send minute (UTC), used with --send-hour. Default 0.")
parser.add_argument("--stagger-minutes", type=int, default=0,
help="Add this many minutes between each created campaign. Useful for catchup sends.")
parser.add_argument("--daily-cap", type=int, default=None,
help="Hard cap on recipients queued by this run. Default: warmup-derived cap when available.")
parser.add_argument("--no-warmup-cap", action="store_true",
help="Disable automatic daily queue cap from /etc/postfix/pw-warmup-start.")
args = parser.parse_args()
if args.date and args.tomorrow:
parser.error("--date and --tomorrow are mutually exclusive")
if args.date:
send_date = date.fromisoformat(args.date)
elif args.tomorrow:
send_date = date.today() + timedelta(days=1)
else:
send_date = date.today()
if args.list_segments:
list_segments(send_date)
return
only = set(args.only_segment) if args.only_segment else None
LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s, "
"max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s, daily_cap=%s, warmup_cap=%s)",
send_date, args.dry_run, args.preview, args.max_per_segment,
sorted(only) if only else None, args.send_hour, args.stagger_minutes,
args.daily_cap, not args.no_warmup_cap)
run(send_date, dry_run=args.dry_run, preview=args.preview,
max_per_segment=args.max_per_segment, only_segments=only,
send_hour_override=args.send_hour, send_minute=args.send_minute,
stagger_minutes=args.stagger_minutes, daily_cap=args.daily_cap,
warmup_cap=not args.no_warmup_cap)
if __name__ == "__main__":
main()