new-site/scripts/build_healthcare_campaigns_cron.py
justin 0320dc17ba healthcare: one-email-per-provider by urgency priority + free check as default
Make the free NPI compliance check the catch-all for ALL verified institutional
providers, but route anyone with a more important/time-sensitive issue to THAT
email instead -- each provider gets exactly one email, their most urgent.

- SEGMENTS gain a 'priority' (lower=more urgent): reactivation 10, revalidation
  overdue 20, due-soon 30, bundle 45, free-NPI-check 100 (catch-all).
- assign_segment()/assign_all(): route each provider to the single
  highest-priority active segment whose selector matches; warm_segment() takes
  the assignment map and only claims its assigned providers (disjoint pools, no
  double-mailing). main() now splits the daily slice by priority order, serving
  urgent segments fully before the broad free-check consumes the remainder.
- nppes_outdated selector -> 'institutional_default' (every verified, non-
  deactivated row), since the free check's value no longer depends on staleness;
  list/campaign renamed 'HC Warmup - Free NPI Check'.
- FIX latent bug: reactivation selector treated 'not on CMS reval list' as
  deactivated -- false for org NPIs (would mis-tell active practices they're
  deactivated). Now uses the REAL nppes_deactivated flag (or OIG/SAM exclusion).
- Drop blanket oig_screening from the default rotation: it matched every row and
  would starve the catch-all, and the free check already screens OIG/SAM and
  routes to the paid fix on a hit. Still runnable via --segments.
