trucking: per-MX-operator throttling + Google/MS-Workspace warmup exclusion

The Jun 13-14 Gmail+Outlook block storm came from the main/trucking pool having
NO per-MX throttling (only HC had it) -- it concentrated warmup volume on
Google/Microsoft-Workspace-hosted business domains. Port the HC fix:

- migration 097: fmcsa_carriers.mx_provider column.
- mx_tag_carriers.py: resolve MX once per distinct domain (reuses the verifier's
  classifier+cache), tag every carrier with that domain's operator. Bounded per
  run, prioritizes unsent verified carriers.
- build_trucking_campaigns: during warmup (day<=6) EXCLUDE tagged Google/MS/
  Proofpoint/etc. carriers in fetch_carriers; per-MX cap in select_sendable_
  carriers so known operators never dominate the quota. Untagged carriers pass
  (not collapsed onto one bucket) until tagging fills in. mx_daily_caps ramps
  with the main warmup day; MAIN_SKIP_BIG_MX=0 disables once warmed.
This commit is contained in:
justin 2026-06-14 21:11:23 -05:00
parent 2caab6aa69
commit 9e40965092
3 changed files with 191 additions and 1 deletions

View file

@ -0,0 +1,13 @@
-- Per-MX-operator throttling for the trucking/main warmup pool.
-- Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365,
-- Google Workspace, Proofpoint, ...), not by recipient domain. The Jun 13-14
-- Gmail + Outlook block storm came from hammering Google/MS-Workspace-hosted
-- business domains. mx_provider lets the builder exclude those during warmup and
-- cap volume per operator (mirrors the HC pool). Populated by mx_tag_carriers.py.
ALTER TABLE fmcsa_carriers
ADD COLUMN IF NOT EXISTS mx_provider TEXT;
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_fmcsa_carriers_mx_provider
ON fmcsa_carriers (mx_provider)
WHERE mx_provider IS NOT NULL;

View file

