campaigns: auto-rollout catch-all pool gated by warmup day + live bounce rate

Replaces the panic-era burner-domain verification plan with an in-house
automatic catch-all rollout in the trucking/IFTA/UCR builders. Root-cause
classification of the 75k pre-DKIM-fix bounces showed ~55% were reputation/
auth (now fixed by DKIM signing) and only ~29% genuinely-dead mailboxes;
catch-all domains accept at RCPT time so they do not user-unknown bounce at
send, making a controlled in-house bleed safer than warming a separate burner.

catch_all_enabled() adds catch-all results only when warmup_day >=
CAMPAIGN_CATCH_ALL_MIN_DAY (21) AND the recent 2-day live bounce rate is below
CAMPAIGN_CATCH_ALL_MAX_BOUNCE_PCT (8%) on a >=300-sent sample; auto-reverts to
the clean smtp_valid/send_confirmed pool on the next run if bounces spike.
Short window so a past disaster cannot block the rollout forever and a fresh
spike trips fast. CAMPAIGN_INCLUDE_CATCH_ALL=1/0 still hard-overrides.

USABLE_FILTER (static) -> usable_filter() (per-run, memoized, one DB probe).
IFTA/UCR SELECT_SQL -> _select_sql() so tc.usable_filter() resolves at call
time, not import. 13 logic unit tests pass; live dry-run decision = OFF
(day 15 < 21 and recent 2d bounce 42% from the aging-out Jun-16 disaster).
This commit is contained in:
justin 2026-06-18 01:39:09 -05:00
parent c36ef07310
commit 40da017b79
4 changed files with 200 additions and 25 deletions

View file

@ -149,12 +149,16 @@ def _reset_cycle_if_new(conn, quarter: str, due: date) -> None:
LOG.info("[ifta] new cycle %s -- cleared %d prior ifta_reminded_at marks", cycle_key, cleared)
SELECT_SQL = f"""
def _select_sql() -> str:
# tc.usable_filter() is resolved at call time (not import) so the catch-all
# auto-rollout decision + its single DB probe happen during the run, not when
# this module is imported.
return f"""
SELECT dot_number, email_address, legal_name, phy_state
FROM fmcsa_carriers
WHERE carrier_operation = 'A' -- interstate => files IFTA
AND email_address IS NOT NULL AND email_address <> ''
AND {tc.USABLE_FILTER}
AND {tc.usable_filter()}
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND ifta_self_filed_at IS NULL -- clicked "I already filed it"
AND COALESCE(ifta_touch_no, 0) < %s -- not yet sent THIS touch
@ -233,7 +237,7 @@ def main() -> int:
LOG.info("[ifta] coupon disabled (CAMPAIGN_ENABLE_COUPON unset) — normal price")
cur = conn.cursor()
cur.execute(SELECT_SQL, [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit])
cur.execute(_select_sql(), [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit])
rows = cur.fetchall()
LOG.info("[ifta] %s due %s | touch %d (%d biz-days before) | %d candidate carriers",
q, due, touch_no, days_before, len(rows))

View file

