Listmonk applies campaign headers as `for hdr,val := range set { h.Add(hdr,val) }`
(internal/manager/manager.go v6.1.0): each map's KEY is the literal header name.
The trucking/CRTC/deficiency builders wrote {"name":"Reply-To","value":..} (and
{"key":..,"value":..}), which emits junk `name:`/`value:` headers and NO real
Reply-To, so replies fell back to the From address (noreply@send.performancewest.net)
instead of info@performancewest.net. HC builder already used the correct
{"Reply-To": value} shape; match it everywhere. Verified against listmonk source.
Impact: outbound only; no customer replies were lost (noreply@ is a real mailbox),
but reply UX pointed at a no-reply address. Live campaign headers re-patched separately.
1554 lines
70 KiB
Python
1554 lines
70 KiB
Python
#!/usr/bin/env python3
|
|
"""Daily trucking campaign builder.
|
|
|
|
Runs nightly (3 AM EST) to create the next day's 8 Listmonk campaigns:
|
|
- 4 TZ regions x MCS-150 overdue (2,000 carriers each, 4 AM / 5 AM / 6 AM / 7 AM EST)
|
|
- 4 TZ regions x Inactive USDOT (1,000 carriers each, same schedule)
|
|
|
|
Selection criteria:
|
|
- email verified or catch-all (usable)
|
|
- mcs150_parsed > 2 years ago (for MCS-150 campaign)
|
|
- oos_active IS TRUE (for inactive USDOT campaign)
|
|
- listmonk_sent_at IS NULL (not yet sent)
|
|
- ordered by mcs150_parsed ASC (most overdue first)
|
|
|
|
Usage:
|
|
python3 scripts/build_trucking_campaigns.py
|
|
python3 scripts/build_trucking_campaigns.py --dry-run
|
|
python3 scripts/build_trucking_campaigns.py --date 2026-06-02
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import urllib.request
|
|
import urllib.parse
|
|
import math
|
|
import time
|
|
from datetime import date, datetime, timedelta, timezone
|
|
|
|
import psycopg2
|
|
|
|
# Allow both supported invocation styles:
|
|
# python -m scripts.build_trucking_campaigns
|
|
# python3 scripts/build_trucking_campaigns.py
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT not in sys.path:
|
|
sys.path.insert(0, ROOT)
|
|
|
|
from scripts._email_exclusions import BLOCKED_EMAIL_DOMAINS
|
|
from scripts._email_plaintext import html_to_text
|
|
|
|
LOG = logging.getLogger("build_trucking_campaigns")
|
|
|
|
# ── Listmonk ──────────────────────────────────────────────────────────────────
|
|
LISTMONK_URL = os.getenv("LISTMONK_URL", "http://listmonk:9000")
|
|
LISTMONK_USER = os.getenv("LISTMONK_USER", "api")
|
|
LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y")
|
|
_LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode()
|
|
|
|
# ── Source campaign IDs ────────────────────────────────────────────────────────
|
|
CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk"
|
|
CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over"
|
|
|
|
# Public site for landing-page links injected into Listmonk subscriber attribs.
|
|
SITE_DOMAIN = os.getenv("SITE_DOMAIN", "https://performancewest.net")
|
|
|
|
# ── Deficiency-flag segments (Phase 5) ──────────────────────────────────────
|
|
# Each segment targets carriers by a `deficiency_flags` value (the TEXT[] column
|
|
# written by fmcsa_deficiency_flagger.py) and links to its order landing page.
|
|
#
|
|
# `source_id` is the Listmonk template campaign to clone; set via env once the
|
|
# template exists (see CAMPAIGN_SOURCE_ENV). Segments without a configured
|
|
# source_id are reported by --list-segments but skipped by the scheduled run so
|
|
# the nightly job never breaks on an unconfigured template.
|
|
#
|
|
# `flag_sql` is a predicate over the deficiency_flags array.
|
|
DEFICIENCY_SEGMENTS = {
|
|
"for_hire_boc3": {
|
|
"label": "For-Hire BOC-3 + UCR",
|
|
"flag_sql": "'for_hire_carrier' = ANY(deficiency_flags)",
|
|
"lp_slug": "boc3-filing",
|
|
"source_env": "CAMPAIGN_FOR_HIRE_ID",
|
|
"limit": 1500,
|
|
},
|
|
"irp_ifta": {
|
|
"label": "IRP / IFTA Registration",
|
|
"flag_sql": "'interstate_needs_irp_ifta' = ANY(deficiency_flags)",
|
|
"lp_slug": "state-trucking-bundle",
|
|
"source_env": "CAMPAIGN_IRP_IFTA_ID",
|
|
"limit": 1500,
|
|
},
|
|
"intrastate_authority": {
|
|
"label": "Intrastate Operating Authority",
|
|
# any intrastate_authority_<state> flag
|
|
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
|
|
"WHERE f LIKE 'intrastate_authority_%%')",
|
|
"lp_slug": "intrastate-authority",
|
|
"source_env": "CAMPAIGN_INTRASTATE_ID",
|
|
"limit": 1000,
|
|
},
|
|
"state_weight_tax": {
|
|
"label": "State Weight-Distance Tax",
|
|
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
|
|
"WHERE f LIKE 'state_weight_tax_%%')",
|
|
# per-state LP resolved per-row in build_lp_link()
|
|
"lp_slug": "state-trucking-bundle",
|
|
"source_env": "CAMPAIGN_WEIGHT_TAX_ID",
|
|
"limit": 1000,
|
|
},
|
|
"state_emissions": {
|
|
"label": "State Clean-Truck / Emissions",
|
|
"flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f "
|
|
"WHERE f LIKE 'state_emissions_%%')",
|
|
"lp_slug": "state-emissions",
|
|
"source_env": "CAMPAIGN_EMISSIONS_ID",
|
|
"limit": 1000,
|
|
},
|
|
"hazmat": {
|
|
"label": "Hazmat PHMSA Registration",
|
|
"flag_sql": "'hazmat_carrier' = ANY(deficiency_flags)",
|
|
"lp_slug": "hazmat-phmsa",
|
|
"source_env": "CAMPAIGN_HAZMAT_ID",
|
|
"limit": 500,
|
|
},
|
|
}
|
|
|
|
# Per-state weight-tax LP overrides (CA emissions goes to ca-mcp-carb).
|
|
_WEIGHT_TAX_LP = {
|
|
"OR": "or-weight-mile-tax", "NY": "ny-hut-registration",
|
|
"KY": "ky-kyu-registration", "NM": "nm-weight-distance",
|
|
"CT": "ct-highway-use-fee",
|
|
}
|
|
|
|
|
|
# ── Authoritative service prices (single source of truth) ───────────────────
|
|
# The discounted prices shown in the coupon email MUST match what checkout
|
|
# actually charges, so we read them from the same catalog the API uses
|
|
# (api/src/service-catalog.ts). We parse it directly (the prod box has python3
|
|
# but not node) and cache per-process. Each entry: {price_cents, discountable}.
|
|
# If the catalog can't be read (file missing in an image), the price helpers
|
|
# degrade to percent-only copy rather than guessing a number.
|
|
_CATALOG_PATH = os.getenv(
|
|
"SERVICE_CATALOG_TS", os.path.join(ROOT, "api", "src", "service-catalog.ts")
|
|
)
|
|
_CATALOG_CACHE: dict | None = None
|
|
|
|
|
|
def _load_service_catalog() -> dict:
|
|
"""Parse api/src/service-catalog.ts -> {slug: {price_cents, discountable}}.
|
|
|
|
discountable defaults to True (the catalog only marks the exceptions with
|
|
`discountable: false`), matching the TS object's own semantics.
|
|
"""
|
|
global _CATALOG_CACHE
|
|
if _CATALOG_CACHE is not None:
|
|
return _CATALOG_CACHE
|
|
catalog: dict = {}
|
|
try:
|
|
import re as _re
|
|
ts = open(_CATALOG_PATH, encoding="utf-8").read()
|
|
m = _re.search(r"export const COMPLIANCE_SERVICES[^=]*=\s*\{(.*)\n\};", ts, _re.S)
|
|
body = m.group(1) if m else ""
|
|
for em in _re.finditer(r'"([a-z0-9\-]+)":\s*\{(.*?)\}', body, _re.S):
|
|
slug, inner = em.group(1), em.group(2)
|
|
pm = _re.search(r"price_cents:\s*(\d+)", inner)
|
|
if not pm:
|
|
continue
|
|
discountable = _re.search(r"discountable:\s*false", inner) is None
|
|
catalog[slug] = {
|
|
"price_cents": int(pm.group(1)),
|
|
"discountable": discountable,
|
|
}
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.warning("[coupon] could not load service catalog (%s); coupon copy "
|
|
"will be percent-only", exc)
|
|
_CATALOG_CACHE = catalog
|
|
return catalog
|
|
|
|
|
|
# The service whose price the coupon copy quotes. This is the specific service
|
|
# each campaign's body is *about*, which is NOT always the slug the CTA links to
|
|
# (e.g. the MCS-150 email talks about the $79 MCS-150 update but the button opens
|
|
# the $399 full-compliance bundle). Pricing from the wrong slug would advertise a
|
|
# discount the landing page doesn't show, so the price slug is explicit here.
|
|
PRICE_SLUG_BY_CAMPAIGN = {
|
|
"mcs150": "mcs150-update",
|
|
"inactive": "usdot-reactivation",
|
|
}
|
|
|
|
|
|
def price_slug_for(campaign_type: str, phy_state: str | None = None) -> str:
|
|
"""The catalog slug whose price the coupon copy should quote for a segment.
|
|
|
|
Main campaigns quote their specific service (MCS-150, reactivation); the
|
|
deficiency segments quote the same slug their CTA links to (resolved, incl.
|
|
per-state overrides, by lp_slug_for)."""
|
|
return PRICE_SLUG_BY_CAMPAIGN.get(campaign_type) or lp_slug_for(campaign_type, phy_state)
|
|
|
|
|
|
def discounted_price_attribs(campaign_type: str, phy_state: str | None,
|
|
coupon_pct: str | None) -> dict:
|
|
"""Per-recipient price merge fields, computed on the fly to match checkout.
|
|
|
|
Mirrors the server exactly: percent discount on the SERVICE fee only
|
|
(discount_cents = round(fee * pct / 100)); non-discountable services (e.g.
|
|
boc3-filing, a $25 passthrough) get NO price discount. Returns:
|
|
coupon_price_full "$79" (service list price)
|
|
coupon_price_deal "$47" (after discount)
|
|
coupon_priceable "1"/"" (whether a real discounted number is available)
|
|
Blank when no coupon, no catalog entry, or the service isn't discountable, so
|
|
the templates fall back to percent-only copy and never print a false number.
|
|
"""
|
|
blank = {"coupon_price_full": "", "coupon_price_deal": "", "coupon_priceable": ""}
|
|
if not coupon_pct:
|
|
return blank
|
|
try:
|
|
pct = int(coupon_pct)
|
|
except (TypeError, ValueError):
|
|
return blank
|
|
slug = price_slug_for(campaign_type, phy_state)
|
|
entry = _load_service_catalog().get(slug)
|
|
if not entry or not entry.get("discountable") or entry["price_cents"] <= 0:
|
|
return blank
|
|
full = entry["price_cents"]
|
|
discount = round(full * pct / 100) # same formula as the API
|
|
deal = full - discount
|
|
|
|
def _fmt(cents: int) -> str:
|
|
return f"${cents // 100}" if cents % 100 == 0 else f"${cents / 100:.2f}"
|
|
|
|
return {
|
|
"coupon_price_full": _fmt(full),
|
|
"coupon_price_deal": _fmt(deal),
|
|
"coupon_priceable": "1",
|
|
}
|
|
|
|
|
|
def build_lp_link(campaign_type: str, phy_state: str | None) -> str:
|
|
"""Return the order landing-page URL for a (segment, state)."""
|
|
seg = DEFICIENCY_SEGMENTS.get(campaign_type)
|
|
slug = seg["lp_slug"] if seg else "dot-full-compliance"
|
|
if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP:
|
|
slug = _WEIGHT_TAX_LP[phy_state]
|
|
if campaign_type == "state_emissions" and phy_state == "CA":
|
|
slug = "ca-mcp-carb"
|
|
return f"{SITE_DOMAIN}/order/{slug}"
|
|
|
|
|
|
def lp_slug_for(campaign_type: str, phy_state: str | None = None) -> str:
|
|
"""The order-page slug (== the discountable service slug) for a segment."""
|
|
seg = DEFICIENCY_SEGMENTS.get(campaign_type)
|
|
slug = seg["lp_slug"] if seg else "dot-full-compliance"
|
|
if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP:
|
|
slug = _WEIGHT_TAX_LP[phy_state]
|
|
if campaign_type == "state_emissions" and phy_state == "CA":
|
|
slug = "ca-mcp-carb"
|
|
return slug
|
|
|
|
|
|
# ── Daily same-day coupon ───────────────────────────────────────────────────
|
|
# Every send day gets ONE random 5-letter coupon at COUPON_PCT off, valid only
|
|
# through 23:59:59 of the send date (America/New_York). The code is written to
|
|
# the app's `discount_codes` table; the existing /api/v1/discount validator and
|
|
# checkout enforce expiry + the service-fee-only scope (pass-through government
|
|
# fees are never discounted). The code + prices are merged into the email so the
|
|
# recipient sees a real, expiring deal.
|
|
#
|
|
# DISCOUNT TOGGLE: the daily coupon is DISABLED by default (Jun 2026). The
|
|
# discount sends were not being delivered (DKIM-broken window), so we are
|
|
# re-testing whether normal-price offers convert now that deliverability is
|
|
# fixed. With the coupon off, no code is minted and an empty coupon_code is
|
|
# merged -- the campaign templates' `{{ if .Subscriber.Attribs.coupon_code }}`
|
|
# guard automatically falls through to the normal-price `{{ else }}` branch and
|
|
# the landing-page links carry no `?code=`. Set CAMPAIGN_ENABLE_COUPON=1 to
|
|
# bring the daily deal back. Reversible, no template or DB changes needed.
|
|
COUPON_ENABLED = os.getenv("CAMPAIGN_ENABLE_COUPON", "0") in ("1", "true", "yes")
|
|
COUPON_PCT = int(os.getenv("CAMPAIGN_COUPON_PCT", "40"))
|
|
# A/B/C price test: when set to a comma list of percents (e.g. "20,30,40") each
|
|
# carrier is deterministically bucketed by a stable hash of their email into one
|
|
# arm, getting that arm's own daily code. Because each code stores its own
|
|
# percent in discount_codes, the discount the email advertises always matches the
|
|
# discount checkout actually applies, and redemptions are measurable per code
|
|
# (description marker campaign-daily:<date>:<pct>). A percent of 0 is a valid
|
|
# FULL-PRICE control arm (e.g. "20,30,0"): no code is minted and the carrier sees
|
|
# the normal price, but they're still hash-bucketed so the control is measurable.
|
|
# Empty/unset = single-arm test at COUPON_PCT (legacy behavior). The split is even
|
|
# and stable per carrier, so a given carrier always sees the same arm across
|
|
# re-sends (no arm-hopping).
|
|
COUPON_AB_PCTS = tuple(
|
|
int(p.strip())
|
|
for p in os.getenv("CAMPAIGN_COUPON_AB_PCTS", "").split(",")
|
|
if p.strip().isdigit()
|
|
)
|
|
# Eligible slugs = every discountable service a trucking campaign can link to.
|
|
# Pass-through-only slugs (boc3-filing $25 passthrough, etc.) are intentionally
|
|
# eligible too: the discount math only touches the service-fee portion, so a
|
|
# code scoped to them simply yields $0 off the passthrough and full off the fee.
|
|
COUPON_SLUGS = (
|
|
"mcs150-update,usdot-reactivation,dot-drug-alcohol,dot-full-compliance,"
|
|
"ucr-registration,state-trucking-bundle,intrastate-authority,irp-registration,"
|
|
"ifta-application,hazmat-phmsa,state-emissions,state-weight-tax,trucking-wrap-up,"
|
|
"boc3-filing"
|
|
)
|
|
_ET = timezone(timedelta(hours=-5)) # EST anchor; close enough for an end-of-day cutoff
|
|
_COUPON_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ" # no I/O to avoid confusion
|
|
|
|
# ── Per-MX-operator throttling (main pool) ──────────────────────────────────
|
|
# Sender reputation is tracked by the RECEIVING operator, not the recipient
|
|
# domain. The Jun 13-14 Gmail + Outlook block storm came from concentrating the
|
|
# warmup on Google/Microsoft-Workspace-hosted business domains. During warmup we
|
|
# EXCLUDE those big operators entirely (send to the long tail of small/self-hosted
|
|
# mail systems that don't bot-throttle), then cap per-operator once reputation is
|
|
# established. mx_provider is populated by mx_tag_carriers.py.
|
|
# Set MAIN_SKIP_BIG_MX=0 to stop excluding once truly warmed up.
|
|
MAIN_SKIP_BIG_MX = os.getenv("MAIN_SKIP_BIG_MX", "1") not in ("0", "false", "")
|
|
# Operators to hold out during warmup (they aggressively throttle/blocklist).
|
|
# These are the "clean label" providers mx_tag_carriers.py recognizes by MX host.
|
|
BIG_MX_OPERATORS = ("google", "microsoft", "proofpoint", "mimecast",
|
|
"barracuda", "cisco", "broadcom")
|
|
# Consumer / aggressively-filtering mailbox operators that mx_tag_carriers.py
|
|
# labels with the "mx:" prefix (no clean label). They complaint-block and filter
|
|
# like the big operators, so hold them out of the warmup pool too. The literal-
|
|
# domain blocklist (BLOCKED_EMAIL_DOMAINS) already stops someone@yahoo.com /
|
|
# @icloud.com, but a CUSTOM domain whose MX points at Yahoo Small Business / AOL
|
|
# (mx:yahoodns.net), Apple iCloud+ Custom Domain (mx:icloud.com), or a legacy
|
|
# consumer ISP is invisible to that string layer -- only the MX tag catches it.
|
|
# (Live 2026-06-20: mx:yahoodns.net alone = 283k sendable carriers.)
|
|
CONSUMER_MX_OPERATORS = (
|
|
"mx:yahoodns.net", "mx:icloud.com", "mx:comcast.net", "mx:charter.net",
|
|
"mx:centurylink.net", "mx:windstream.net", "mx:tds.net",
|
|
"mx:earthlink-vadesecure.net",
|
|
)
|
|
# Everything held out of the warmup pool entirely until MAIN_BIG_MX_EXCLUDE_UNTIL_DAY,
|
|
# then re-introduced gradually via mx_daily_caps().
|
|
WARMUP_EXCLUDE_OPERATORS = BIG_MX_OPERATORS + CONSUMER_MX_OPERATORS
|
|
MAIN_WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start")
|
|
# How many days to EXCLUDE the big operators entirely. The Jun 13-14 block storm
|
|
# means reputation is NOT yet established despite a high calendar day count, so we
|
|
# hold Google/Microsoft/etc. out until day 30 to let reputation recover on the
|
|
# long-tail operators first, then re-introduce them gradually via mx_daily_caps.
|
|
MAIN_BIG_MX_EXCLUDE_UNTIL_DAY = int(os.getenv("MAIN_BIG_MX_EXCLUDE_UNTIL_DAY", "30"))
|
|
|
|
|
|
def main_warmup_day() -> int:
|
|
try:
|
|
start = int(open(MAIN_WARMUP_START_FILE).read().strip())
|
|
return max(0, int((time.time() - start) // 86400))
|
|
except Exception:
|
|
return 99 # no stamp = treat as warmed up
|
|
|
|
|
|
def mx_daily_caps(day: int) -> dict:
|
|
"""Per-operator daily NEW-recipient caps. Big + consumer-MX operators are
|
|
EXCLUDED entirely until MAIN_BIG_MX_EXCLUDE_UNTIL_DAY (reputation recovery),
|
|
then re-introduced gradually. 'default' is the per-operator cap for the long
|
|
tail of small/self-hosted systems that carry the warmup volume."""
|
|
if day <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY:
|
|
big, default = 0, 120 # big OFF; long-tail operators carry volume
|
|
elif day <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY + 7:
|
|
big, default = 40, 150 # re-introduce big slowly
|
|
elif day <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY + 14:
|
|
big, default = 120, 200
|
|
else:
|
|
big, default = 300, 250
|
|
# Both big-label and consumer-mx operators ramp together on the same schedule.
|
|
caps = {op: big for op in WARMUP_EXCLUDE_OPERATORS}
|
|
caps["__default__"] = default
|
|
return caps
|
|
|
|
|
|
def mx_throttled(rows: list[tuple], total_n: int, caps: dict, mx_idx: int) -> list[tuple]:
|
|
"""Pick up to total_n rows, capping per mx_provider (rows[mx_idx]) so no
|
|
single receiving operator exceeds its daily share. Preserves order."""
|
|
per_op: dict = {}
|
|
chosen: list[tuple] = []
|
|
default_cap = caps.get("__default__", 50)
|
|
for r in rows:
|
|
if len(chosen) >= total_n:
|
|
break
|
|
prov = (r[mx_idx] or "").strip().lower() if mx_idx < len(r) else ""
|
|
cap = caps.get(prov, default_cap)
|
|
used = per_op.get(prov, 0)
|
|
if used >= cap:
|
|
continue
|
|
per_op[prov] = used + 1
|
|
chosen.append(r)
|
|
return chosen
|
|
|
|
|
|
def _random_coupon_code() -> str:
|
|
import secrets
|
|
return "".join(secrets.choice(_COUPON_ALPHABET) for _ in range(5))
|
|
|
|
|
|
def get_or_create_daily_coupon(conn, send_date: date, pct: int | None = None) -> str:
|
|
"""Return the 5-letter coupon code for `send_date` at `pct`% off, minting it
|
|
if needed.
|
|
|
|
Idempotent: a marker in `description` lets a re-run on the same day reuse the
|
|
existing code instead of minting a duplicate. Single-arm runs use the legacy
|
|
marker `campaign-daily:<date>`; A/B arms use `campaign-daily:<date>:<pct>` so
|
|
each percent gets its own stable, separately-countable code.
|
|
"""
|
|
pct = COUPON_PCT if pct is None else pct
|
|
# Keep the legacy marker for the single-arm (no A/B) case so historical
|
|
# idempotency/lookups still work; A/B arms get a percent-suffixed marker.
|
|
marker = (
|
|
f"campaign-daily:{send_date.isoformat()}"
|
|
if not COUPON_AB_PCTS
|
|
else f"campaign-daily:{send_date.isoformat()}:{pct}"
|
|
)
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT code FROM discount_codes WHERE description = %s LIMIT 1", (marker,))
|
|
row = cur.fetchone()
|
|
if row:
|
|
return row[0]
|
|
|
|
# 23:59:59 ET of the send date
|
|
expires = datetime.combine(send_date, datetime.min.time(), tzinfo=_ET) + timedelta(
|
|
hours=23, minutes=59, seconds=59
|
|
)
|
|
starts = datetime.combine(send_date, datetime.min.time(), tzinfo=_ET)
|
|
|
|
# Retry on the (rare) code collision against the UNIQUE constraint.
|
|
for _ in range(25):
|
|
code = _random_coupon_code()
|
|
try:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO discount_codes
|
|
(code, description, discount_type, discount_value, applies_to,
|
|
max_uses_per_email, active, starts_at, expires_at)
|
|
VALUES (%s, %s, 'percent', %s, %s, 1, TRUE, %s, %s)
|
|
ON CONFLICT (code) DO NOTHING
|
|
RETURNING code
|
|
""",
|
|
(code, marker, pct, COUPON_SLUGS, starts, expires),
|
|
)
|
|
r = cur.fetchone()
|
|
if r:
|
|
conn.commit()
|
|
LOG.info("[coupon] daily code %s (%d%% off, expires %s ET)",
|
|
code, pct, expires.isoformat())
|
|
return r[0]
|
|
except Exception:
|
|
conn.rollback()
|
|
raise RuntimeError("could not mint a unique daily coupon code")
|
|
|
|
|
|
def get_or_create_daily_coupons(conn, send_date: date) -> dict[int, str]:
|
|
"""Mint (or reuse) every coupon arm for `send_date`.
|
|
|
|
Returns a mapping of percent -> code. With no A/B test configured this is a
|
|
single arm {COUPON_PCT: code}; with CAMPAIGN_COUPON_AB_PCTS="20,30,40" it
|
|
returns one code per percent so recipients can be split across arms.
|
|
|
|
A percent of 0 is a valid FULL-PRICE control arm: no code is minted (the map
|
|
value is ""), so carriers bucketed into it see the normal price and no coupon,
|
|
while still being deterministically assigned (and thus measurable) by email
|
|
hash. Example: CAMPAIGN_COUPON_AB_PCTS="20,30,0".
|
|
"""
|
|
pcts = list(COUPON_AB_PCTS) if COUPON_AB_PCTS else [COUPON_PCT]
|
|
out: dict[int, str] = {}
|
|
for pct in pcts:
|
|
out[pct] = "" if pct <= 0 else get_or_create_daily_coupon(conn, send_date, pct)
|
|
return out
|
|
|
|
|
|
def pick_coupon_for_email(email: str, daily_coupons: dict[int, str] | None) -> tuple[str, str]:
|
|
"""Deterministically assign a carrier to one coupon arm by a stable hash of
|
|
their email. Returns (code, pct_str); ("", "") when coupons are off OR when
|
|
the carrier is bucketed into a full-price control arm (pct 0, code "").
|
|
|
|
The hash makes the split even and *stable*: the same carrier always lands in
|
|
the same arm across re-sends, so an A/B comparison isn't polluted by a carrier
|
|
seeing 20% one day and 40% the next. A full-price (0%) arm is returned as
|
|
("", "") so the email renders the normal-price branch, identical to a no-coupon
|
|
send — but the carrier is still deterministically in that arm, so re-hashing
|
|
a converter's email recovers which arm they were in for attribution.
|
|
"""
|
|
if not daily_coupons:
|
|
return "", ""
|
|
pcts = sorted(daily_coupons.keys())
|
|
if len(pcts) == 1:
|
|
pct = pcts[0]
|
|
else:
|
|
h = hashlib.sha256((email or "").strip().lower().encode()).hexdigest()
|
|
pct = pcts[int(h, 16) % len(pcts)]
|
|
code = daily_coupons[pct]
|
|
if not code: # full-price control arm: no code, no deal
|
|
return "", ""
|
|
return code, str(pct)
|
|
|
|
|
|
def coupon_attribs(coupon_code: str | None, coupon_pct: str | None = None) -> dict:
|
|
"""Merge fields for the same-day deal, blank when no coupon is active.
|
|
|
|
`coupon_pct` is passed per-recipient during an A/B test so the advertised
|
|
percent matches the arm's actual code; it falls back to the global COUPON_PCT
|
|
for single-arm sends.
|
|
"""
|
|
if not coupon_code:
|
|
return {"coupon_code": "", "coupon_pct": "", "coupon_expires": ""}
|
|
return {
|
|
"coupon_code": coupon_code,
|
|
"coupon_pct": coupon_pct or str(COUPON_PCT),
|
|
# Human-readable cutoff for the email body.
|
|
"coupon_expires": "11:59 PM ET tonight",
|
|
}
|
|
|
|
|
|
def lp_link_with_coupon(campaign_type: str, phy_state: str | None,
|
|
coupon_code: str | None, dot: str | None = None) -> str:
|
|
"""Order landing-page URL as the email's `lp_link` attrib.
|
|
|
|
Always emits a `?`-started query (carrying the carrier's DOT, plus the daily
|
|
`code=` when a coupon is active) so that EVERY template can safely append its
|
|
own params with a leading `&`. This eliminates a class of broken-CTA bugs:
|
|
previously `build_lp_link()` returned a bare path, so a template that wrote
|
|
`{{ lp_link }}&utm_source=...` produced `/order/slug&utm_source=...` (an
|
|
invalid URL that 404s), and one that wrote `{{ lp_link }}?dot=...` double-`?`d
|
|
once a coupon added its own `?code=`. With the query owned here, both template
|
|
styles converge on `{{ lp_link }}&utm_source=...` and are correct whether or
|
|
not the coupon is on.
|
|
"""
|
|
url = build_lp_link(campaign_type, phy_state)
|
|
params = []
|
|
if dot:
|
|
params.append(f"dot={dot}")
|
|
if coupon_code:
|
|
params.append(f"code={coupon_code}")
|
|
if params:
|
|
url = f"{url}?" + "&".join(params)
|
|
return url
|
|
|
|
# ── TZ config: tz_key -> (states, send_hour_utc) ─────────────────────────────
|
|
# 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local
|
|
TIMEZONE_CONFIG = {
|
|
"ET": {
|
|
"states": ("CT","DC","DE","FL","GA","IN","KY","MA","MD","ME","MI",
|
|
"NH","NJ","NY","NC","OH","PA","RI","SC","TN","VA","VT","WV"),
|
|
"send_hour_utc": 9, # 4 AM EST
|
|
},
|
|
"CT": {
|
|
"states": ("AL","AR","IA","IL","KS","LA","MN","MO","MS","ND",
|
|
"NE","OK","SD","TX","WI"),
|
|
"send_hour_utc": 10, # 5 AM EST = 4 AM CST
|
|
},
|
|
"MT": {
|
|
"states": ("AZ","CO","ID","MT","NM","UT","WY"),
|
|
"send_hour_utc": 11, # 6 AM EST = 4 AM MST
|
|
},
|
|
"PT": {
|
|
"states": ("AK","CA","HI","NV","OR","WA"),
|
|
"send_hour_utc": 12, # 7 AM EST = 4 AM PST
|
|
},
|
|
}
|
|
|
|
# Owner email — test sends go here before each campaign is scheduled
|
|
TEST_EMAIL = os.getenv("CAMPAIGN_TEST_EMAIL", "carrierone@gmx.com")
|
|
REPLY_TO_EMAIL = os.getenv("CAMPAIGN_REPLY_TO", "info@performancewest.net")
|
|
# Listmonk applies campaign headers as `for hdr, val := range set { h.Add(hdr, val) }`
|
|
# (internal/manager/manager.go), i.e. each map's KEY is the literal header name.
|
|
# So the correct shape is {"Reply-To": value}; a {"name": ..., "value": ...} map
|
|
# would emit junk "name:"/"value:" headers and NO real Reply-To, silently sending
|
|
# replies to the From address (noreply@send.performancewest.net) instead. The
|
|
# healthcare builder already uses the correct shape; match it here.
|
|
REPLY_TO_HEADERS = [{"Reply-To": REPLY_TO_EMAIL}]
|
|
|
|
# Bulk From — sends from the dedicated bulk subdomain so its sending reputation
|
|
# is isolated from the root domain (which stays clean for transactional /
|
|
# verification mail). Replies still go to the root domain via Reply-To above, so
|
|
# the customer-facing reply experience is unchanged. See docs/deliverability.md.
|
|
FROM_EMAIL = os.getenv("CAMPAIGN_FROM", "Performance West <noreply@send.performancewest.net>")
|
|
|
|
# Which verification results are safe to SEND to. We key ONLY off
|
|
# email_verify_result, never the email_verified boolean: the verifier sets
|
|
# email_verified=TRUE optimistically for 'mx_unreachable' (domain exists but its
|
|
# mail server didn't answer the probe) — those addresses HARD-BOUNCE when we
|
|
# actually send, which is what tanked deliverability (≈47% bounce, half the list
|
|
# blocklisted). So 'mx_unreachable' and all error/reject results are excluded.
|
|
#
|
|
# Recovery mode (default ON while reputation is damaged): send ONLY 'smtp_valid'
|
|
# — addresses an MX explicitly accepted at RCPT time — plus 'send_confirmed'
|
|
# (addresses proven deliverable by a real burner-domain verification send; see
|
|
# docs/campaign-deliverability-plan.md). This drives the bounce rate to near-zero
|
|
# and rebuilds sender reputation. 'hard_bounced' is NEVER sendable.
|
|
#
|
|
# Catch-all domains (accept any RCPT at SMTP time, then may silently bounce
|
|
# later) are the big growth pool but the risky one, so they are gated by an
|
|
# AUTOMATIC rollout (see catch_all_enabled): once the IPs are warm AND the recent
|
|
# live bounce rate is provably low, they are added; if bounces spike they
|
|
# auto-revert. CAMPAIGN_INCLUDE_CATCH_ALL=1/0 hard-overrides the auto decision.
|
|
BASE_SENDABLE_RESULTS = ["smtp_valid", "send_confirmed"]
|
|
CATCH_ALL_RESULTS = ["catch_all_domain", "catch_all_detected"]
|
|
|
|
# ── Catch-all auto-rollout tunables ─────────────────────────────────────────
|
|
# Warmup day at/after which catch-all MAY auto-enable (rebuild reputation on the
|
|
# clean smtp_valid pool first). Independent of the big-MX axis: catch-all is
|
|
# dominated by long-tail business domains, and any catch-all address that also
|
|
# lands on Google/Microsoft is still held out by big_mx_exclude until day 30.
|
|
CATCH_ALL_MIN_WARMUP_DAY = int(os.getenv("CAMPAIGN_CATCH_ALL_MIN_DAY", "21"))
|
|
# Recent-window bounce-rate ceiling (percent). At/above this, catch-all stays OFF
|
|
# and an already-on rollout auto-reverts. A SHORT window is deliberate: a
|
|
# historical disaster (e.g. the Jun-16 ~45% 7-day rate) must NOT block the
|
|
# rollout forever, and a fresh spike must trip it fast.
|
|
CATCH_ALL_MAX_BOUNCE_PCT = float(os.getenv("CAMPAIGN_CATCH_ALL_MAX_BOUNCE_PCT", "8"))
|
|
CATCH_ALL_BOUNCE_WINDOW_DAYS = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_WINDOW_DAYS", "2"))
|
|
# Minimum sent volume required in the window before the rate is trusted (else a
|
|
# tiny sample like 9 sent / 1 bounce = 11% would wrongly gate the decision).
|
|
CATCH_ALL_BOUNCE_MIN_SENT = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_MIN_SENT", "300"))
|
|
|
|
DB_URL = os.getenv("DATABASE_URL", "")
|
|
WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start")
|
|
|
|
|
|
def _listmonk_db_url() -> str:
|
|
"""Derive the listmonk DB URL from DATABASE_URL (same Postgres, diff db).
|
|
|
|
Bounce/sent counts live in the listmonk DB, while the campaign builder's
|
|
DB_URL points at the `performancewest` app DB on the SAME Postgres server.
|
|
"""
|
|
override = os.getenv("LISTMONK_DATABASE_URL")
|
|
if override:
|
|
return override
|
|
base = DB_URL or os.getenv("DATABASE_URL", "")
|
|
if "/" in base:
|
|
return base.rsplit("/", 1)[0] + "/listmonk"
|
|
return base
|
|
|
|
|
|
def recent_bounce_rate(window_days: int) -> tuple[float | None, int, int]:
|
|
"""Live campaign bounce rate over the last `window_days`.
|
|
|
|
Returns (rate_pct_or_None, sent, bounced). rate is None when sent==0 (no
|
|
signal). Only campaigns that actually ran in the window are counted, and
|
|
bounces are joined on campaign_id (≈99% populated for the real-time postfix
|
|
source), so a long-past disaster cannot poison a short recent window.
|
|
"""
|
|
try:
|
|
conn = psycopg2.connect(_listmonk_db_url())
|
|
except Exception as exc: # pragma: no cover - infra dependent
|
|
LOG.warning("catch-all guardrail: cannot reach listmonk DB (%s); "
|
|
"treating bounce rate as UNKNOWN (fail-closed)", exc)
|
|
return None, 0, 0
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
SELECT COALESCE(SUM(c.sent), 0),
|
|
COALESCE(SUM(b.n), 0)
|
|
FROM campaigns c
|
|
LEFT JOIN (
|
|
SELECT campaign_id, count(*) AS n
|
|
FROM bounces
|
|
WHERE campaign_id IS NOT NULL
|
|
GROUP BY campaign_id
|
|
) b ON b.campaign_id = c.id
|
|
WHERE COALESCE(c.started_at, c.created_at)
|
|
> now() - make_interval(days => %s)
|
|
AND c.status IN ('finished', 'running')
|
|
""",
|
|
(window_days,),
|
|
)
|
|
sent, bounced = cur.fetchone()
|
|
sent, bounced = int(sent), int(bounced)
|
|
finally:
|
|
conn.close()
|
|
rate = (100.0 * bounced / sent) if sent else None
|
|
return rate, sent, bounced
|
|
|
|
|
|
def catch_all_enabled() -> bool:
|
|
"""Decide whether catch-all domains are sendable on THIS run.
|
|
|
|
Auto-rollout (no env needed):
|
|
1. IPs warm enough -> warmup_day() >= CATCH_ALL_MIN_WARMUP_DAY
|
|
2. recent bounce rate low -> over CATCH_ALL_BOUNCE_WINDOW_DAYS, with at
|
|
least CATCH_ALL_BOUNCE_MIN_SENT sent for a
|
|
trustworthy sample, the rate is BELOW
|
|
CATCH_ALL_MAX_BOUNCE_PCT.
|
|
If bounces later spike above the ceiling, this returns False again on the
|
|
next run -> the rollout auto-reverts to the clean smtp_valid pool.
|
|
|
|
CAMPAIGN_INCLUDE_CATCH_ALL hard-overrides: '1'/'true' forces ON (manual
|
|
decision, skips guardrail), '0'/'false' forces OFF.
|
|
"""
|
|
override = os.getenv("CAMPAIGN_INCLUDE_CATCH_ALL")
|
|
if override is not None:
|
|
forced = override.strip().lower() not in ("0", "false", "")
|
|
LOG.info("catch-all: forced %s via CAMPAIGN_INCLUDE_CATCH_ALL=%r",
|
|
"ON" if forced else "OFF", override)
|
|
return forced
|
|
|
|
day = warmup_day()
|
|
if day is None or day < CATCH_ALL_MIN_WARMUP_DAY:
|
|
LOG.info("catch-all: OFF (warmup day %s < min %s)",
|
|
day, CATCH_ALL_MIN_WARMUP_DAY)
|
|
return False
|
|
|
|
rate, sent, bounced = recent_bounce_rate(CATCH_ALL_BOUNCE_WINDOW_DAYS)
|
|
if rate is None or sent < CATCH_ALL_BOUNCE_MIN_SENT:
|
|
# Not enough recent signal to trust -> fail closed (stay on clean pool).
|
|
LOG.info("catch-all: OFF (insufficient recent signal: %s sent < min %s "
|
|
"over %sd; need a low proven bounce rate first)",
|
|
sent, CATCH_ALL_BOUNCE_MIN_SENT, CATCH_ALL_BOUNCE_WINDOW_DAYS)
|
|
return False
|
|
if rate >= CATCH_ALL_MAX_BOUNCE_PCT:
|
|
LOG.warning("catch-all: OFF (recent bounce rate %.2f%% >= ceiling %.2f%% "
|
|
"over %sd; %s sent / %s bounced) -- auto-reverting to clean pool",
|
|
rate, CATCH_ALL_MAX_BOUNCE_PCT, CATCH_ALL_BOUNCE_WINDOW_DAYS,
|
|
sent, bounced)
|
|
return False
|
|
|
|
LOG.info("catch-all: ON (warmup day %s >= %s; recent bounce %.2f%% < %.2f%% "
|
|
"over %sd; %s sent / %s bounced)",
|
|
day, CATCH_ALL_MIN_WARMUP_DAY, rate, CATCH_ALL_MAX_BOUNCE_PCT,
|
|
CATCH_ALL_BOUNCE_WINDOW_DAYS, sent, bounced)
|
|
return True
|
|
|
|
|
|
def usable_filter() -> str:
|
|
"""SQL predicate for email_verify_result values that are safe to send to.
|
|
|
|
Always includes the clean pool (smtp_valid + send_confirmed); adds catch-all
|
|
results only when catch_all_enabled() says so (warm IPs + low live bounces).
|
|
The decision is memoized so it is computed ONCE per build run (one DB probe,
|
|
one log line, and a consistent filter across every segment/timezone).
|
|
"""
|
|
global _USABLE_FILTER_CACHE
|
|
if _USABLE_FILTER_CACHE is None:
|
|
results = list(BASE_SENDABLE_RESULTS)
|
|
if catch_all_enabled():
|
|
results += CATCH_ALL_RESULTS
|
|
_USABLE_FILTER_CACHE = (
|
|
"email_verify_result IN (" + ", ".join(f"'{r}'" for r in results) + ")"
|
|
)
|
|
return _USABLE_FILTER_CACHE
|
|
|
|
|
|
_USABLE_FILTER_CACHE: str | None = None
|
|
|
|
|
|
def warmup_day() -> int | None:
|
|
"""Return days since MTA warmup start, or None if not configured/readable."""
|
|
try:
|
|
with open(WARMUP_START_FILE, "r", encoding="utf-8") as fh:
|
|
start = int(fh.read().strip())
|
|
return max(0, int((time.time() - start) // 86400))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def warmup_daily_queue_cap(day: int | None = None) -> int | None:
|
|
"""Daily campaign-builder queue cap aligned with Listmonk rampcap targets.
|
|
|
|
This prevents the builder from scheduling 10k-20k recipients/day while
|
|
Listmonk is throttled to only 500-3,000/day, which would create a massive
|
|
backlog that later drains unpredictably. The numbers match the runbook's
|
|
intended daily warmup totals, not the theoretical 24-hour sliding-window max.
|
|
"""
|
|
if day is None:
|
|
day = warmup_day()
|
|
if day is None:
|
|
return None
|
|
if day <= 1:
|
|
return 500
|
|
if day <= 3:
|
|
return 1500
|
|
if day <= 6:
|
|
return 2500
|
|
return 3000
|
|
|
|
|
|
def lm_api(path: str, data: dict | None = None, method: str | None = None):
|
|
headers = {"Content-Type": "application/json", "Authorization": f"Basic {_LM_AUTH}"}
|
|
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
|
|
if data is not None:
|
|
req.data = json.dumps(data).encode()
|
|
if method:
|
|
req.method = method
|
|
try:
|
|
return json.loads(urllib.request.urlopen(req, timeout=30).read())
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()[:300]
|
|
raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}")
|
|
|
|
|
|
_LM_SUPPRESSION_CACHE: dict[str, tuple[bool, str]] = {}
|
|
|
|
|
|
def _lm_email_query(email: str) -> str:
|
|
"""Return a Listmonk subscriber query for an exact email address."""
|
|
# Listmonk's query language uses single-quoted strings. Email addresses should
|
|
# not contain quotes, but escape defensively so a bad source row cannot break
|
|
# the query or broaden it.
|
|
safe = email.lower().strip().replace("'", "''")
|
|
return f"email = '{safe}'"
|
|
|
|
|
|
def listmonk_sendable(email: str) -> tuple[bool, str]:
|
|
"""Return whether Listmonk will send to this address, with reason.
|
|
|
|
SQL eligibility is not enough: an address may already exist in Listmonk as
|
|
disabled, blocklisted, bounced, or unsubscribed. If we add those rows to a
|
|
fresh campaign list, Listmonk filters them at send time, producing a smaller
|
|
`to_send` count than the builder's daily warmup allocation. Check status at
|
|
build time and fetch replacements so each campaign list is filled with
|
|
actually-sendable subscribers.
|
|
"""
|
|
key = email.lower().strip()
|
|
if not key:
|
|
return False, "blank_email"
|
|
if key in _LM_SUPPRESSION_CACHE:
|
|
return _LM_SUPPRESSION_CACHE[key]
|
|
try:
|
|
res = lm_api(
|
|
"/subscribers?" + urllib.parse.urlencode({
|
|
"query": _lm_email_query(key),
|
|
"per_page": 1,
|
|
})
|
|
)
|
|
results = res.get("data", {}).get("results", [])
|
|
if not results:
|
|
out = (True, "new")
|
|
else:
|
|
sub = results[0]
|
|
status = (sub.get("status") or "").lower()
|
|
if status != "enabled":
|
|
out = (False, f"listmonk_status:{status or 'unknown'}")
|
|
else:
|
|
bad_list_status = ""
|
|
for lst in sub.get("lists") or []:
|
|
lstatus = (lst.get("subscription_status") or "").lower()
|
|
if lstatus in {"unsubscribed", "unconfirmed"}:
|
|
bad_list_status = lstatus
|
|
break
|
|
if bad_list_status:
|
|
out = (False, f"list_subscription_status:{bad_list_status}")
|
|
else:
|
|
out = (True, "enabled")
|
|
except Exception as exc: # noqa: BLE001
|
|
# Do not silently underfill campaigns because Listmonk's lookup endpoint
|
|
# hiccupped. Let the caller/logs surface the problem; treating it as not
|
|
# sendable is safer than scheduling a suppressed address and marking it
|
|
# sent in our source DB.
|
|
out = (False, f"lookup_error:{exc}")
|
|
_LM_SUPPRESSION_CACHE[key] = out
|
|
return out
|
|
|
|
|
|
def get_base_campaign(campaign_id: int) -> dict:
|
|
return lm_api(f"/campaigns/{campaign_id}")["data"]
|
|
|
|
|
|
def create_list(name: str) -> int:
|
|
result = lm_api("/lists", {
|
|
"name": name,
|
|
"type": "private",
|
|
"optin": "single",
|
|
"tags": ["trucking", "auto"],
|
|
}, "POST")
|
|
return result["data"]["id"]
|
|
|
|
|
|
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
|
|
"""Create/attach a single subscriber to a list. Returns True on success.
|
|
|
|
Listmonk's POST /api/subscribers takes one subscriber with `lists` (array of
|
|
list IDs) and preconfirm_subscriptions=true so they're immediately sendable.
|
|
"""
|
|
try:
|
|
lm_api("/subscribers", {
|
|
"email": email,
|
|
"name": name or email,
|
|
"status": "enabled",
|
|
"lists": [list_id],
|
|
"attribs": attribs,
|
|
"preconfirm_subscriptions": True,
|
|
}, "POST")
|
|
return True
|
|
except Exception as exc:
|
|
msg = str(exc)
|
|
# Already exists — attach to this list instead of failing
|
|
if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg:
|
|
try:
|
|
subs = lm_api(
|
|
"/subscribers?"
|
|
+ urllib.parse.urlencode({
|
|
"query": f"email = '{email.lower().strip()}'",
|
|
"per_page": 1,
|
|
})
|
|
)
|
|
results = subs.get("data", {}).get("results", [])
|
|
if results:
|
|
status = (results[0].get("status") or "").lower()
|
|
if status != "enabled":
|
|
LOG.info("Listmonk suppressed existing subscriber %s status=%s", email, status or "unknown")
|
|
return False
|
|
sub_id = results[0]["id"]
|
|
# Refresh attribs so per-campaign merge fields (coupon code,
|
|
# ifta_due_date, lp_link, etc.) are correct for THIS send --
|
|
# otherwise a previously-imported carrier keeps stale attribs
|
|
# and the new campaign renders blank fields.
|
|
try:
|
|
merged = dict(results[0].get("attribs") or {})
|
|
merged.update(attribs or {})
|
|
lm_api(f"/subscribers/{sub_id}", {
|
|
"email": email,
|
|
"name": name or results[0].get("name") or email,
|
|
"status": "enabled",
|
|
"attribs": merged,
|
|
}, "PUT")
|
|
except Exception:
|
|
pass # non-fatal: still attach to the list below
|
|
lm_api("/subscribers/lists", {
|
|
"ids": [sub_id],
|
|
"action": "add",
|
|
"target_list_ids": [list_id],
|
|
"status": "confirmed",
|
|
}, "PUT")
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
|
|
def import_subscribers(list_id: int, subscribers: list[dict]) -> int:
|
|
"""Add subscribers to a list one-by-one. Returns count successfully added."""
|
|
added = 0
|
|
for sub in subscribers:
|
|
if add_subscriber(list_id, sub["email"], sub.get("name", ""), sub.get("attribs", {})):
|
|
added += 1
|
|
return added
|
|
|
|
|
|
def _altbody_for(base: dict, body: str | None = None) -> str:
|
|
"""Plaintext (text/plain) part for a campaign.
|
|
|
|
Listmonk only emits multipart/alternative when altbody is set; HTML-only
|
|
mail is a spam-score signal. The source/base campaigns have no altbody, so
|
|
derive one from the HTML body. `body` overrides base["body"] for test sends
|
|
where merge fields were already substituted.
|
|
"""
|
|
existing = (base.get("altbody") or "").strip()
|
|
if existing:
|
|
return existing
|
|
html = body if body is not None else base.get("body", "")
|
|
return html_to_text(html)
|
|
|
|
|
|
def create_and_schedule_campaign(
|
|
base: dict,
|
|
list_id: int,
|
|
name: str,
|
|
send_at_utc: datetime,
|
|
schedule: bool = True,
|
|
) -> int:
|
|
payload = {
|
|
"name": name,
|
|
"subject": base["subject"],
|
|
"lists": [list_id],
|
|
"from_email": FROM_EMAIL,
|
|
"type": "regular",
|
|
"content_type": base["content_type"],
|
|
"body": base["body"],
|
|
"altbody": _altbody_for(base),
|
|
"template_id": base["template_id"],
|
|
"tags": base.get("tags") or [],
|
|
"messenger": base.get("messenger") or "email",
|
|
"headers": base.get("headers") or REPLY_TO_HEADERS,
|
|
}
|
|
if schedule:
|
|
payload["send_at"] = send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00")
|
|
result = lm_api("/campaigns", payload, "POST")
|
|
cid = result["data"]["id"]
|
|
if schedule:
|
|
lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT")
|
|
# else: leave as draft (preview mode)
|
|
return cid
|
|
|
|
|
|
def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: str,
|
|
campaign_type: str) -> None:
|
|
"""Send one test email so the owner can approve before the real blast goes out."""
|
|
# sample_row is (dot, email, name, phy_state, target_state). Older callers may
|
|
# still pass a 4-tuple (dot, email, name, state); handle both.
|
|
dot, email, name, phy_state = sample_row[0], sample_row[1], sample_row[2], sample_row[3]
|
|
target_state = sample_row[4] if len(sample_row) > 4 else phy_state
|
|
state = phy_state
|
|
body = base["body"]
|
|
body = body.replace("{{ .Subscriber.Attribs.company }}", name or "Sample Carrier LLC")
|
|
body = body.replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
|
|
body = body.replace("{{ .Subscriber.Attribs.state }}", state or "TX")
|
|
# Real subscribers get a populated lp_link attrib; the test send must mirror
|
|
# that or the CTA button (e.g. "Check My Emissions Status") renders as a bare
|
|
# "?dot=..." that links to nowhere. Build the same link the audience gets,
|
|
# using the target_state (the state the offer applies to, which for per-state
|
|
# programs comes from the deficiency flag, not the base state).
|
|
body = body.replace("{{ .Subscriber.Attribs.lp_link }}",
|
|
build_lp_link(campaign_type, target_state))
|
|
# NOTE: leave {{ UnsubscribeURL }} alone — Listmonk renders it into a real,
|
|
# working per-subscriber unsubscribe link (even on test sends). Overwriting it
|
|
# produced a dead /unsubscribe link with no subscriber identity.
|
|
subj = base["subject"].replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000")
|
|
# Listmonk /test needs `lists` as an array of IDs (base["lists"] is objects)
|
|
list_ids = [l["id"] for l in base.get("lists", []) if isinstance(l, dict)] or [1]
|
|
payload = {
|
|
"name": base.get("name", "Test"), "subject": subj,
|
|
"lists": list_ids, "from_email": FROM_EMAIL,
|
|
"type": "regular", "content_type": base["content_type"],
|
|
"body": body, "altbody": _altbody_for(base, body),
|
|
"template_id": base["template_id"],
|
|
"tags": base.get("tags") or [], "messenger": base.get("messenger") or "email",
|
|
"headers": base.get("headers") or REPLY_TO_HEADERS,
|
|
"subscribers": [TEST_EMAIL],
|
|
}
|
|
try:
|
|
lm_api(f"/campaigns/{campaign_id}/test", payload, "POST")
|
|
LOG.info("[%s/%s] Test sent to %s (sample: %s DOT#%s)", tz, label, TEST_EMAIL, name, dot)
|
|
except Exception as exc:
|
|
LOG.warning("[%s/%s] Test send failed: %s", tz, label, exc)
|
|
|
|
|
|
def campaign_type_filter(campaign_type: str) -> str:
|
|
if campaign_type == "mcs150":
|
|
return "mcs150_parsed < NOW() - INTERVAL '2 years'"
|
|
if campaign_type == "inactive":
|
|
return "oos_active IS TRUE"
|
|
if campaign_type in DEFICIENCY_SEGMENTS:
|
|
return DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"]
|
|
raise ValueError(f"unknown campaign_type {campaign_type!r}")
|
|
|
|
|
|
def campaign_weight(campaign_type: str) -> float:
|
|
"""Warmup allocation weight by campaign specificity.
|
|
|
|
Highly specific deficiency copy should generally get more early warmup share
|
|
than generic overdue/inactive notices because it is more relevant and less
|
|
repetitive at the mailbox-provider/content-fingerprint level.
|
|
"""
|
|
if campaign_type == "mcs150":
|
|
return 0.75
|
|
if campaign_type == "inactive":
|
|
return 0.65
|
|
return 1.25
|
|
|
|
|
|
def count_carriers(conn, tz_states: tuple, campaign_type: str, limit: int | None = None) -> int:
|
|
cur = conn.cursor()
|
|
states_placeholder = ",".join(["%s"] * len(tz_states))
|
|
type_filter = campaign_type_filter(campaign_type)
|
|
cur.execute(f"""
|
|
SELECT count(*)
|
|
FROM (
|
|
SELECT 1
|
|
FROM fmcsa_carriers
|
|
WHERE {type_filter}
|
|
AND {usable_filter()}
|
|
AND listmonk_sent_at IS NULL
|
|
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
|
|
AND phy_state IN ({states_placeholder})
|
|
LIMIT %s
|
|
) s
|
|
""", [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit or 10_000_000])
|
|
return int(cur.fetchone()[0])
|
|
|
|
|
|
def allocate_daily_budget(candidates: list[dict], daily_cap: int | None) -> dict[tuple[str, str], int]:
|
|
"""Allocate daily cap across (timezone, campaign_type) by weighted audience.
|
|
|
|
The allocation is proportional to each slot's eligible lead count and campaign
|
|
specificity weight, capped by the slot's audience/limit. This avoids sending
|
|
the same number from a tiny hazmat segment and a massive MCS-150 segment while
|
|
still preserving body-text variety during warmup.
|
|
"""
|
|
positive = [c for c in candidates if c["audience"] > 0]
|
|
if daily_cap is None:
|
|
return {(c["tz"], c["campaign_type"]): c["audience"] for c in candidates}
|
|
remaining_cap = min(daily_cap, sum(c["audience"] for c in positive))
|
|
quotas = {(c["tz"], c["campaign_type"]): 0 for c in candidates}
|
|
remaining = { (c["tz"], c["campaign_type"]): c["audience"] for c in positive }
|
|
weighted = { (c["tz"], c["campaign_type"]): c["audience"] * c["weight"] for c in positive }
|
|
|
|
# Repeated largest-remainder passes handle slots that hit their audience cap.
|
|
while remaining_cap > 0 and remaining:
|
|
total_weight = sum(weighted[k] for k in remaining)
|
|
if total_weight <= 0:
|
|
break
|
|
raw = {k: remaining_cap * (weighted[k] / total_weight) for k in remaining}
|
|
assigned_this_pass = 0
|
|
remainders: list[tuple[float, tuple[str, str]]] = []
|
|
for k, val in raw.items():
|
|
add = min(remaining[k], int(math.floor(val)))
|
|
quotas[k] += add
|
|
remaining[k] -= add
|
|
assigned_this_pass += add
|
|
if remaining[k] > 0:
|
|
remainders.append((val - math.floor(val), k))
|
|
leftover = remaining_cap - assigned_this_pass
|
|
for _, k in sorted(remainders, reverse=True):
|
|
if leftover <= 0:
|
|
break
|
|
if remaining.get(k, 0) <= 0:
|
|
continue
|
|
quotas[k] += 1
|
|
remaining[k] -= 1
|
|
assigned_this_pass += 1
|
|
leftover -= 1
|
|
remaining_cap -= assigned_this_pass
|
|
remaining = {k: v for k, v in remaining.items() if v > 0}
|
|
if assigned_this_pass == 0:
|
|
break
|
|
return quotas
|
|
|
|
|
|
def fetch_carriers(
|
|
conn,
|
|
tz_states: tuple,
|
|
campaign_type: str,
|
|
limit: int,
|
|
offset: int = 0,
|
|
) -> list[tuple]:
|
|
"""Return (dot_number, email_address, legal_name, phy_state, target_state) rows.
|
|
|
|
target_state is the state the offer applies to. For most segments that is
|
|
the carrier's base (phy_state). For per-state programs (state_weight_tax,
|
|
state_emissions) the relevant state is encoded in the deficiency flag suffix
|
|
(e.g. 'state_weight_tax_OR'), which can differ from the base state, so we
|
|
pull it out of the flag so the CTA links to the correct state's order page.
|
|
"""
|
|
cur = conn.cursor()
|
|
states_placeholder = ",".join(["%s"] * len(tz_states))
|
|
type_filter = campaign_type_filter(campaign_type)
|
|
|
|
# target_state expression: pull the state suffix out of the matching flag
|
|
# for per-state programs, otherwise fall back to the base (phy_state).
|
|
if campaign_type in ("state_weight_tax", "state_emissions"):
|
|
prefix = f"{campaign_type}_"
|
|
target_state_sql = (
|
|
"COALESCE((SELECT upper(substr(f, %s)) "
|
|
"FROM unnest(deficiency_flags) f "
|
|
f"WHERE f LIKE '{campaign_type}_%%' LIMIT 1), phy_state)"
|
|
)
|
|
target_state_params = [len(prefix) + 1]
|
|
else:
|
|
target_state_sql = "phy_state"
|
|
target_state_params = []
|
|
|
|
# During warmup, exclude carriers on the big operators that throttle/blocklist
|
|
# (Google, Microsoft, etc.) AND the consumer mailbox operators behind the
|
|
# "mx:" prefix (Yahoo Small Business, iCloud custom domains, legacy ISPs) --
|
|
# mx_provider is set by mx_tag_carriers.py. Untagged carriers (mx_provider IS
|
|
# NULL) are kept: the per-MX throttle in the selector still bounds them, and
|
|
# excluding NULLs would starve the pool until tagging completes.
|
|
big_mx_exclude = ""
|
|
if MAIN_SKIP_BIG_MX and main_warmup_day() <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY:
|
|
ops = ",".join("'%s'" % o for o in WARMUP_EXCLUDE_OPERATORS)
|
|
big_mx_exclude = f"AND (mx_provider IS NULL OR mx_provider NOT IN ({ops}))"
|
|
|
|
cur.execute(f"""
|
|
SELECT dot_number, email_address, legal_name, phy_state,
|
|
{target_state_sql} AS target_state, mx_provider
|
|
FROM fmcsa_carriers
|
|
WHERE {type_filter}
|
|
AND {usable_filter()}
|
|
AND listmonk_sent_at IS NULL
|
|
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
|
|
AND phy_state IN ({states_placeholder})
|
|
{big_mx_exclude}
|
|
ORDER BY mcs150_parsed ASC NULLS LAST
|
|
LIMIT %s OFFSET %s
|
|
""", target_state_params + [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit, offset])
|
|
return cur.fetchall()
|
|
|
|
|
|
def select_sendable_carriers(
|
|
conn,
|
|
tz_states: tuple,
|
|
campaign_type: str,
|
|
quota: int,
|
|
max_scan: int | None = None,
|
|
) -> tuple[list[tuple], dict[str, int]]:
|
|
"""Fetch rows until `quota` Listmonk-sendable carriers are found.
|
|
|
|
Returns (rows, suppression_counts). Rows that are disabled/blocklisted/etc.
|
|
in Listmonk are skipped and not marked sent in Postgres.
|
|
"""
|
|
selected: list[tuple] = []
|
|
skipped: dict[str, int] = {}
|
|
seen_emails: set[str] = set()
|
|
offset = 0
|
|
# Per-MX throttle: cap how many of THIS run's quota go to each receiving
|
|
# operator so we never concentrate on one (the Jun 13-14 Gmail/Outlook storm).
|
|
caps = mx_daily_caps(main_warmup_day())
|
|
per_op: dict = {}
|
|
default_cap = caps.get("__default__", 50)
|
|
# Untagged (NULL mx_provider) safety cap. We can't exclude NULLs (the big-MX
|
|
# exclusion is MX-based, so an untagged Google/Yahoo domain would slip through),
|
|
# but we also shouldn't let a flood of freshly-imported, never-resolved domains
|
|
# dominate a run -- some are big/consumer operators we'd otherwise hold out.
|
|
# The pw-mx-tag cron drains the *sendable* untagged backlog fast (only ~3k
|
|
# distinct verified domains as of 2026-06-20, < one 20k/day run), so this is a
|
|
# between-runs safety net, not the primary gate. Generous enough to never starve
|
|
# the pool in normal operation. Tunable via MAIN_UNTAGGED_MX_CAP.
|
|
untagged_cap = int(os.getenv("MAIN_UNTAGGED_MX_CAP", str(max(quota, 200))))
|
|
untagged_used = 0
|
|
MX_IDX = 5 # mx_provider is the 6th column from fetch_carriers
|
|
# Warmup caps are small, but old audiences can contain many prior bounces or
|
|
# unsubscribes. Scan beyond the quota while still bounding worst-case API calls.
|
|
scan_limit = max_scan or max(quota * 8, quota + 500, 1000)
|
|
batch_size = min(max(quota * 2, 100), 1000)
|
|
|
|
while len(selected) < quota and offset < scan_limit:
|
|
batch = fetch_carriers(conn, tz_states, campaign_type, batch_size, offset=offset)
|
|
if not batch:
|
|
break
|
|
offset += len(batch)
|
|
for row in batch:
|
|
email = (row[1] or "").lower().strip()
|
|
if email in seen_emails:
|
|
skipped["duplicate_email"] = skipped.get("duplicate_email", 0) + 1
|
|
continue
|
|
seen_emails.add(email)
|
|
# Per-MX-operator cap (reputation is per receiving operator).
|
|
# Tagged carriers are capped per operator; untagged (no mx_provider
|
|
# yet) are bounded by a single shared safety cap (untagged_cap) instead
|
|
# of being uncapped -- this stops a flood of unresolved domains (which
|
|
# could include big/consumer operators) from dominating a run, without
|
|
# starving the pool. The big-operator EXCLUSION in fetch_carriers keeps
|
|
# KNOWN Google/MS/consumer-MX out; the pw-mx-tag cron keeps NULL small.
|
|
prov = (row[MX_IDX] or "").strip().lower() if len(row) > MX_IDX else ""
|
|
if prov:
|
|
cap = caps.get(prov, default_cap)
|
|
if per_op.get(prov, 0) >= cap:
|
|
skipped[f"mx_cap:{prov}"] = skipped.get(f"mx_cap:{prov}", 0) + 1
|
|
continue
|
|
else:
|
|
if untagged_used >= untagged_cap:
|
|
skipped["mx_cap:untagged"] = skipped.get("mx_cap:untagged", 0) + 1
|
|
continue
|
|
ok, reason = listmonk_sendable(email)
|
|
if not ok:
|
|
skipped[reason] = skipped.get(reason, 0) + 1
|
|
continue
|
|
if prov:
|
|
per_op[prov] = per_op.get(prov, 0) + 1
|
|
else:
|
|
untagged_used += 1
|
|
selected.append(row)
|
|
if len(selected) >= quota:
|
|
break
|
|
if len(batch) < batch_size:
|
|
break
|
|
return selected, skipped
|
|
|
|
|
|
def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None:
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
UPDATE fmcsa_carriers
|
|
SET listmonk_sent_at = NOW(),
|
|
listmonk_campaign_type = %s
|
|
WHERE dot_number = ANY(%s::text[])
|
|
""", (campaign_type, dot_numbers))
|
|
conn.commit()
|
|
|
|
|
|
def list_segments(send_date: date) -> None:
|
|
"""Report deduped audience size per deficiency segment (no writes).
|
|
|
|
Verifiable Phase-5 metric: each segment selects a nonzero, deduped audience
|
|
and points at a valid order LP. Source-template configuration is shown so we
|
|
know which segments are ready to schedule.
|
|
"""
|
|
conn = psycopg2.connect(DB_URL)
|
|
print(f"\nDeficiency-flag segments (send date {send_date.isoformat()}):\n")
|
|
print(f"{'segment':<22}{'audience':>10} {'source':<10} landing page")
|
|
print("-" * 78)
|
|
grand_total = 0
|
|
for ctype, seg in DEFICIENCY_SEGMENTS.items():
|
|
total = 0
|
|
for tz_cfg in TIMEZONE_CONFIG.values():
|
|
rows = fetch_carriers(conn, tz_cfg["states"], ctype, seg["limit"])
|
|
total += len(rows)
|
|
grand_total += total
|
|
src = os.getenv(seg["source_env"])
|
|
src_str = src if src else "UNSET"
|
|
lp = build_lp_link(ctype, None)
|
|
print(f"{ctype:<22}{total:>10} {src_str:<10} {lp}")
|
|
print("-" * 78)
|
|
print(f"{'TOTAL':<22}{grand_total:>10}")
|
|
print("\n(UNSET source = template not yet created; segment is skipped by the "
|
|
"scheduled run until its CAMPAIGN_*_ID env is set.)\n")
|
|
conn.close()
|
|
|
|
|
|
def run(send_date: date, dry_run: bool = False, preview: bool = False,
|
|
max_per_segment: int | None = None, only_segments: set[str] | None = None,
|
|
send_hour_override: int | None = None, send_minute: int = 0,
|
|
stagger_minutes: int = 0, daily_cap: int | None = None,
|
|
warmup_cap: bool = True) -> None:
|
|
conn = psycopg2.connect(DB_URL)
|
|
|
|
# Mint (or reuse) the same-day coupon(s) for this send date so every campaign
|
|
# in the run shares the same expiring code(s). Preview/dry runs skip the write,
|
|
# and the daily deal is disabled by default (see COUPON_ENABLED) -- when off we
|
|
# send at normal price (empty coupon_code -> template's no-deal branch). When
|
|
# CAMPAIGN_COUPON_AB_PCTS is set we mint one code per percent arm and split
|
|
# recipients across them by a stable hash of their email.
|
|
daily_coupons: dict[int, str] | None = None
|
|
if COUPON_ENABLED and not dry_run and not preview:
|
|
try:
|
|
daily_coupons = get_or_create_daily_coupons(conn, send_date)
|
|
if COUPON_AB_PCTS:
|
|
LOG.info("[coupon] A/B test arms: %s",
|
|
", ".join(f"{p}%={c}" for p, c in sorted(daily_coupons.items())))
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.warning("[coupon] could not mint daily coupon(s): %s (sending without)", exc)
|
|
elif not COUPON_ENABLED:
|
|
LOG.info("[coupon] disabled (CAMPAIGN_ENABLE_COUPON unset) — sending at normal price")
|
|
|
|
base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID)
|
|
base_inactive = get_base_campaign(CAMPAIGN_INACTIVE_ID)
|
|
|
|
campaign_specs = [
|
|
("mcs150", base_mcs150, 2000, "MCS-150 Overdue"),
|
|
("inactive", base_inactive, 1000, "Inactive USDOT"),
|
|
]
|
|
|
|
# Add deficiency-flag segments whose Listmonk source template is configured.
|
|
for ctype, seg in DEFICIENCY_SEGMENTS.items():
|
|
src = os.getenv(seg["source_env"])
|
|
if not src:
|
|
LOG.info("[segment] %s skipped — %s not set (template not created)",
|
|
ctype, seg["source_env"])
|
|
continue
|
|
try:
|
|
base = get_base_campaign(int(src))
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.warning("[segment] %s skipped — source %s unusable: %s",
|
|
ctype, src, exc)
|
|
continue
|
|
campaign_specs.append((ctype, base, seg["limit"], seg["label"]))
|
|
|
|
# Optional warmup controls: restrict to specific segments and/or cap the
|
|
# per-segment audience so a fresh-IP day-0 send stays small.
|
|
if only_segments:
|
|
campaign_specs = [s for s in campaign_specs if s[0] in only_segments]
|
|
if not campaign_specs:
|
|
LOG.error("No campaign specs match --only-segment %s", sorted(only_segments))
|
|
conn.close()
|
|
return
|
|
if max_per_segment is not None:
|
|
campaign_specs = [
|
|
(ct, base, min(lim, max_per_segment), label)
|
|
for (ct, base, lim, label) in campaign_specs
|
|
]
|
|
|
|
if daily_cap is None and warmup_cap and not preview:
|
|
daily_cap = warmup_daily_queue_cap()
|
|
if daily_cap is not None:
|
|
LOG.info("Daily queue cap active: %d recipients total", daily_cap)
|
|
|
|
if preview:
|
|
LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, "
|
|
"test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL)
|
|
|
|
candidates: list[dict] = []
|
|
totals_by_campaign: dict[str, int] = {}
|
|
for tz, tz_cfg in TIMEZONE_CONFIG.items():
|
|
for campaign_type, _base, limit, label in campaign_specs:
|
|
audience = 1 if preview else count_carriers(conn, tz_cfg["states"], campaign_type, limit)
|
|
weight = campaign_weight(campaign_type)
|
|
candidates.append({
|
|
"tz": tz,
|
|
"states": tz_cfg["states"],
|
|
"campaign_type": campaign_type,
|
|
"limit": limit,
|
|
"label": label,
|
|
"audience": audience,
|
|
"weight": weight,
|
|
})
|
|
totals_by_campaign[campaign_type] = totals_by_campaign.get(campaign_type, 0) + audience
|
|
|
|
quotas = allocate_daily_budget(candidates, None if preview else daily_cap)
|
|
LOG.info("Eligible audience by SQL criterion: %s", ", ".join(
|
|
f"{ct}={n}" for ct, n in sorted(totals_by_campaign.items())
|
|
))
|
|
if daily_cap is not None and not preview:
|
|
planned_by_campaign: dict[str, int] = {}
|
|
for (tz, ctype), quota in quotas.items():
|
|
planned_by_campaign[ctype] = planned_by_campaign.get(ctype, 0) + quota
|
|
LOG.info("Planned allocation by criterion: %s (total=%d/%d)", ", ".join(
|
|
f"{ct}={n}" for ct, n in sorted(planned_by_campaign.items())
|
|
), sum(planned_by_campaign.values()), daily_cap)
|
|
|
|
scheduled_count = 0
|
|
queued_recipients = 0
|
|
|
|
for tz, tz_cfg in TIMEZONE_CONFIG.items():
|
|
states = tz_cfg["states"]
|
|
send_hour = send_hour_override if send_hour_override is not None else tz_cfg["send_hour_utc"]
|
|
send_at = datetime(
|
|
send_date.year, send_date.month, send_date.day,
|
|
send_hour, send_minute, 0, tzinfo=timezone.utc
|
|
)
|
|
|
|
for campaign_type, base, limit, label in campaign_specs:
|
|
if daily_cap is not None and queued_recipients >= daily_cap:
|
|
LOG.info("[%s/%s] Daily cap reached (%d/%d) — skipping remaining slots",
|
|
tz, campaign_type, queued_recipients, daily_cap)
|
|
continue
|
|
effective_send_at = send_at + timedelta(minutes=stagger_minutes * scheduled_count)
|
|
fetch_limit = 1 if preview else limit
|
|
if daily_cap is not None and not preview:
|
|
fetch_limit = min(fetch_limit, quotas.get((tz, campaign_type), 0))
|
|
if fetch_limit <= 0:
|
|
LOG.info("[%s/%s] Allocation is 0 — skipping", tz, campaign_type)
|
|
continue
|
|
if preview:
|
|
rows = fetch_carriers(conn, states, campaign_type, fetch_limit)
|
|
suppressed: dict[str, int] = {}
|
|
else:
|
|
rows, suppressed = select_sendable_carriers(conn, states, campaign_type, fetch_limit)
|
|
if suppressed:
|
|
LOG.info("[%s/%s] Runtime Listmonk suppression skipped %d rows: %s",
|
|
tz, campaign_type, sum(suppressed.values()), ", ".join(
|
|
f"{reason}={n}" for reason, n in sorted(suppressed.items())
|
|
))
|
|
if not rows:
|
|
LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type)
|
|
continue
|
|
|
|
actual = len(rows)
|
|
tag = "PREVIEW " if preview else ""
|
|
list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}"
|
|
campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}"
|
|
|
|
LOG.info("[%s/%s] %d sendable carriers -> %s (send %s UTC)",
|
|
tz, campaign_type, actual, campaign_name,
|
|
effective_send_at.strftime("%Y-%m-%d %H:%M"))
|
|
|
|
if dry_run:
|
|
LOG.info(" DRY RUN — skipping Listmonk + DB writes")
|
|
scheduled_count += 1
|
|
queued_recipients += actual
|
|
continue
|
|
|
|
# Build subscriber list. In preview, only the owner's address (with the
|
|
# sample carrier's attribs) so the real audience is never touched.
|
|
if preview:
|
|
r0 = rows[0]
|
|
p_code, p_pct = pick_coupon_for_email(TEST_EMAIL, daily_coupons)
|
|
subscribers = [{
|
|
"email": TEST_EMAIL,
|
|
"name": r0[2] or "Sample Carrier",
|
|
"attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or "",
|
|
"lp_link": lp_link_with_coupon(campaign_type, r0[4], p_code, dot=str(r0[0])),
|
|
**coupon_attribs(p_code, p_pct),
|
|
**discounted_price_attribs(campaign_type, r0[4], p_pct)},
|
|
}]
|
|
else:
|
|
subscribers = []
|
|
for row in rows:
|
|
c_code, c_pct = pick_coupon_for_email(row[1], daily_coupons)
|
|
subscribers.append({
|
|
"email": row[1],
|
|
"name": row[2] or row[1],
|
|
"attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or "",
|
|
"lp_link": lp_link_with_coupon(campaign_type, row[4], c_code, dot=str(row[0])),
|
|
**coupon_attribs(c_code, c_pct),
|
|
**discounted_price_attribs(campaign_type, row[4], c_pct)},
|
|
})
|
|
|
|
# Create list + add subscribers
|
|
list_id = create_list(list_name)
|
|
added = import_subscribers(list_id, subscribers)
|
|
LOG.info("[%s/%s] List %d: %d/%d subscribers added", tz, campaign_type, list_id, added, len(subscribers))
|
|
if added == 0:
|
|
LOG.error("[%s/%s] BUG: 0 subscribers added to list %d — skipping", tz, campaign_type, list_id)
|
|
continue
|
|
|
|
# Create campaign (draft in preview, scheduled in real run)
|
|
cid = create_and_schedule_campaign(base, list_id, campaign_name, effective_send_at, schedule=not preview)
|
|
LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid,
|
|
"draft/preview" if preview else f"scheduled {effective_send_at.isoformat()}")
|
|
scheduled_count += 1
|
|
queued_recipients += added
|
|
|
|
# Send a test to the owner so they can spot-check the rendered email
|
|
send_test(base, cid, rows[0], label, tz, campaign_type)
|
|
|
|
# Mark carriers as sent only on a real run
|
|
if not preview:
|
|
mark_sent(conn, [row[0] for row in rows], campaign_type)
|
|
|
|
conn.close()
|
|
LOG.info("Done. queued_recipients=%d%s", queued_recipients,
|
|
f" daily_cap={daily_cap}" if daily_cap is not None else "")
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
)
|
|
parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk")
|
|
parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes")
|
|
parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent")
|
|
parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)")
|
|
parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: today)")
|
|
parser.add_argument("--tomorrow", action="store_true",
|
|
help="Schedule for tomorrow instead of today. Use only for manual pre-builds.")
|
|
parser.add_argument("--max-per-segment", type=int, default=None,
|
|
help="Cap audience per (segment x timezone). Use for fresh-IP warmup small sends.")
|
|
parser.add_argument("--only-segment", action="append", default=None, metavar="SEG",
|
|
help="Restrict to specific segment(s), e.g. mcs150, inactive, for_hire_boc3. Repeatable.")
|
|
parser.add_argument("--send-hour", type=int, default=None,
|
|
help="Override send hour (UTC) for ALL timezones instead of per-tz defaults.")
|
|
parser.add_argument("--send-minute", type=int, default=0,
|
|
help="Send minute (UTC), used with --send-hour. Default 0.")
|
|
parser.add_argument("--stagger-minutes", type=int, default=0,
|
|
help="Add this many minutes between each created campaign. Useful for catchup sends.")
|
|
parser.add_argument("--daily-cap", type=int, default=None,
|
|
help="Hard cap on recipients queued by this run. Default: warmup-derived cap when available.")
|
|
parser.add_argument("--no-warmup-cap", action="store_true",
|
|
help="Disable automatic daily queue cap from /etc/postfix/pw-warmup-start.")
|
|
args = parser.parse_args()
|
|
|
|
if args.date and args.tomorrow:
|
|
parser.error("--date and --tomorrow are mutually exclusive")
|
|
|
|
if args.date:
|
|
send_date = date.fromisoformat(args.date)
|
|
elif args.tomorrow:
|
|
send_date = date.today() + timedelta(days=1)
|
|
else:
|
|
send_date = date.today()
|
|
|
|
if args.list_segments:
|
|
list_segments(send_date)
|
|
return
|
|
|
|
only = set(args.only_segment) if args.only_segment else None
|
|
LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s, "
|
|
"max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s, daily_cap=%s, warmup_cap=%s)",
|
|
send_date, args.dry_run, args.preview, args.max_per_segment,
|
|
sorted(only) if only else None, args.send_hour, args.stagger_minutes,
|
|
args.daily_cap, not args.no_warmup_cap)
|
|
run(send_date, dry_run=args.dry_run, preview=args.preview,
|
|
max_per_segment=args.max_per_segment, only_segments=only,
|
|
send_hour_override=args.send_hour, send_minute=args.send_minute,
|
|
stagger_minutes=args.stagger_minutes, daily_cap=args.daily_cap,
|
|
warmup_cap=not args.no_warmup_cap)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|