@ -168,6 +168,60 @@ COUPON_SLUGS = (
_ET = timezone(timedelta(hours=-5)) # EST anchor; close enough for an end-of-day cutoff
_COUPON_ALPHABET = "ABCDEFGHJKLMNPQRSTUVWXYZ" # no I/O to avoid confusion
# ── Per-MX-operator throttling (main pool) ──────────────────────────────────
# Sender reputation is tracked by the RECEIVING operator, not the recipient
# domain. The Jun 13-14 Gmail + Outlook block storm came from concentrating the
# warmup on Google/Microsoft-Workspace-hosted business domains. During warmup we
# EXCLUDE those big operators entirely (send to the long tail of small/self-hosted
# mail systems that don't bot-throttle), then cap per-operator once reputation is
# established. mx_provider is populated by mx_tag_carriers.py.
# Set MAIN_SKIP_BIG_MX=0 to stop excluding once warmed up.
MAIN_SKIP_BIG_MX = os.getenv("MAIN_SKIP_BIG_MX", "1") not in ("0", "false", "")
# Operators to hold out during warmup (they aggressively throttle/blocklist).
BIG_MX_OPERATORS = ("google", "microsoft", "proofpoint", "mimecast",
"barracuda", "cisco", "broadcom")
MAIN_WARMUP_START_FILE = os.getenv("MTA_WARMUP_START_FILE", "/etc/postfix/pw-warmup-start")
def main_warmup_day() -> int:
try:
start = int(open(MAIN_WARMUP_START_FILE).read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return 99 # no stamp = treat as warmed up
def mx_daily_caps(day: int) -> dict:
"""Per-operator daily NEW-recipient caps, ramping with the warmup day. Big
operators are EXCLUDED during early warmup (see MAIN_SKIP_BIG_MX); these caps
apply once they're re-enabled."""
if day <= 6: big, default = 0, 40 # big operators OFF, long tail only
elif day <= 13: big, default = 60, 80
elif day <= 20: big, default = 150, 150
else: big, default = 300, 250
caps = {op: big for op in BIG_MX_OPERATORS}
caps["__default__"] = default
return caps
def mx_throttled(rows: list[tuple], total_n: int, caps: dict, mx_idx: int) -> list[tuple]:
"""Pick up to total_n rows, capping per mx_provider (rows[mx_idx]) so no
single receiving operator exceeds its daily share. Preserves order."""
per_op: dict = {}
chosen: list[tuple] = []
default_cap = caps.get("__default__", 50)
for r in rows:
if len(chosen) >= total_n:
break
prov = (r[mx_idx] or "").strip().lower() if mx_idx < len(r) else ""
cap = caps.get(prov, default_cap)
used = per_op.get(prov, 0)
if used >= cap:
continue
per_op[prov] = used + 1
chosen.append(r)
return chosen
def _random_coupon_code() -> str:
import secrets
@ -668,15 +722,26 @@ def fetch_carriers(
target_state_sql = "phy_state"
target_state_params = []
# During warmup, exclude carriers on the big operators that throttle/blocklist
# (Google, Microsoft, etc.) -- mx_provider is set by mx_tag_carriers.py.
# Untagged carriers (mx_provider IS NULL) are kept: the per-MX throttle in the
# selector still bounds them, and excluding NULLs would starve the pool until
# tagging completes.
big_mx_exclude = ""
if MAIN_SKIP_BIG_MX and main_warmup_day() <= 6:
ops = ",".join("'%s'" % o for o in BIG_MX_OPERATORS)
big_mx_exclude = f"AND (mx_provider IS NULL OR mx_provider NOT IN ({ops}))"
cur.execute(f"""
SELECT dot_number, email_address, legal_name, phy_state,
{target_state_sql} AS target_state
{target_state_sql} AS target_state, mx_provider
FROM fmcsa_carriers
WHERE {type_filter}
AND {USABLE_FILTER}
AND listmonk_sent_at IS NULL
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND phy_state IN ({states_placeholder})
{big_mx_exclude}
ORDER BY mcs150_parsed ASC NULLS LAST
LIMIT %s OFFSET %s
""", target_state_params + [list(BLOCKED_EMAIL_DOMAINS)] + list(tz_states) + [limit, offset])
@ -699,6 +764,12 @@ def select_sendable_carriers(
skipped: dict[str, int] = {}
seen_emails: set[str] = set()
offset = 0
# Per-MX throttle: cap how many of THIS run's quota go to each receiving
# operator so we never concentrate on one (the Jun 13-14 Gmail/Outlook storm).
caps = mx_daily_caps(main_warmup_day())
per_op: dict = {}
default_cap = caps.get("__default__", 50)
MX_IDX = 5 # mx_provider is the 6th column from fetch_carriers
# Warmup caps are small, but old audiences can contain many prior bounces or
# unsubscribes. Scan beyond the quota while still bounding worst-case API calls.
scan_limit = max_scan or max(quota * 8, quota + 500, 1000)
@ -715,10 +786,24 @@ def select_sendable_carriers(
skipped["duplicate_email"] = skipped.get("duplicate_email", 0) + 1
continue
seen_emails.add(email)
# Per-MX-operator cap (reputation is per receiving operator).
# Untagged carriers (no mx_provider yet) are NOT capped here -- they
# would otherwise all collapse onto one __default__ bucket and starve
# the pool before tagging completes. The big-operator EXCLUSION in
# fetch_carriers already keeps Google/MS out during warmup; this cap
# bounds the KNOWN operators once tagging fills in.
prov = (row[MX_IDX] or "").strip().lower() if len(row) > MX_IDX else ""
if prov:
cap = caps.get(prov, default_cap)
if per_op.get(prov, 0) >= cap:
skipped[f"mx_cap:{prov}"] = skipped.get(f"mx_cap:{prov}", 0) + 1
continue
ok, reason = listmonk_sendable(email)
if not ok:
skipped[reason] = skipped.get(reason, 0) + 1
continue
if prov:
per_op[prov] = per_op.get(prov, 0) + 1
selected.append(row)
if len(selected) >= quota:
break

View file

@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""Tag fmcsa_carriers with their email domain's MX operator (for per-MX throttle).
Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365,
Google Workspace, Proofpoint, ...), not by recipient domain. The trucking warmup
was hammering Google/Microsoft-hosted business domains (the Jun 13-14 Gmail +
Outlook block storm). To fix it we need each carrier's MX operator so the builder
can (a) exclude Google/MS-Workspace during warmup and (b) cap volume per operator.
This resolves MX once PER DISTINCT DOMAIN (cached), then writes the operator
label onto every carrier with that domain. Run it before the daily trucking
builder; it processes the highest-priority unsent carriers first and is bounded
per run so it never stalls.
Usage:
python3 scripts/mx_tag_carriers.py [--limit-domains 4000] [--only-unsent]
"""
from __future__ import annotations
import argparse
import os
import sys
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
import psycopg2 # noqa: E402
# Reuse the verifier's MX classifier + cache (one source of truth, no extra DNS).
from scripts.verify_csv_emails import mx_provider, get_mx_hosts # noqa: E402
DB_URL = os.getenv("DATABASE_URL", "")
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--limit-domains", type=int, default=5000,
help="max distinct domains to resolve this run (bounds DNS work)")
ap.add_argument("--only-unsent", action="store_true",
help="prioritize carriers not yet emailed")
args = ap.parse_args()
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Distinct email domains that still need an mx_provider, prioritizing the
# ones the builder will actually send to (verified/usable, not yet sent).
sent_filter = "AND listmonk_sent_at IS NULL" if args.only_unsent else ""
cur.execute(f"""
SELECT lower(split_part(email_address, '@', 2)) AS domain, count(*) AS n
FROM fmcsa_carriers
WHERE email_address IS NOT NULL AND email_address <> ''
AND mx_provider IS NULL
AND (email_verified IS TRUE OR email_verify_result IN
('smtp_valid','catch_all_domain','catch_all_detected'))
{sent_filter}
GROUP BY 1
ORDER BY n DESC
LIMIT %s
""", (args.limit_domains,))
domains = [r[0] for r in cur.fetchall() if r[0]]
print(f"resolving MX for {len(domains)} distinct domains...", file=sys.stderr)
tagged_domains = 0
for i, dom in enumerate(domains, 1):
get_mx_hosts(dom) # populates the cache (DNS)
prov = mx_provider(dom) # classify from cache
cur.execute("""
UPDATE fmcsa_carriers SET mx_provider = %s
WHERE lower(split_part(email_address, '@', 2)) = %s
AND mx_provider IS NULL
""", (prov, dom))
tagged_domains += 1
if i % 200 == 0:
conn.commit()
print(f" {i}/{len(domains)} domains", file=sys.stderr)
conn.commit()
# Report the operator distribution of what we just tagged.
cur.execute("""
SELECT mx_provider, count(*) FROM fmcsa_carriers
WHERE mx_provider IS NOT NULL GROUP BY 1 ORDER BY 2 DESC LIMIT 12
""")
print("MX operator distribution (tagged so far):", file=sys.stderr)
for prov, n in cur.fetchall():
print(f" {prov}: {n:,}", file=sys.stderr)
conn.close()
print(f"done: tagged {tagged_domains} domains", file=sys.stderr)
return 0
if __name__ == "__main__":
raise SystemExit(main())