new-site/scripts/build_trucking_campaigns.py
justin e3f439221a fix(trucking-email): kill recurring @TrackLink 404 at the source-clone boundary
Root cause of the order-CTA 404s recurring after the prior live fix: the
builder clones email bodies from STORED Listmonk source campaigns (ids
186/188/271-274/309/310/469/473), not from the edited source files. Those
stored bodies still carried @TrackLink on the per-subscriber order CTA, so
every nightly build re-registered a single static /order/<slug>&utm... link
(no '?') that 404s for every recipient. This morning's 3,000 real sends AND
the owner spot-check both went out with dead order links.

Two durable guards:
1. get_base_campaign() now strips @TrackLink from any cloned body (with a
   warning), so a stale/re-edited source campaign can never reach recipients
   broken again. Human clicks are already attributed via Umami.
2. The owner test-send now builds the CTA via lp_link_with_coupon(dot=...)
   (leading '?') instead of build_lp_link() (bare path).

Also fixed live: stripped @TrackLink from the 10 stored source campaign
bodies; rewrote the 12 already-registered broken links. Backups in listmonk:
pw_source_tracklink_bak_20260623 + pw_links_tracklink_bak_20260623.
2026-06-23 15:02:05 -05:00

1584 lines
72 KiB
Python

#!/usr/bin/env python3
"""Daily trucking campaign builder.
Runs nightly (3 AM EST) to create the next day's 8 Listmonk campaigns:
- 4 TZ regions x MCS-150 overdue (2,000 carriers each, 4 AM / 5 AM / 6 AM / 7 AM EST)
- 4 TZ regions x Inactive USDOT (1,000 carriers each, same schedule)
Selection criteria:
- email verified or catch-all (usable)
- mcs150_parsed > 2 years ago (for MCS-150 campaign)
- oos_active IS TRUE (for inactive USDOT campaign)
- listmonk_sent_at IS NULL (not yet sent)
- ordered by mcs150_parsed ASC (most overdue first)
Usage:
python3 scripts/build_trucking_campaigns.py
python3 scripts/build_trucking_campaigns.py --dry-run
python3 scripts/build_trucking_campaigns.py --date 2026-06-02
"""
from __future__ import annotations
import argparse
import base64
import hashlib
import json
import logging
import os
import sys
import urllib.request
import urllib.parse
import math
import time
from datetime import date, datetime, timedelta, timezone
import psycopg2
# Allow both supported invocation styles:
# python -m scripts.build_trucking_campaigns
# python3 scripts/build_trucking_campaigns.py
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
from scripts._email_exclusions import BLOCKED_EMAIL_DOMAINS
from scripts._email_plaintext import html_to_text
LOG = logging.getLogger("build_trucking_campaigns")
# ── Listmonk ──────────────────────────────────────────────────────────────────
LISTMONK_URL = os.getenv("LISTMONK_URL", "http://listmonk:9000")
LISTMONK_USER = os.getenv("LISTMONK_USER", "api")
LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
_LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
# ── Source campaign IDs ────────────────────────────────────────────────────────
CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk"
CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over"
# Public site for landing-page links injected into Listmonk subscriber attribs.
SITE_DOMAIN = os.getenv("SITE_DOMAIN", "https://performancewest.net")
# ── Deficiency-flag segments (Phase 5) ──────────────────────────────────────
# Each segment targets carriers by a `deficiency_flags` value (the TEXT[] column
# written by fmcsa_deficiency_flagger.py) and links to its order landing page.
#
# `source_id` is the Listmonk template campaign to clone; set via env once the
# template exists (see CAMPAIGN_SOURCE_ENV). Segments without a configured
# source_id are reported by --list-segments but skipped by the scheduled run so
# the nightly job never breaks on an unconfigured template.
#
# `flag_sql` is a predicate over the deficiency_flags array.
DEFICIENCY_SEGMENTS = {
"for_hire_boc3": {
"label": "For-Hire BOC-3 + UCR",
"flag_sql": "'for_hire_carrier' = ANY(deficiency_flags)",
"lp_slug": "boc3-filing",
"source_env": "CAMPAIGN_FOR_HIRE_ID",
"limit": 1500,
},
"irp_ifta": {
"label": "IRP / IFTA Registration",
"flag_sql": "'interstate_needs_irp_ifta' = ANY(deficiency_flags)",
"lp_slug": "state-trucking-bundle",
"source_env": "CAMPAIGN_IRP_IFTA_ID",
"limit": 1500,
},
"intrastate_authority": {
"label": "Intrastate Operating Authority",
# any intrastate_authority_<state> flag
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'intrastate_authority_%%')",
"lp_slug": "intrastate-authority",
"source_env": "CAMPAIGN_INTRASTATE_ID",
"limit": 1000,
},
"state_weight_tax": {
"label": "State Weight-Distance Tax",
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'state_weight_tax_%%')",
# per-state LP resolved per-row in build_lp_link()
"lp_slug": "state-trucking-bundle",
"source_env": "CAMPAIGN_WEIGHT_TAX_ID",
"limit": 1000,
},
"state_emissions": {
"label": "State Clean-Truck / Emissions",
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
"WHERE f LIKE 'state_emissions_%%')",
"lp_slug": "state-emissions",
"source_env": "CAMPAIGN_EMISSIONS_ID",
"limit": 1000,
},
"hazmat": {
"label": "Hazmat PHMSA Registration",
"flag_sql": "'hazmat_carrier' = ANY(deficiency_flags)",
"lp_slug": "hazmat-phmsa",
"source_env": "CAMPAIGN_HAZMAT_ID",
"limit": 500,
},
}
# Per-state weight-tax LP overrides (CA emissions goes to ca-mcp-carb).
_WEIGHT_TAX_LP = {
"OR": "or-weight-mile-tax", "NY": "ny-hut-registration",
"KY": "ky-kyu-registration", "NM": "nm-weight-distance",
"CT": "ct-highway-use-fee",
}
# ── Authoritative service prices (single source of truth) ───────────────────
# The discounted prices shown in the coupon email MUST match what checkout
# actually charges, so we read them from the same catalog the API uses
# (api/src/service-catalog.ts). We parse it directly (the prod box has python3
# but not node) and cache per-process. Each entry: {price_cents, discountable}.
# If the catalog can't be read (file missing in an image), the price helpers
# degrade to percent-only copy rather than guessing a number.
_CATALOG_PATH = os.getenv(
"SERVICE_CATALOG_TS", os.path.join(ROOT, "api", "src", "service-catalog.ts")
)
_CATALOG_CACHE: dict | None = None
def _load_service_catalog() -> dict:
"""Parse api/src/service-catalog.ts -> {slug: {price_cents, discountable}}.
discountable defaults to True (the catalog only marks the exceptions with
`discountable: false`), matching the TS object's own semantics.
"""
global _CATALOG_CACHE
if _CATALOG_CACHE is not None:
return _CATALOG_CACHE
catalog: dict = {}
try:
import re as _re
ts = open(_CATALOG_PATH, encoding="utf-8").read()
m = _re.search(r"export const COMPLIANCE_SERVICES[^=]*=\s*\{(.*)\n\};", ts, _re.S)
body = m.group(1) if m else ""
for em in _re.finditer(r'"([a-z0-9\-]+)":\s*\{(.*?)\}', body, _re.S):
slug, inner = em.group(1), em.group(2)
pm = _re.search(r"price_cents:\s*(\d+)", inner)
if not pm:
continue
discountable = _re.search(r"discountable:\s*false", inner) is None
catalog[slug] = {
"price_cents": int(pm.group(1)),
"discountable": discountable,
}
except Exception as exc: # noqa: BLE001
LOG.warning("[coupon] could not load service catalog (%s); coupon copy "
"will be percent-only", exc)
_CATALOG_CACHE = catalog
return catalog
# The service whose price the coupon copy quotes. This is the specific service
# each campaign's body is *about*, which is NOT always the slug the CTA links to
# (e.g. the MCS-150 email talks about the $79 MCS-150 update but the button opens
# the $399 full-compliance bundle). Pricing from the wrong slug would advertise a
# discount the landing page doesn't show, so the price slug is explicit here.
PRICE_SLUG_BY_CAMPAIGN = {
"mcs150": "mcs150-update",
"inactive": "usdot-reactivation",
}
def price_slug_for(campaign_type: str, phy_state: str | None = None) -> str:
"""The catalog slug whose price the coupon copy should quote for a segment.
Main campaigns quote their specific service (MCS-150, reactivation); the
deficiency segments quote the same slug their CTA links to (resolved, incl.
per-state overrides, by lp_slug_for)."""
return PRICE_SLUG_BY_CAMPAIGN.get(campaign_type) or lp_slug_for(campaign_type, phy_state)
def discounted_price_attribs(campaign_type: str, phy_state: str | None,
coupon_pct: str | None) -> dict:
"""Per-recipient price merge fields, computed on the fly to match checkout.
Mirrors the server exactly: percent discount on the SERVICE fee only
(discount_cents = round(fee * pct / 100)); non-discountable services (e.g.
boc3-filing, a $25 passthrough) get NO price discount. Returns:
coupon_price_full "$79" (service list price)
coupon_price_deal "$47" (after discount)
coupon_priceable "1"/"" (whether a real discounted number is available)
Blank when no coupon, no catalog entry, or the service isn't discountable, so
the templates fall back to percent-only copy and never print a false number.
"""
blank = {"coupon_price_full": "", "coupon_price_deal": "", "coupon_priceable": ""}
if not coupon_pct:
return blank
try:
pct = int(coupon_pct)
except (TypeError, ValueError):
return blank
slug = price_slug_for(campaign_type, phy_state)
entry = _load_service_catalog().get(slug)
if not entry or not entry.get("discountable") or entry["price_cents"] <= 0:
return blank
full = entry["price_cents"]
discount = round(full * pct / 100) # same formula as the API
deal = full - discount
def _fmt(cents: int) -> str:
return f"${cents // 100}" if cents % 100 == 0 else f"${cents / 100:.2f}"
return {
"coupon_price_full": _fmt(full),
"coupon_price_deal": _fmt(deal),
"coupon_priceable": "1",
}
def build_lp_link(campaign_type: str, phy_state: str | None) -> str:
"""Return the order landing-page URL for a (segment, state)."""
seg = DEFICIENCY_SEGMENTS.get(campaign_type)
slug = seg["lp_slug"] if seg else "dot-full-compliance"
if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP:
slug = _WEIGHT_TAX_LP[phy_state]
if campaign_type == "state_emissions" and phy_state == "CA":
slug = "ca-mcp-carb"
return f"{SITE_DOMAIN}/order/{slug}"
def lp_slug_for(campaign_type: str, phy_state: str | None = None) -> str:
"""The order-page slug (== the discountable service slug) for a segment."""
seg = DEFICIENCY_SEGMENTS.get(campaign_type)
slug = seg["lp_slug"] if seg else "dot-full-compliance"
if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP:
slug = _WEIGHT_TAX_LP[phy_state]
if campaign_type == "state_emissions" and phy_state == "CA":
slug = "ca-mcp-carb"
return slug
# ── Daily same-day coupon ───────────────────────────────────────────────────
# Every send day gets ONE random 5-letter coupon at COUPON_PCT off, valid only
# through 23:59:59 of the send date (America/New_York). The code is written to
# the app's `discount_codes` table; the existing /api/v1/discount validator and
# checkout enforce expiry + the service-fee-only scope (pass-through government
# fees are never discounted). The code + prices are merged into the email so the
# recipient sees a real, expiring deal.
#
# DISCOUNT TOGGLE: the daily coupon is DISABLED by default (Jun 2026). The
# discount sends were not being delivered (DKIM-broken window), so we are
# re-testing whether normal-price offers convert now that deliverability is
# fixed. With the coupon off, no code is minted and an empty coupon_code is
# merged -- the campaign templates' `{{ if .Subscriber.Attribs.coupon_code }}`
# guard automatically falls through to the normal-price `{{ else }}` branch and
# the landing-page links carry no `?code=`. Set CAMPAIGN_ENABLE_COUPON=1 to
# bring the daily deal back. Reversible, no template or DB changes needed.
COUPON_ENABLED = os.getenv("CAMPAIGN_ENABLE_COUPON", "0") in ("1", "true", "yes")
COUPON_PCT = int(os.getenv("CAMPAIGN_COUPON_PCT", "40"))
# A/B/C price test: when set to a comma list of percents (e.g. "20,30,40") each
# carrier is deterministically bucketed by a stable hash of their email into one
# arm, getting that arm's own daily code. Because each code stores its own
# percent in discount_codes, the discount the email advertises always matches the
# discount checkout actually applies, and redemptions are measurable per code
# (description marker campaign-daily:<date>:<pct>). A percent of 0 is a valid
# FULL-PRICE control arm (e.g. "20,30,0"): no code is minted and the carrier sees
# the normal price, but they're still hash-bucketed so the control is measurable.
# Empty/unset = single-arm test at COUPON_PCT (legacy behavior). The split is even
# and stable per carrier, so a given carrier always sees the same arm across
# re-sends (no arm-hopping).
COUPON_AB_PCTS = tuple(
int(p.strip())
for p in os.getenv("CAMPAIGN_COUPON_AB_PCTS", "").split(",")
if p.strip().isdigit()
)
# Eligible slugs = every discountable service a trucking campaign can link to.
# Pass-through-only slugs (boc3-filing $25 passthrough, etc.) are intentionally
# eligible too: the discount math only touches the service-fee portion, so a
# code scoped to them simply yields $0 off the passthrough and full off the fee.
COUPON_SLUGS = (
"mcs150-update,usdot-reactivation,dot-drug-alcohol,dot-full-compliance,"
"ucr-registration,state-trucking-bundle,intrastate-authority,irp-registration,"
"ifta-application,hazmat-phmsa,state-emissions,state-weight-tax,trucking-wrap-up,"
"boc3-filing"
)
_ET = timezone(timedelta(hours=-5)) # EST anchor; close enough for an end-of-day cutoff
_COUPON_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ" # no I/O to avoid confusion
# ── Per-MX-operator throttling (main pool) ──────────────────────────────────
# Sender reputation is tracked by the RECEIVING operator, not the recipient
# domain. The Jun 13-14 Gmail + Outlook block storm came from concentrating the
# warmup on Google/Microsoft-Workspace-hosted business domains. During warmup we
# EXCLUDE those big operators entirely (send to the long tail of small/self-hosted
# mail systems that don't bot-throttle), then cap per-operator once reputation is
# established. mx_provider is populated by mx_tag_carriers.py.
# Set MAIN_SKIP_BIG_MX=0 to stop excluding once truly warmed up.
MAIN_SKIP_BIG_MX = os.getenv("MAIN_SKIP_BIG_MX", "1") not in ("0", "false", "")
# Operators to hold out during warmup (they aggressively throttle/blocklist).
# These are the "clean label" providers mx_tag_carriers.py recognizes by MX host.
BIG_MX_OPERATORS = ("google", "microsoft", "proofpoint", "mimecast",
"barracuda", "cisco", "broadcom")
# Consumer / aggressively-filtering mailbox operators that mx_tag_carriers.py
# labels with the "mx:" prefix (no clean label). They complaint-block and filter
# like the big operators, so hold them out of the warmup pool too. The literal-
# domain blocklist (BLOCKED_EMAIL_DOMAINS) already stops someone@yahoo.com /
# @icloud.com, but a CUSTOM domain whose MX points at Yahoo Small Business / AOL
# (mx:yahoodns.net), Apple iCloud+ Custom Domain (mx:icloud.com), or a legacy
# consumer ISP is invisible to that string layer -- only the MX tag catches it.
# (Live 2026-06-20: mx:yahoodns.net alone = 283k sendable carriers.)
CONSUMER_MX_OPERATORS = (
"mx:yahoodns.net", "mx:icloud.com", "mx:comcast.net", "mx:charter.net",
"mx:centurylink.net", "mx:windstream.net", "mx:tds.net",
"mx:earthlink-vadesecure.net",
)
# Everything held out of the warmup pool entirely until MAIN_BIG_MX_EXCLUDE_UNTIL_DAY,
# then re-introduced gradually via mx_daily_caps().
#
# MAIN_EXCLUDE_OPERATORS (comma-separated mx_provider labels) OVERRIDES this set
# when present. Use it to send to everything EXCEPT a specific operator without
# waiting for the calendar ramp -- e.g. after the Jun 2026 no-DKIM incident we
# re-send to the whole (now-signed) audience but still hold Google's consumer
# inboxes back while their reputation recovers:
# MAIN_EXCLUDE_OPERATORS="google"
# An empty string ("MAIN_EXCLUDE_OPERATORS=") means exclude NOBODY by operator.
_excl_override = os.getenv("MAIN_EXCLUDE_OPERATORS")
if _excl_override is None:
WARMUP_EXCLUDE_OPERATORS = BIG_MX_OPERATORS + CONSUMER_MX_OPERATORS
else:
WARMUP_EXCLUDE_OPERATORS = tuple(
o.strip().lower() for o in _excl_override.split(",") if o.strip()
)
MAIN_WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start")
# How many days to EXCLUDE the big operators entirely. The Jun 13-14 block storm
# means reputation is NOT yet established despite a high calendar day count, so we
# hold Google/Microsoft/etc. out until day 30 to let reputation recover on the
# long-tail operators first, then re-introduce them gradually via mx_daily_caps.
MAIN_BIG_MX_EXCLUDE_UNTIL_DAY = int(os.getenv("MAIN_BIG_MX_EXCLUDE_UNTIL_DAY", "30"))
def main_warmup_day() -> int:
try:
start = int(open(MAIN_WARMUP_START_FILE).read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return 99 # no stamp = treat as warmed up
def mx_daily_caps(day: int) -> dict:
"""Per-operator daily NEW-recipient caps. Big + consumer-MX operators are
EXCLUDED entirely until MAIN_BIG_MX_EXCLUDE_UNTIL_DAY (reputation recovery),
then re-introduced gradually. 'default' is the per-operator cap for the long
tail of small/self-hosted systems that carry the warmup volume."""
if day <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY:
big, default = 0, 120 # big OFF; long-tail operators carry volume
elif day <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY + 7:
big, default = 40, 150 # re-introduce big slowly
elif day <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY + 14:
big, default = 120, 200
else:
big, default = 300, 250
# Both big-label and consumer-mx operators ramp together on the same schedule.
caps = {op: big for op in WARMUP_EXCLUDE_OPERATORS}
caps["__default__"] = default
return caps
def mx_throttled(rows: list[tuple], total_n: int, caps: dict, mx_idx: int) -> list[tuple]:
"""Pick up to total_n rows, capping per mx_provider (rows[mx_idx]) so no
single receiving operator exceeds its daily share. Preserves order."""
per_op: dict = {}
chosen: list[tuple] = []
default_cap = caps.get("__default__", 50)
for r in rows:
if len(chosen) >= total_n:
break
prov = (r[mx_idx] or "").strip().lower() if mx_idx < len(r) else ""
cap = caps.get(prov, default_cap)
used = per_op.get(prov, 0)
if used >= cap:
continue
per_op[prov] = used + 1
chosen.append(r)
return chosen
def _random_coupon_code() -> str:
import secrets
return "".join(secrets.choice(_COUPON_ALPHABET) for _ in range(5))
def get_or_create_daily_coupon(conn, send_date: date, pct: int | None = None) -> str:
"""Return the 5-letter coupon code for `send_date` at `pct`% off, minting it
if needed.
Idempotent: a marker in `description` lets a re-run on the same day reuse the
existing code instead of minting a duplicate. Single-arm runs use the legacy
marker `campaign-daily:<date>`; A/B arms use `campaign-daily:<date>:<pct>` so
each percent gets its own stable, separately-countable code.
"""
pct = COUPON_PCT if pct is None else pct
# Keep the legacy marker for the single-arm (no A/B) case so historical
# idempotency/lookups still work; A/B arms get a percent-suffixed marker.
marker = (
f"campaign-daily:{send_date.isoformat()}"
if not COUPON_AB_PCTS
else f"campaign-daily:{send_date.isoformat()}:{pct}"
)
cur = conn.cursor()
cur.execute("SELECT code FROM discount_codes WHERE description = %s LIMIT 1", (marker,))
row = cur.fetchone()
if row:
return row[0]
# 23:59:59 ET of the send date
expires = datetime.combine(send_date, datetime.min.time(), tzinfo=_ET) + timedelta(
hours=23, minutes=59, seconds=59
)
starts = datetime.combine(send_date, datetime.min.time(), tzinfo=_ET)
# Retry on the (rare) code collision against the UNIQUE constraint.
for _ in range(25):
code = _random_coupon_code()
try:
cur.execute(
"""
INSERT INTO discount_codes
(code, description, discount_type, discount_value, applies_to,
max_uses_per_email, active, starts_at, expires_at)
VALUES (%s, %s, 'percent', %s, %s, 1, TRUE, %s, %s)
ON CONFLICT (code) DO NOTHING
RETURNING code
""",
(code, marker, pct, COUPON_SLUGS, starts, expires),
)
r = cur.fetchone()
if r:
conn.commit()
LOG.info("[coupon] daily code %s (%d%% off, expires %s ET)",
code, pct, expires.isoformat())
return r[0]
except Exception:
conn.rollback()
raise RuntimeError("could not mint a unique daily coupon code")
def get_or_create_daily_coupons(conn, send_date: date) -> dict[int, str]:
"""Mint (or reuse) every coupon arm for `send_date`.
Returns a mapping of percent -> code. With no A/B test configured this is a
single arm {COUPON_PCT: code}; with CAMPAIGN_COUPON_AB_PCTS="20,30,40" it
returns one code per percent so recipients can be split across arms.
A percent of 0 is a valid FULL-PRICE control arm: no code is minted (the map
value is ""), so carriers bucketed into it see the normal price and no coupon,
while still being deterministically assigned (and thus measurable) by email
hash. Example: CAMPAIGN_COUPON_AB_PCTS="20,30,0".
"""
pcts = list(COUPON_AB_PCTS) if COUPON_AB_PCTS else [COUPON_PCT]
out: dict[int, str] = {}
for pct in pcts:
out[pct] = "" if pct <= 0 else get_or_create_daily_coupon(conn, send_date, pct)
return out
def pick_coupon_for_email(email: str, daily_coupons: dict[int, str] | None) -> tuple[str, str]:
"""Deterministically assign a carrier to one coupon arm by a stable hash of
their email. Returns (code, pct_str); ("", "") when coupons are off OR when
the carrier is bucketed into a full-price control arm (pct 0, code "").
The hash makes the split even and *stable*: the same carrier always lands in
the same arm across re-sends, so an A/B comparison isn't polluted by a carrier
seeing 20% one day and 40% the next. A full-price (0%) arm is returned as
("", "") so the email renders the normal-price branch, identical to a no-coupon
send — but the carrier is still deterministically in that arm, so re-hashing
a converter's email recovers which arm they were in for attribution.
"""
if not daily_coupons:
return "", ""
pcts = sorted(daily_coupons.keys())
if len(pcts) == 1:
pct = pcts[0]
else:
h = hashlib.sha256((email or "").strip().lower().encode()).hexdigest()
pct = pcts[int(h, 16) % len(pcts)]
code = daily_coupons[pct]
if not code: # full-price control arm: no code, no deal
return "", ""
return code, str(pct)
def coupon_attribs(coupon_code: str | None, coupon_pct: str | None = None) -> dict:
"""Merge fields for the same-day deal, blank when no coupon is active.
`coupon_pct` is passed per-recipient during an A/B test so the advertised
percent matches the arm's actual code; it falls back to the global COUPON_PCT
for single-arm sends.
"""
if not coupon_code:
return {"coupon_code": "", "coupon_pct": "", "coupon_expires": ""}
return {
"coupon_code": coupon_code,
"coupon_pct": coupon_pct or str(COUPON_PCT),
# Human-readable cutoff for the email body.
"coupon_expires": "11:59 PM ET tonight",
}
def lp_link_with_coupon(campaign_type: str, phy_state: str | None,
coupon_code: str | None, dot: str | None = None) -> str:
"""Order landing-page URL as the email's `lp_link` attrib.
Always emits a `?`-started query (carrying the carrier's DOT, plus the daily
`code=` when a coupon is active) so that EVERY template can safely append its
own params with a leading `&`. This eliminates a class of broken-CTA bugs:
previously `build_lp_link()` returned a bare path, so a template that wrote
`{{ lp_link }}&utm_source=...` produced `/order/slug&utm_source=...` (an
invalid URL that 404s), and one that wrote `{{ lp_link }}?dot=...` double-`?`d
once a coupon added its own `?code=`. With the query owned here, both template
styles converge on `{{ lp_link }}&utm_source=...` and are correct whether or
not the coupon is on.
"""
url = build_lp_link(campaign_type, phy_state)
params = []
if dot:
params.append(f"dot={dot}")
if coupon_code:
params.append(f"code={coupon_code}")
if params:
url = f"{url}?" + "&".join(params)
return url
# ── TZ config: tz_key -> (states, send_hour_utc) ─────────────────────────────
# 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local
TIMEZONE_CONFIG = {
"ET": {
"states": ("CT","DC","DE","FL","GA","IN","KY","MA","MD","ME","MI",
"NH","NJ","NY","NC","OH","PA","RI","SC","TN","VA","VT","WV"),
"send_hour_utc": 9, # 4 AM EST
},
"CT": {
"states": ("AL","AR","IA","IL","KS","LA","MN","MO","MS","ND",
"NE","OK","SD","TX","WI"),
"send_hour_utc": 10, # 5 AM EST = 4 AM CST
},
"MT": {
"states": ("AZ","CO","ID","MT","NM","UT","WY"),
"send_hour_utc": 11, # 6 AM EST = 4 AM MST
},
"PT": {
"states": ("AK","CA","HI","NV","OR","WA"),
"send_hour_utc": 12, # 7 AM EST = 4 AM PST
},
}
# Owner email — test sends go here before each campaign is scheduled
TEST_EMAIL = os.getenv("CAMPAIGN_TEST_EMAIL", "carrierone@gmx.com")
REPLY_TO_EMAIL = os.getenv("CAMPAIGN_REPLY_TO", "info@performancewest.net")
# Listmonk applies campaign headers as `for hdr, val := range set { h.Add(hdr, val) }`
# (internal/manager/manager.go), i.e. each map's KEY is the literal header name.
# So the correct shape is {"Reply-To": value}; a {"name": ..., "value": ...} map
# would emit junk "name:"/"value:" headers and NO real Reply-To, silently sending
# replies to the From address (noreply@send.performancewest.net) instead. The
# healthcare builder already uses the correct shape; match it here.
REPLY_TO_HEADERS = [{"Reply-To": REPLY_TO_EMAIL}]
# Bulk From — sends from the dedicated bulk subdomain so its sending reputation
# is isolated from the root domain (which stays clean for transactional /
# verification mail). Replies still go to the root domain via Reply-To above, so
# the customer-facing reply experience is unchanged. See docs/deliverability.md.
FROM_EMAIL = os.getenv("CAMPAIGN_FROM", "Performance West <noreply@send.performancewest.net>")
# Which verification results are safe to SEND to. We key ONLY off
# email_verify_result, never the email_verified boolean: the verifier sets
# email_verified=TRUE optimistically for 'mx_unreachable' (domain exists but its
# mail server didn't answer the probe) — those addresses HARD-BOUNCE when we
# actually send, which is what tanked deliverability (≈47% bounce, half the list
# blocklisted). So 'mx_unreachable' and all error/reject results are excluded.
#
# Recovery mode (default ON while reputation is damaged): send ONLY 'smtp_valid'
# — addresses an MX explicitly accepted at RCPT time — plus 'send_confirmed'
# (addresses proven deliverable by a real burner-domain verification send; see
# docs/campaign-deliverability-plan.md). This drives the bounce rate to near-zero
# and rebuilds sender reputation. 'hard_bounced' is NEVER sendable.
#
# Catch-all domains (accept any RCPT at SMTP time, then may silently bounce
# later) are the big growth pool but the risky one, so they are gated by an
# AUTOMATIC rollout (see catch_all_enabled): once the IPs are warm AND the recent
# live bounce rate is provably low, they are added; if bounces spike they
# auto-revert. CAMPAIGN_INCLUDE_CATCH_ALL=1/0 hard-overrides the auto decision.
BASE_SENDABLE_RESULTS = ["smtp_valid", "send_confirmed"]
CATCH_ALL_RESULTS = ["catch_all_domain", "catch_all_detected"]
# ── Catch-all auto-rollout tunables ─────────────────────────────────────────
# Warmup day at/after which catch-all MAY auto-enable (rebuild reputation on the
# clean smtp_valid pool first). Independent of the big-MX axis: catch-all is
# dominated by long-tail business domains, and any catch-all address that also
# lands on Google/Microsoft is still held out by big_mx_exclude until day 30.
CATCH_ALL_MIN_WARMUP_DAY = int(os.getenv("CAMPAIGN_CATCH_ALL_MIN_DAY", "21"))
# Recent-window bounce-rate ceiling (percent). At/above this, catch-all stays OFF
# and an already-on rollout auto-reverts. A SHORT window is deliberate: a
# historical disaster (e.g. the Jun-16 ~45% 7-day rate) must NOT block the
# rollout forever, and a fresh spike must trip it fast.
CATCH_ALL_MAX_BOUNCE_PCT = float(os.getenv("CAMPAIGN_CATCH_ALL_MAX_BOUNCE_PCT", "8"))
CATCH_ALL_BOUNCE_WINDOW_DAYS = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_WINDOW_DAYS", "2"))
# Minimum sent volume required in the window before the rate is trusted (else a
# tiny sample like 9 sent / 1 bounce = 11% would wrongly gate the decision).
CATCH_ALL_BOUNCE_MIN_SENT = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_MIN_SENT", "300"))
DB_URL = os.getenv("DATABASE_URL", "")
WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start")
def _listmonk_db_url() -> str:
"""Derive the listmonk DB URL from DATABASE_URL (same Postgres, diff db).
Bounce/sent counts live in the listmonk DB, while the campaign builder's
DB_URL points at the `performancewest` app DB on the SAME Postgres server.
"""
override = os.getenv("LISTMONK_DATABASE_URL")
if override:
return override
base = DB_URL or os.getenv("DATABASE_URL", "")
if "/" in base:
return base.rsplit("/", 1)[0] + "/listmonk"
return base
def recent_bounce_rate(window_days: int) -> tuple[float | None, int, int]:
"""Live campaign bounce rate over the last `window_days`.
Returns (rate_pct_or_None, sent, bounced). rate is None when sent==0 (no
signal). Only campaigns that actually ran in the window are counted, and
bounces are joined on campaign_id (≈99% populated for the real-time postfix
source), so a long-past disaster cannot poison a short recent window.
"""
try:
conn = psycopg2.connect(_listmonk_db_url())
except Exception as exc: # pragma: no cover - infra dependent
LOG.warning("catch-all guardrail: cannot reach listmonk DB (%s); "
"treating bounce rate as UNKNOWN (fail-closed)", exc)
return None, 0, 0
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT COALESCE(SUM(c.sent), 0),
COALESCE(SUM(b.n), 0)
FROM campaigns c
LEFT JOIN (
SELECT campaign_id, count(*) AS n
FROM bounces
WHERE campaign_id IS NOT NULL
GROUP BY campaign_id
) b ON b.campaign_id = c.id
WHERE COALESCE(c.started_at, c.created_at)
> now() - make_interval(days => %s)
AND c.status IN ('finished', 'running')
""",
(window_days,),
)
sent, bounced = cur.fetchone()
sent, bounced = int(sent), int(bounced)
finally:
conn.close()
rate = (100.0 * bounced / sent) if sent else None
return rate, sent, bounced
def catch_all_enabled() -> bool:
"""Decide whether catch-all domains are sendable on THIS run.
Auto-rollout (no env needed):
1. IPs warm enough -> warmup_day() >= CATCH_ALL_MIN_WARMUP_DAY
2. recent bounce rate low -> over CATCH_ALL_BOUNCE_WINDOW_DAYS, with at
least CATCH_ALL_BOUNCE_MIN_SENT sent for a
trustworthy sample, the rate is BELOW
CATCH_ALL_MAX_BOUNCE_PCT.
If bounces later spike above the ceiling, this returns False again on the
next run -> the rollout auto-reverts to the clean smtp_valid pool.
CAMPAIGN_INCLUDE_CATCH_ALL hard-overrides: '1'/'true' forces ON (manual
decision, skips guardrail), '0'/'false' forces OFF.
"""
override = os.getenv("CAMPAIGN_INCLUDE_CATCH_ALL")
if override is not None:
forced = override.strip().lower() not in ("0", "false", "")
LOG.info("catch-all: forced %s via CAMPAIGN_INCLUDE_CATCH_ALL=%r",
"ON" if forced else "OFF", override)
return forced
day = warmup_day()
if day is None or day < CATCH_ALL_MIN_WARMUP_DAY:
LOG.info("catch-all: OFF (warmup day %s < min %s)",
day, CATCH_ALL_MIN_WARMUP_DAY)
return False
rate, sent, bounced = recent_bounce_rate(CATCH_ALL_BOUNCE_WINDOW_DAYS)
if rate is None or sent < CATCH_ALL_BOUNCE_MIN_SENT:
# Not enough recent signal to trust -> fail closed (stay on clean pool).
LOG.info("catch-all: OFF (insufficient recent signal: %s sent < min %s "
"over %sd; need a low proven bounce rate first)",
sent, CATCH_ALL_BOUNCE_MIN_SENT, CATCH_ALL_BOUNCE_WINDOW_DAYS)
return False
if rate >= CATCH_ALL_MAX_BOUNCE_PCT:
LOG.warning("catch-all: OFF (recent bounce rate %.2f%% >= ceiling %.2f%% "
"over %sd; %s sent / %s bounced) -- auto-reverting to clean pool",
rate, CATCH_ALL_MAX_BOUNCE_PCT, CATCH_ALL_BOUNCE_WINDOW_DAYS,
sent, bounced)
return False
LOG.info("catch-all: ON (warmup day %s >= %s; recent bounce %.2f%% < %.2f%% "
"over %sd; %s sent / %s bounced)",
day, CATCH_ALL_MIN_WARMUP_DAY, rate, CATCH_ALL_MAX_BOUNCE_PCT,
CATCH_ALL_BOUNCE_WINDOW_DAYS, sent, bounced)
return True
def usable_filter() -> str:
"""SQL predicate for email_verify_result values that are safe to send to.
Always includes the clean pool (smtp_valid + send_confirmed); adds catch-all
results only when catch_all_enabled() says so (warm IPs + low live bounces).
The decision is memoized so it is computed ONCE per build run (one DB probe,
one log line, and a consistent filter across every segment/timezone).
"""
global _USABLE_FILTER_CACHE
if _USABLE_FILTER_CACHE is None:
results = list(BASE_SENDABLE_RESULTS)
if catch_all_enabled():
results += CATCH_ALL_RESULTS
_USABLE_FILTER_CACHE = (
"email_verify_result IN (" + ", ".join(f"'{r}'" for r in results) + ")"
)
return _USABLE_FILTER_CACHE
_USABLE_FILTER_CACHE: str | None = None
def warmup_day() -> int | None:
"""Return days since MTA warmup start, or None if not configured/readable."""
try:
with open(WARMUP_START_FILE, "r", encoding="utf-8") as fh:
start = int(fh.read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return None
def warmup_daily_queue_cap(day: int | None = None) -> int | None:
"""Daily campaign-builder queue cap aligned with Listmonk rampcap targets.
This prevents the builder from scheduling 10k-20k recipients/day while
Listmonk is throttled to only 500-3,000/day, which would create a massive
backlog that later drains unpredictably. The numbers match the runbook's
intended daily warmup totals, not the theoretical 24-hour sliding-window max.
"""
if day is None:
day = warmup_day()
if day is None:
return None
if day <= 1:
return 500
if day <= 3:
return 1500
if day <= 6:
return 2500
return 3000
def lm_api(path: str, data: dict | None = None, method: str | None = None):
headers = {"Content-Type": "application/json", "Authorization": f"Basic {_LM_AUTH}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if method:
req.method = method
try:
return json.loads(urllib.request.urlopen(req, timeout=30).read())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}")
_LM_SUPPRESSION_CACHE: dict[str, tuple[bool, str]] = {}
def _lm_email_query(email: str) -> str:
"""Return a Listmonk subscriber query for an exact email address."""
# Listmonk's query language uses single-quoted strings. Email addresses should
# not contain quotes, but escape defensively so a bad source row cannot break
# the query or broaden it.
safe = email.lower().strip().replace("'", "''")
return f"email = '{safe}'"
def listmonk_sendable(email: str) -> tuple[bool, str]:
"""Return whether Listmonk will send to this address, with reason.
SQL eligibility is not enough: an address may already exist in Listmonk as
disabled, blocklisted, bounced, or unsubscribed. If we add those rows to a
fresh campaign list, Listmonk filters them at send time, producing a smaller
`to_send` count than the builder's daily warmup allocation. Check status at
build time and fetch replacements so each campaign list is filled with
actually-sendable subscribers.
"""
key = email.lower().strip()
if not key:
return False, "blank_email"
if key in _LM_SUPPRESSION_CACHE:
return _LM_SUPPRESSION_CACHE[key]
try:
res = lm_api(
"/subscribers?" + urllib.parse.urlencode({
"query": _lm_email_query(key),
"per_page": 1,
})
)
results = res.get("data", {}).get("results", [])
if not results:
out = (True, "new")
else:
sub = results[0]
status = (sub.get("status") or "").lower()
if status != "enabled":
out = (False, f"listmonk_status:{status or 'unknown'}")
else:
bad_list_status = ""
for lst in sub.get("lists") or []:
lstatus = (lst.get("subscription_status") or "").lower()
if lstatus in {"unsubscribed", "unconfirmed"}:
bad_list_status = lstatus
break
if bad_list_status:
out = (False, f"list_subscription_status:{bad_list_status}")
else:
out = (True, "enabled")
except Exception as exc: # noqa: BLE001
# Do not silently underfill campaigns because Listmonk's lookup endpoint
# hiccupped. Let the caller/logs surface the problem; treating it as not
# sendable is safer than scheduling a suppressed address and marking it
# sent in our source DB.
out = (False, f"lookup_error:{exc}")
_LM_SUPPRESSION_CACHE[key] = out
return out
def get_base_campaign(campaign_id: int) -> dict:
data = lm_api(f"/campaigns/{campaign_id}")["data"]
# Defensively strip Listmonk's @TrackLink marker from cloned bodies. Our CTAs
# are PER-SUBSCRIBER (`{{ lp_link }}`, `?dot=...`), and @TrackLink registers a
# SINGLE static URL per tracked link: it both 404s (the registered URL is
# captured with the `{{ lp_link }}` token unrendered -> `/order/slug&utm...`
# with no `?`) and collapses every recipient onto one carrier's redirect.
# Real human clicks are already attributed via Umami's campaign-click event,
# so dropping the marker loses nothing. This guard means a stale source
# campaign re-edited with @TrackLink can never reach recipients broken again.
body = data.get("body")
if body and "@TrackLink" in body:
data["body"] = body.replace("@TrackLink", "")
LOG.warning("[%s] stripped @TrackLink from cloned source body id=%s",
"get_base_campaign", campaign_id)
return data
def create_list(name: str) -> int:
result = lm_api("/lists", {
"name": name,
"type": "private",
"optin": "single",
"tags": ["trucking", "auto"],
}, "POST")
return result["data"]["id"]
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
"""Create/attach a single subscriber to a list. Returns True on success.
Listmonk's POST /api/subscribers takes one subscriber with `lists` (array of
list IDs) and preconfirm_subscriptions=true so they're immediately sendable.
"""
try:
lm_api("/subscribers", {
"email": email,
"name": name or email,
"status": "enabled",
"lists": [list_id],
"attribs": attribs,
"preconfirm_subscriptions": True,
}, "POST")
return True
except Exception as exc:
msg = str(exc)
# Already exists — attach to this list instead of failing
if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg:
try:
subs = lm_api(
"/subscribers?"
+ urllib.parse.urlencode({
"query": f"email = '{email.lower().strip()}'",
"per_page": 1,
})
)
results = subs.get("data", {}).get("results", [])
if results:
status = (results[0].get("status") or "").lower()
if status != "enabled":
LOG.info("Listmonk suppressed existing subscriber %s status=%s", email, status or "unknown")
return False
sub_id = results[0]["id"]
# Refresh attribs so per-campaign merge fields (coupon code,
# ifta_due_date, lp_link, etc.) are correct for THIS send --
# otherwise a previously-imported carrier keeps stale attribs
# and the new campaign renders blank fields.
try:
merged = dict(results[0].get("attribs") or {})
merged.update(attribs or {})
lm_api(f"/subscribers/{sub_id}", {
"email": email,
"name": name or results[0].get("name") or email,
"status": "enabled",
"attribs": merged,
}, "PUT")
except Exception:
pass # non-fatal: still attach to the list below
lm_api("/subscribers/lists", {
"ids": [sub_id],
"action": "add",
"target_list_ids": [list_id],
"status": "confirmed",
}, "PUT")
return True
except Exception:
return False
return False
def import_subscribers(list_id: int, subscribers: list[dict]) -> int:
"""Add subscribers to a list one-by-one. Returns count successfully added."""
added = 0
for sub in subscribers:
if add_subscriber(list_id, sub["email"], sub.get("name", ""), sub.get("attribs", {})):
added += 1
return added
def _altbody_for(base: dict, body: str | None = None) -> str:
"""Plaintext (text/plain) part for a campaign.
Listmonk only emits multipart/alternative when altbody is set; HTML-only
mail is a spam-score signal. The source/base campaigns have no altbody, so
derive one from the HTML body. `body` overrides base["body"] for test sends
where merge fields were already substituted.
"""
existing = (base.get("altbody") or "").strip()
if existing:
return existing
html = body if body is not None else base.get("body", "")
return html_to_text(html)
def create_and_schedule_campaign(
base: dict,
list_id: int,
name: str,
send_at_utc: datetime,
schedule: bool = True,
) -> int:
payload = {
"name": name,
"subject": base["subject"],
"lists": [list_id],
"from_email": FROM_EMAIL,
"type": "regular",
"content_type": base["content_type"],
"body": base["body"],
"altbody": _altbody_for(base),
"template_id": base["template_id"],
"tags": base.get("tags") or [],
"messenger": base.get("messenger") or "email",
"headers": base.get("headers") or REPLY_TO_HEADERS,
}
if schedule:
payload["send_at"] = send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00")
result = lm_api("/campaigns", payload, "POST")
cid = result["data"]["id"]
if schedule:
lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT")
# else: leave as draft (preview mode)
return cid
def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: str,
campaign_type: str) -> None:
"""Send one test email so the owner can approve before the real blast goes out."""
# sample_row is (dot, email, name, phy_state, target_state). Older callers may
# still pass a 4-tuple (dot, email, name, state); handle both.
dot, email, name, phy_state = sample_row[0], sample_row[1], sample_row[2], sample_row[3]
target_state = sample_row[4] if len(sample_row) > 4 else phy_state
state = phy_state
body = base["body"]
body = body.replace("{{ .Subscriber.Attribs.company }}", name or "Sample Carrier LLC")
body = body.replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
body = body.replace("{{ .Subscriber.Attribs.state }}", state or "TX")
# Real subscribers get a populated lp_link attrib that already carries a
# leading `?` query (the carrier's `?dot=`, plus `?code=` when a coupon is
# on). The test send MUST mirror that: a bare path here would render the CTA
# as `/order/slug&utm_source=...` (the template appends `&utm...`) which has
# no `?` and 404s — exactly the broken owner-spot-check link. Use the same
# lp_link_with_coupon() the audience gets so the `?` is present.
body = body.replace("{{ .Subscriber.Attribs.lp_link }}",
lp_link_with_coupon(campaign_type, target_state, None,
dot=dot or "0000000"))
# NOTE: leave {{ UnsubscribeURL }} alone — Listmonk renders it into a real,
# working per-subscriber unsubscribe link (even on test sends). Overwriting it
# produced a dead /unsubscribe link with no subscriber identity.
subj = base["subject"].replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
# Listmonk /test needs `lists` as an array of IDs (base["lists"] is objects)
list_ids = [l["id"] for l in base.get("lists", []) if isinstance(l, dict)] or [1]
payload = {
"name": base.get("name", "Test"), "subject": subj,
"lists": list_ids, "from_email": FROM_EMAIL,
"type": "regular", "content_type": base["content_type"],
"body": body, "altbody": _altbody_for(base, body),
"template_id": base["template_id"],
"tags": base.get("tags") or [], "messenger": base.get("messenger") or "email",
"headers": base.get("headers") or REPLY_TO_HEADERS,
"subscribers": [TEST_EMAIL],
}
try:
lm_api(f"/campaigns/{campaign_id}/test", payload, "POST")
LOG.info("[%s/%s] Test sent to %s (sample: %s DOT#%s)", tz, label, TEST_EMAIL, name, dot)
except Exception as exc:
LOG.warning("[%s/%s] Test send failed: %s", tz, label, exc)
def campaign_type_filter(campaign_type: str) -> str:
if campaign_type == "mcs150":
return "mcs150_parsed < NOW() - INTERVAL '2 years'"
if campaign_type == "inactive":
return "oos_active IS TRUE"
if campaign_type in DEFICIENCY_SEGMENTS:
return DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"]
raise ValueError(f"unknown campaign_type {campaign_type!r}")
def campaign_weight(campaign_type: str) -> float:
"""Warmup allocation weight by campaign specificity.
Highly specific deficiency copy should generally get more early warmup share
than generic overdue/inactive notices because it is more relevant and less
repetitive at the mailbox-provider/content-fingerprint level.
"""
if campaign_type == "mcs150":
return 0.75
if campaign_type == "inactive":
return 0.65
return 1.25
def count_carriers(conn, tz_states: tuple, campaign_type: str, limit: int | None = None) -> int:
cur = conn.cursor()
states_placeholder = ",".join(["%s"] * len(tz_states))
type_filter = campaign_type_filter(campaign_type)
cur.execute(f"""
SELECT count(*)
FROM (
SELECT 1
FROM fmcsa_carriers
WHERE {type_filter}
AND {usable_filter()}
AND listmonk_sent_at IS NULL
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND phy_state IN ({states_placeholder})
LIMIT %s
) s
""", [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit or 10_000_000])
return int(cur.fetchone()[0])
def allocate_daily_budget(candidates: list[dict], daily_cap: int | None) -> dict[tuple[str, str], int]:
"""Allocate daily cap across (timezone, campaign_type) by weighted audience.
The allocation is proportional to each slot's eligible lead count and campaign
specificity weight, capped by the slot's audience/limit. This avoids sending
the same number from a tiny hazmat segment and a massive MCS-150 segment while
still preserving body-text variety during warmup.
"""
positive = [c for c in candidates if c["audience"] > 0]
if daily_cap is None:
return {(c["tz"], c["campaign_type"]): c["audience"] for c in candidates}
remaining_cap = min(daily_cap, sum(c["audience"] for c in positive))
quotas = {(c["tz"], c["campaign_type"]): 0 for c in candidates}
remaining = { (c["tz"], c["campaign_type"]): c["audience"] for c in positive }
weighted = { (c["tz"], c["campaign_type"]): c["audience"] * c["weight"] for c in positive }
# Repeated largest-remainder passes handle slots that hit their audience cap.
while remaining_cap > 0 and remaining:
total_weight = sum(weighted[k] for k in remaining)
if total_weight <= 0:
break
raw = {k: remaining_cap * (weighted[k] / total_weight) for k in remaining}
assigned_this_pass = 0
remainders: list[tuple[float, tuple[str, str]]] = []
for k, val in raw.items():
add = min(remaining[k], int(math.floor(val)))
quotas[k] += add
remaining[k] -= add
assigned_this_pass += add
if remaining[k] > 0:
remainders.append((val - math.floor(val), k))
leftover = remaining_cap - assigned_this_pass
for _, k in sorted(remainders, reverse=True):
if leftover <= 0:
break
if remaining.get(k, 0) <= 0:
continue
quotas[k] += 1
remaining[k] -= 1
assigned_this_pass += 1
leftover -= 1
remaining_cap -= assigned_this_pass
remaining = {k: v for k, v in remaining.items() if v > 0}
if assigned_this_pass == 0:
break
return quotas
def fetch_carriers(
conn,
tz_states: tuple,
campaign_type: str,
limit: int,
offset: int = 0,
) -> list[tuple]:
"""Return (dot_number, email_address, legal_name, phy_state, target_state) rows.
target_state is the state the offer applies to. For most segments that is
the carrier's base (phy_state). For per-state programs (state_weight_tax,
state_emissions) the relevant state is encoded in the deficiency flag suffix
(e.g. 'state_weight_tax_OR'), which can differ from the base state, so we
pull it out of the flag so the CTA links to the correct state's order page.
"""
cur = conn.cursor()
states_placeholder = ",".join(["%s"] * len(tz_states))
type_filter = campaign_type_filter(campaign_type)
# target_state expression: pull the state suffix out of the matching flag
# for per-state programs, otherwise fall back to the base (phy_state).
if campaign_type in ("state_weight_tax", "state_emissions"):
prefix = f"{campaign_type}_"
target_state_sql = (
"COALESCE((SELECT upper(substr(f, %s)) "
"FROM unnest(deficiency_flags) f "
f"WHERE f LIKE '{campaign_type}_%%' LIMIT 1), phy_state)"
)
target_state_params = [len(prefix) + 1]
else:
target_state_sql = "phy_state"
target_state_params = []
# During warmup, exclude carriers on the big operators that throttle/blocklist
# (Google, Microsoft, etc.) AND the consumer mailbox operators behind the
# "mx:" prefix (Yahoo Small Business, iCloud custom domains, legacy ISPs) --
# mx_provider is set by mx_tag_carriers.py. Untagged carriers (mx_provider IS
# NULL) are kept: the per-MX throttle in the selector still bounds them, and
# excluding NULLs would starve the pool until tagging completes.
big_mx_exclude = ""
if MAIN_SKIP_BIG_MX and main_warmup_day() <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY:
ops = ",".join("'%s'" % o for o in WARMUP_EXCLUDE_OPERATORS)
big_mx_exclude = f"AND (mx_provider IS NULL OR mx_provider NOT IN ({ops}))"
cur.execute(f"""
SELECT dot_number, email_address, legal_name, phy_state,
{target_state_sql} AS target_state, mx_provider
FROM fmcsa_carriers
WHERE {type_filter}
AND {usable_filter()}
AND listmonk_sent_at IS NULL
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND phy_state IN ({states_placeholder})
{big_mx_exclude}
ORDER BY mcs150_parsed ASC NULLS LAST
LIMIT %s OFFSET %s
""", target_state_params + [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit, offset])
return cur.fetchall()
def select_sendable_carriers(
conn,
tz_states: tuple,
campaign_type: str,
quota: int,
max_scan: int | None = None,
) -> tuple[list[tuple], dict[str, int]]:
"""Fetch rows until `quota` Listmonk-sendable carriers are found.
Returns (rows, suppression_counts). Rows that are disabled/blocklisted/etc.
in Listmonk are skipped and not marked sent in Postgres.
"""
selected: list[tuple] = []
skipped: dict[str, int] = {}
seen_emails: set[str] = set()
offset = 0
# Per-MX throttle: cap how many of THIS run's quota go to each receiving
# operator so we never concentrate on one (the Jun 13-14 Gmail/Outlook storm).
caps = mx_daily_caps(main_warmup_day())
per_op: dict = {}
default_cap = caps.get("__default__", 50)
# Untagged (NULL mx_provider) safety cap. We can't exclude NULLs (the big-MX
# exclusion is MX-based, so an untagged Google/Yahoo domain would slip through),
# but we also shouldn't let a flood of freshly-imported, never-resolved domains
# dominate a run -- some are big/consumer operators we'd otherwise hold out.
# The pw-mx-tag cron drains the *sendable* untagged backlog fast (only ~3k
# distinct verified domains as of 2026-06-20, < one 20k/day run), so this is a
# between-runs safety net, not the primary gate. Generous enough to never starve
# the pool in normal operation. Tunable via MAIN_UNTAGGED_MX_CAP.
untagged_cap = int(os.getenv("MAIN_UNTAGGED_MX_CAP", str(max(quota, 200))))
untagged_used = 0
MX_IDX = 5 # mx_provider is the 6th column from fetch_carriers
# Warmup caps are small, but old audiences can contain many prior bounces or
# unsubscribes. Scan beyond the quota while still bounding worst-case API calls.
scan_limit = max_scan or max(quota * 8, quota + 500, 1000)
batch_size = min(max(quota * 2, 100), 1000)
while len(selected) < quota and offset < scan_limit:
batch = fetch_carriers(conn, tz_states, campaign_type, batch_size, offset=offset)
if not batch:
break
offset += len(batch)
for row in batch:
email = (row[1] or "").lower().strip()
if email in seen_emails:
skipped["duplicate_email"] = skipped.get("duplicate_email", 0) + 1
continue
seen_emails.add(email)
# Per-MX-operator cap (reputation is per receiving operator).
# Tagged carriers are capped per operator; untagged (no mx_provider
# yet) are bounded by a single shared safety cap (untagged_cap) instead
# of being uncapped -- this stops a flood of unresolved domains (which
# could include big/consumer operators) from dominating a run, without
# starving the pool. The big-operator EXCLUSION in fetch_carriers keeps
# KNOWN Google/MS/consumer-MX out; the pw-mx-tag cron keeps NULL small.
prov = (row[MX_IDX] or "").strip().lower() if len(row) > MX_IDX else ""
if prov:
cap = caps.get(prov, default_cap)
if per_op.get(prov, 0) >= cap:
skipped[f"mx_cap:{prov}"] = skipped.get(f"mx_cap:{prov}", 0) + 1
continue
else:
if untagged_used >= untagged_cap:
skipped["mx_cap:untagged"] = skipped.get("mx_cap:untagged", 0) + 1
continue
ok, reason = listmonk_sendable(email)
if not ok:
skipped[reason] = skipped.get(reason, 0) + 1
continue
if prov:
per_op[prov] = per_op.get(prov, 0) + 1
else:
untagged_used += 1
selected.append(row)
if len(selected) >= quota:
break
if len(batch) < batch_size:
break
return selected, skipped
def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None:
cur = conn.cursor()
cur.execute("""
UPDATE fmcsa_carriers
SET listmonk_sent_at = NOW(),
listmonk_campaign_type = %s
WHERE dot_number = ANY(%s::text[])
""", (campaign_type, dot_numbers))
conn.commit()
def list_segments(send_date: date) -> None:
"""Report deduped audience size per deficiency segment (no writes).
Verifiable Phase-5 metric: each segment selects a nonzero, deduped audience
and points at a valid order LP. Source-template configuration is shown so we
know which segments are ready to schedule.
"""
conn = psycopg2.connect(DB_URL)
print(f"\nDeficiency-flag segments (send date {send_date.isoformat()}):\n")
print(f"{'segment':<22}{'audience':>10} {'source':<10} landing page")
print("-" * 78)
grand_total = 0
for ctype, seg in DEFICIENCY_SEGMENTS.items():
total = 0
for tz_cfg in TIMEZONE_CONFIG.values():
rows = fetch_carriers(conn, tz_cfg["states"], ctype, seg["limit"])
total += len(rows)
grand_total += total
src = os.getenv(seg["source_env"])
src_str = src if src else "UNSET"
lp = build_lp_link(ctype, None)
print(f"{ctype:<22}{total:>10} {src_str:<10} {lp}")
print("-" * 78)
print(f"{'TOTAL':<22}{grand_total:>10}")
print("\n(UNSET source = template not yet created; segment is skipped by the "
"scheduled run until its CAMPAIGN_*_ID env is set.)\n")
conn.close()
def run(send_date: date, dry_run: bool = False, preview: bool = False,
max_per_segment: int | None = None, only_segments: set[str] | None = None,
send_hour_override: int | None = None, send_minute: int = 0,
stagger_minutes: int = 0, daily_cap: int | None = None,
warmup_cap: bool = True) -> None:
conn = psycopg2.connect(DB_URL)
# Mint (or reuse) the same-day coupon(s) for this send date so every campaign
# in the run shares the same expiring code(s). Preview/dry runs skip the write,
# and the daily deal is disabled by default (see COUPON_ENABLED) -- when off we
# send at normal price (empty coupon_code -> template's no-deal branch). When
# CAMPAIGN_COUPON_AB_PCTS is set we mint one code per percent arm and split
# recipients across them by a stable hash of their email.
daily_coupons: dict[int, str] | None = None
if COUPON_ENABLED and not dry_run and not preview:
try:
daily_coupons = get_or_create_daily_coupons(conn, send_date)
if COUPON_AB_PCTS:
LOG.info("[coupon] A/B test arms: %s",
", ".join(f"{p}%={c}" for p, c in sorted(daily_coupons.items())))
except Exception as exc: # noqa: BLE001
LOG.warning("[coupon] could not mint daily coupon(s): %s (sending without)", exc)
elif not COUPON_ENABLED:
LOG.info("[coupon] disabled (CAMPAIGN_ENABLE_COUPON unset) — sending at normal price")
base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID)
base_inactive = get_base_campaign(CAMPAIGN_INACTIVE_ID)
campaign_specs = [
("mcs150", base_mcs150, 2000, "MCS-150 Overdue"),
("inactive", base_inactive, 1000, "Inactive USDOT"),
]
# Add deficiency-flag segments whose Listmonk source template is configured.
for ctype, seg in DEFICIENCY_SEGMENTS.items():
src = os.getenv(seg["source_env"])
if not src:
LOG.info("[segment] %s skipped — %s not set (template not created)",
ctype, seg["source_env"])
continue
try:
base = get_base_campaign(int(src))
except Exception as exc: # noqa: BLE001
LOG.warning("[segment] %s skipped — source %s unusable: %s",
ctype, src, exc)
continue
campaign_specs.append((ctype, base, seg["limit"], seg["label"]))
# Optional warmup controls: restrict to specific segments and/or cap the
# per-segment audience so a fresh-IP day-0 send stays small.
if only_segments:
campaign_specs = [s for s in campaign_specs if s[0] in only_segments]
if not campaign_specs:
LOG.error("No campaign specs match --only-segment %s", sorted(only_segments))
conn.close()
return
if max_per_segment is not None:
campaign_specs = [
(ct, base, min(lim, max_per_segment), label)
for (ct, base, lim, label) in campaign_specs
]
if daily_cap is None and warmup_cap and not preview:
daily_cap = warmup_daily_queue_cap()
if daily_cap is not None:
LOG.info("Daily queue cap active: %d recipients total", daily_cap)
if preview:
LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, "
"test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL)
candidates: list[dict] = []
totals_by_campaign: dict[str, int] = {}
for tz, tz_cfg in TIMEZONE_CONFIG.items():
for campaign_type, _base, limit, label in campaign_specs:
audience = 1 if preview else count_carriers(conn, tz_cfg["states"], campaign_type, limit)
weight = campaign_weight(campaign_type)
candidates.append({
"tz": tz,
"states": tz_cfg["states"],
"campaign_type": campaign_type,
"limit": limit,
"label": label,
"audience": audience,
"weight": weight,
})
totals_by_campaign[campaign_type] = totals_by_campaign.get(campaign_type, 0) + audience
quotas = allocate_daily_budget(candidates, None if preview else daily_cap)
LOG.info("Eligible audience by SQL criterion: %s", ", ".join(
f"{ct}={n}" for ct, n in sorted(totals_by_campaign.items())
))
if daily_cap is not None and not preview:
planned_by_campaign: dict[str, int] = {}
for (tz, ctype), quota in quotas.items():
planned_by_campaign[ctype] = planned_by_campaign.get(ctype, 0) + quota
LOG.info("Planned allocation by criterion: %s (total=%d/%d)", ", ".join(
f"{ct}={n}" for ct, n in sorted(planned_by_campaign.items())
), sum(planned_by_campaign.values()), daily_cap)
scheduled_count = 0
queued_recipients = 0
for tz, tz_cfg in TIMEZONE_CONFIG.items():
states = tz_cfg["states"]
send_hour = send_hour_override if send_hour_override is not None else tz_cfg["send_hour_utc"]
send_at = datetime(
send_date.year, send_date.month, send_date.day,
send_hour, send_minute, 0, tzinfo=timezone.utc
)
for campaign_type, base, limit, label in campaign_specs:
if daily_cap is not None and queued_recipients >= daily_cap:
LOG.info("[%s/%s] Daily cap reached (%d/%d) — skipping remaining slots",
tz, campaign_type, queued_recipients, daily_cap)
continue
effective_send_at = send_at + timedelta(minutes=stagger_minutes * scheduled_count)
fetch_limit = 1 if preview else limit
if daily_cap is not None and not preview:
fetch_limit = min(fetch_limit, quotas.get((tz, campaign_type), 0))
if fetch_limit <= 0:
LOG.info("[%s/%s] Allocation is 0 — skipping", tz, campaign_type)
continue
if preview:
rows = fetch_carriers(conn, states, campaign_type, fetch_limit)
suppressed: dict[str, int] = {}
else:
rows, suppressed = select_sendable_carriers(conn, states, campaign_type, fetch_limit)
if suppressed:
LOG.info("[%s/%s] Runtime Listmonk suppression skipped %d rows: %s",
tz, campaign_type, sum(suppressed.values()), ", ".join(
f"{reason}={n}" for reason, n in sorted(suppressed.items())
))
if not rows:
LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type)
continue
actual = len(rows)
tag = "PREVIEW " if preview else ""
list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}"
campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}"
LOG.info("[%s/%s] %d sendable carriers -> %s (send %s UTC)",
tz, campaign_type, actual, campaign_name,
effective_send_at.strftime("%Y-%m-%d %H:%M"))
if dry_run:
LOG.info(" DRY RUN — skipping Listmonk + DB writes")
scheduled_count += 1
queued_recipients += actual
continue
# Build subscriber list. In preview, only the owner's address (with the
# sample carrier's attribs) so the real audience is never touched.
if preview:
r0 = rows[0]
p_code, p_pct = pick_coupon_for_email(TEST_EMAIL, daily_coupons)
subscribers = [{
"email": TEST_EMAIL,
"name": r0[2] or "Sample Carrier",
"attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or "",
"lp_link": lp_link_with_coupon(campaign_type, r0[4], p_code, dot=str(r0[0])),
**coupon_attribs(p_code, p_pct),
**discounted_price_attribs(campaign_type, r0[4], p_pct)},
}]
else:
subscribers = []
for row in rows:
c_code, c_pct = pick_coupon_for_email(row[1], daily_coupons)
subscribers.append({
"email": row[1],
"name": row[2] or row[1],
"attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or "",
"lp_link": lp_link_with_coupon(campaign_type, row[4], c_code, dot=str(row[0])),
**coupon_attribs(c_code, c_pct),
**discounted_price_attribs(campaign_type, row[4], c_pct)},
})
# Create list + add subscribers
list_id = create_list(list_name)
added = import_subscribers(list_id, subscribers)
LOG.info("[%s/%s] List %d: %d/%d subscribers added", tz, campaign_type, list_id, added, len(subscribers))
if added == 0:
LOG.error("[%s/%s] BUG: 0 subscribers added to list %d — skipping", tz, campaign_type, list_id)
continue
# Create campaign (draft in preview, scheduled in real run)
cid = create_and_schedule_campaign(base, list_id, campaign_name, effective_send_at, schedule=not preview)
LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid,
"draft/preview" if preview else f"scheduled {effective_send_at.isoformat()}")
scheduled_count += 1
queued_recipients += added
# Send a test to the owner so they can spot-check the rendered email
send_test(base, cid, rows[0], label, tz, campaign_type)
# Mark carriers as sent only on a real run
if not preview:
mark_sent(conn, [row[0] for row in rows], campaign_type)
conn.close()
LOG.info("Done. queued_recipients=%d%s", queued_recipients,
f" daily_cap={daily_cap}" if daily_cap is not None else "")
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk")
parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes")
parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent")
parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)")
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: today)")
parser.add_argument("--tomorrow", action="store_true",
help="Schedule for tomorrow instead of today. Use only for manual pre-builds.")
parser.add_argument("--max-per-segment", type=int, default=None,
help="Cap audience per (segment x timezone). Use for fresh-IP warmup small sends.")
parser.add_argument("--only-segment", action="append", default=None, metavar="SEG",
help="Restrict to specific segment(s), e.g. mcs150, inactive, for_hire_boc3. Repeatable.")
parser.add_argument("--send-hour", type=int, default=None,
help="Override send hour (UTC) for ALL timezones instead of per-tz defaults.")
parser.add_argument("--send-minute", type=int, default=0,
help="Send minute (UTC), used with --send-hour. Default 0.")
parser.add_argument("--stagger-minutes", type=int, default=0,
help="Add this many minutes between each created campaign. Useful for catchup sends.")
parser.add_argument("--daily-cap", type=int, default=None,
help="Hard cap on recipients queued by this run. Default: warmup-derived cap when available.")
parser.add_argument("--no-warmup-cap", action="store_true",
help="Disable automatic daily queue cap from /etc/postfix/pw-warmup-start.")
args = parser.parse_args()
if args.date and args.tomorrow:
parser.error("--date and --tomorrow are mutually exclusive")
if args.date:
send_date = date.fromisoformat(args.date)
elif args.tomorrow:
send_date = date.today() + timedelta(days=1)
else:
send_date = date.today()
if args.list_segments:
list_segments(send_date)
return
only = set(args.only_segment) if args.only_segment else None
LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s, "
"max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s, daily_cap=%s, warmup_cap=%s)",
send_date, args.dry_run, args.preview, args.max_per_segment,
sorted(only) if only else None, args.send_hour, args.stagger_minutes,
args.daily_cap, not args.no_warmup_cap)
run(send_date, dry_run=args.dry_run, preview=args.preview,
max_per_segment=args.max_per_segment, only_segments=only,
send_hour_override=args.send_hour, send_minute=args.send_minute,
stagger_minutes=args.stagger_minutes, daily_cap=args.daily_cap,
warmup_cap=not args.no_warmup_cap)
if __name__ == "__main__":
main()