- Add scripts/test_segment_assignment.py (10 cases incl. 'overdue AND stale ->
  overdue wins'); all pass.
2026-06-20 16:01:23 -05:00

652 lines
30 KiB
Python

#!/usr/bin/env python3
"""Healthcare warmup campaign builder for listmonk-hc.
Runs daily (Mon-Fri, 7 AM Central via cron). Each run:
1. Imports the next slice of the VERIFIED, overdue-first warmup list into a
listmonk-hc list (deduped; already-imported rows are skipped).
2. Ensures the teal "Medicare revalidation overdue" campaign exists and is
running, pointed at that list.
3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the
actual per-hour throttling, so this builder just feeds the queue.
WARMUP COHORT: point HC_VERIFIED_CSV at hc_warmup_nongoogle.csv for weeks 1-2.
Google/Workspace-hosted practice domains (~31%) reject hard from cold IPs
(550-5.7.1 "unsolicited"), which damages warmup reputation. Send the 501
non-Google practice domains first; switch to hc_warmup_google.csv (222) once the
IPs are warm (week 2-3).
The daily slice size follows the hc warmup ramp so we never queue more than the
IPs can safely send while warming. Sends ONLY happen via the hc HOT stream
(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool.
Idempotent: safe to run every weekday. Tracks imported emails in a state file.
Usage:
python3 scripts/build_healthcare_campaigns_cron.py # daily slice
python3 scripts/build_healthcare_campaigns_cron.py --dry-run
python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default
"""
from __future__ import annotations
import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error
from datetime import datetime, timezone
LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101")
LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api")
# Token read from the host file written when the api user was created.
def _token() -> str:
t = os.getenv("HC_LISTMONK_TOKEN")
if t:
return t
for p in ("/opt/performancewest/.secrets/hc-listmonk-token",
"/etc/postfix/hc-listmonk-token"):
if os.path.exists(p):
try:
return open(p).read().strip()
except PermissionError:
continue
raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found")
VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv")
STATE_DIR = os.getenv("HC_STATE_DIR", "/opt/performancewest/data")
WARMUP_STAMP = "/etc/postfix/hc-warmup-start"
# Permanent do-not-contact list (one email per line). Providers who tell us they
# already revalidated, asked to stop, etc. go here so the warmup NEVER imports or
# re-mails them, regardless of segment or stale CMS data. Append + run --prune.
SUPPRESS_FILE = os.getenv("HC_SUPPRESS_FILE", os.path.join(STATE_DIR, "hc_suppress.txt"))
# Bulk From — sends from the dedicated bulk subdomain so its sending reputation
# is isolated from the root domain (which stays clean for transactional /
# verification mail). Replies still go to the root domain via Reply-To, so the
# customer-facing reply experience is unchanged. See docs/deliverability.md.
FROM_EMAIL = os.getenv(
"HC_CAMPAIGN_FROM",
"Performance West Compliance <compliance@send.performancewest.net>",
)
REPLY_TO = os.getenv("HC_CAMPAIGN_REPLY_TO", "info@performancewest.net")
# Segment registry (subject, template file, list/campaign names, row selector)
# is the single source of truth shared with build_healthcare_campaigns.py.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from build_healthcare_campaigns import SEGMENTS, template_path # noqa: E402
from _email_plaintext import html_to_text # noqa: E402
def load_suppressed() -> set[str]:
if os.path.exists(SUPPRESS_FILE):
return {ln.strip().lower() for ln in open(SUPPRESS_FILE) if ln.strip() and not ln.startswith("#")}
return set()
# Which segments to warm, in priority order. Revalidation stays the lead (it has
# the most verified data + the official-record card). The others warm in
# parallel on smaller slices so we collect engagement data across all programs
# without overwhelming the warming IPs.
# Which segments to warm, in priority order. With one-email-per-provider
# assignment (assign_all), each provider is routed to exactly ONE of these by
# urgency: reactivation (10) > revalidation overdue (20) > due soon (30) > the
# free NPI compliance check (100, catch-all). The free check ITSELF includes OIG/
# SAM exclusion screening and routes to the paid OIG fix on a hit, so the
# standalone blanket `oig_screening` email (which matched every verified row and
# would otherwise starve the catch-all) is intentionally NOT in the default
# rotation -- it can still be run explicitly via --segments for a dedicated push.
ACTIVE_SEGMENTS = os.getenv(
"HC_SEGMENTS",
"npi_reactivation,revalidation_overdue,revalidation_due_soon,nppes_outdated,compliance_bundle",
).split(",")
# Warmup deliverability guard: only mail SLIGHTLY-overdue providers. A practice
# that lapsed recently is almost certainly still operating with a live inbox; one
# that is many months/years overdue has likely closed, merged, or abandoned the
# address, so its mail bounces -- and bounces are the fastest way to wreck a
# warming IP's reputation. Window is inclusive [MIN, MAX] days overdue.
WARMUP_OVERDUE_MIN = int(os.getenv("HC_OVERDUE_MIN", "1"))
WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90"))
# Proactive "revalidation due soon" window (days UNTIL the due date). Mirrors the
# overdue window so we reach providers shortly before AND after their deadline,
# roughly doubling the deliverable warmup pool from the same CMS data source.
WARMUP_DUE_SOON_MIN = int(os.getenv("HC_DUE_SOON_MIN", "1"))
WARMUP_DUE_SOON_MAX = int(os.getenv("HC_DUE_SOON_MAX", "90"))
def _overdue_days(r: dict):
v = (r.get("days_overdue") or "").strip()
try:
return int(v)
except ValueError:
return None
# During warmup, hold out Google-Workspace-hosted domains: Google rejects bulk
# mail from cold/warming IPs hard (550-5.7.1), and those bounces wreck the
# warming reputation. The mx_provider flag is set by the weekly hc_data_refresh
# (an MX lookup, since a custom domain can silently use Google Workspace). Set
# HC_SKIP_GOOGLE=0 to lift this once the IPs are warm.
SKIP_GOOGLE = os.getenv("HC_SKIP_GOOGLE", "1") not in ("0", "false", "no")
def _is_google_hosted(r: dict) -> bool:
if not SKIP_GOOGLE:
return False
return (r.get("mx_provider") or "").strip().lower() == "google"
def warmup_day() -> int:
try:
start = int(open(WARMUP_STAMP).read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return 0
def daily_slice(day: int) -> int:
"""TOTAL new subscribers to import across ALL segments today, aligned with
the hc ramp. The rampcap caps hourly *delivery*; this caps daily *queueing*
so we never flood the warming IPs. Runs DAILY (continuous warmup -- mailbox
providers reward consistent daily volume; weekends are NOT skipped)."""
if day <= 1: return 100
if day <= 4: return 300
if day <= 9: return 600
return 1000
# ── Per-MX-operator throttle ─────────────────────────────────────────────────
# Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365,
# Google Workspace, Proofpoint, ...), not by recipient domain. So we cap how many
# new providers we queue per operator per day, and let volume spread across the
# long tail of operators freely. This lets total daily volume be much higher than
# a flat cap without hammering any single receiving system. Caps ramp with the
# warmup day. "default" applies to any operator not explicitly listed (mostly the
# long tail of small/independent mail hosts -- a generous cap is safe there
# because each sees only a handful).
def mx_daily_caps(day: int) -> dict:
# (microsoft, google, proofpoint, default-per-operator)
if day <= 1: big, default = 25, 15
elif day <= 4: big, default = 60, 40
elif day <= 9: big, default = 120, 80
else: big, default = 250, 150
return {
"microsoft": big,
"google": big,
"proofpoint": big,
"cisco": big,
"mimecast": big,
"barracuda": big,
"__default__": default,
}
def mx_throttled(candidates: list[dict], total_n: int, caps: dict) -> list[dict]:
"""Pick up to total_n candidates, capping per mx_provider so no single
receiving operator gets more than its daily share. Preserves input order
within each operator. Falls back to ungrouped slicing if rows have no
mx_provider."""
if not candidates or "mx_provider" not in candidates[0]:
return candidates[:total_n]
per_op: dict = {}
chosen: list[dict] = []
default_cap = caps.get("__default__", 50)
for r in candidates:
if len(chosen) >= total_n:
break
op = (r.get("mx_provider") or "").strip() or "__default__"
cap = caps.get(op, default_cap)
if per_op.get(op, 0) >= cap:
continue
per_op[op] = per_op.get(op, 0) + 1
chosen.append(r)
return chosen
def lm(path: str, data=None, method=None):
tok = _token()
headers = {"Content-Type": "application/json",
"Authorization": f"token {LISTMONK_USER}:{tok}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if not method:
method = "POST"
if method:
req.get_method = lambda: method
try:
with urllib.request.urlopen(req, timeout=20) as r:
return json.loads(r.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}")
def get_or_create_list(list_name: str, tags: list[str]) -> int:
res = lm("/lists?per_page=100")
for l in res.get("data", {}).get("results", []):
if l["name"] == list_name:
return l["id"]
res = lm("/lists", {"name": list_name, "type": "private", "optin": "single",
"tags": tags})
return res["data"]["id"]
def state_file(seg_key: str) -> str:
return os.path.join(STATE_DIR, f"hc_imported_{seg_key}.txt")
def load_imported(seg_key: str) -> set[str]:
p = state_file(seg_key)
if os.path.exists(p):
return {ln.strip().lower() for ln in open(p) if ln.strip()}
# Back-compat: the original single-segment cron tracked revalidation imports
# in hc_imported_emails.txt. Seed the revalidation state from it once.
legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt")
if seg_key == "revalidation_overdue" and os.path.exists(legacy):
return {ln.strip().lower() for ln in open(legacy) if ln.strip()}
return set()
def save_imported(seg_key: str, emails: set[str]):
os.makedirs(STATE_DIR, exist_ok=True)
with open(state_file(seg_key), "w") as f:
f.write("\n".join(sorted(emails)) + "\n")
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
try:
lm("/subscribers", {
"email": email, "name": name or email.split("@")[0],
"status": "enabled", "lists": [list_id],
"attribs": attribs, "preconfirm_subscriptions": True,
})
return True
except SystemExit as e:
# Already exists -> attach to the list AND refresh attribs so per-segment
# merge fields (e.g. days_until for the due-soon template) are correct
# even when the same provider already exists from another segment import.
if "409" in str(e) or "already exists" in str(e).lower():
try:
q = "subscribers.email = '" + email.replace("'", "''") + "'"
found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1}))
results = found.get("data", {}).get("results", [])
if results:
sid = results[0]["id"]
existing_attribs = results[0].get("attribs") or {}
# Merge: keep prior fields, overwrite with this segment's
# values (npi/practice/due-date/days_until/days_overdue).
merged = {**existing_attribs, **attribs}
lm(f"/subscribers/{sid}", {
"email": email, "name": name or email.split("@")[0],
"attribs": merged,
}, "PUT")
lm("/subscribers/lists", {"ids": [sid], "action": "add",
"target_list_ids": [list_id],
"status": "confirmed"}, "PUT")
return True
except Exception:
return False
return False
def ensure_campaign(seg_key: str, list_id: int) -> int:
# Reuse an existing warmup campaign for this segment only if it's still
# ACTIVE (draft / running / paused / scheduled). A finished/cancelled one
# can't accept new subscribers or restart, so we create a fresh dated one --
# that also picks up the latest email template (the canonical hc_*.html).
from datetime import date
seg = SEGMENTS[seg_key]
ACTIVE = {"draft", "running", "paused", "scheduled"}
res = lm("/campaigns?per_page=100")
for c in res.get("data", {}).get("results", []):
if c["name"].startswith(seg["campaign_name"]) and c.get("status") in ACTIVE:
return c["id"]
body = open(template_path(seg_key)).read()
dated = f"{seg['campaign_name']} - {date.today():%b %d %Y}"
payload = {
"name": dated, "subject": seg["subject"], "lists": [list_id],
"from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext",
"body": body, "altbody": html_to_text(body), "messenger": "email",
"tags": ["healthcare", "warmup", seg_key],
"headers": [{"Reply-To": REPLY_TO},
{"List-Unsubscribe": "<{{ UnsubscribeURL }}>"},
{"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}],
}
res = lm("/campaigns", payload)
return res["data"]["id"]
def row_matches(seg_key: str, r: dict) -> bool:
"""Does this master-CSV row belong to a segment? Driven by the segment's
`selector` so the row->segment mapping lives next to the segment metadata."""
sel = SEGMENTS[seg_key]["selector"]
status = (r.get("reval_status") or "").strip().lower()
excluded = (r.get("leie_excluded") or "").strip() not in ("", "0", "false")
optout = (r.get("optout_ending") or "").strip() != ""
if sel == "reval_overdue":
# Only SLIGHTLY-overdue: recently-lapsed practices are still active with
# deliverable inboxes. Heavily-overdue ones likely bounce and burn the
# warming IP's reputation, so we hold them out of the warmup window.
if status != "overdue":
return False
od = _overdue_days(r)
return od is not None and WARMUP_OVERDUE_MIN <= od <= WARMUP_OVERDUE_MAX
if sel == "reval_due_soon":
# Proactive: revalidation is UPCOMING within the lookahead window. Pitch
# is "handle it before your deadline" -- taps the same CMS Revalidation
# Due Date List as reval_overdue but the (much larger) not-yet-due slice,
# so it grows warmup supply without touching a new data source.
# days_overdue is negative for upcoming (days until due), so a provider
# due in N days has days_overdue == -N.
if status != "upcoming":
return False
od = _overdue_days(r)
if od is None:
return False
days_until = -od
return WARMUP_DUE_SOON_MIN <= days_until <= WARMUP_DUE_SOON_MAX
if sel == "reval_upcoming": return status == "upcoming"
if sel == "leie_or_deactivated":
# Reactivation targets: flagged OIG/SAM excluded, OR the NPI is genuinely
# DEACTIVATED in NPPES (real signal from enrich_nppes_last_updated.py).
# We deliberately do NOT treat "not on the CMS revalidation list" as
# deactivated here: for the institutional org-NPI pool that's false (an
# org NPI simply may not be an individual Medicare enrollee), which would
# mis-tell a fully-active practice they're deactivated. Only real
# exclusion or a real NPPES deactivation qualifies.
deactivated = (r.get("nppes_deactivated") or "").strip().upper() == "Y"
return excluded or deactivated
if sel == "optout_ending": return optout
if sel == "institutional_default":
# Catch-all for the FREE NPI compliance-check email: any SMTP-verified
# institutional row that isn't a more-urgent case. With one-email-per-
# provider assignment (assign_segment), higher-priority segments
# (reactivation, revalidation) claim their providers first, so this only
# receives the remainder. Deactivated NPIs are excluded (they belong to
# reactivation and likely bounce). We keep the SMTP-verification gate so
# we only mail inboxes we already proved are live.
if (r.get("nppes_deactivated") or "").strip().upper() == "Y":
return False
return (str(r.get("verify_ok", "")).strip().upper()
in ("Y", "YES", "TRUE", "1", ""))
if sel == "any":
# OIG screening applies to any billing practice, but for warmup we still
# exclude the likely-undeliverable: providers heavily overdue (stale) or
# already dropped off the reval list. Recently-lapsed and upcoming stay.
if status in ("not_on_list", "no_reval_flag"):
return False
od = _overdue_days(r)
if status == "overdue" and (od is None or od > WARMUP_OVERDUE_MAX):
return False
return True
if sel == "institutional_verified":
# For the freshly SMTP-VERIFIED institutional list, "not on the CMS
# revalidation list" does NOT mean undeliverable -- it just means the org
# NPI is not an individual Medicare enrollee. We already proved the inbox
# is live (verify_ok), so trust that instead of using reval-list presence
# as a deliverability proxy. Still hold out the heavily-overdue (stale)
# individual enrollees that DO appear, to protect the warming IP.
if (str(r.get("verify_ok", "")).strip().upper() not in ("Y", "YES", "TRUE", "1", "")):
return False
od = _overdue_days(r)
if status == "overdue" and od is not None and od > WARMUP_OVERDUE_MAX:
return False
return True
return False
def _seg_priority(seg_key: str) -> int:
"""Urgency rank for a segment (lower = more urgent). Defaults high so a
segment without an explicit priority never out-ranks a real one."""
return int(SEGMENTS[seg_key].get("priority", 1000))
def assign_segment(r: dict, active_segments: list[str]) -> str | None:
"""One-email-per-provider: return the SINGLE segment a provider belongs to --
the most-urgent (lowest priority number) active segment whose selector matches
their row -- or None if no active segment matches. This is what guarantees a
provider who is e.g. both revalidation-overdue AND eligible for the free check
receives only the overdue email (priority 20), never both. Ties broken by
priority then segment key for determinism."""
matches = [s for s in active_segments if row_matches(s, r)]
if not matches:
return None
return min(matches, key=lambda s: (_seg_priority(s), s))
def assign_all(rows: list[dict], active_segments: list[str]) -> dict[str, str]:
"""Map email -> assigned segment across the whole list, so each segment's
importer can claim only its assigned providers. Computed once per run."""
out: dict[str, str] = {}
for r in rows:
email = (r.get("email") or "").strip().lower()
if not email:
continue
seg = assign_segment(r, active_segments)
if seg is not None:
out[email] = seg
return out
def attribs_for(r: dict) -> dict:
# days_overdue is positive when past due and negative when upcoming (days
# until the due date). Expose a clean positive "days_until" for the
# due-soon segment's template.
od_raw = (str(r.get("days_overdue", "")) or "").strip()
days_until = ""
try:
od = int(od_raw)
if od < 0:
days_until = str(-od)
except ValueError:
pass
return {
"npi": r.get("npi", ""),
"practice": r.get("name", ""),
"specialty": r.get("specialty", ""),
"state": r.get("state", ""),
# Separate fields so the email's "official CMS record" card can render
# the due date + overdue/until count cleanly (mirrors the CMS
# Revalidation Due Date List, verified by NPI via the weekly refresh).
"reval_due_date": r.get("reval_due_date", ""),
"days_overdue": str(r.get("days_overdue", "")),
"days_until": days_until,
# MX operator (for per-operator analysis + throttling audit).
"mx_provider": r.get("mx_provider", ""),
# Real NPPES freshness (from enrich_nppes_last_updated.py). Lets the
# "NPPES may be out of date" email cite the actual government date the
# provider can verify on the public registry, instead of an unbacked
# "FLAGGED OUT OF DATE" claim.
"nppes_last_updated": r.get("nppes_last_updated", ""),
"nppes_years_stale": r.get("nppes_years_stale", ""),
"nppes_enumeration": r.get("nppes_enumeration", ""),
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
}
def warm_segment(seg_key: str, rows: list[dict], slice_n: int,
dry_run: bool, start_campaign: bool,
assignment: dict[str, str] | None = None) -> int:
"""Import up to slice_n new subscribers for one segment and keep its
campaign active. Returns how many NEW subscribers were imported.
If `assignment` (email -> single assigned segment) is given, a provider is a
candidate only when THIS segment is their assigned one -- enforcing
one-email-per-provider by urgency priority. When it's None, fall back to the
legacy behavior (row_matches), so the warmup cron that runs a single segment
keeps working unchanged."""
seg = SEGMENTS[seg_key]
imported = load_imported(seg_key)
suppressed = load_suppressed()
def _is_candidate(r: dict) -> bool:
email = r.get("email", "").strip().lower()
if not email or email in imported or email in suppressed:
return False
if _is_google_hosted(r):
return False
if assignment is not None:
return assignment.get(email) == seg_key
return row_matches(seg_key, r)
candidates = [r for r in rows if _is_candidate(r)]
# Spread the slice across MX operators so no single receiving system (e.g.
# Microsoft 365) gets the whole batch. Caps ramp with the warmup day.
todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day()))
print(f"[hc-cron] {seg_key}: candidates={len(candidates)} "
f"already={len(imported)} to_import={len(todo)}")
if dry_run:
for r in todo[:3]:
print(f" would import: {r['email']} {r.get('name','')[:28]} "
f"status={r.get('reval_status','')}")
return 0
if not todo:
return 0
list_id = get_or_create_list(seg["list_name"], ["healthcare", "warmup", seg_key])
n_ok = 0
for r in todo:
email = r["email"].strip().lower()
if add_subscriber(list_id, email, r.get("name") or "", attribs_for(r)):
imported.add(email); n_ok += 1
save_imported(seg_key, imported)
cid = ensure_campaign(seg_key, list_id)
if start_campaign:
try:
lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT")
except SystemExit as e:
# Already running raises; that's fine.
if "already" not in str(e).lower():
print(f"[hc-cron] {seg_key}: start warning: {e}")
print(f"[hc-cron] {seg_key}: imported {n_ok} into list {list_id}; campaign={cid}")
return n_ok
def _all_list_subscribers(list_id: int):
"""Yield (id, email) for every subscriber on a list, paging the API."""
page, per = 1, 1000
while True:
q = urllib.parse.urlencode({"list_id": list_id, "page": page, "per_page": per})
res = lm("/subscribers?" + q)
results = res.get("data", {}).get("results", []) or []
for s in results:
yield s["id"], (s.get("email") or "").strip().lower()
if len(results) < per:
break
page += 1
def prune_holdouts(dry_run: bool) -> int:
"""Belt-and-suspenders: remove subscribers who should NOT be in the warmup
from the active warmup lists, even if they were imported before a guard
existed or their domain's MX has since flipped to Google. We match against
the FRESH MASTER CSV (re-classified weekly by hc_data_refresh), not the
listmonk attribs snapshot, so a domain that newly became Google-hosted is
caught here. Returns the number of (subscriber, list) removals."""
master_path = os.getenv("HC_MASTER_CSV", os.path.join(STATE_DIR, "hc_warmup_week1.csv"))
if not os.path.exists(master_path):
print(f"[hc-cron] prune: master {master_path} not found, skipping")
return 0
rows = list(csv.DictReader(open(master_path)))
by_email = {r.get("email", "").strip().lower(): r for r in rows if r.get("email")}
suppressed = load_suppressed()
removed = 0
for seg_key, seg in SEGMENTS.items():
try:
res = lm("/lists?per_page=100")
list_id = next((l["id"] for l in res.get("data", {}).get("results", [])
if l["name"] == seg["list_name"]), None)
except SystemExit:
list_id = None
if not list_id:
continue
drop_ids = []
for sid, email in _all_list_subscribers(list_id):
# Always remove anyone on the permanent do-not-contact list (they
# told us to stop / already revalidated), regardless of source data.
if email in suppressed:
drop_ids.append(sid)
continue
r = by_email.get(email)
if r is None:
continue # not in our source data; leave it alone
# DELIVERABILITY-only prune: remove subscribers whose domain is now
# Google-hosted (would hard-bounce from the cold IP). We deliberately
# do NOT evict for audience reasons (e.g. an overdue provider drifting
# out of the 1-90 day window) -- they were a valid target when warmed
# and re-evaluating audience on already-engaged people just wastes
# warmup progress. The import-time guard handles audience for NEW adds.
if _is_google_hosted(r):
drop_ids.append(sid)
if drop_ids:
print(f"[hc-cron] prune {seg_key} (list {list_id}): "
f"{len(drop_ids)} holdouts to remove")
if not dry_run:
# Bulk unsubscribe + detach from this list (chunked).
for i in range(0, len(drop_ids), 500):
chunk = drop_ids[i:i + 500]
lm("/subscribers/lists", {"ids": chunk, "action": "remove",
"target_list_ids": [list_id]}, "PUT")
removed += len(drop_ids)
print(f"[hc-cron] prune: removed {removed} subscriber-list holdouts")
return removed
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--slice", type=int, default=0,
help="override TOTAL daily import slice across segments (0=ramp default)")
ap.add_argument("--segments", default=",".join(ACTIVE_SEGMENTS),
help="comma list of segment keys to warm")
ap.add_argument("--start-campaign", action="store_true",
help="flip campaigns to 'running' (otherwise left as draft for approval)")
ap.add_argument("--prune", action="store_true",
help="also remove now-Google-hosted / out-of-audience subscribers "
"from the warmup lists (run after the weekly refresh)")
ap.add_argument("--prune-only", action="store_true",
help="run ONLY the deliverability prune, then exit (no import/warm)")
args = ap.parse_args()
day = warmup_day()
total_slice = args.slice or daily_slice(day)
segments = [s.strip() for s in args.segments.split(",") if s.strip() in SEGMENTS]
print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} "
f"total_slice={total_slice} segments={segments}")
rows = list(csv.DictReader(open(VERIFIED_CSV)))
print(f"[hc-cron] verified_total={len(rows)}")
if args.prune or args.prune_only:
prune_holdouts(args.dry_run)
if args.prune_only:
return
# Split the daily slice across segments by URGENCY PRIORITY. We process
# segments most-urgent first (lowest priority number) and give each the
# remaining budget, so urgent segments (reactivation, revalidation-overdue)
# are fully served before the broad free-NPI-check catch-all consumes the
# rest. Because assign_all already routed each provider to a single segment,
# the segment pools are disjoint -- no provider is double-counted or
# double-mailed. Unused budget from a small urgent pool flows to the next
# segment automatically (we only decrement by what was actually imported).
assignment = assign_all(rows, segments)
if assignment:
from collections import Counter
dist = Counter(assignment.values())
print(f"[hc-cron] assignment (one email/provider by priority): "
+ ", ".join(f"{k}={dist[k]}" for k in
sorted(dist, key=lambda s: (_seg_priority(s), s))))
order = sorted(segments, key=lambda s: (_seg_priority(s), s))
grand = 0
budget = total_slice
for seg_key in order:
cap = budget if not args.dry_run else total_slice
n = warm_segment(seg_key, rows, cap, args.dry_run, args.start_campaign,
assignment)
grand += n
budget -= n
if budget <= 0 and not args.dry_run:
break
print(f"[hc-cron] done: imported {grand} new subscribers across {len(segments)} segment(s)")
if __name__ == "__main__":
main()