diff --git a/scripts/build_healthcare_campaigns_cron.py b/scripts/build_healthcare_campaigns_cron.py index faf3049..dffb7ac 100644 --- a/scripts/build_healthcare_campaigns_cron.py +++ b/scripts/build_healthcare_campaigns_cron.py @@ -131,6 +131,54 @@ def daily_slice(day: int) -> int: return 1000 +# ── Per-MX-operator throttle ───────────────────────────────────────────────── +# Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365, +# Google Workspace, Proofpoint, ...), not by recipient domain. So we cap how many +# new providers we queue per operator per day, and let volume spread across the +# long tail of operators freely. This lets total daily volume be much higher than +# a flat cap without hammering any single receiving system. Caps ramp with the +# warmup day. "default" applies to any operator not explicitly listed (mostly the +# long tail of small/independent mail hosts -- a generous cap is safe there +# because each sees only a handful). +def mx_daily_caps(day: int) -> dict: + # (microsoft, google, proofpoint, default-per-operator) + if day <= 1: big, default = 25, 15 + elif day <= 4: big, default = 60, 40 + elif day <= 9: big, default = 120, 80 + else: big, default = 250, 150 + return { + "microsoft": big, + "google": big, + "proofpoint": big, + "cisco": big, + "mimecast": big, + "barracuda": big, + "__default__": default, + } + + +def mx_throttled(candidates: list[dict], total_n: int, caps: dict) -> list[dict]: + """Pick up to total_n candidates, capping per mx_provider so no single + receiving operator gets more than its daily share. Preserves input order + within each operator. Falls back to ungrouped slicing if rows have no + mx_provider.""" + if not candidates or "mx_provider" not in candidates[0]: + return candidates[:total_n] + per_op: dict = {} + chosen: list[dict] = [] + default_cap = caps.get("__default__", 50) + for r in candidates: + if len(chosen) >= total_n: + break + op = (r.get("mx_provider") or "").strip() or "__default__" + cap = caps.get(op, default_cap) + if per_op.get(op, 0) >= cap: + continue + per_op[op] = per_op.get(op, 0) + 1 + chosen.append(r) + return chosen + + def lm(path: str, data=None, method=None): tok = _token() headers = {"Content-Type": "application/json", @@ -334,7 +382,9 @@ def warm_segment(seg_key: str, rows: list[dict], slice_n: int, and r["email"].strip().lower() not in suppressed and not _is_google_hosted(r) and row_matches(seg_key, r)] - todo = candidates[:slice_n] + # Spread the slice across MX operators so no single receiving system (e.g. + # Microsoft 365) gets the whole batch. Caps ramp with the warmup day. + todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day())) print(f"[hc-cron] {seg_key}: candidates={len(candidates)} " f"already={len(imported)} to_import={len(todo)}")