@ -353,22 +353,163 @@ REPLY_TO_HEADERS = [{"name": "Reply-To", "value": REPLY_TO_EMAIL}]
# — addresses an MX explicitly accepted at RCPT time — plus 'send_confirmed'
# (addresses proven deliverable by a real burner-domain verification send; see
# docs/campaign-deliverability-plan.md). This drives the bounce rate to near-zero
# and rebuilds sender reputation. Once recovered, set CAMPAIGN_INCLUDE_CATCH_ALL=1
# to re-add catch-all domains (which accept at SMTP time but can still bounce
# later, so they stay out during recovery). 'hard_bounced' is NEVER sendable.
_SENDABLE_RESULTS = ["smtp_valid", "send_confirmed"]
if os.getenv("CAMPAIGN_INCLUDE_CATCH_ALL", "0") not in ("0", "false", ""):
_SENDABLE_RESULTS += ["catch_all_domain", "catch_all_detected"]
USABLE_FILTER = (
"email_verify_result IN ("
+ ", ".join(f"'{r}'" for r in _SENDABLE_RESULTS)
+ ")"
)
# and rebuilds sender reputation. 'hard_bounced' is NEVER sendable.
#
# Catch-all domains (accept any RCPT at SMTP time, then may silently bounce
# later) are the big growth pool but the risky one, so they are gated by an
# AUTOMATIC rollout (see catch_all_enabled): once the IPs are warm AND the recent
# live bounce rate is provably low, they are added; if bounces spike they
# auto-revert. CAMPAIGN_INCLUDE_CATCH_ALL=1/0 hard-overrides the auto decision.
BASE_SENDABLE_RESULTS = ["smtp_valid", "send_confirmed"]
CATCH_ALL_RESULTS = ["catch_all_domain", "catch_all_detected"]
# ── Catch-all auto-rollout tunables ─────────────────────────────────────────
# Warmup day at/after which catch-all MAY auto-enable (rebuild reputation on the
# clean smtp_valid pool first). Independent of the big-MX axis: catch-all is
# dominated by long-tail business domains, and any catch-all address that also
# lands on Google/Microsoft is still held out by big_mx_exclude until day 30.
CATCH_ALL_MIN_WARMUP_DAY = int(os.getenv("CAMPAIGN_CATCH_ALL_MIN_DAY", "21"))
# Recent-window bounce-rate ceiling (percent). At/above this, catch-all stays OFF
# and an already-on rollout auto-reverts. A SHORT window is deliberate: a
# historical disaster (e.g. the Jun-16 ~45% 7-day rate) must NOT block the
# rollout forever, and a fresh spike must trip it fast.
CATCH_ALL_MAX_BOUNCE_PCT = float(os.getenv("CAMPAIGN_CATCH_ALL_MAX_BOUNCE_PCT", "8"))
CATCH_ALL_BOUNCE_WINDOW_DAYS = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_WINDOW_DAYS", "2"))
# Minimum sent volume required in the window before the rate is trusted (else a
# tiny sample like 9 sent / 1 bounce = 11% would wrongly gate the decision).
CATCH_ALL_BOUNCE_MIN_SENT = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_MIN_SENT", "300"))
DB_URL = os.getenv("DATABASE_URL", "")
WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start")
def _listmonk_db_url() -> str:
"""Derive the listmonk DB URL from DATABASE_URL (same Postgres, diff db).
Bounce/sent counts live in the listmonk DB, while the campaign builder's
DB_URL points at the `performancewest` app DB on the SAME Postgres server.
"""
override = os.getenv("LISTMONK_DATABASE_URL")
if override:
return override
base = DB_URL or os.getenv("DATABASE_URL", "")
if "/" in base:
return base.rsplit("/", 1)[0] + "/listmonk"
return base
def recent_bounce_rate(window_days: int) -> tuple[float | None, int, int]:
"""Live campaign bounce rate over the last `window_days`.
Returns (rate_pct_or_None, sent, bounced). rate is None when sent==0 (no
signal). Only campaigns that actually ran in the window are counted, and
bounces are joined on campaign_id (99% populated for the real-time postfix
source), so a long-past disaster cannot poison a short recent window.
"""
try:
conn = psycopg2.connect(_listmonk_db_url())
except Exception as exc: # pragma: no cover - infra dependent
LOG.warning("catch-all guardrail: cannot reach listmonk DB (%s); "
"treating bounce rate as UNKNOWN (fail-closed)", exc)
return None, 0, 0
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT COALESCE(SUM(c.sent), 0),
COALESCE(SUM(b.n), 0)
FROM campaigns c
LEFT JOIN (
SELECT campaign_id, count(*) AS n
FROM bounces
WHERE campaign_id IS NOT NULL
GROUP BY campaign_id
) b ON b.campaign_id = c.id
WHERE COALESCE(c.started_at, c.created_at)
> now() - make_interval(days => %s)
AND c.status IN ('finished', 'running')
""",
(window_days,),
)
sent, bounced = cur.fetchone()
sent, bounced = int(sent), int(bounced)
finally:
conn.close()
rate = (100.0 * bounced / sent) if sent else None
return rate, sent, bounced
def catch_all_enabled() -> bool:
"""Decide whether catch-all domains are sendable on THIS run.
Auto-rollout (no env needed):
1. IPs warm enough -> warmup_day() >= CATCH_ALL_MIN_WARMUP_DAY
2. recent bounce rate low -> over CATCH_ALL_BOUNCE_WINDOW_DAYS, with at
least CATCH_ALL_BOUNCE_MIN_SENT sent for a
trustworthy sample, the rate is BELOW
CATCH_ALL_MAX_BOUNCE_PCT.
If bounces later spike above the ceiling, this returns False again on the
next run -> the rollout auto-reverts to the clean smtp_valid pool.
CAMPAIGN_INCLUDE_CATCH_ALL hard-overrides: '1'/'true' forces ON (manual
decision, skips guardrail), '0'/'false' forces OFF.
"""
override = os.getenv("CAMPAIGN_INCLUDE_CATCH_ALL")
if override is not None:
forced = override.strip().lower() not in ("0", "false", "")
LOG.info("catch-all: forced %s via CAMPAIGN_INCLUDE_CATCH_ALL=%r",
"ON" if forced else "OFF", override)
return forced
day = warmup_day()
if day is None or day < CATCH_ALL_MIN_WARMUP_DAY:
LOG.info("catch-all: OFF (warmup day %s < min %s)",
day, CATCH_ALL_MIN_WARMUP_DAY)
return False
rate, sent, bounced = recent_bounce_rate(CATCH_ALL_BOUNCE_WINDOW_DAYS)
if rate is None or sent < CATCH_ALL_BOUNCE_MIN_SENT:
# Not enough recent signal to trust -> fail closed (stay on clean pool).
LOG.info("catch-all: OFF (insufficient recent signal: %s sent < min %s "
"over %sd; need a low proven bounce rate first)",
sent, CATCH_ALL_BOUNCE_MIN_SENT, CATCH_ALL_BOUNCE_WINDOW_DAYS)
return False
if rate >= CATCH_ALL_MAX_BOUNCE_PCT:
LOG.warning("catch-all: OFF (recent bounce rate %.2f%% >= ceiling %.2f%% "
"over %sd; %s sent / %s bounced) -- auto-reverting to clean pool",
rate, CATCH_ALL_MAX_BOUNCE_PCT, CATCH_ALL_BOUNCE_WINDOW_DAYS,
sent, bounced)
return False
LOG.info("catch-all: ON (warmup day %s >= %s; recent bounce %.2f%% < %.2f%% "
"over %sd; %s sent / %s bounced)",
day, CATCH_ALL_MIN_WARMUP_DAY, rate, CATCH_ALL_MAX_BOUNCE_PCT,
CATCH_ALL_BOUNCE_WINDOW_DAYS, sent, bounced)
return True
def usable_filter() -> str:
"""SQL predicate for email_verify_result values that are safe to send to.
Always includes the clean pool (smtp_valid + send_confirmed); adds catch-all
results only when catch_all_enabled() says so (warm IPs + low live bounces).
The decision is memoized so it is computed ONCE per build run (one DB probe,
one log line, and a consistent filter across every segment/timezone).
"""
global _USABLE_FILTER_CACHE
if _USABLE_FILTER_CACHE is None:
results = list(BASE_SENDABLE_RESULTS)
if catch_all_enabled():
results += CATCH_ALL_RESULTS
_USABLE_FILTER_CACHE = (
"email_verify_result IN (" + ", ".join(f"'{r}'" for r in results) + ")"
)
return _USABLE_FILTER_CACHE
_USABLE_FILTER_CACHE: str | None = None
def warmup_day() -> int | None:
"""Return days since MTA warmup start, or None if not configured/readable."""
try:
@ -684,7 +825,7 @@ def count_carriers(conn, tz_states: tuple, campaign_type: str, limit: int | None
SELECT 1
FROM fmcsa_carriers
WHERE {type_filter}
AND {USABLE_FILTER}
AND {usable_filter()}
AND listmonk_sent_at IS NULL
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND phy_state IN ({states_placeholder})
@ -790,7 +931,7 @@ def fetch_carriers(
{target_state_sql} AS target_state, mx_provider
FROM fmcsa_carriers
WHERE {type_filter}
AND {USABLE_FILTER}
AND {usable_filter()}
AND listmonk_sent_at IS NULL
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND phy_state IN ({states_placeholder})

View file

@ -115,12 +115,16 @@ def _reset_cycle_if_new(conn, year: int) -> None:
LOG.info("[ucr] new cycle %s -- cleared %d prior marks", cycle_key, cleared)
SELECT_SQL = f"""
def _select_sql() -> str:
# tc.usable_filter() is resolved at call time (not import) so the catch-all
# auto-rollout decision + its single DB probe happen during the run, not when
# this module is imported.
return f"""
SELECT dot_number, email_address, legal_name, phy_state
FROM fmcsa_carriers
WHERE carrier_operation = 'A' -- interstate => needs UCR
AND email_address IS NOT NULL AND email_address <> ''
AND {tc.USABLE_FILTER}
AND {tc.usable_filter()}
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND ucr_self_filed_at IS NULL
AND COALESCE(ucr_touch_no, 0) < %s
@ -179,7 +183,7 @@ def main() -> int:
LOG.info("[ucr] coupon disabled (CAMPAIGN_ENABLE_COUPON unset) — normal price")
cur = conn.cursor()
cur.execute(SELECT_SQL, [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit])
cur.execute(_select_sql(), [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit])
rows = cur.fetchall()
LOG.info("[ucr] %d UCR due %s | touch %d (%d biz-days) | %d candidates",
year, due, touch_no, days_before, len(rows))