From 9e409650926494423873fd892e1ed162c955c6e7 Mon Sep 17 00:00:00 2001 From: justin Date: Sun, 14 Jun 2026 21:11:23 -0500 Subject: [PATCH] trucking: per-MX-operator throttling + Google/MS-Workspace warmup exclusion The Jun 13-14 Gmail+Outlook block storm came from the main/trucking pool having NO per-MX throttling (only HC had it) -- it concentrated warmup volume on Google/Microsoft-Workspace-hosted business domains. Port the HC fix: - migration 097: fmcsa_carriers.mx_provider column. - mx_tag_carriers.py: resolve MX once per distinct domain (reuses the verifier's classifier+cache), tag every carrier with that domain's operator. Bounded per run, prioritizes unsent verified carriers. - build_trucking_campaigns: during warmup (day<=6) EXCLUDE tagged Google/MS/ Proofpoint/etc. carriers in fetch_carriers; per-MX cap in select_sendable_ carriers so known operators never dominate the quota. Untagged carriers pass (not collapsed onto one bucket) until tagging fills in. mx_daily_caps ramps with the main warmup day; MAIN_SKIP_BIG_MX=0 disables once warmed. --- api/migrations/097_fmcsa_mx_provider.sql | 13 ++++ scripts/build_trucking_campaigns.py | 87 +++++++++++++++++++++- scripts/mx_tag_carriers.py | 92 ++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 api/migrations/097_fmcsa_mx_provider.sql create mode 100644 scripts/mx_tag_carriers.py diff --git a/api/migrations/097_fmcsa_mx_provider.sql b/api/migrations/097_fmcsa_mx_provider.sql new file mode 100644 index 0000000..be72678 --- /dev/null +++ b/api/migrations/097_fmcsa_mx_provider.sql @@ -0,0 +1,13 @@ +-- Per-MX-operator throttling for the trucking/main warmup pool. +-- Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365, +-- Google Workspace, Proofpoint, ...), not by recipient domain. The Jun 13-14 +-- Gmail + Outlook block storm came from hammering Google/MS-Workspace-hosted +-- business domains. mx_provider lets the builder exclude those during warmup and +-- cap volume per operator (mirrors the HC pool). Populated by mx_tag_carriers.py. + +ALTER TABLE fmcsa_carriers + ADD COLUMN IF NOT EXISTS mx_provider TEXT; + +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_fmcsa_carriers_mx_provider + ON fmcsa_carriers (mx_provider) + WHERE mx_provider IS NOT NULL; diff --git a/scripts/build_trucking_campaigns.py b/scripts/build_trucking_campaigns.py index 4a9e10f..148f202 100644 --- a/scripts/build_trucking_campaigns.py +++ b/scripts/build_trucking_campaigns.py @@ -168,6 +168,60 @@ COUPON_SLUGS = ( _ET = timezone(timedelta(hours=-5)) # EST anchor; close enough for an end-of-day cutoff _COUPON_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ" # no I/O to avoid confusion +# ── Per-MX-operator throttling (main pool) ────────────────────────────────── +# Sender reputation is tracked by the RECEIVING operator, not the recipient +# domain. The Jun 13-14 Gmail + Outlook block storm came from concentrating the +# warmup on Google/Microsoft-Workspace-hosted business domains. During warmup we +# EXCLUDE those big operators entirely (send to the long tail of small/self-hosted +# mail systems that don't bot-throttle), then cap per-operator once reputation is +# established. mx_provider is populated by mx_tag_carriers.py. +# Set MAIN_SKIP_BIG_MX=0 to stop excluding once warmed up. +MAIN_SKIP_BIG_MX = os.getenv("MAIN_SKIP_BIG_MX", "1") not in ("0", "false", "") +# Operators to hold out during warmup (they aggressively throttle/blocklist). +BIG_MX_OPERATORS = ("google", "microsoft", "proofpoint", "mimecast", + "barracuda", "cisco", "broadcom") +MAIN_WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start") + + +def main_warmup_day() -> int: + try: + start = int(open(MAIN_WARMUP_START_FILE).read().strip()) + return max(0, int((time.time() - start) // 86400)) + except Exception: + return 99 # no stamp = treat as warmed up + + +def mx_daily_caps(day: int) -> dict: + """Per-operator daily NEW-recipient caps, ramping with the warmup day. Big + operators are EXCLUDED during early warmup (see MAIN_SKIP_BIG_MX); these caps + apply once they're re-enabled.""" + if day <= 6: big, default = 0, 40 # big operators OFF, long tail only + elif day <= 13: big, default = 60, 80 + elif day <= 20: big, default = 150, 150 + else: big, default = 300, 250 + caps = {op: big for op in BIG_MX_OPERATORS} + caps["__default__"] = default + return caps + + +def mx_throttled(rows: list[tuple], total_n: int, caps: dict, mx_idx: int) -> list[tuple]: + """Pick up to total_n rows, capping per mx_provider (rows[mx_idx]) so no + single receiving operator exceeds its daily share. Preserves order.""" + per_op: dict = {} + chosen: list[tuple] = [] + default_cap = caps.get("__default__", 50) + for r in rows: + if len(chosen) >= total_n: + break + prov = (r[mx_idx] or "").strip().lower() if mx_idx < len(r) else "" + cap = caps.get(prov, default_cap) + used = per_op.get(prov, 0) + if used >= cap: + continue + per_op[prov] = used + 1 + chosen.append(r) + return chosen + def _random_coupon_code() -> str: import secrets @@ -668,15 +722,26 @@ def fetch_carriers( target_state_sql = "phy_state" target_state_params = [] + # During warmup, exclude carriers on the big operators that throttle/blocklist + # (Google, Microsoft, etc.) -- mx_provider is set by mx_tag_carriers.py. + # Untagged carriers (mx_provider IS NULL) are kept: the per-MX throttle in the + # selector still bounds them, and excluding NULLs would starve the pool until + # tagging completes. + big_mx_exclude = "" + if MAIN_SKIP_BIG_MX and main_warmup_day() <= 6: + ops = ",".join("'%s'" % o for o in BIG_MX_OPERATORS) + big_mx_exclude = f"AND (mx_provider IS NULL OR mx_provider NOT IN ({ops}))" + cur.execute(f""" SELECT dot_number, email_address, legal_name, phy_state, - {target_state_sql} AS target_state + {target_state_sql} AS target_state, mx_provider FROM fmcsa_carriers WHERE {type_filter} AND {USABLE_FILTER} AND listmonk_sent_at IS NULL AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND phy_state IN ({states_placeholder}) + {big_mx_exclude} ORDER BY mcs150_parsed ASC NULLS LAST LIMIT %s OFFSET %s """, target_state_params + [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit, offset]) @@ -699,6 +764,12 @@ def select_sendable_carriers( skipped: dict[str, int] = {} seen_emails: set[str] = set() offset = 0 + # Per-MX throttle: cap how many of THIS run's quota go to each receiving + # operator so we never concentrate on one (the Jun 13-14 Gmail/Outlook storm). + caps = mx_daily_caps(main_warmup_day()) + per_op: dict = {} + default_cap = caps.get("__default__", 50) + MX_IDX = 5 # mx_provider is the 6th column from fetch_carriers # Warmup caps are small, but old audiences can contain many prior bounces or # unsubscribes. Scan beyond the quota while still bounding worst-case API calls. scan_limit = max_scan or max(quota * 8, quota + 500, 1000) @@ -715,10 +786,24 @@ def select_sendable_carriers( skipped["duplicate_email"] = skipped.get("duplicate_email", 0) + 1 continue seen_emails.add(email) + # Per-MX-operator cap (reputation is per receiving operator). + # Untagged carriers (no mx_provider yet) are NOT capped here -- they + # would otherwise all collapse onto one __default__ bucket and starve + # the pool before tagging completes. The big-operator EXCLUSION in + # fetch_carriers already keeps Google/MS out during warmup; this cap + # bounds the KNOWN operators once tagging fills in. + prov = (row[MX_IDX] or "").strip().lower() if len(row) > MX_IDX else "" + if prov: + cap = caps.get(prov, default_cap) + if per_op.get(prov, 0) >= cap: + skipped[f"mx_cap:{prov}"] = skipped.get(f"mx_cap:{prov}", 0) + 1 + continue ok, reason = listmonk_sendable(email) if not ok: skipped[reason] = skipped.get(reason, 0) + 1 continue + if prov: + per_op[prov] = per_op.get(prov, 0) + 1 selected.append(row) if len(selected) >= quota: break diff --git a/scripts/mx_tag_carriers.py b/scripts/mx_tag_carriers.py new file mode 100644 index 0000000..df4950d --- /dev/null +++ b/scripts/mx_tag_carriers.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +"""Tag fmcsa_carriers with their email domain's MX operator (for per-MX throttle). + +Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365, +Google Workspace, Proofpoint, ...), not by recipient domain. The trucking warmup +was hammering Google/Microsoft-hosted business domains (the Jun 13-14 Gmail + +Outlook block storm). To fix it we need each carrier's MX operator so the builder +can (a) exclude Google/MS-Workspace during warmup and (b) cap volume per operator. + +This resolves MX once PER DISTINCT DOMAIN (cached), then writes the operator +label onto every carrier with that domain. Run it before the daily trucking +builder; it processes the highest-priority unsent carriers first and is bounded +per run so it never stalls. + +Usage: + python3 scripts/mx_tag_carriers.py [--limit-domains 4000] [--only-unsent] +""" +from __future__ import annotations +import argparse +import os +import sys + +ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if ROOT not in sys.path: + sys.path.insert(0, ROOT) + +import psycopg2 # noqa: E402 +# Reuse the verifier's MX classifier + cache (one source of truth, no extra DNS). +from scripts.verify_csv_emails import mx_provider, get_mx_hosts # noqa: E402 + +DB_URL = os.getenv("DATABASE_URL", "") + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--limit-domains", type=int, default=5000, + help="max distinct domains to resolve this run (bounds DNS work)") + ap.add_argument("--only-unsent", action="store_true", + help="prioritize carriers not yet emailed") + args = ap.parse_args() + + conn = psycopg2.connect(DB_URL) + cur = conn.cursor() + + # Distinct email domains that still need an mx_provider, prioritizing the + # ones the builder will actually send to (verified/usable, not yet sent). + sent_filter = "AND listmonk_sent_at IS NULL" if args.only_unsent else "" + cur.execute(f""" + SELECT lower(split_part(email_address, '@', 2)) AS domain, count(*) AS n + FROM fmcsa_carriers + WHERE email_address IS NOT NULL AND email_address <> '' + AND mx_provider IS NULL + AND (email_verified IS TRUE OR email_verify_result IN + ('smtp_valid','catch_all_domain','catch_all_detected')) + {sent_filter} + GROUP BY 1 + ORDER BY n DESC + LIMIT %s + """, (args.limit_domains,)) + domains = [r[0] for r in cur.fetchall() if r[0]] + print(f"resolving MX for {len(domains)} distinct domains...", file=sys.stderr) + + tagged_domains = 0 + for i, dom in enumerate(domains, 1): + get_mx_hosts(dom) # populates the cache (DNS) + prov = mx_provider(dom) # classify from cache + cur.execute(""" + UPDATE fmcsa_carriers SET mx_provider = %s + WHERE lower(split_part(email_address, '@', 2)) = %s + AND mx_provider IS NULL + """, (prov, dom)) + tagged_domains += 1 + if i % 200 == 0: + conn.commit() + print(f" {i}/{len(domains)} domains", file=sys.stderr) + conn.commit() + + # Report the operator distribution of what we just tagged. + cur.execute(""" + SELECT mx_provider, count(*) FROM fmcsa_carriers + WHERE mx_provider IS NOT NULL GROUP BY 1 ORDER BY 2 DESC LIMIT 12 + """) + print("MX operator distribution (tagged so far):", file=sys.stderr) + for prov, n in cur.fetchall(): + print(f" {prov}: {n:,}", file=sys.stderr) + conn.close() + print(f"done: tagged {tagged_domains} domains", file=sys.stderr) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())