From 40da017b79431221dde2b7c9d36dbd62c3e72131 Mon Sep 17 00:00:00 2001 From: justin Date: Thu, 18 Jun 2026 01:39:09 -0500 Subject: [PATCH] campaigns: auto-rollout catch-all pool gated by warmup day + live bounce rate Replaces the panic-era burner-domain verification plan with an in-house automatic catch-all rollout in the trucking/IFTA/UCR builders. Root-cause classification of the 75k pre-DKIM-fix bounces showed ~55% were reputation/ auth (now fixed by DKIM signing) and only ~29% genuinely-dead mailboxes; catch-all domains accept at RCPT time so they do not user-unknown bounce at send, making a controlled in-house bleed safer than warming a separate burner. catch_all_enabled() adds catch-all results only when warmup_day >= CAMPAIGN_CATCH_ALL_MIN_DAY (21) AND the recent 2-day live bounce rate is below CAMPAIGN_CATCH_ALL_MAX_BOUNCE_PCT (8%) on a >=300-sent sample; auto-reverts to the clean smtp_valid/send_confirmed pool on the next run if bounces spike. Short window so a past disaster cannot block the rollout forever and a fresh spike trips fast. CAMPAIGN_INCLUDE_CATCH_ALL=1/0 still hard-overrides. USABLE_FILTER (static) -> usable_filter() (per-run, memoized, one DB probe). IFTA/UCR SELECT_SQL -> _select_sql() so tc.usable_filter() resolves at call time, not import. 13 logic unit tests pass; live dry-run decision = OFF (day 15 < 21 and recent 2d bounce 42% from the aging-out Jun-16 disaster). --- docs/campaign-deliverability-plan.md | 38 +++++- scripts/build_ifta_quarterly_campaign.py | 10 +- scripts/build_trucking_campaigns.py | 167 +++++++++++++++++++++-- scripts/build_ucr_annual_campaign.py | 10 +- 4 files changed, 200 insertions(+), 25 deletions(-) diff --git a/docs/campaign-deliverability-plan.md b/docs/campaign-deliverability-plan.md index 8fa0159..1a788cb 100644 --- a/docs/campaign-deliverability-plan.md +++ b/docs/campaign-deliverability-plan.md @@ -86,9 +86,35 @@ the cleaned output. - [x] Fix the PW trucking send filter (drop `mx_unreachable`; recovery mode). - [x] Confirm healthcare unaffected. -- [ ] Add `send_confirmed` / `hard_bounced` result handling to the campaign - filter + a writeback path from bounce processing. -- [ ] Stand up the burner verification domain + isolated MTA identity. -- [ ] Build the verification-send + bounce-writeback worker. -- [ ] Re-verify the `catch_all_domain` + `mx_unreachable` pools through the burner - to grow the PW-sendable list. +- [x] Add `send_confirmed` / `hard_bounced` result handling to the campaign + filter + a writeback path from bounce processing (`burner_list_verify.py`). +- [x] **Catch-all auto-rollout instead of the burner domain (2026-06-18).** After + the DKIM signing fix landed, a root-cause classification of the 75k + pre-fix bounces showed the damage was ~55% reputation/auth (which DKIM + fixes) and only ~29% genuinely-dead mailboxes. The catch-all pool accepts + at RCPT time by definition, so it does not user-unknown bounce at send + time -- it is far safer to bleed directly in warmed batches than to stand + up + warm a whole separate burner domain/IP/SPF/DKIM identity. So the + catch-all pool is now gated by an **automatic in-house rollout** in + `build_trucking_campaigns.py` (`catch_all_enabled()`): + - enables only when `warmup_day() >= CAMPAIGN_CATCH_ALL_MIN_DAY` (21) + AND the **recent** (2-day) live campaign bounce rate is below + `CAMPAIGN_CATCH_ALL_MAX_BOUNCE_PCT` (8%) on a trustworthy sample + (>= 300 sent); + - **auto-reverts** to the clean `smtp_valid`/`send_confirmed` pool on the + next run if bounces spike back above the ceiling; + - a deliberately SHORT window so a past disaster (the Jun-16 ~45% 7-day + rate) cannot block the rollout forever, and a fresh spike trips it fast; + - `CAMPAIGN_INCLUDE_CATCH_ALL=1/0` still hard-overrides the auto decision. + Applied uniformly to trucking + IFTA + UCR builders (`tc.usable_filter()`). + The bounce-watcher continues to auto-suppress any individual hard bounces + in real time, so PW's own bounce rate stays bounded during the rollout. +- [ ] ~~Stand up the burner verification domain + isolated MTA identity.~~ + **Dropped** -- superseded by the catch-all auto-rollout above (the burner + was a panic-era design from before the DKIM fix + per-subscriber bounce + tracking made an in-house controlled rollout safe). The `mx_probe_blocked` + consumer-ISP pool (438k, highest dead-mailbox risk) is the only case where + a burner would still help; revisit only if that pool is ever needed. +- [x] ~~Build the verification-send + bounce-writeback worker.~~ Not needed for + catch-all (see above). `burner_list_verify.py` remains available if the + `mx_probe_blocked` pool is ever scrubbed via a burner. diff --git a/scripts/build_ifta_quarterly_campaign.py b/scripts/build_ifta_quarterly_campaign.py index 7dc7570..c6506e3 100644 --- a/scripts/build_ifta_quarterly_campaign.py +++ b/scripts/build_ifta_quarterly_campaign.py @@ -149,12 +149,16 @@ def _reset_cycle_if_new(conn, quarter: str, due: date) -> None: LOG.info("[ifta] new cycle %s -- cleared %d prior ifta_reminded_at marks", cycle_key, cleared) -SELECT_SQL = f""" +def _select_sql() -> str: + # tc.usable_filter() is resolved at call time (not import) so the catch-all + # auto-rollout decision + its single DB probe happen during the run, not when + # this module is imported. + return f""" SELECT dot_number, email_address, legal_name, phy_state FROM fmcsa_carriers WHERE carrier_operation = 'A' -- interstate => files IFTA AND email_address IS NOT NULL AND email_address <> '' - AND {tc.USABLE_FILTER} + AND {tc.usable_filter()} AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND ifta_self_filed_at IS NULL -- clicked "I already filed it" AND COALESCE(ifta_touch_no, 0) < %s -- not yet sent THIS touch @@ -233,7 +237,7 @@ def main() -> int: LOG.info("[ifta] coupon disabled (CAMPAIGN_ENABLE_COUPON unset) — normal price") cur = conn.cursor() - cur.execute(SELECT_SQL, [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit]) + cur.execute(_select_sql(), [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit]) rows = cur.fetchall() LOG.info("[ifta] %s due %s | touch %d (%d biz-days before) | %d candidate carriers", q, due, touch_no, days_before, len(rows)) diff --git a/scripts/build_trucking_campaigns.py b/scripts/build_trucking_campaigns.py index 76c233b..dc4bbc9 100644 --- a/scripts/build_trucking_campaigns.py +++ b/scripts/build_trucking_campaigns.py @@ -353,22 +353,163 @@ REPLY_TO_HEADERS = [{"name": "Reply-To", "value": REPLY_TO_EMAIL}] # — addresses an MX explicitly accepted at RCPT time — plus 'send_confirmed' # (addresses proven deliverable by a real burner-domain verification send; see # docs/campaign-deliverability-plan.md). This drives the bounce rate to near-zero -# and rebuilds sender reputation. Once recovered, set CAMPAIGN_INCLUDE_CATCH_ALL=1 -# to re-add catch-all domains (which accept at SMTP time but can still bounce -# later, so they stay out during recovery). 'hard_bounced' is NEVER sendable. -_SENDABLE_RESULTS = ["smtp_valid", "send_confirmed"] -if os.getenv("CAMPAIGN_INCLUDE_CATCH_ALL", "0") not in ("0", "false", ""): - _SENDABLE_RESULTS += ["catch_all_domain", "catch_all_detected"] -USABLE_FILTER = ( - "email_verify_result IN (" - + ", ".join(f"'{r}'" for r in _SENDABLE_RESULTS) - + ")" -) +# and rebuilds sender reputation. 'hard_bounced' is NEVER sendable. +# +# Catch-all domains (accept any RCPT at SMTP time, then may silently bounce +# later) are the big growth pool but the risky one, so they are gated by an +# AUTOMATIC rollout (see catch_all_enabled): once the IPs are warm AND the recent +# live bounce rate is provably low, they are added; if bounces spike they +# auto-revert. CAMPAIGN_INCLUDE_CATCH_ALL=1/0 hard-overrides the auto decision. +BASE_SENDABLE_RESULTS = ["smtp_valid", "send_confirmed"] +CATCH_ALL_RESULTS = ["catch_all_domain", "catch_all_detected"] + +# ── Catch-all auto-rollout tunables ───────────────────────────────────────── +# Warmup day at/after which catch-all MAY auto-enable (rebuild reputation on the +# clean smtp_valid pool first). Independent of the big-MX axis: catch-all is +# dominated by long-tail business domains, and any catch-all address that also +# lands on Google/Microsoft is still held out by big_mx_exclude until day 30. +CATCH_ALL_MIN_WARMUP_DAY = int(os.getenv("CAMPAIGN_CATCH_ALL_MIN_DAY", "21")) +# Recent-window bounce-rate ceiling (percent). At/above this, catch-all stays OFF +# and an already-on rollout auto-reverts. A SHORT window is deliberate: a +# historical disaster (e.g. the Jun-16 ~45% 7-day rate) must NOT block the +# rollout forever, and a fresh spike must trip it fast. +CATCH_ALL_MAX_BOUNCE_PCT = float(os.getenv("CAMPAIGN_CATCH_ALL_MAX_BOUNCE_PCT", "8")) +CATCH_ALL_BOUNCE_WINDOW_DAYS = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_WINDOW_DAYS", "2")) +# Minimum sent volume required in the window before the rate is trusted (else a +# tiny sample like 9 sent / 1 bounce = 11% would wrongly gate the decision). +CATCH_ALL_BOUNCE_MIN_SENT = int(os.getenv("CAMPAIGN_CATCH_ALL_BOUNCE_MIN_SENT", "300")) DB_URL = os.getenv("DATABASE_URL", "") WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start") +def _listmonk_db_url() -> str: + """Derive the listmonk DB URL from DATABASE_URL (same Postgres, diff db). + + Bounce/sent counts live in the listmonk DB, while the campaign builder's + DB_URL points at the `performancewest` app DB on the SAME Postgres server. + """ + override = os.getenv("LISTMONK_DATABASE_URL") + if override: + return override + base = DB_URL or os.getenv("DATABASE_URL", "") + if "/" in base: + return base.rsplit("/", 1)[0] + "/listmonk" + return base + + +def recent_bounce_rate(window_days: int) -> tuple[float | None, int, int]: + """Live campaign bounce rate over the last `window_days`. + + Returns (rate_pct_or_None, sent, bounced). rate is None when sent==0 (no + signal). Only campaigns that actually ran in the window are counted, and + bounces are joined on campaign_id (≈99% populated for the real-time postfix + source), so a long-past disaster cannot poison a short recent window. + """ + try: + conn = psycopg2.connect(_listmonk_db_url()) + except Exception as exc: # pragma: no cover - infra dependent + LOG.warning("catch-all guardrail: cannot reach listmonk DB (%s); " + "treating bounce rate as UNKNOWN (fail-closed)", exc) + return None, 0, 0 + try: + with conn.cursor() as cur: + cur.execute( + """ + SELECT COALESCE(SUM(c.sent), 0), + COALESCE(SUM(b.n), 0) + FROM campaigns c + LEFT JOIN ( + SELECT campaign_id, count(*) AS n + FROM bounces + WHERE campaign_id IS NOT NULL + GROUP BY campaign_id + ) b ON b.campaign_id = c.id + WHERE COALESCE(c.started_at, c.created_at) + > now() - make_interval(days => %s) + AND c.status IN ('finished', 'running') + """, + (window_days,), + ) + sent, bounced = cur.fetchone() + sent, bounced = int(sent), int(bounced) + finally: + conn.close() + rate = (100.0 * bounced / sent) if sent else None + return rate, sent, bounced + + +def catch_all_enabled() -> bool: + """Decide whether catch-all domains are sendable on THIS run. + + Auto-rollout (no env needed): + 1. IPs warm enough -> warmup_day() >= CATCH_ALL_MIN_WARMUP_DAY + 2. recent bounce rate low -> over CATCH_ALL_BOUNCE_WINDOW_DAYS, with at + least CATCH_ALL_BOUNCE_MIN_SENT sent for a + trustworthy sample, the rate is BELOW + CATCH_ALL_MAX_BOUNCE_PCT. + If bounces later spike above the ceiling, this returns False again on the + next run -> the rollout auto-reverts to the clean smtp_valid pool. + + CAMPAIGN_INCLUDE_CATCH_ALL hard-overrides: '1'/'true' forces ON (manual + decision, skips guardrail), '0'/'false' forces OFF. + """ + override = os.getenv("CAMPAIGN_INCLUDE_CATCH_ALL") + if override is not None: + forced = override.strip().lower() not in ("0", "false", "") + LOG.info("catch-all: forced %s via CAMPAIGN_INCLUDE_CATCH_ALL=%r", + "ON" if forced else "OFF", override) + return forced + + day = warmup_day() + if day is None or day < CATCH_ALL_MIN_WARMUP_DAY: + LOG.info("catch-all: OFF (warmup day %s < min %s)", + day, CATCH_ALL_MIN_WARMUP_DAY) + return False + + rate, sent, bounced = recent_bounce_rate(CATCH_ALL_BOUNCE_WINDOW_DAYS) + if rate is None or sent < CATCH_ALL_BOUNCE_MIN_SENT: + # Not enough recent signal to trust -> fail closed (stay on clean pool). + LOG.info("catch-all: OFF (insufficient recent signal: %s sent < min %s " + "over %sd; need a low proven bounce rate first)", + sent, CATCH_ALL_BOUNCE_MIN_SENT, CATCH_ALL_BOUNCE_WINDOW_DAYS) + return False + if rate >= CATCH_ALL_MAX_BOUNCE_PCT: + LOG.warning("catch-all: OFF (recent bounce rate %.2f%% >= ceiling %.2f%% " + "over %sd; %s sent / %s bounced) -- auto-reverting to clean pool", + rate, CATCH_ALL_MAX_BOUNCE_PCT, CATCH_ALL_BOUNCE_WINDOW_DAYS, + sent, bounced) + return False + + LOG.info("catch-all: ON (warmup day %s >= %s; recent bounce %.2f%% < %.2f%% " + "over %sd; %s sent / %s bounced)", + day, CATCH_ALL_MIN_WARMUP_DAY, rate, CATCH_ALL_MAX_BOUNCE_PCT, + CATCH_ALL_BOUNCE_WINDOW_DAYS, sent, bounced) + return True + + +def usable_filter() -> str: + """SQL predicate for email_verify_result values that are safe to send to. + + Always includes the clean pool (smtp_valid + send_confirmed); adds catch-all + results only when catch_all_enabled() says so (warm IPs + low live bounces). + The decision is memoized so it is computed ONCE per build run (one DB probe, + one log line, and a consistent filter across every segment/timezone). + """ + global _USABLE_FILTER_CACHE + if _USABLE_FILTER_CACHE is None: + results = list(BASE_SENDABLE_RESULTS) + if catch_all_enabled(): + results += CATCH_ALL_RESULTS + _USABLE_FILTER_CACHE = ( + "email_verify_result IN (" + ", ".join(f"'{r}'" for r in results) + ")" + ) + return _USABLE_FILTER_CACHE + + +_USABLE_FILTER_CACHE: str | None = None + + def warmup_day() -> int | None: """Return days since MTA warmup start, or None if not configured/readable.""" try: @@ -684,7 +825,7 @@ def count_carriers(conn, tz_states: tuple, campaign_type: str, limit: int | None SELECT 1 FROM fmcsa_carriers WHERE {type_filter} - AND {USABLE_FILTER} + AND {usable_filter()} AND listmonk_sent_at IS NULL AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND phy_state IN ({states_placeholder}) @@ -790,7 +931,7 @@ def fetch_carriers( {target_state_sql} AS target_state, mx_provider FROM fmcsa_carriers WHERE {type_filter} - AND {USABLE_FILTER} + AND {usable_filter()} AND listmonk_sent_at IS NULL AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND phy_state IN ({states_placeholder}) diff --git a/scripts/build_ucr_annual_campaign.py b/scripts/build_ucr_annual_campaign.py index 1234865..cb298fd 100644 --- a/scripts/build_ucr_annual_campaign.py +++ b/scripts/build_ucr_annual_campaign.py @@ -115,12 +115,16 @@ def _reset_cycle_if_new(conn, year: int) -> None: LOG.info("[ucr] new cycle %s -- cleared %d prior marks", cycle_key, cleared) -SELECT_SQL = f""" +def _select_sql() -> str: + # tc.usable_filter() is resolved at call time (not import) so the catch-all + # auto-rollout decision + its single DB probe happen during the run, not when + # this module is imported. + return f""" SELECT dot_number, email_address, legal_name, phy_state FROM fmcsa_carriers WHERE carrier_operation = 'A' -- interstate => needs UCR AND email_address IS NOT NULL AND email_address <> '' - AND {tc.USABLE_FILTER} + AND {tc.usable_filter()} AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND ucr_self_filed_at IS NULL AND COALESCE(ucr_touch_no, 0) < %s @@ -179,7 +183,7 @@ def main() -> int: LOG.info("[ucr] coupon disabled (CAMPAIGN_ENABLE_COUPON unset) — normal price") cur = conn.cursor() - cur.execute(SELECT_SQL, [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit]) + cur.execute(_select_sql(), [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit]) rows = cur.fetchall() LOG.info("[ucr] %d UCR due %s | touch %d (%d biz-days) | %d candidates", year, due, touch_no, days_before, len(rows))