#!/usr/bin/env python3 """Daily trucking campaign builder. Runs nightly (3 AM EST) to create the next day's 8 Listmonk campaigns: - 4 TZ regions x MCS-150 overdue (2,000 carriers each, 4 AM / 5 AM / 6 AM / 7 AM EST) - 4 TZ regions x Inactive USDOT (1,000 carriers each, same schedule) Selection criteria: - email verified or catch-all (usable) - mcs150_parsed > 2 years ago (for MCS-150 campaign) - oos_active IS TRUE (for inactive USDOT campaign) - listmonk_sent_at IS NULL (not yet sent) - ordered by mcs150_parsed ASC (most overdue first) Usage: python3 scripts/build_trucking_campaigns.py python3 scripts/build_trucking_campaigns.py --dry-run python3 scripts/build_trucking_campaigns.py --date 2026-06-02 """ from __future__ import annotations import argparse import base64 import hashlib import json import logging import os import sys import urllib.request import urllib.parse import math import time from datetime import date, datetime, timedelta, timezone import psycopg2 # Allow both supported invocation styles: # python -m scripts.build_trucking_campaigns # python3 scripts/build_trucking_campaigns.py ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT not in sys.path: sys.path.insert(0, ROOT) from scripts._email_exclusions import BLOCKED_EMAIL_DOMAINS from scripts._email_plaintext import html_to_text LOG = logging.getLogger("build_trucking_campaigns") # ── Listmonk ────────────────────────────────────────────────────────────────── LISTMONK_URL = os.getenv("LISTMONK_URL", "http://listmonk:9000") LISTMONK_USER = os.getenv("LISTMONK_USER", "api") LISTMONK_PASS = os.getenv("LISTMONK_PASS", "6X1rKPea61N4rZ1S65Hx5zvqzbCj30F6nvEe9oVGH_Y") _LM_AUTH = base64.b64encode(f"{LISTMONK_USER}:{LISTMONK_PASS}".encode()).decode() # ── Source campaign IDs ──────────────────────────────────────────────────────── CAMPAIGN_MCS150_ID = 186 # "MCS-150 Overdue — $1,000/Day Fine Risk" CAMPAIGN_INACTIVE_ID = 188 # "Inactive USDOT — Reactivate Before You Get Pulled Over" # Public site for landing-page links injected into Listmonk subscriber attribs. SITE_DOMAIN = os.getenv("SITE_DOMAIN", "https://performancewest.net") # ── Deficiency-flag segments (Phase 5) ────────────────────────────────────── # Each segment targets carriers by a `deficiency_flags` value (the TEXT[] column # written by fmcsa_deficiency_flagger.py) and links to its order landing page. # # `source_id` is the Listmonk template campaign to clone; set via env once the # template exists (see CAMPAIGN_SOURCE_ENV). Segments without a configured # source_id are reported by --list-segments but skipped by the scheduled run so # the nightly job never breaks on an unconfigured template. # # `flag_sql` is a predicate over the deficiency_flags array. DEFICIENCY_SEGMENTS = { "for_hire_boc3": { "label": "For-Hire BOC-3 + UCR", "flag_sql": "'for_hire_carrier' = ANY(deficiency_flags)", "lp_slug": "boc3-filing", "source_env": "CAMPAIGN_FOR_HIRE_ID", "limit": 1500, }, "irp_ifta": { "label": "IRP / IFTA Registration", "flag_sql": "'interstate_needs_irp_ifta' = ANY(deficiency_flags)", "lp_slug": "state-trucking-bundle", "source_env": "CAMPAIGN_IRP_IFTA_ID", "limit": 1500, }, "intrastate_authority": { "label": "Intrastate Operating Authority", # any intrastate_authority_ flag "flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f " "WHERE f LIKE 'intrastate_authority_%%')", "lp_slug": "intrastate-authority", "source_env": "CAMPAIGN_INTRASTATE_ID", "limit": 1000, }, "state_weight_tax": { "label": "State Weight-Distance Tax", "flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f " "WHERE f LIKE 'state_weight_tax_%%')", # per-state LP resolved per-row in build_lp_link() "lp_slug": "state-trucking-bundle", "source_env": "CAMPAIGN_WEIGHT_TAX_ID", "limit": 1000, }, "state_emissions": { "label": "State Clean-Truck / Emissions", "flag_sql": "EXISTS (SELECT 1 FROM unnest(deficiency_flags) f " "WHERE f LIKE 'state_emissions_%%')", "lp_slug": "state-emissions", "source_env": "CAMPAIGN_EMISSIONS_ID", "limit": 1000, }, "hazmat": { "label": "Hazmat PHMSA Registration", "flag_sql": "'hazmat_carrier' = ANY(deficiency_flags)", "lp_slug": "hazmat-phmsa", "source_env": "CAMPAIGN_HAZMAT_ID", "limit": 500, }, } # Per-state weight-tax LP overrides (CA emissions goes to ca-mcp-carb). _WEIGHT_TAX_LP = { "OR": "or-weight-mile-tax", "NY": "ny-hut-registration", "KY": "ky-kyu-registration", "NM": "nm-weight-distance", "CT": "ct-highway-use-fee", } # ── Authoritative service prices (single source of truth) ─────────────────── # The discounted prices shown in the coupon email MUST match what checkout # actually charges, so we read them from the same catalog the API uses # (api/src/service-catalog.ts). We parse it directly (the prod box has python3 # but not node) and cache per-process. Each entry: {price_cents, discountable}. # If the catalog can't be read (file missing in an image), the price helpers # degrade to percent-only copy rather than guessing a number. _CATALOG_PATH = os.getenv( "SERVICE_CATALOG_TS", os.path.join(ROOT, "api", "src", "service-catalog.ts") ) _CATALOG_CACHE: dict | None = None def _load_service_catalog() -> dict: """Parse api/src/service-catalog.ts -> {slug: {price_cents, discountable}}. discountable defaults to True (the catalog only marks the exceptions with `discountable: false`), matching the TS object's own semantics. """ global _CATALOG_CACHE if _CATALOG_CACHE is not None: return _CATALOG_CACHE catalog: dict = {} try: import re as _re ts = open(_CATALOG_PATH, encoding="utf-8").read() m = _re.search(r"export const COMPLIANCE_SERVICES[^=]*=\s*\{(.*)\n\};", ts, _re.S) body = m.group(1) if m else "" for em in _re.finditer(r'"([a-z0-9\-]+)":\s*\{(.*?)\}', body, _re.S): slug, inner = em.group(1), em.group(2) pm = _re.search(r"price_cents:\s*(\d+)", inner) if not pm: continue discountable = _re.search(r"discountable:\s*false", inner) is None catalog[slug] = { "price_cents": int(pm.group(1)), "discountable": discountable, } except Exception as exc: # noqa: BLE001 LOG.warning("[coupon] could not load service catalog (%s); coupon copy " "will be percent-only", exc) _CATALOG_CACHE = catalog return catalog # The service whose price the coupon copy quotes. This is the specific service # each campaign's body is *about*, which is NOT always the slug the CTA links to # (e.g. the MCS-150 email talks about the $79 MCS-150 update but the button opens # the $399 full-compliance bundle). Pricing from the wrong slug would advertise a # discount the landing page doesn't show, so the price slug is explicit here. PRICE_SLUG_BY_CAMPAIGN = { "mcs150": "mcs150-update", "inactive": "usdot-reactivation", } def price_slug_for(campaign_type: str, phy_state: str | None = None) -> str: """The catalog slug whose price the coupon copy should quote for a segment. Main campaigns quote their specific service (MCS-150, reactivation); the deficiency segments quote the same slug their CTA links to (resolved, incl. per-state overrides, by lp_slug_for).""" return PRICE_SLUG_BY_CAMPAIGN.get(campaign_type) or lp_slug_for(campaign_type, phy_state) def discounted_price_attribs(campaign_type: str, phy_state: str | None, coupon_pct: str | None) -> dict: """Per-recipient price merge fields, computed on the fly to match checkout. Mirrors the server exactly: percent discount on the SERVICE fee only (discount_cents = round(fee * pct / 100)); non-discountable services (e.g. boc3-filing, a $25 passthrough) get NO price discount. Returns: coupon_price_full "$79" (service list price) coupon_price_deal "$47" (after discount) coupon_priceable "1"/"" (whether a real discounted number is available) Blank when no coupon, no catalog entry, or the service isn't discountable, so the templates fall back to percent-only copy and never print a false number. """ blank = {"coupon_price_full": "", "coupon_price_deal": "", "coupon_priceable": ""} if not coupon_pct: return blank try: pct = int(coupon_pct) except (TypeError, ValueError): return blank slug = price_slug_for(campaign_type, phy_state) entry = _load_service_catalog().get(slug) if not entry or not entry.get("discountable") or entry["price_cents"] <= 0: return blank full = entry["price_cents"] discount = round(full * pct / 100) # same formula as the API deal = full - discount def _fmt(cents: int) -> str: return f"${cents // 100}" if cents % 100 == 0 else f"${cents / 100:.2f}" return { "coupon_price_full": _fmt(full), "coupon_price_deal": _fmt(deal), "coupon_priceable": "1", } def build_lp_link(campaign_type: str, phy_state: str | None) -> str: """Return the order landing-page URL for a (segment, state).""" seg = DEFICIENCY_SEGMENTS.get(campaign_type) slug = seg["lp_slug"] if seg else "dot-full-compliance" if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP: slug = _WEIGHT_TAX_LP[phy_state] if campaign_type == "state_emissions" and phy_state == "CA": slug = "ca-mcp-carb" return f"{SITE_DOMAIN}/order/{slug}" def lp_slug_for(campaign_type: str, phy_state: str | None = None) -> str: """The order-page slug (== the discountable service slug) for a segment.""" seg = DEFICIENCY_SEGMENTS.get(campaign_type) slug = seg["lp_slug"] if seg else "dot-full-compliance" if campaign_type == "state_weight_tax" and phy_state in _WEIGHT_TAX_LP: slug = _WEIGHT_TAX_LP[phy_state] if campaign_type == "state_emissions" and phy_state == "CA": slug = "ca-mcp-carb" return slug # ── Daily same-day coupon ─────────────────────────────────────────────────── # Every send day gets ONE random 5-letter coupon at COUPON_PCT off, valid only # through 23:59:59 of the send date (America/New_York). The code is written to # the app's `discount_codes` table; the existing /api/v1/discount validator and # checkout enforce expiry + the service-fee-only scope (pass-through government # fees are never discounted). The code + prices are merged into the email so the # recipient sees a real, expiring deal. # # DISCOUNT TOGGLE: the daily coupon is DISABLED by default (Jun 2026). The # discount sends were not being delivered (DKIM-broken window), so we are # re-testing whether normal-price offers convert now that deliverability is # fixed. With the coupon off, no code is minted and an empty coupon_code is # merged -- the campaign templates' `{{ if .Subscriber.Attribs.coupon_code }}` # guard automatically falls through to the normal-price `{{ else }}` branch and # the landing-page links carry no `?code=`. Set CAMPAIGN_ENABLE_COUPON=1 to # bring the daily deal back. Reversible, no template or DB changes needed. COUPON_ENABLED = os.getenv("CAMPAIGN_ENABLE_COUPON", "0") in ("1", "true", "yes") COUPON_PCT = int(os.getenv("CAMPAIGN_COUPON_PCT", "40")) # A/B/C price test: when set to a comma list of percents (e.g. "20,30,40") each # carrier is deterministically bucketed by a stable hash of their email into one # arm, getting that arm's own daily code. Because each code stores its own # percent in discount_codes, the discount the email advertises always matches the # discount checkout actually applies, and redemptions are measurable per code # (description marker campaign-daily::). A percent of 0 is a valid # FULL-PRICE control arm (e.g. "20,30,0"): no code is minted and the carrier sees # the normal price, but they're still hash-bucketed so the control is measurable. # Empty/unset = single-arm test at COUPON_PCT (legacy behavior). The split is even # and stable per carrier, so a given carrier always sees the same arm across # re-sends (no arm-hopping). COUPON_AB_PCTS = tuple( int(p.strip()) for p in os.getenv("CAMPAIGN_COUPON_AB_PCTS", "").split(",") if p.strip().isdigit() ) # Eligible slugs = every discountable service a trucking campaign can link to. # Pass-through-only slugs (boc3-filing $25 passthrough, etc.) are intentionally # eligible too: the discount math only touches the service-fee portion, so a # code scoped to them simply yields $0 off the passthrough and full off the fee. COUPON_SLUGS = ( "mcs150-update,usdot-reactivation,dot-drug-alcohol,dot-full-compliance," "ucr-registration,state-trucking-bundle,intrastate-authority,irp-registration," "ifta-application,hazmat-phmsa,state-emissions,state-weight-tax,trucking-wrap-up," "boc3-filing" ) _ET = timezone(timedelta(hours=-5)) # EST anchor; close enough for an end-of-day cutoff _COUPON_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ" # no I/O to avoid confusion # ── Per-MX-operator throttling (main pool) ────────────────────────────────── # Sender reputation is tracked by the RECEIVING operator, not the recipient # domain. The Jun 13-14 Gmail + Outlook block storm came from concentrating the # warmup on Google/Microsoft-Workspace-hosted business domains. During warmup we # EXCLUDE those big operators entirely (send to the long tail of small/self-hosted # mail systems that don't bot-throttle), then cap per-operator once reputation is # established. mx_provider is populated by mx_tag_carriers.py. # Set MAIN_SKIP_BIG_MX=0 to stop excluding once truly warmed up. MAIN_SKIP_BIG_MX = os.getenv("MAIN_SKIP_BIG_MX", "1") not in ("0", "false", "") # Operators to hold out during warmup (they aggressively throttle/blocklist). # These are the "clean label" providers mx_tag_carriers.py recognizes by MX host. BIG_MX_OPERATORS = ("google", "microsoft", "proofpoint", "mimecast", "barracuda", "cisco", "broadcom") # Consumer / aggressively-filtering mailbox operators that mx_tag_carriers.py # labels with the "mx:" prefix (no clean label). They complaint-block and filter # like the big operators, so hold them out of the warmup pool too. The literal- # domain blocklist (BLOCKED_EMAIL_DOMAINS) already stops someone@yahoo.com / # @icloud.com, but a CUSTOM domain whose MX points at Yahoo Small Business / AOL # (mx:yahoodns.net), Apple iCloud+ Custom Domain (mx:icloud.com), or a legacy # consumer ISP is invisible to that string layer -- only the MX tag catches it. # (Live 2026-06-20: mx:yahoodns.net alone = 283k sendable carriers.) CONSUMER_MX_OPERATORS = ( "mx:yahoodns.net", "mx:icloud.com", "mx:comcast.net", "mx:charter.net", "mx:centurylink.net", "mx:windstream.net", "mx:tds.net", "mx:earthlink-vadesecure.net", ) # Everything held out of the warmup pool entirely until MAIN_BIG_MX_EXCLUDE_UNTIL_DAY, # then re-introduced gradually via mx_daily_caps(). # # MAIN_EXCLUDE_OPERATORS (comma-separated mx_provider labels) OVERRIDES this set # when present. Use it to send to everything EXCEPT a specific operator without # waiting for the calendar ramp -- e.g. after the Jun 2026 no-DKIM incident we # re-send to the whole (now-signed) audience but still hold Google's consumer # inboxes back while their reputation recovers: # MAIN_EXCLUDE_OPERATORS="google" # An empty string ("MAIN_EXCLUDE_OPERATORS=") means exclude NOBODY by operator. _excl_override = os.getenv("MAIN_EXCLUDE_OPERATORS") if _excl_override is None: WARMUP_EXCLUDE_OPERATORS = BIG_MX_OPERATORS + CONSUMER_MX_OPERATORS else: WARMUP_EXCLUDE_OPERATORS = tuple( o.strip().lower() for o in _excl_override.split(",") if o.strip() ) MAIN_WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start") # How many days to EXCLUDE the big operators entirely. The Jun 13-14 block storm # means reputation is NOT yet established despite a high calendar day count, so we # hold Google/Microsoft/etc. out until day 30 to let reputation recover on the # long-tail operators first, then re-introduce them gradually via mx_daily_caps. MAIN_BIG_MX_EXCLUDE_UNTIL_DAY = int(os.getenv("MAIN_BIG_MX_EXCLUDE_UNTIL_DAY", "30")) def main_warmup_day() -> int: try: start = int(open(MAIN_WARMUP_START_FILE).read().strip()) return max(0, int((time.time() - start) // 86400)) except Exception: return 99 # no stamp = treat as warmed up def mx_daily_caps(day: int) -> dict: """Per-operator daily NEW-recipient caps. Big + consumer-MX operators are EXCLUDED entirely until MAIN_BIG_MX_EXCLUDE_UNTIL_DAY (reputation recovery), then re-introduced gradually. 'default' is the per-operator cap for the long tail of small/self-hosted systems that carry the warmup volume.""" if day <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY: big, default = 0, 120 # big OFF; long-tail operators carry volume elif day <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY + 7: big, default = 40, 150 # re-introduce big slowly elif day <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY + 14: big, default = 120, 200 else: big, default = 300, 250 # Both big-label and consumer-mx operators ramp together on the same schedule. caps = {op: big for op in WARMUP_EXCLUDE_OPERATORS} caps["__default__"] = default return caps def mx_throttled(rows: list[tuple], total_n: int, caps: dict, mx_idx: int) -> list[tuple]: """Pick up to total_n rows, capping per mx_provider (rows[mx_idx]) so no single receiving operator exceeds its daily share. Preserves order.""" per_op: dict = {} chosen: list[tuple] = [] default_cap = caps.get("__default__", 50) for r in rows: if len(chosen) >= total_n: break prov = (r[mx_idx] or "").strip().lower() if mx_idx < len(r) else "" cap = caps.get(prov, default_cap) used = per_op.get(prov, 0) if used >= cap: continue per_op[prov] = used + 1 chosen.append(r) return chosen def _random_coupon_code() -> str: import secrets return "".join(secrets.choice(_COUPON_ALPHABET) for _ in range(5)) def get_or_create_daily_coupon(conn, send_date: date, pct: int | None = None) -> str: """Return the 5-letter coupon code for `send_date` at `pct`% off, minting it if needed. Idempotent: a marker in `description` lets a re-run on the same day reuse the existing code instead of minting a duplicate. Single-arm runs use the legacy marker `campaign-daily:`; A/B arms use `campaign-daily::` so each percent gets its own stable, separately-countable code. """ pct = COUPON_PCT if pct is None else pct # Keep the legacy marker for the single-arm (no A/B) case so historical # idempotency/lookups still work; A/B arms get a percent-suffixed marker. marker = ( f"campaign-daily:{send_date.isoformat()}" if not COUPON_AB_PCTS else f"campaign-daily:{send_date.isoformat()}:{pct}" ) cur = conn.cursor() cur.execute("SELECT code FROM discount_codes WHERE description = %s LIMIT 1", (marker,)) row = cur.fetchone() if row: return row[0] # 23:59:59 ET of the send date expires = datetime.combine(send_date, datetime.min.time(), tzinfo=_ET) + timedelta( hours=23, minutes=59, seconds=59 ) starts = datetime.combine(send_date, datetime.min.time(), tzinfo=_ET) # Retry on the (rare) code collision against the UNIQUE constraint. for _ in range(25): code = _random_coupon_code() try: cur.execute( """ INSERT INTO discount_codes (code, description, discount_type, discount_value, applies_to, max_uses_per_email, active, starts_at, expires_at) VALUES (%s, %s, 'percent', %s, %s, 1, TRUE, %s, %s) ON CONFLICT (code) DO NOTHING RETURNING code """, (code, marker, pct, COUPON_SLUGS, starts, expires), ) r = cur.fetchone() if r: conn.commit() LOG.info("[coupon] daily code %s (%d%% off, expires %s ET)", code, pct, expires.isoformat()) return r[0] except Exception: conn.rollback() raise RuntimeError("could not mint a unique daily coupon code") def get_or_create_daily_coupons(conn, send_date: date) -> dict[int, str]: """Mint (or reuse) every coupon arm for `send_date`. Returns a mapping of percent -> code. With no A/B test configured this is a single arm {COUPON_PCT: code}; with CAMPAIGN_COUPON_AB_PCTS="20,30,40" it returns one code per percent so recipients can be split across arms. A percent of 0 is a valid FULL-PRICE control arm: no code is minted (the map value is ""), so carriers bucketed into it see the normal price and no coupon, while still being deterministically assigned (and thus measurable) by email hash. Example: CAMPAIGN_COUPON_AB_PCTS="20,30,0". """ pcts = list(COUPON_AB_PCTS) if COUPON_AB_PCTS else [COUPON_PCT] out: dict[int, str] = {} for pct in pcts: out[pct] = "" if pct <= 0 else get_or_create_daily_coupon(conn, send_date, pct) return out def pick_coupon_for_email(email: str, daily_coupons: dict[int, str] | None) -> tuple[str, str]: """Deterministically assign a carrier to one coupon arm by a stable hash of their email. Returns (code, pct_str); ("", "") when coupons are off OR when the carrier is bucketed into a full-price control arm (pct 0, code ""). The hash makes the split even and *stable*: the same carrier always lands in the same arm across re-sends, so an A/B comparison isn't polluted by a carrier seeing 20% one day and 40% the next. A full-price (0%) arm is returned as ("", "") so the email renders the normal-price branch, identical to a no-coupon send — but the carrier is still deterministically in that arm, so re-hashing a converter's email recovers which arm they were in for attribution. """ if not daily_coupons: return "", "" pcts = sorted(daily_coupons.keys()) if len(pcts) == 1: pct = pcts[0] else: h = hashlib.sha256((email or "").strip().lower().encode()).hexdigest() pct = pcts[int(h, 16) % len(pcts)] code = daily_coupons[pct] if not code: # full-price control arm: no code, no deal return "", "" return code, str(pct) def coupon_attribs(coupon_code: str | None, coupon_pct: str | None = None) -> dict: """Merge fields for the same-day deal, blank when no coupon is active. `coupon_pct` is passed per-recipient during an A/B test so the advertised percent matches the arm's actual code; it falls back to the global COUPON_PCT for single-arm sends. """ if not coupon_code: return {"coupon_code": "", "coupon_pct": "", "coupon_expires": ""} return { "coupon_code": coupon_code, "coupon_pct": coupon_pct or str(COUPON_PCT), # Human-readable cutoff for the email body. "coupon_expires": "11:59 PM ET tonight", } def lp_link_with_coupon(campaign_type: str, phy_state: str | None, coupon_code: str | None, dot: str | None = None) -> str: """Order landing-page URL as the email's `lp_link` attrib. Always emits a `?`-started query (carrying the carrier's DOT, plus the daily `code=` when a coupon is active) so that EVERY template can safely append its own params with a leading `&`. This eliminates a class of broken-CTA bugs: previously `build_lp_link()` returned a bare path, so a template that wrote `{{ lp_link }}&utm_source=...` produced `/order/slug&utm_source=...` (an invalid URL that 404s), and one that wrote `{{ lp_link }}?dot=...` double-`?`d once a coupon added its own `?code=`. With the query owned here, both template styles converge on `{{ lp_link }}&utm_source=...` and are correct whether or not the coupon is on. """ url = build_lp_link(campaign_type, phy_state) params = [] if dot: params.append(f"dot={dot}") if coupon_code: params.append(f"code={coupon_code}") if params: url = f"{url}?" + "&".join(params) return url # ── TZ config: tz_key -> (states, send_hour_utc) ───────────────────────────── # 4AM EST = 09:00 UTC, each TZ +1hr so they get it at ~4AM local TIMEZONE_CONFIG = { "ET": { "states": ("CT","DC","DE","FL","GA","IN","KY","MA","MD","ME","MI", "NH","NJ","NY","NC","OH","PA","RI","SC","TN","VA","VT","WV"), "send_hour_utc": 9, # 4 AM EST }, "CT": { "states": ("AL","AR","IA","IL","KS","LA","MN","MO","MS","ND", "NE","OK","SD","TX","WI"), "send_hour_utc": 10, # 5 AM EST = 4 AM CST }, "MT": { "states": ("AZ","CO","ID","MT","NM","UT","WY"), "send_hour_utc": 11, # 6 AM EST = 4 AM MST }, "PT": { "states": ("AK","CA","HI","NV","OR","WA"), "send_hour_utc": 12, # 7 AM EST = 4 AM PST }, } # Owner email — test sends go here before each campaign is scheduled TEST_EMAIL = os.getenv("CAMPAIGN_TEST_EMAIL", "carrierone@gmx.com") REPLY_TO_EMAIL = os.getenv("CAMPAIGN_REPLY_TO", "info@performancewest.net") # Listmonk applies campaign headers as `for hdr, val := range set { h.Add(hdr, val) }` # (internal/manager/manager.go), i.e. each map's KEY is the literal header name. # So the correct shape is {"Reply-To": value}; a {"name": ..., "value": ...} map # would emit junk "name:"/"value:" headers and NO real Reply-To, silently sending # replies to the From address (noreply@send.performancewest.net) instead. The # healthcare builder already uses the correct shape; match it here. REPLY_TO_HEADERS = [{"Reply-To": REPLY_TO_EMAIL}] # Bulk From — sends from the dedicated bulk subdomain so its sending reputation # is isolated from the root domain (which stays clean for transactional / # verification mail). Replies still go to the root domain via Reply-To above, so # the customer-facing reply experience is unchanged. See docs/deliverability.md. FROM_EMAIL = os.getenv("CAMPAIGN_FROM", "Performance West ") # Which verification results are safe to SEND to. We key ONLY off # email_verify_result, never the email_verified boolean: the verifier sets # email_verified=TRUE optimistically for 'mx_unreachable' (domain exists but its # mail server didn't answer the probe) — those addresses HARD-BOUNCE when we # actually send, which is what tanked deliverability (≈47% bounce, half the list # blocklisted). So 'mx_unreachable' and all error/reject results are excluded. # # Recovery mode (default ON while reputation is damaged): send ONLY 'smtp_valid' # — addresses an MX explicitly accepted at RCPT time — plus 'send_confirmed' # (addresses proven deliverable by a real burner-domain verification send; see # docs/campaign-deliverability-plan.md). This drives the bounce rate to near-zero # and rebuilds sender reputation. 'hard_bounced' is NEVER sendable. # # Catch-all domains (accept any RCPT at SMTP time, then may silently bounce # later) are the big growth pool but the risky one, so they are gated by an # AUTOMATIC rollout (see catch_all_enabled): once the IPs are warm AND the recent # live bounce rate is provably low, they are added; if bounces spike they # auto-revert. CAMPAIGN_INCLUDE_CATCH_ALL=1/0 hard-overrides the auto decision. BASE_SENDABLE_RESULTS = ["smtp_valid", "send_confirmed"] CATCH_ALL_RESULTS = ["catch_all_domain", "catch_all_detected"] # ── Catch-all auto-rollout tunables ───────────────────────────────────────── # Warmup day at/after which catch-all MAY auto-enable (rebuild reputation on the # clean smtp_valid pool first). Independent of the big-MX axis: catch-all is # dominated by long-tail business domains, and any catch-all address that also # lands on Google/Microsoft is still held out by big_mx_exclude until day 30. CATCH_ALL_MIN_WARMUP_DAY = int(os.getenv("CAMPAIGN_CATCH_ALL_MIN_DAY", "21")) # Recent-window bounce-rate ceiling (percent). At/above this, catch-all stays OFF # and an already-on rollout auto-reverts. A SHORT window is deliberate: a # historical disaster (e.g. the Jun-16 ~45% 7-day rate) must NOT block the # rollout forever, and a fresh spike must trip it fast. CATCH_ALL_MAX_BOUNCE_PCT = float(os.getenv("CAMPAIGN_CATCH_ALL_MAX_BOUNCE_PCT", "8")) CATCH_ALL_BOUNCE_WINDOW_DAYS = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_WINDOW_DAYS", "2")) # Minimum sent volume required in the window before the rate is trusted (else a # tiny sample like 9 sent / 1 bounce = 11% would wrongly gate the decision). CATCH_ALL_BOUNCE_MIN_SENT = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_MIN_SENT", "300")) DB_URL = os.getenv("DATABASE_URL", "") WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start") def _listmonk_db_url() -> str: """Derive the listmonk DB URL from DATABASE_URL (same Postgres, diff db). Bounce/sent counts live in the listmonk DB, while the campaign builder's DB_URL points at the `performancewest` app DB on the SAME Postgres server. """ override = os.getenv("LISTMONK_DATABASE_URL") if override: return override base = DB_URL or os.getenv("DATABASE_URL", "") if "/" in base: return base.rsplit("/", 1)[0] + "/listmonk" return base def recent_bounce_rate(window_days: int) -> tuple[float | None, int, int]: """Live campaign bounce rate over the last `window_days`. Returns (rate_pct_or_None, sent, bounced). rate is None when sent==0 (no signal). Only campaigns that actually ran in the window are counted, and bounces are joined on campaign_id (≈99% populated for the real-time postfix source), so a long-past disaster cannot poison a short recent window. """ try: conn = psycopg2.connect(_listmonk_db_url()) except Exception as exc: # pragma: no cover - infra dependent LOG.warning("catch-all guardrail: cannot reach listmonk DB (%s); " "treating bounce rate as UNKNOWN (fail-closed)", exc) return None, 0, 0 try: with conn.cursor() as cur: cur.execute( """ SELECT COALESCE(SUM(c.sent), 0), COALESCE(SUM(b.n), 0) FROM campaigns c LEFT JOIN ( SELECT campaign_id, count(*) AS n FROM bounces WHERE campaign_id IS NOT NULL GROUP BY campaign_id ) b ON b.campaign_id = c.id WHERE COALESCE(c.started_at, c.created_at) > now() - make_interval(days => %s) AND c.status IN ('finished', 'running') """, (window_days,), ) sent, bounced = cur.fetchone() sent, bounced = int(sent), int(bounced) finally: conn.close() rate = (100.0 * bounced / sent) if sent else None return rate, sent, bounced def catch_all_enabled() -> bool: """Decide whether catch-all domains are sendable on THIS run. Auto-rollout (no env needed): 1. IPs warm enough -> warmup_day() >= CATCH_ALL_MIN_WARMUP_DAY 2. recent bounce rate low -> over CATCH_ALL_BOUNCE_WINDOW_DAYS, with at least CATCH_ALL_BOUNCE_MIN_SENT sent for a trustworthy sample, the rate is BELOW CATCH_ALL_MAX_BOUNCE_PCT. If bounces later spike above the ceiling, this returns False again on the next run -> the rollout auto-reverts to the clean smtp_valid pool. CAMPAIGN_INCLUDE_CATCH_ALL hard-overrides: '1'/'true' forces ON (manual decision, skips guardrail), '0'/'false' forces OFF. """ override = os.getenv("CAMPAIGN_INCLUDE_CATCH_ALL") if override is not None: forced = override.strip().lower() not in ("0", "false", "") LOG.info("catch-all: forced %s via CAMPAIGN_INCLUDE_CATCH_ALL=%r", "ON" if forced else "OFF", override) return forced day = warmup_day() if day is None or day < CATCH_ALL_MIN_WARMUP_DAY: LOG.info("catch-all: OFF (warmup day %s < min %s)", day, CATCH_ALL_MIN_WARMUP_DAY) return False rate, sent, bounced = recent_bounce_rate(CATCH_ALL_BOUNCE_WINDOW_DAYS) if rate is None or sent < CATCH_ALL_BOUNCE_MIN_SENT: # Not enough recent signal to trust -> fail closed (stay on clean pool). LOG.info("catch-all: OFF (insufficient recent signal: %s sent < min %s " "over %sd; need a low proven bounce rate first)", sent, CATCH_ALL_BOUNCE_MIN_SENT, CATCH_ALL_BOUNCE_WINDOW_DAYS) return False if rate >= CATCH_ALL_MAX_BOUNCE_PCT: LOG.warning("catch-all: OFF (recent bounce rate %.2f%% >= ceiling %.2f%% " "over %sd; %s sent / %s bounced) -- auto-reverting to clean pool", rate, CATCH_ALL_MAX_BOUNCE_PCT, CATCH_ALL_BOUNCE_WINDOW_DAYS, sent, bounced) return False LOG.info("catch-all: ON (warmup day %s >= %s; recent bounce %.2f%% < %.2f%% " "over %sd; %s sent / %s bounced)", day, CATCH_ALL_MIN_WARMUP_DAY, rate, CATCH_ALL_MAX_BOUNCE_PCT, CATCH_ALL_BOUNCE_WINDOW_DAYS, sent, bounced) return True def usable_filter() -> str: """SQL predicate for email_verify_result values that are safe to send to. Always includes the clean pool (smtp_valid + send_confirmed); adds catch-all results only when catch_all_enabled() says so (warm IPs + low live bounces). The decision is memoized so it is computed ONCE per build run (one DB probe, one log line, and a consistent filter across every segment/timezone). """ global _USABLE_FILTER_CACHE if _USABLE_FILTER_CACHE is None: results = list(BASE_SENDABLE_RESULTS) if catch_all_enabled(): results += CATCH_ALL_RESULTS _USABLE_FILTER_CACHE = ( "email_verify_result IN (" + ", ".join(f"'{r}'" for r in results) + ")" ) return _USABLE_FILTER_CACHE _USABLE_FILTER_CACHE: str | None = None def warmup_day() -> int | None: """Return days since MTA warmup start, or None if not configured/readable.""" try: with open(WARMUP_START_FILE, "r", encoding="utf-8") as fh: start = int(fh.read().strip()) return max(0, int((time.time() - start) // 86400)) except Exception: return None def warmup_daily_queue_cap(day: int | None = None) -> int | None: """Daily campaign-builder queue cap aligned with Listmonk rampcap targets. This prevents the builder from scheduling 10k-20k recipients/day while Listmonk is throttled to only 500-3,000/day, which would create a massive backlog that later drains unpredictably. The numbers match the runbook's intended daily warmup totals, not the theoretical 24-hour sliding-window max. """ if day is None: day = warmup_day() if day is None: return None if day <= 1: return 500 if day <= 3: return 1500 if day <= 6: return 2500 return 3000 def lm_api(path: str, data: dict | None = None, method: str | None = None): headers = {"Content-Type": "application/json", "Authorization": f"Basic {_LM_AUTH}"} req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers) if data is not None: req.data = json.dumps(data).encode() if method: req.method = method try: return json.loads(urllib.request.urlopen(req, timeout=30).read()) except urllib.error.HTTPError as e: body = e.read().decode()[:300] raise RuntimeError(f"Listmonk {path} HTTP {e.code}: {body}") _LM_SUPPRESSION_CACHE: dict[str, tuple[bool, str]] = {} def _lm_email_query(email: str) -> str: """Return a Listmonk subscriber query for an exact email address.""" # Listmonk's query language uses single-quoted strings. Email addresses should # not contain quotes, but escape defensively so a bad source row cannot break # the query or broaden it. safe = email.lower().strip().replace("'", "''") return f"email = '{safe}'" def listmonk_sendable(email: str) -> tuple[bool, str]: """Return whether Listmonk will send to this address, with reason. SQL eligibility is not enough: an address may already exist in Listmonk as disabled, blocklisted, bounced, or unsubscribed. If we add those rows to a fresh campaign list, Listmonk filters them at send time, producing a smaller `to_send` count than the builder's daily warmup allocation. Check status at build time and fetch replacements so each campaign list is filled with actually-sendable subscribers. """ key = email.lower().strip() if not key: return False, "blank_email" if key in _LM_SUPPRESSION_CACHE: return _LM_SUPPRESSION_CACHE[key] try: res = lm_api( "/subscribers?" + urllib.parse.urlencode({ "query": _lm_email_query(key), "per_page": 1, }) ) results = res.get("data", {}).get("results", []) if not results: out = (True, "new") else: sub = results[0] status = (sub.get("status") or "").lower() if status != "enabled": out = (False, f"listmonk_status:{status or 'unknown'}") else: bad_list_status = "" for lst in sub.get("lists") or []: lstatus = (lst.get("subscription_status") or "").lower() if lstatus in {"unsubscribed", "unconfirmed"}: bad_list_status = lstatus break if bad_list_status: out = (False, f"list_subscription_status:{bad_list_status}") else: out = (True, "enabled") except Exception as exc: # noqa: BLE001 # Do not silently underfill campaigns because Listmonk's lookup endpoint # hiccupped. Let the caller/logs surface the problem; treating it as not # sendable is safer than scheduling a suppressed address and marking it # sent in our source DB. out = (False, f"lookup_error:{exc}") _LM_SUPPRESSION_CACHE[key] = out return out def get_base_campaign(campaign_id: int) -> dict: return lm_api(f"/campaigns/{campaign_id}")["data"] def create_list(name: str) -> int: result = lm_api("/lists", { "name": name, "type": "private", "optin": "single", "tags": ["trucking", "auto"], }, "POST") return result["data"]["id"] def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool: """Create/attach a single subscriber to a list. Returns True on success. Listmonk's POST /api/subscribers takes one subscriber with `lists` (array of list IDs) and preconfirm_subscriptions=true so they're immediately sendable. """ try: lm_api("/subscribers", { "email": email, "name": name or email, "status": "enabled", "lists": [list_id], "attribs": attribs, "preconfirm_subscriptions": True, }, "POST") return True except Exception as exc: msg = str(exc) # Already exists — attach to this list instead of failing if "already exists" in msg.lower() or "conflict" in msg.lower() or "HTTP 409" in msg: try: subs = lm_api( "/subscribers?" + urllib.parse.urlencode({ "query": f"email = '{email.lower().strip()}'", "per_page": 1, }) ) results = subs.get("data", {}).get("results", []) if results: status = (results[0].get("status") or "").lower() if status != "enabled": LOG.info("Listmonk suppressed existing subscriber %s status=%s", email, status or "unknown") return False sub_id = results[0]["id"] # Refresh attribs so per-campaign merge fields (coupon code, # ifta_due_date, lp_link, etc.) are correct for THIS send -- # otherwise a previously-imported carrier keeps stale attribs # and the new campaign renders blank fields. try: merged = dict(results[0].get("attribs") or {}) merged.update(attribs or {}) lm_api(f"/subscribers/{sub_id}", { "email": email, "name": name or results[0].get("name") or email, "status": "enabled", "attribs": merged, }, "PUT") except Exception: pass # non-fatal: still attach to the list below lm_api("/subscribers/lists", { "ids": [sub_id], "action": "add", "target_list_ids": [list_id], "status": "confirmed", }, "PUT") return True except Exception: return False return False def import_subscribers(list_id: int, subscribers: list[dict]) -> int: """Add subscribers to a list one-by-one. Returns count successfully added.""" added = 0 for sub in subscribers: if add_subscriber(list_id, sub["email"], sub.get("name", ""), sub.get("attribs", {})): added += 1 return added def _altbody_for(base: dict, body: str | None = None) -> str: """Plaintext (text/plain) part for a campaign. Listmonk only emits multipart/alternative when altbody is set; HTML-only mail is a spam-score signal. The source/base campaigns have no altbody, so derive one from the HTML body. `body` overrides base["body"] for test sends where merge fields were already substituted. """ existing = (base.get("altbody") or "").strip() if existing: return existing html = body if body is not None else base.get("body", "") return html_to_text(html) def create_and_schedule_campaign( base: dict, list_id: int, name: str, send_at_utc: datetime, schedule: bool = True, ) -> int: payload = { "name": name, "subject": base["subject"], "lists": [list_id], "from_email": FROM_EMAIL, "type": "regular", "content_type": base["content_type"], "body": base["body"], "altbody": _altbody_for(base), "template_id": base["template_id"], "tags": base.get("tags") or [], "messenger": base.get("messenger") or "email", "headers": base.get("headers") or REPLY_TO_HEADERS, } if schedule: payload["send_at"] = send_at_utc.strftime("%Y-%m-%dT%H:%M:%S+00:00") result = lm_api("/campaigns", payload, "POST") cid = result["data"]["id"] if schedule: lm_api(f"/campaigns/{cid}/status", {"status": "scheduled"}, "PUT") # else: leave as draft (preview mode) return cid def send_test(base: dict, campaign_id: int, sample_row: tuple, label: str, tz: str, campaign_type: str) -> None: """Send one test email so the owner can approve before the real blast goes out.""" # sample_row is (dot, email, name, phy_state, target_state). Older callers may # still pass a 4-tuple (dot, email, name, state); handle both. dot, email, name, phy_state = sample_row[0], sample_row[1], sample_row[2], sample_row[3] target_state = sample_row[4] if len(sample_row) > 4 else phy_state state = phy_state body = base["body"] body = body.replace("{{ .Subscriber.Attribs.company }}", name or "Sample Carrier LLC") body = body.replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000") body = body.replace("{{ .Subscriber.Attribs.state }}", state or "TX") # Real subscribers get a populated lp_link attrib; the test send must mirror # that or the CTA button (e.g. "Check My Emissions Status") renders as a bare # "?dot=..." that links to nowhere. Build the same link the audience gets, # using the target_state (the state the offer applies to, which for per-state # programs comes from the deficiency flag, not the base state). body = body.replace("{{ .Subscriber.Attribs.lp_link }}", build_lp_link(campaign_type, target_state)) # NOTE: leave {{ UnsubscribeURL }} alone — Listmonk renders it into a real, # working per-subscriber unsubscribe link (even on test sends). Overwriting it # produced a dead /unsubscribe link with no subscriber identity. subj = base["subject"].replace("{{ .Subscriber.Attribs.dot_number }}", dot or "0000000") # Listmonk /test needs `lists` as an array of IDs (base["lists"] is objects) list_ids = [l["id"] for l in base.get("lists", []) if isinstance(l, dict)] or [1] payload = { "name": base.get("name", "Test"), "subject": subj, "lists": list_ids, "from_email": FROM_EMAIL, "type": "regular", "content_type": base["content_type"], "body": body, "altbody": _altbody_for(base, body), "template_id": base["template_id"], "tags": base.get("tags") or [], "messenger": base.get("messenger") or "email", "headers": base.get("headers") or REPLY_TO_HEADERS, "subscribers": [TEST_EMAIL], } try: lm_api(f"/campaigns/{campaign_id}/test", payload, "POST") LOG.info("[%s/%s] Test sent to %s (sample: %s DOT#%s)", tz, label, TEST_EMAIL, name, dot) except Exception as exc: LOG.warning("[%s/%s] Test send failed: %s", tz, label, exc) def campaign_type_filter(campaign_type: str) -> str: if campaign_type == "mcs150": return "mcs150_parsed < NOW() - INTERVAL '2 years'" if campaign_type == "inactive": return "oos_active IS TRUE" if campaign_type in DEFICIENCY_SEGMENTS: return DEFICIENCY_SEGMENTS[campaign_type]["flag_sql"] raise ValueError(f"unknown campaign_type {campaign_type!r}") def campaign_weight(campaign_type: str) -> float: """Warmup allocation weight by campaign specificity. Highly specific deficiency copy should generally get more early warmup share than generic overdue/inactive notices because it is more relevant and less repetitive at the mailbox-provider/content-fingerprint level. """ if campaign_type == "mcs150": return 0.75 if campaign_type == "inactive": return 0.65 return 1.25 def count_carriers(conn, tz_states: tuple, campaign_type: str, limit: int | None = None) -> int: cur = conn.cursor() states_placeholder = ",".join(["%s"] * len(tz_states)) type_filter = campaign_type_filter(campaign_type) cur.execute(f""" SELECT count(*) FROM ( SELECT 1 FROM fmcsa_carriers WHERE {type_filter} AND {usable_filter()} AND listmonk_sent_at IS NULL AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND phy_state IN ({states_placeholder}) LIMIT %s ) s """, [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit or 10_000_000]) return int(cur.fetchone()[0]) def allocate_daily_budget(candidates: list[dict], daily_cap: int | None) -> dict[tuple[str, str], int]: """Allocate daily cap across (timezone, campaign_type) by weighted audience. The allocation is proportional to each slot's eligible lead count and campaign specificity weight, capped by the slot's audience/limit. This avoids sending the same number from a tiny hazmat segment and a massive MCS-150 segment while still preserving body-text variety during warmup. """ positive = [c for c in candidates if c["audience"] > 0] if daily_cap is None: return {(c["tz"], c["campaign_type"]): c["audience"] for c in candidates} remaining_cap = min(daily_cap, sum(c["audience"] for c in positive)) quotas = {(c["tz"], c["campaign_type"]): 0 for c in candidates} remaining = { (c["tz"], c["campaign_type"]): c["audience"] for c in positive } weighted = { (c["tz"], c["campaign_type"]): c["audience"] * c["weight"] for c in positive } # Repeated largest-remainder passes handle slots that hit their audience cap. while remaining_cap > 0 and remaining: total_weight = sum(weighted[k] for k in remaining) if total_weight <= 0: break raw = {k: remaining_cap * (weighted[k] / total_weight) for k in remaining} assigned_this_pass = 0 remainders: list[tuple[float, tuple[str, str]]] = [] for k, val in raw.items(): add = min(remaining[k], int(math.floor(val))) quotas[k] += add remaining[k] -= add assigned_this_pass += add if remaining[k] > 0: remainders.append((val - math.floor(val), k)) leftover = remaining_cap - assigned_this_pass for _, k in sorted(remainders, reverse=True): if leftover <= 0: break if remaining.get(k, 0) <= 0: continue quotas[k] += 1 remaining[k] -= 1 assigned_this_pass += 1 leftover -= 1 remaining_cap -= assigned_this_pass remaining = {k: v for k, v in remaining.items() if v > 0} if assigned_this_pass == 0: break return quotas def fetch_carriers( conn, tz_states: tuple, campaign_type: str, limit: int, offset: int = 0, ) -> list[tuple]: """Return (dot_number, email_address, legal_name, phy_state, target_state) rows. target_state is the state the offer applies to. For most segments that is the carrier's base (phy_state). For per-state programs (state_weight_tax, state_emissions) the relevant state is encoded in the deficiency flag suffix (e.g. 'state_weight_tax_OR'), which can differ from the base state, so we pull it out of the flag so the CTA links to the correct state's order page. """ cur = conn.cursor() states_placeholder = ",".join(["%s"] * len(tz_states)) type_filter = campaign_type_filter(campaign_type) # target_state expression: pull the state suffix out of the matching flag # for per-state programs, otherwise fall back to the base (phy_state). if campaign_type in ("state_weight_tax", "state_emissions"): prefix = f"{campaign_type}_" target_state_sql = ( "COALESCE((SELECT upper(substr(f, %s)) " "FROM unnest(deficiency_flags) f " f"WHERE f LIKE '{campaign_type}_%%' LIMIT 1), phy_state)" ) target_state_params = [len(prefix) + 1] else: target_state_sql = "phy_state" target_state_params = [] # During warmup, exclude carriers on the big operators that throttle/blocklist # (Google, Microsoft, etc.) AND the consumer mailbox operators behind the # "mx:" prefix (Yahoo Small Business, iCloud custom domains, legacy ISPs) -- # mx_provider is set by mx_tag_carriers.py. Untagged carriers (mx_provider IS # NULL) are kept: the per-MX throttle in the selector still bounds them, and # excluding NULLs would starve the pool until tagging completes. big_mx_exclude = "" if MAIN_SKIP_BIG_MX and main_warmup_day() <= MAIN_BIG_MX_EXCLUDE_UNTIL_DAY: ops = ",".join("'%s'" % o for o in WARMUP_EXCLUDE_OPERATORS) big_mx_exclude = f"AND (mx_provider IS NULL OR mx_provider NOT IN ({ops}))" cur.execute(f""" SELECT dot_number, email_address, legal_name, phy_state, {target_state_sql} AS target_state, mx_provider FROM fmcsa_carriers WHERE {type_filter} AND {usable_filter()} AND listmonk_sent_at IS NULL AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND phy_state IN ({states_placeholder}) {big_mx_exclude} ORDER BY mcs150_parsed ASC NULLS LAST LIMIT %s OFFSET %s """, target_state_params + [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit, offset]) return cur.fetchall() def select_sendable_carriers( conn, tz_states: tuple, campaign_type: str, quota: int, max_scan: int | None = None, ) -> tuple[list[tuple], dict[str, int]]: """Fetch rows until `quota` Listmonk-sendable carriers are found. Returns (rows, suppression_counts). Rows that are disabled/blocklisted/etc. in Listmonk are skipped and not marked sent in Postgres. """ selected: list[tuple] = [] skipped: dict[str, int] = {} seen_emails: set[str] = set() offset = 0 # Per-MX throttle: cap how many of THIS run's quota go to each receiving # operator so we never concentrate on one (the Jun 13-14 Gmail/Outlook storm). caps = mx_daily_caps(main_warmup_day()) per_op: dict = {} default_cap = caps.get("__default__", 50) # Untagged (NULL mx_provider) safety cap. We can't exclude NULLs (the big-MX # exclusion is MX-based, so an untagged Google/Yahoo domain would slip through), # but we also shouldn't let a flood of freshly-imported, never-resolved domains # dominate a run -- some are big/consumer operators we'd otherwise hold out. # The pw-mx-tag cron drains the *sendable* untagged backlog fast (only ~3k # distinct verified domains as of 2026-06-20, < one 20k/day run), so this is a # between-runs safety net, not the primary gate. Generous enough to never starve # the pool in normal operation. Tunable via MAIN_UNTAGGED_MX_CAP. untagged_cap = int(os.getenv("MAIN_UNTAGGED_MX_CAP", str(max(quota, 200)))) untagged_used = 0 MX_IDX = 5 # mx_provider is the 6th column from fetch_carriers # Warmup caps are small, but old audiences can contain many prior bounces or # unsubscribes. Scan beyond the quota while still bounding worst-case API calls. scan_limit = max_scan or max(quota * 8, quota + 500, 1000) batch_size = min(max(quota * 2, 100), 1000) while len(selected) < quota and offset < scan_limit: batch = fetch_carriers(conn, tz_states, campaign_type, batch_size, offset=offset) if not batch: break offset += len(batch) for row in batch: email = (row[1] or "").lower().strip() if email in seen_emails: skipped["duplicate_email"] = skipped.get("duplicate_email", 0) + 1 continue seen_emails.add(email) # Per-MX-operator cap (reputation is per receiving operator). # Tagged carriers are capped per operator; untagged (no mx_provider # yet) are bounded by a single shared safety cap (untagged_cap) instead # of being uncapped -- this stops a flood of unresolved domains (which # could include big/consumer operators) from dominating a run, without # starving the pool. The big-operator EXCLUSION in fetch_carriers keeps # KNOWN Google/MS/consumer-MX out; the pw-mx-tag cron keeps NULL small. prov = (row[MX_IDX] or "").strip().lower() if len(row) > MX_IDX else "" if prov: cap = caps.get(prov, default_cap) if per_op.get(prov, 0) >= cap: skipped[f"mx_cap:{prov}"] = skipped.get(f"mx_cap:{prov}", 0) + 1 continue else: if untagged_used >= untagged_cap: skipped["mx_cap:untagged"] = skipped.get("mx_cap:untagged", 0) + 1 continue ok, reason = listmonk_sendable(email) if not ok: skipped[reason] = skipped.get(reason, 0) + 1 continue if prov: per_op[prov] = per_op.get(prov, 0) + 1 else: untagged_used += 1 selected.append(row) if len(selected) >= quota: break if len(batch) < batch_size: break return selected, skipped def mark_sent(conn, dot_numbers: list[str], campaign_type: str) -> None: cur = conn.cursor() cur.execute(""" UPDATE fmcsa_carriers SET listmonk_sent_at = NOW(), listmonk_campaign_type = %s WHERE dot_number = ANY(%s::text[]) """, (campaign_type, dot_numbers)) conn.commit() def list_segments(send_date: date) -> None: """Report deduped audience size per deficiency segment (no writes). Verifiable Phase-5 metric: each segment selects a nonzero, deduped audience and points at a valid order LP. Source-template configuration is shown so we know which segments are ready to schedule. """ conn = psycopg2.connect(DB_URL) print(f"\nDeficiency-flag segments (send date {send_date.isoformat()}):\n") print(f"{'segment':<22}{'audience':>10} {'source':<10} landing page") print("-" * 78) grand_total = 0 for ctype, seg in DEFICIENCY_SEGMENTS.items(): total = 0 for tz_cfg in TIMEZONE_CONFIG.values(): rows = fetch_carriers(conn, tz_cfg["states"], ctype, seg["limit"]) total += len(rows) grand_total += total src = os.getenv(seg["source_env"]) src_str = src if src else "UNSET" lp = build_lp_link(ctype, None) print(f"{ctype:<22}{total:>10} {src_str:<10} {lp}") print("-" * 78) print(f"{'TOTAL':<22}{grand_total:>10}") print("\n(UNSET source = template not yet created; segment is skipped by the " "scheduled run until its CAMPAIGN_*_ID env is set.)\n") conn.close() def run(send_date: date, dry_run: bool = False, preview: bool = False, max_per_segment: int | None = None, only_segments: set[str] | None = None, send_hour_override: int | None = None, send_minute: int = 0, stagger_minutes: int = 0, daily_cap: int | None = None, warmup_cap: bool = True) -> None: conn = psycopg2.connect(DB_URL) # Mint (or reuse) the same-day coupon(s) for this send date so every campaign # in the run shares the same expiring code(s). Preview/dry runs skip the write, # and the daily deal is disabled by default (see COUPON_ENABLED) -- when off we # send at normal price (empty coupon_code -> template's no-deal branch). When # CAMPAIGN_COUPON_AB_PCTS is set we mint one code per percent arm and split # recipients across them by a stable hash of their email. daily_coupons: dict[int, str] | None = None if COUPON_ENABLED and not dry_run and not preview: try: daily_coupons = get_or_create_daily_coupons(conn, send_date) if COUPON_AB_PCTS: LOG.info("[coupon] A/B test arms: %s", ", ".join(f"{p}%={c}" for p, c in sorted(daily_coupons.items()))) except Exception as exc: # noqa: BLE001 LOG.warning("[coupon] could not mint daily coupon(s): %s (sending without)", exc) elif not COUPON_ENABLED: LOG.info("[coupon] disabled (CAMPAIGN_ENABLE_COUPON unset) — sending at normal price") base_mcs150 = get_base_campaign(CAMPAIGN_MCS150_ID) base_inactive = get_base_campaign(CAMPAIGN_INACTIVE_ID) campaign_specs = [ ("mcs150", base_mcs150, 2000, "MCS-150 Overdue"), ("inactive", base_inactive, 1000, "Inactive USDOT"), ] # Add deficiency-flag segments whose Listmonk source template is configured. for ctype, seg in DEFICIENCY_SEGMENTS.items(): src = os.getenv(seg["source_env"]) if not src: LOG.info("[segment] %s skipped — %s not set (template not created)", ctype, seg["source_env"]) continue try: base = get_base_campaign(int(src)) except Exception as exc: # noqa: BLE001 LOG.warning("[segment] %s skipped — source %s unusable: %s", ctype, src, exc) continue campaign_specs.append((ctype, base, seg["limit"], seg["label"])) # Optional warmup controls: restrict to specific segments and/or cap the # per-segment audience so a fresh-IP day-0 send stays small. if only_segments: campaign_specs = [s for s in campaign_specs if s[0] in only_segments] if not campaign_specs: LOG.error("No campaign specs match --only-segment %s", sorted(only_segments)) conn.close() return if max_per_segment is not None: campaign_specs = [ (ct, base, min(lim, max_per_segment), label) for (ct, base, lim, label) in campaign_specs ] if daily_cap is None and warmup_cap and not preview: daily_cap = warmup_daily_queue_cap() if daily_cap is not None: LOG.info("Daily queue cap active: %d recipients total", daily_cap) if preview: LOG.info("PREVIEW MODE — 1 sample carrier per campaign, drafts only, " "test sends to %s, no real schedule, no mark-sent.", TEST_EMAIL) candidates: list[dict] = [] totals_by_campaign: dict[str, int] = {} for tz, tz_cfg in TIMEZONE_CONFIG.items(): for campaign_type, _base, limit, label in campaign_specs: audience = 1 if preview else count_carriers(conn, tz_cfg["states"], campaign_type, limit) weight = campaign_weight(campaign_type) candidates.append({ "tz": tz, "states": tz_cfg["states"], "campaign_type": campaign_type, "limit": limit, "label": label, "audience": audience, "weight": weight, }) totals_by_campaign[campaign_type] = totals_by_campaign.get(campaign_type, 0) + audience quotas = allocate_daily_budget(candidates, None if preview else daily_cap) LOG.info("Eligible audience by SQL criterion: %s", ", ".join( f"{ct}={n}" for ct, n in sorted(totals_by_campaign.items()) )) if daily_cap is not None and not preview: planned_by_campaign: dict[str, int] = {} for (tz, ctype), quota in quotas.items(): planned_by_campaign[ctype] = planned_by_campaign.get(ctype, 0) + quota LOG.info("Planned allocation by criterion: %s (total=%d/%d)", ", ".join( f"{ct}={n}" for ct, n in sorted(planned_by_campaign.items()) ), sum(planned_by_campaign.values()), daily_cap) scheduled_count = 0 queued_recipients = 0 for tz, tz_cfg in TIMEZONE_CONFIG.items(): states = tz_cfg["states"] send_hour = send_hour_override if send_hour_override is not None else tz_cfg["send_hour_utc"] send_at = datetime( send_date.year, send_date.month, send_date.day, send_hour, send_minute, 0, tzinfo=timezone.utc ) for campaign_type, base, limit, label in campaign_specs: if daily_cap is not None and queued_recipients >= daily_cap: LOG.info("[%s/%s] Daily cap reached (%d/%d) — skipping remaining slots", tz, campaign_type, queued_recipients, daily_cap) continue effective_send_at = send_at + timedelta(minutes=stagger_minutes * scheduled_count) fetch_limit = 1 if preview else limit if daily_cap is not None and not preview: fetch_limit = min(fetch_limit, quotas.get((tz, campaign_type), 0)) if fetch_limit <= 0: LOG.info("[%s/%s] Allocation is 0 — skipping", tz, campaign_type) continue if preview: rows = fetch_carriers(conn, states, campaign_type, fetch_limit) suppressed: dict[str, int] = {} else: rows, suppressed = select_sendable_carriers(conn, states, campaign_type, fetch_limit) if suppressed: LOG.info("[%s/%s] Runtime Listmonk suppression skipped %d rows: %s", tz, campaign_type, sum(suppressed.values()), ", ".join( f"{reason}={n}" for reason, n in sorted(suppressed.items()) )) if not rows: LOG.warning("[%s/%s] No usable records — skipping", tz, campaign_type) continue actual = len(rows) tag = "PREVIEW " if preview else "" list_name = f"{tag}Trucking {label} {tz} {send_date.isoformat()}" campaign_name = f"{tag}{label} - {tz} - {send_date.strftime('%b %d %Y')}" LOG.info("[%s/%s] %d sendable carriers -> %s (send %s UTC)", tz, campaign_type, actual, campaign_name, effective_send_at.strftime("%Y-%m-%d %H:%M")) if dry_run: LOG.info(" DRY RUN — skipping Listmonk + DB writes") scheduled_count += 1 queued_recipients += actual continue # Build subscriber list. In preview, only the owner's address (with the # sample carrier's attribs) so the real audience is never touched. if preview: r0 = rows[0] p_code, p_pct = pick_coupon_for_email(TEST_EMAIL, daily_coupons) subscribers = [{ "email": TEST_EMAIL, "name": r0[2] or "Sample Carrier", "attribs": {"dot_number": r0[0], "company": r0[2] or "", "state": r0[3] or "", "lp_link": lp_link_with_coupon(campaign_type, r0[4], p_code, dot=str(r0[0])), **coupon_attribs(p_code, p_pct), **discounted_price_attribs(campaign_type, r0[4], p_pct)}, }] else: subscribers = [] for row in rows: c_code, c_pct = pick_coupon_for_email(row[1], daily_coupons) subscribers.append({ "email": row[1], "name": row[2] or row[1], "attribs": {"dot_number": row[0], "company": row[2] or "", "state": row[3] or "", "lp_link": lp_link_with_coupon(campaign_type, row[4], c_code, dot=str(row[0])), **coupon_attribs(c_code, c_pct), **discounted_price_attribs(campaign_type, row[4], c_pct)}, }) # Create list + add subscribers list_id = create_list(list_name) added = import_subscribers(list_id, subscribers) LOG.info("[%s/%s] List %d: %d/%d subscribers added", tz, campaign_type, list_id, added, len(subscribers)) if added == 0: LOG.error("[%s/%s] BUG: 0 subscribers added to list %d — skipping", tz, campaign_type, list_id) continue # Create campaign (draft in preview, scheduled in real run) cid = create_and_schedule_campaign(base, list_id, campaign_name, effective_send_at, schedule=not preview) LOG.info("[%s/%s] Campaign %d created (%s)", tz, campaign_type, cid, "draft/preview" if preview else f"scheduled {effective_send_at.isoformat()}") scheduled_count += 1 queued_recipients += added # Send a test to the owner so they can spot-check the rendered email send_test(base, cid, rows[0], label, tz, campaign_type) # Mark carriers as sent only on a real run if not preview: mark_sent(conn, [row[0] for row in rows], campaign_type) conn.close() LOG.info("Done. queued_recipients=%d%s", queued_recipients, f" daily_cap={daily_cap}" if daily_cap is not None else "") def main(): logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", ) parser = argparse.ArgumentParser(description="Build daily trucking campaigns in Listmonk") parser.add_argument("--dry-run", action="store_true", help="Show what would be created, no writes") parser.add_argument("--preview", action="store_true", help="1 sample carrier/campaign, drafts only, test sends to owner, no real schedule/mark-sent") parser.add_argument("--list-segments", action="store_true", help="Report deduped audience size per deficiency segment, then exit (no writes)") parser.add_argument("--date", type=str, help="Target send date YYYY-MM-DD (default: today)") parser.add_argument("--tomorrow", action="store_true", help="Schedule for tomorrow instead of today. Use only for manual pre-builds.") parser.add_argument("--max-per-segment", type=int, default=None, help="Cap audience per (segment x timezone). Use for fresh-IP warmup small sends.") parser.add_argument("--only-segment", action="append", default=None, metavar="SEG", help="Restrict to specific segment(s), e.g. mcs150, inactive, for_hire_boc3. Repeatable.") parser.add_argument("--send-hour", type=int, default=None, help="Override send hour (UTC) for ALL timezones instead of per-tz defaults.") parser.add_argument("--send-minute", type=int, default=0, help="Send minute (UTC), used with --send-hour. Default 0.") parser.add_argument("--stagger-minutes", type=int, default=0, help="Add this many minutes between each created campaign. Useful for catchup sends.") parser.add_argument("--daily-cap", type=int, default=None, help="Hard cap on recipients queued by this run. Default: warmup-derived cap when available.") parser.add_argument("--no-warmup-cap", action="store_true", help="Disable automatic daily queue cap from /etc/postfix/pw-warmup-start.") args = parser.parse_args() if args.date and args.tomorrow: parser.error("--date and --tomorrow are mutually exclusive") if args.date: send_date = date.fromisoformat(args.date) elif args.tomorrow: send_date = date.today() + timedelta(days=1) else: send_date = date.today() if args.list_segments: list_segments(send_date) return only = set(args.only_segment) if args.only_segment else None LOG.info("Building campaigns for send date %s (dry_run=%s, preview=%s, " "max_per_segment=%s, only=%s, send_hour=%s, stagger_minutes=%s, daily_cap=%s, warmup_cap=%s)", send_date, args.dry_run, args.preview, args.max_per_segment, sorted(only) if only else None, args.send_hour, args.stagger_minutes, args.daily_cap, not args.no_warmup_cap) run(send_date, dry_run=args.dry_run, preview=args.preview, max_per_segment=args.max_per_segment, only_segments=only, send_hour_override=args.send_hour, send_minute=args.send_minute, stagger_minutes=args.stagger_minutes, daily_cap=args.daily_cap, warmup_cap=not args.no_warmup_cap) if __name__ == "__main__": main()