new-site/scripts/build_healthcare_campaigns_cron.py
justin 744f0a89cf healthcare: bound NPPES-stale window [3,10]yr + restore verify_ok gate
- Add NPPES_STALE_MAX_YEARS (default 10): a record untouched for many years is
  a stronger signal the practice closed/moved, and a bounce burns the warming
  IP. Observed institutional distribution clusters 3-7yrs with ~0 beyond 8, so
  10 is a safe ceiling that mails the whole real pool while excluding any
  outlier ancient record. MIN stays 3 (keeps the 'out of date' claim credible).
- Restore the SMTP-verification gate (verify_ok) that the shared
  institutional_verified selector had -- the swap to nppes_stale dropped it; we
  only mail inboxes we already proved live.
- enrich: process the re-fetch queue STALEST-FIRST so a bounded (--limit) or
  --max-age refresh spends its budget on the most-overdue cache entries (and new
  NPIs) first, never starving them behind merely-aging ones.
- Selector unit-tested (10 cases incl. window edges, verify gate, deactivated).
2026-06-20 15:28:12 -05:00

615 lines
29 KiB
Python

#!/usr/bin/env python3
"""Healthcare warmup campaign builder for listmonk-hc.
Runs daily (Mon-Fri, 7 AM Central via cron). Each run:
1. Imports the next slice of the VERIFIED, overdue-first warmup list into a
listmonk-hc list (deduped; already-imported rows are skipped).
2. Ensures the teal "Medicare revalidation overdue" campaign exists and is
running, pointed at that list.
3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the
actual per-hour throttling, so this builder just feeds the queue.
WARMUP COHORT: point HC_VERIFIED_CSV at hc_warmup_nongoogle.csv for weeks 1-2.
Google/Workspace-hosted practice domains (~31%) reject hard from cold IPs
(550-5.7.1 "unsolicited"), which damages warmup reputation. Send the 501
non-Google practice domains first; switch to hc_warmup_google.csv (222) once the
IPs are warm (week 2-3).
The daily slice size follows the hc warmup ramp so we never queue more than the
IPs can safely send while warming. Sends ONLY happen via the hc HOT stream
(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool.
Idempotent: safe to run every weekday. Tracks imported emails in a state file.
Usage:
python3 scripts/build_healthcare_campaigns_cron.py # daily slice
python3 scripts/build_healthcare_campaigns_cron.py --dry-run
python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default
"""
from __future__ import annotations
import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error
from datetime import datetime, timezone
LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101")
LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api")
# Token read from the host file written when the api user was created.
def _token() -> str:
t = os.getenv("HC_LISTMONK_TOKEN")
if t:
return t
for p in ("/opt/performancewest/.secrets/hc-listmonk-token",
"/etc/postfix/hc-listmonk-token"):
if os.path.exists(p):
try:
return open(p).read().strip()
except PermissionError:
continue
raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found")
VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv")
STATE_DIR = os.getenv("HC_STATE_DIR", "/opt/performancewest/data")
WARMUP_STAMP = "/etc/postfix/hc-warmup-start"
# Permanent do-not-contact list (one email per line). Providers who tell us they
# already revalidated, asked to stop, etc. go here so the warmup NEVER imports or
# re-mails them, regardless of segment or stale CMS data. Append + run --prune.
SUPPRESS_FILE = os.getenv("HC_SUPPRESS_FILE", os.path.join(STATE_DIR, "hc_suppress.txt"))
# Bulk From — sends from the dedicated bulk subdomain so its sending reputation
# is isolated from the root domain (which stays clean for transactional /
# verification mail). Replies still go to the root domain via Reply-To, so the
# customer-facing reply experience is unchanged. See docs/deliverability.md.
FROM_EMAIL = os.getenv(
"HC_CAMPAIGN_FROM",
"Performance West Compliance <compliance@send.performancewest.net>",
)
REPLY_TO = os.getenv("HC_CAMPAIGN_REPLY_TO", "info@performancewest.net")
# Segment registry (subject, template file, list/campaign names, row selector)
# is the single source of truth shared with build_healthcare_campaigns.py.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from build_healthcare_campaigns import SEGMENTS, template_path # noqa: E402
from _email_plaintext import html_to_text # noqa: E402
def load_suppressed() -> set[str]:
if os.path.exists(SUPPRESS_FILE):
return {ln.strip().lower() for ln in open(SUPPRESS_FILE) if ln.strip() and not ln.startswith("#")}
return set()
# Which segments to warm, in priority order. Revalidation stays the lead (it has
# the most verified data + the official-record card). The others warm in
# parallel on smaller slices so we collect engagement data across all programs
# without overwhelming the warming IPs.
ACTIVE_SEGMENTS = os.getenv(
"HC_SEGMENTS",
"revalidation_overdue,revalidation_due_soon,oig_screening,nppes_outdated,npi_reactivation,compliance_bundle",
).split(",")
# Warmup deliverability guard: only mail SLIGHTLY-overdue providers. A practice
# that lapsed recently is almost certainly still operating with a live inbox; one
# that is many months/years overdue has likely closed, merged, or abandoned the
# address, so its mail bounces -- and bounces are the fastest way to wreck a
# warming IP's reputation. Window is inclusive [MIN, MAX] days overdue.
WARMUP_OVERDUE_MIN = int(os.getenv("HC_OVERDUE_MIN", "1"))
WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90"))
# Proactive "revalidation due soon" window (days UNTIL the due date). Mirrors the
# overdue window so we reach providers shortly before AND after their deadline,
# roughly doubling the deliverable warmup pool from the same CMS data source.
WARMUP_DUE_SOON_MIN = int(os.getenv("HC_DUE_SOON_MIN", "1"))
WARMUP_DUE_SOON_MAX = int(os.getenv("HC_DUE_SOON_MAX", "90"))
# NPPES "out of date" segment: only mail records whose REAL NPPES last_updated
# date is within an [MIN, MAX] whole-years-stale window. MIN keeps the "out of
# date" claim credible (a record updated <3yrs ago isn't convincingly stale);
# MAX caps deliverability/defunct risk (a record untouched for many years is a
# stronger signal the practice closed/moved -- and a bounce burns the warming
# IP). This is what makes the claim LITERALLY TRUE and verifiable -- the provider
# can confirm the exact same last_updated date on the public registry. The date
# is joined in by enrich_nppes_last_updated.py (column nppes_years_stale). Until
# that enrichment has run, the field is empty and this segment safely mails
# nobody (we never assert "out of date" without the government date to back it).
# Observed institutional distribution: tightly clustered 3-7yrs, ~0 beyond 8yrs.
NPPES_STALE_MIN_YEARS = int(os.getenv("HC_NPPES_STALE_MIN_YEARS", "3"))
NPPES_STALE_MAX_YEARS = int(os.getenv("HC_NPPES_STALE_MAX_YEARS", "10"))
def _overdue_days(r: dict):
v = (r.get("days_overdue") or "").strip()
try:
return int(v)
except ValueError:
return None
# During warmup, hold out Google-Workspace-hosted domains: Google rejects bulk
# mail from cold/warming IPs hard (550-5.7.1), and those bounces wreck the
# warming reputation. The mx_provider flag is set by the weekly hc_data_refresh
# (an MX lookup, since a custom domain can silently use Google Workspace). Set
# HC_SKIP_GOOGLE=0 to lift this once the IPs are warm.
SKIP_GOOGLE = os.getenv("HC_SKIP_GOOGLE", "1") not in ("0", "false", "no")
def _is_google_hosted(r: dict) -> bool:
if not SKIP_GOOGLE:
return False
return (r.get("mx_provider") or "").strip().lower() == "google"
def warmup_day() -> int:
try:
start = int(open(WARMUP_STAMP).read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return 0
def daily_slice(day: int) -> int:
"""TOTAL new subscribers to import across ALL segments today, aligned with
the hc ramp. The rampcap caps hourly *delivery*; this caps daily *queueing*
so we never flood the warming IPs. Runs DAILY (continuous warmup -- mailbox
providers reward consistent daily volume; weekends are NOT skipped)."""
if day <= 1: return 100
if day <= 4: return 300
if day <= 9: return 600
return 1000
# ── Per-MX-operator throttle ─────────────────────────────────────────────────
# Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365,
# Google Workspace, Proofpoint, ...), not by recipient domain. So we cap how many
# new providers we queue per operator per day, and let volume spread across the
# long tail of operators freely. This lets total daily volume be much higher than
# a flat cap without hammering any single receiving system. Caps ramp with the
# warmup day. "default" applies to any operator not explicitly listed (mostly the
# long tail of small/independent mail hosts -- a generous cap is safe there
# because each sees only a handful).
def mx_daily_caps(day: int) -> dict:
# (microsoft, google, proofpoint, default-per-operator)
if day <= 1: big, default = 25, 15
elif day <= 4: big, default = 60, 40
elif day <= 9: big, default = 120, 80
else: big, default = 250, 150
return {
"microsoft": big,
"google": big,
"proofpoint": big,
"cisco": big,
"mimecast": big,
"barracuda": big,
"__default__": default,
}
def mx_throttled(candidates: list[dict], total_n: int, caps: dict) -> list[dict]:
"""Pick up to total_n candidates, capping per mx_provider so no single
receiving operator gets more than its daily share. Preserves input order
within each operator. Falls back to ungrouped slicing if rows have no
mx_provider."""
if not candidates or "mx_provider" not in candidates[0]:
return candidates[:total_n]
per_op: dict = {}
chosen: list[dict] = []
default_cap = caps.get("__default__", 50)
for r in candidates:
if len(chosen) >= total_n:
break
op = (r.get("mx_provider") or "").strip() or "__default__"
cap = caps.get(op, default_cap)
if per_op.get(op, 0) >= cap:
continue
per_op[op] = per_op.get(op, 0) + 1
chosen.append(r)
return chosen
def lm(path: str, data=None, method=None):
tok = _token()
headers = {"Content-Type": "application/json",
"Authorization": f"token {LISTMONK_USER}:{tok}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if not method:
method = "POST"
if method:
req.get_method = lambda: method
try:
with urllib.request.urlopen(req, timeout=20) as r:
return json.loads(r.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}")
def get_or_create_list(list_name: str, tags: list[str]) -> int:
res = lm("/lists?per_page=100")
for l in res.get("data", {}).get("results", []):
if l["name"] == list_name:
return l["id"]
res = lm("/lists", {"name": list_name, "type": "private", "optin": "single",
"tags": tags})
return res["data"]["id"]
def state_file(seg_key: str) -> str:
return os.path.join(STATE_DIR, f"hc_imported_{seg_key}.txt")
def load_imported(seg_key: str) -> set[str]:
p = state_file(seg_key)
if os.path.exists(p):
return {ln.strip().lower() for ln in open(p) if ln.strip()}
# Back-compat: the original single-segment cron tracked revalidation imports
# in hc_imported_emails.txt. Seed the revalidation state from it once.
legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt")
if seg_key == "revalidation_overdue" and os.path.exists(legacy):
return {ln.strip().lower() for ln in open(legacy) if ln.strip()}
return set()
def save_imported(seg_key: str, emails: set[str]):
os.makedirs(STATE_DIR, exist_ok=True)
with open(state_file(seg_key), "w") as f:
f.write("\n".join(sorted(emails)) + "\n")
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
try:
lm("/subscribers", {
"email": email, "name": name or email.split("@")[0],
"status": "enabled", "lists": [list_id],
"attribs": attribs, "preconfirm_subscriptions": True,
})
return True
except SystemExit as e:
# Already exists -> attach to the list AND refresh attribs so per-segment
# merge fields (e.g. days_until for the due-soon template) are correct
# even when the same provider already exists from another segment import.
if "409" in str(e) or "already exists" in str(e).lower():
try:
q = "subscribers.email = '" + email.replace("'", "''") + "'"
found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1}))
results = found.get("data", {}).get("results", [])
if results:
sid = results[0]["id"]
existing_attribs = results[0].get("attribs") or {}
# Merge: keep prior fields, overwrite with this segment's
# values (npi/practice/due-date/days_until/days_overdue).
merged = {**existing_attribs, **attribs}
lm(f"/subscribers/{sid}", {
"email": email, "name": name or email.split("@")[0],
"attribs": merged,
}, "PUT")
lm("/subscribers/lists", {"ids": [sid], "action": "add",
"target_list_ids": [list_id],
"status": "confirmed"}, "PUT")
return True
except Exception:
return False
return False
def ensure_campaign(seg_key: str, list_id: int) -> int:
# Reuse an existing warmup campaign for this segment only if it's still
# ACTIVE (draft / running / paused / scheduled). A finished/cancelled one
# can't accept new subscribers or restart, so we create a fresh dated one --
# that also picks up the latest email template (the canonical hc_*.html).
from datetime import date
seg = SEGMENTS[seg_key]
ACTIVE = {"draft", "running", "paused", "scheduled"}
res = lm("/campaigns?per_page=100")
for c in res.get("data", {}).get("results", []):
if c["name"].startswith(seg["campaign_name"]) and c.get("status") in ACTIVE:
return c["id"]
body = open(template_path(seg_key)).read()
dated = f"{seg['campaign_name']} - {date.today():%b %d %Y}"
payload = {
"name": dated, "subject": seg["subject"], "lists": [list_id],
"from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext",
"body": body, "altbody": html_to_text(body), "messenger": "email",
"tags": ["healthcare", "warmup", seg_key],
"headers": [{"Reply-To": REPLY_TO},
{"List-Unsubscribe": "<{{ UnsubscribeURL }}>"},
{"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}],
}
res = lm("/campaigns", payload)
return res["data"]["id"]
def row_matches(seg_key: str, r: dict) -> bool:
"""Does this master-CSV row belong to a segment? Driven by the segment's
`selector` so the row->segment mapping lives next to the segment metadata."""
sel = SEGMENTS[seg_key]["selector"]
status = (r.get("reval_status") or "").strip().lower()
excluded = (r.get("leie_excluded") or "").strip() not in ("", "0", "false")
optout = (r.get("optout_ending") or "").strip() != ""
if sel == "reval_overdue":
# Only SLIGHTLY-overdue: recently-lapsed practices are still active with
# deliverable inboxes. Heavily-overdue ones likely bounce and burn the
# warming IP's reputation, so we hold them out of the warmup window.
if status != "overdue":
return False
od = _overdue_days(r)
return od is not None and WARMUP_OVERDUE_MIN <= od <= WARMUP_OVERDUE_MAX
if sel == "reval_due_soon":
# Proactive: revalidation is UPCOMING within the lookahead window. Pitch
# is "handle it before your deadline" -- taps the same CMS Revalidation
# Due Date List as reval_overdue but the (much larger) not-yet-due slice,
# so it grows warmup supply without touching a new data source.
# days_overdue is negative for upcoming (days until due), so a provider
# due in N days has days_overdue == -N.
if status != "upcoming":
return False
od = _overdue_days(r)
if od is None:
return False
days_until = -od
return WARMUP_DUE_SOON_MIN <= days_until <= WARMUP_DUE_SOON_MAX
if sel == "reval_upcoming": return status == "upcoming"
if sel == "leie_or_deactivated":
# Reactivation targets: flagged excluded, OR no longer on the reval list
# (a strong deactivation proxy once revalidation lapses).
return excluded or status in ("not_on_list", "no_reval_flag")
if sel == "optout_ending": return optout
if sel == "nppes_stale":
# NPPES "out of date" segment. Only mail records whose REAL NPPES
# last_updated date (joined by enrich_nppes_last_updated.py) falls in the
# [MIN, MAX] years-stale window, so the "may be out of date" claim is
# literally true AND deliverable (very-stale records likely belong to
# closed/moved practices that bounce). The provider can verify the same
# date on the public registry. Deactivated NPIs belong to
# npi_reactivation, not here, so they're excluded. We also keep the
# institutional list's SMTP-verification gate (verify_ok) so we only mail
# inboxes we already proved are live. Empty stale field (enrichment not
# yet run) -> no match, so we never assert staleness without the date.
if (r.get("nppes_deactivated") or "").strip().upper() == "Y":
return False
if (str(r.get("verify_ok", "")).strip().upper()
not in ("Y", "YES", "TRUE", "1", "")):
return False
ys = (r.get("nppes_years_stale") or "").strip()
try:
return NPPES_STALE_MIN_YEARS <= int(ys) <= NPPES_STALE_MAX_YEARS
except ValueError:
return False
if sel == "any":
# OIG screening applies to any billing practice, but for warmup we still
# exclude the likely-undeliverable: providers heavily overdue (stale) or
# already dropped off the reval list. Recently-lapsed and upcoming stay.
if status in ("not_on_list", "no_reval_flag"):
return False
od = _overdue_days(r)
if status == "overdue" and (od is None or od > WARMUP_OVERDUE_MAX):
return False
return True
if sel == "institutional_verified":
# For the freshly SMTP-VERIFIED institutional list, "not on the CMS
# revalidation list" does NOT mean undeliverable -- it just means the org
# NPI is not an individual Medicare enrollee. We already proved the inbox
# is live (verify_ok), so trust that instead of using reval-list presence
# as a deliverability proxy. Still hold out the heavily-overdue (stale)
# individual enrollees that DO appear, to protect the warming IP.
if (str(r.get("verify_ok", "")).strip().upper() not in ("Y", "YES", "TRUE", "1", "")):
return False
od = _overdue_days(r)
if status == "overdue" and od is not None and od > WARMUP_OVERDUE_MAX:
return False
return True
return False
def attribs_for(r: dict) -> dict:
# days_overdue is positive when past due and negative when upcoming (days
# until the due date). Expose a clean positive "days_until" for the
# due-soon segment's template.
od_raw = (str(r.get("days_overdue", "")) or "").strip()
days_until = ""
try:
od = int(od_raw)
if od < 0:
days_until = str(-od)
except ValueError:
pass
return {
"npi": r.get("npi", ""),
"practice": r.get("name", ""),
"specialty": r.get("specialty", ""),
"state": r.get("state", ""),
# Separate fields so the email's "official CMS record" card can render
# the due date + overdue/until count cleanly (mirrors the CMS
# Revalidation Due Date List, verified by NPI via the weekly refresh).
"reval_due_date": r.get("reval_due_date", ""),
"days_overdue": str(r.get("days_overdue", "")),
"days_until": days_until,
# MX operator (for per-operator analysis + throttling audit).
"mx_provider": r.get("mx_provider", ""),
# Real NPPES freshness (from enrich_nppes_last_updated.py). Lets the
# "NPPES may be out of date" email cite the actual government date the
# provider can verify on the public registry, instead of an unbacked
# "FLAGGED OUT OF DATE" claim.
"nppes_last_updated": r.get("nppes_last_updated", ""),
"nppes_years_stale": r.get("nppes_years_stale", ""),
"nppes_enumeration": r.get("nppes_enumeration", ""),
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
}
def warm_segment(seg_key: str, rows: list[dict], slice_n: int,
dry_run: bool, start_campaign: bool) -> int:
"""Import up to slice_n new subscribers for one segment and keep its
campaign active. Returns how many NEW subscribers were imported."""
seg = SEGMENTS[seg_key]
imported = load_imported(seg_key)
suppressed = load_suppressed()
candidates = [r for r in rows
if r.get("email", "").strip()
and r["email"].strip().lower() not in imported
and r["email"].strip().lower() not in suppressed
and not _is_google_hosted(r)
and row_matches(seg_key, r)]
# Spread the slice across MX operators so no single receiving system (e.g.
# Microsoft 365) gets the whole batch. Caps ramp with the warmup day.
todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day()))
print(f"[hc-cron] {seg_key}: candidates={len(candidates)} "
f"already={len(imported)} to_import={len(todo)}")
if dry_run:
for r in todo[:3]:
print(f" would import: {r['email']} {r.get('name','')[:28]} "
f"status={r.get('reval_status','')}")
return 0
if not todo:
return 0
list_id = get_or_create_list(seg["list_name"], ["healthcare", "warmup", seg_key])
n_ok = 0
for r in todo:
email = r["email"].strip().lower()
if add_subscriber(list_id, email, r.get("name") or "", attribs_for(r)):
imported.add(email); n_ok += 1
save_imported(seg_key, imported)
cid = ensure_campaign(seg_key, list_id)
if start_campaign:
try:
lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT")
except SystemExit as e:
# Already running raises; that's fine.
if "already" not in str(e).lower():
print(f"[hc-cron] {seg_key}: start warning: {e}")
print(f"[hc-cron] {seg_key}: imported {n_ok} into list {list_id}; campaign={cid}")
return n_ok
def _all_list_subscribers(list_id: int):
"""Yield (id, email) for every subscriber on a list, paging the API."""
page, per = 1, 1000
while True:
q = urllib.parse.urlencode({"list_id": list_id, "page": page, "per_page": per})
res = lm("/subscribers?" + q)
results = res.get("data", {}).get("results", []) or []
for s in results:
yield s["id"], (s.get("email") or "").strip().lower()
if len(results) < per:
break
page += 1
def prune_holdouts(dry_run: bool) -> int:
"""Belt-and-suspenders: remove subscribers who should NOT be in the warmup
from the active warmup lists, even if they were imported before a guard
existed or their domain's MX has since flipped to Google. We match against
the FRESH MASTER CSV (re-classified weekly by hc_data_refresh), not the
listmonk attribs snapshot, so a domain that newly became Google-hosted is
caught here. Returns the number of (subscriber, list) removals."""
master_path = os.getenv("HC_MASTER_CSV", os.path.join(STATE_DIR, "hc_warmup_week1.csv"))
if not os.path.exists(master_path):
print(f"[hc-cron] prune: master {master_path} not found, skipping")
return 0
rows = list(csv.DictReader(open(master_path)))
by_email = {r.get("email", "").strip().lower(): r for r in rows if r.get("email")}
suppressed = load_suppressed()
removed = 0
for seg_key, seg in SEGMENTS.items():
try:
res = lm("/lists?per_page=100")
list_id = next((l["id"] for l in res.get("data", {}).get("results", [])
if l["name"] == seg["list_name"]), None)
except SystemExit:
list_id = None
if not list_id:
continue
drop_ids = []
for sid, email in _all_list_subscribers(list_id):
# Always remove anyone on the permanent do-not-contact list (they
# told us to stop / already revalidated), regardless of source data.
if email in suppressed:
drop_ids.append(sid)
continue
r = by_email.get(email)
if r is None:
continue # not in our source data; leave it alone
# DELIVERABILITY-only prune: remove subscribers whose domain is now
# Google-hosted (would hard-bounce from the cold IP). We deliberately
# do NOT evict for audience reasons (e.g. an overdue provider drifting
# out of the 1-90 day window) -- they were a valid target when warmed
# and re-evaluating audience on already-engaged people just wastes
# warmup progress. The import-time guard handles audience for NEW adds.
if _is_google_hosted(r):
drop_ids.append(sid)
if drop_ids:
print(f"[hc-cron] prune {seg_key} (list {list_id}): "
f"{len(drop_ids)} holdouts to remove")
if not dry_run:
# Bulk unsubscribe + detach from this list (chunked).
for i in range(0, len(drop_ids), 500):
chunk = drop_ids[i:i + 500]
lm("/subscribers/lists", {"ids": chunk, "action": "remove",
"target_list_ids": [list_id]}, "PUT")
removed += len(drop_ids)
print(f"[hc-cron] prune: removed {removed} subscriber-list holdouts")
return removed
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--slice", type=int, default=0,
help="override TOTAL daily import slice across segments (0=ramp default)")
ap.add_argument("--segments", default=",".join(ACTIVE_SEGMENTS),
help="comma list of segment keys to warm")
ap.add_argument("--start-campaign", action="store_true",
help="flip campaigns to 'running' (otherwise left as draft for approval)")
ap.add_argument("--prune", action="store_true",
help="also remove now-Google-hosted / out-of-audience subscribers "
"from the warmup lists (run after the weekly refresh)")
ap.add_argument("--prune-only", action="store_true",
help="run ONLY the deliverability prune, then exit (no import/warm)")
args = ap.parse_args()
day = warmup_day()
total_slice = args.slice or daily_slice(day)
segments = [s.strip() for s in args.segments.split(",") if s.strip() in SEGMENTS]
print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} "
f"total_slice={total_slice} segments={segments}")
rows = list(csv.DictReader(open(VERIFIED_CSV)))
print(f"[hc-cron] verified_total={len(rows)}")
if args.prune or args.prune_only:
prune_holdouts(args.dry_run)
if args.prune_only:
return
# Split the daily slice across segments. Revalidation (the lead, richest
# data) gets ~half; the rest share the remainder evenly. The lead reclaims
# any rounding remainder so the total never exceeds the warming-rate budget.
lead = "revalidation_overdue"
others = [s for s in segments if s != lead]
per_seg = {}
if lead in segments:
per_seg[lead] = max(1, int(total_slice * 0.5))
rem = total_slice - per_seg[lead]
else:
rem = total_slice
if others and rem > 0:
base, extra = divmod(rem, len(others))
for i, s in enumerate(others):
per_seg[s] = base + (1 if i < extra else 0)
elif others:
for s in others:
per_seg[s] = 0
# Reclaim any rounding remainder onto the lead so sum(per_seg) == total_slice
# exactly (never overshoot the rate cap, never silently drop budget).
if lead in per_seg:
per_seg[lead] += total_slice - sum(per_seg.values())
grand = 0
for seg_key in segments:
grand += warm_segment(seg_key, rows, per_seg.get(seg_key, 0),
args.dry_run, args.start_campaign)
print(f"[hc-cron] done: imported {grand} new subscribers across {len(segments)} segment(s)")
if __name__ == "__main__":
main()