hc: per-MX-operator warmup throttle (spread load across receiving systems)

Reputation is tracked per receiving mail operator, not per recipient domain, so
the daily warmup slice is now distributed across MX operators with per-operator
daily caps (ramping with the warmup day): Microsoft/Google/Proofpoint/etc. capped
individually, long-tail operators each get a generous default. This lets total
daily volume be much higher than a flat cap without hammering any single system.
mx_throttled() respects the mx_provider column the verifier now writes; falls back
to flat slicing if absent.
This commit is contained in:
justin 2026-06-12 22:09:29 -05:00
parent 4638fbe3d2
commit 5237c81385

View file

@ -131,6 +131,54 @@ def daily_slice(day: int) -> int:
return 1000
# ── Per-MX-operator throttle ─────────────────────────────────────────────────
# Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365,
# Google Workspace, Proofpoint, ...), not by recipient domain. So we cap how many
# new providers we queue per operator per day, and let volume spread across the
# long tail of operators freely. This lets total daily volume be much higher than
# a flat cap without hammering any single receiving system. Caps ramp with the
# warmup day. "default" applies to any operator not explicitly listed (mostly the
# long tail of small/independent mail hosts -- a generous cap is safe there
# because each sees only a handful).
def mx_daily_caps(day: int) -> dict:
# (microsoft, google, proofpoint, default-per-operator)
if day <= 1: big, default = 25, 15
elif day <= 4: big, default = 60, 40
elif day <= 9: big, default = 120, 80
else: big, default = 250, 150
return {
"microsoft": big,
"google": big,
"proofpoint": big,
"cisco": big,
"mimecast": big,
"barracuda": big,
"__default__": default,
}
def mx_throttled(candidates: list[dict], total_n: int, caps: dict) -> list[dict]:
"""Pick up to total_n candidates, capping per mx_provider so no single
receiving operator gets more than its daily share. Preserves input order
within each operator. Falls back to ungrouped slicing if rows have no
mx_provider."""
if not candidates or "mx_provider" not in candidates[0]:
return candidates[:total_n]
per_op: dict = {}
chosen: list[dict] = []
default_cap = caps.get("__default__", 50)
for r in candidates:
if len(chosen) >= total_n:
break
op = (r.get("mx_provider") or "").strip() or "__default__"
cap = caps.get(op, default_cap)
if per_op.get(op, 0) >= cap:
continue
per_op[op] = per_op.get(op, 0) + 1
chosen.append(r)
return chosen
def lm(path: str, data=None, method=None):
tok = _token()
headers = {"Content-Type": "application/json",
@ -334,7 +382,9 @@ def warm_segment(seg_key: str, rows: list[dict], slice_n: int,
and r["email"].strip().lower() not in suppressed
and not _is_google_hosted(r)
and row_matches(seg_key, r)]
todo = candidates[:slice_n]
# Spread the slice across MX operators so no single receiving system (e.g.
# Microsoft 365) gets the whole batch. Caps ramp with the warmup day.
todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day()))
print(f"[hc-cron] {seg_key}: candidates={len(candidates)} "
f"already={len(imported)} to_import={len(todo)}")