new-site/scripts/build_healthcare_campaigns_cron.py
justin 9cb10b18e0 feat(hc): deliverability prune -- evict newly-Google-hosted subscribers
Belt-and-suspenders for the edge you flagged: a domain already in a warmup list
could flip its MX to Google Workspace between weekly refreshes, after which it
would hard-bounce from the cold IP. The import-time guard only catches NEW adds.

- prune_holdouts(): enumerates each warmup list's subscribers, matches them
  against the FRESH master CSV (re-classified weekly), and removes any whose
  domain is now Google-hosted. DELIVERABILITY-ONLY -- it never evicts for
  audience reasons (an overdue provider drifting out of the 1-90 day window was
  a valid target when warmed; re-litigating that just wastes warmup progress).
- --prune (run alongside warming) and --prune-only (prune then exit).
- Wired into the weekly refresh cron as a --prune-only chained step, so MX is
  re-checked and holdouts removed every Monday before the weekday sends.

Verified end-to-end: with no Google domains in lists it's a 0-op; injecting a
simulated Google-flipped domain into the master, the prune correctly detects and
(in a real run) would remove it from every list it's on.
2026-06-08 03:39:56 -05:00

440 lines
19 KiB
Python

#!/usr/bin/env python3
"""Healthcare warmup campaign builder for listmonk-hc.
Runs daily (Mon-Fri, 7 AM Central via cron). Each run:
1. Imports the next slice of the VERIFIED, overdue-first warmup list into a
listmonk-hc list (deduped; already-imported rows are skipped).
2. Ensures the teal "Medicare revalidation overdue" campaign exists and is
running, pointed at that list.
3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the
actual per-hour throttling, so this builder just feeds the queue.
WARMUP COHORT: point HC_VERIFIED_CSV at hc_warmup_nongoogle.csv for weeks 1-2.
Google/Workspace-hosted practice domains (~31%) reject hard from cold IPs
(550-5.7.1 "unsolicited"), which damages warmup reputation. Send the 501
non-Google practice domains first; switch to hc_warmup_google.csv (222) once the
IPs are warm (week 2-3).
The daily slice size follows the hc warmup ramp so we never queue more than the
IPs can safely send while warming. Sends ONLY happen via the hc HOT stream
(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool.
Idempotent: safe to run every weekday. Tracks imported emails in a state file.
Usage:
python3 scripts/build_healthcare_campaigns_cron.py # daily slice
python3 scripts/build_healthcare_campaigns_cron.py --dry-run
python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default
"""
from __future__ import annotations
import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error
from datetime import datetime, timezone
LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101")
LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api")
# Token read from the host file written when the api user was created.
def _token() -> str:
t = os.getenv("HC_LISTMONK_TOKEN")
if t:
return t
for p in ("/opt/performancewest/.secrets/hc-listmonk-token",
"/etc/postfix/hc-listmonk-token"):
if os.path.exists(p):
try:
return open(p).read().strip()
except PermissionError:
continue
raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found")
VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv")
STATE_DIR = os.getenv("HC_STATE_DIR", "/opt/performancewest/data")
WARMUP_STAMP = "/etc/postfix/hc-warmup-start"
FROM_EMAIL = "Performance West Compliance <compliance@performancewest.net>"
REPLY_TO = "info@performancewest.net"
# Segment registry (subject, template file, list/campaign names, row selector)
# is the single source of truth shared with build_healthcare_campaigns.py.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from build_healthcare_campaigns import SEGMENTS, template_path # noqa: E402
# Which segments to warm, in priority order. Revalidation stays the lead (it has
# the most verified data + the official-record card). The others warm in
# parallel on smaller slices so we collect engagement data across all programs
# without overwhelming the warming IPs.
ACTIVE_SEGMENTS = os.getenv(
"HC_SEGMENTS",
"revalidation_overdue,oig_screening,nppes_outdated,npi_reactivation,compliance_bundle",
).split(",")
# Warmup deliverability guard: only mail SLIGHTLY-overdue providers. A practice
# that lapsed recently is almost certainly still operating with a live inbox; one
# that is many months/years overdue has likely closed, merged, or abandoned the
# address, so its mail bounces -- and bounces are the fastest way to wreck a
# warming IP's reputation. Window is inclusive [MIN, MAX] days overdue.
WARMUP_OVERDUE_MIN = int(os.getenv("HC_OVERDUE_MIN", "1"))
WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90"))
def _overdue_days(r: dict):
v = (r.get("days_overdue") or "").strip()
try:
return int(v)
except ValueError:
return None
# During warmup, hold out Google-Workspace-hosted domains: Google rejects bulk
# mail from cold/warming IPs hard (550-5.7.1), and those bounces wreck the
# warming reputation. The mx_provider flag is set by the weekly hc_data_refresh
# (an MX lookup, since a custom domain can silently use Google Workspace). Set
# HC_SKIP_GOOGLE=0 to lift this once the IPs are warm.
SKIP_GOOGLE = os.getenv("HC_SKIP_GOOGLE", "1") not in ("0", "false", "no")
def _is_google_hosted(r: dict) -> bool:
if not SKIP_GOOGLE:
return False
return (r.get("mx_provider") or "").strip().lower() == "google"
def warmup_day() -> int:
try:
start = int(open(WARMUP_STAMP).read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return 0
def daily_slice(day: int) -> int:
"""TOTAL new subscribers to import across ALL segments today, aligned with
the hc ramp. The rampcap caps hourly *delivery*; this caps daily *queueing*
so we never flood the warming IPs. Mon-Fri only (cron enforces weekday)."""
if day <= 1: return 100
if day <= 4: return 300
if day <= 9: return 600
return 1000
def lm(path: str, data=None, method=None):
tok = _token()
headers = {"Content-Type": "application/json",
"Authorization": f"token {LISTMONK_USER}:{tok}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if not method:
method = "POST"
if method:
req.get_method = lambda: method
try:
with urllib.request.urlopen(req, timeout=20) as r:
return json.loads(r.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}")
def get_or_create_list(list_name: str, tags: list[str]) -> int:
res = lm("/lists?per_page=100")
for l in res.get("data", {}).get("results", []):
if l["name"] == list_name:
return l["id"]
res = lm("/lists", {"name": list_name, "type": "private", "optin": "single",
"tags": tags})
return res["data"]["id"]
def state_file(seg_key: str) -> str:
return os.path.join(STATE_DIR, f"hc_imported_{seg_key}.txt")
def load_imported(seg_key: str) -> set[str]:
p = state_file(seg_key)
if os.path.exists(p):
return {ln.strip().lower() for ln in open(p) if ln.strip()}
# Back-compat: the original single-segment cron tracked revalidation imports
# in hc_imported_emails.txt. Seed the revalidation state from it once.
legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt")
if seg_key == "revalidation_overdue" and os.path.exists(legacy):
return {ln.strip().lower() for ln in open(legacy) if ln.strip()}
return set()
def save_imported(seg_key: str, emails: set[str]):
os.makedirs(STATE_DIR, exist_ok=True)
with open(state_file(seg_key), "w") as f:
f.write("\n".join(sorted(emails)) + "\n")
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
try:
lm("/subscribers", {
"email": email, "name": name or email.split("@")[0],
"status": "enabled", "lists": [list_id],
"attribs": attribs, "preconfirm_subscriptions": True,
})
return True
except SystemExit as e:
# Already exists -> attach to the list instead.
if "409" in str(e) or "already exists" in str(e).lower():
try:
q = "subscribers.email = '" + email.replace("'", "''") + "'"
found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1}))
results = found.get("data", {}).get("results", [])
if results:
sid = results[0]["id"]
lm("/subscribers/lists", {"ids": [sid], "action": "add",
"target_list_ids": [list_id],
"status": "confirmed"}, "PUT")
return True
except Exception:
return False
return False
def ensure_campaign(seg_key: str, list_id: int) -> int:
# Reuse an existing warmup campaign for this segment only if it's still
# ACTIVE (draft / running / paused / scheduled). A finished/cancelled one
# can't accept new subscribers or restart, so we create a fresh dated one --
# that also picks up the latest email template (the canonical hc_*.html).
from datetime import date
seg = SEGMENTS[seg_key]
ACTIVE = {"draft", "running", "paused", "scheduled"}
res = lm("/campaigns?per_page=100")
for c in res.get("data", {}).get("results", []):
if c["name"].startswith(seg["campaign_name"]) and c.get("status") in ACTIVE:
return c["id"]
body = open(template_path(seg_key)).read()
dated = f"{seg['campaign_name']} - {date.today():%b %d %Y}"
payload = {
"name": dated, "subject": seg["subject"], "lists": [list_id],
"from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext",
"body": body, "messenger": "email",
"tags": ["healthcare", "warmup", seg_key],
"headers": [{"Reply-To": REPLY_TO},
{"List-Unsubscribe": "<{{ UnsubscribeURL }}>"},
{"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}],
}
res = lm("/campaigns", payload)
return res["data"]["id"]
def row_matches(seg_key: str, r: dict) -> bool:
"""Does this master-CSV row belong to a segment? Driven by the segment's
`selector` so the row->segment mapping lives next to the segment metadata."""
sel = SEGMENTS[seg_key]["selector"]
status = (r.get("reval_status") or "").strip().lower()
excluded = (r.get("leie_excluded") or "").strip() not in ("", "0", "false")
optout = (r.get("optout_ending") or "").strip() != ""
if sel == "reval_overdue":
# Only SLIGHTLY-overdue: recently-lapsed practices are still active with
# deliverable inboxes. Heavily-overdue ones likely bounce and burn the
# warming IP's reputation, so we hold them out of the warmup window.
if status != "overdue":
return False
od = _overdue_days(r)
return od is not None and WARMUP_OVERDUE_MIN <= od <= WARMUP_OVERDUE_MAX
if sel == "reval_upcoming": return status == "upcoming"
if sel == "leie_or_deactivated":
# Reactivation targets: flagged excluded, OR no longer on the reval list
# (a strong deactivation proxy once revalidation lapses).
return excluded or status in ("not_on_list", "no_reval_flag")
if sel == "optout_ending": return optout
if sel == "any":
# OIG screening applies to any billing practice, but for warmup we still
# exclude the likely-undeliverable: providers heavily overdue (stale) or
# already dropped off the reval list. Recently-lapsed and upcoming stay.
if status in ("not_on_list", "no_reval_flag"):
return False
od = _overdue_days(r)
if status == "overdue" and (od is None or od > WARMUP_OVERDUE_MAX):
return False
return True
return False
def attribs_for(r: dict) -> dict:
return {
"npi": r.get("npi", ""),
"practice": r.get("name", ""),
"specialty": r.get("specialty", ""),
"state": r.get("state", ""),
# Separate fields so the email's "official CMS record" card can render
# the due date + overdue count cleanly (mirrors the CMS Revalidation Due
# Date List, verified by NPI via the weekly hc_data_refresh).
"reval_due_date": r.get("reval_due_date", ""),
"days_overdue": str(r.get("days_overdue", "")),
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
}
def warm_segment(seg_key: str, rows: list[dict], slice_n: int,
dry_run: bool, start_campaign: bool) -> int:
"""Import up to slice_n new subscribers for one segment and keep its
campaign active. Returns how many NEW subscribers were imported."""
seg = SEGMENTS[seg_key]
imported = load_imported(seg_key)
candidates = [r for r in rows
if r.get("email", "").strip()
and r["email"].strip().lower() not in imported
and not _is_google_hosted(r)
and row_matches(seg_key, r)]
todo = candidates[:slice_n]
print(f"[hc-cron] {seg_key}: candidates={len(candidates)} "
f"already={len(imported)} to_import={len(todo)}")
if dry_run:
for r in todo[:3]:
print(f" would import: {r['email']} {r.get('name','')[:28]} "
f"status={r.get('reval_status','')}")
return 0
if not todo:
return 0
list_id = get_or_create_list(seg["list_name"], ["healthcare", "warmup", seg_key])
n_ok = 0
for r in todo:
email = r["email"].strip().lower()
if add_subscriber(list_id, email, r.get("name") or "", attribs_for(r)):
imported.add(email); n_ok += 1
save_imported(seg_key, imported)
cid = ensure_campaign(seg_key, list_id)
if start_campaign:
try:
lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT")
except SystemExit as e:
# Already running raises; that's fine.
if "already" not in str(e).lower():
print(f"[hc-cron] {seg_key}: start warning: {e}")
print(f"[hc-cron] {seg_key}: imported {n_ok} into list {list_id}; campaign={cid}")
return n_ok
def _all_list_subscribers(list_id: int):
"""Yield (id, email) for every subscriber on a list, paging the API."""
page, per = 1, 1000
while True:
q = urllib.parse.urlencode({"list_id": list_id, "page": page, "per_page": per})
res = lm("/subscribers?" + q)
results = res.get("data", {}).get("results", []) or []
for s in results:
yield s["id"], (s.get("email") or "").strip().lower()
if len(results) < per:
break
page += 1
def prune_holdouts(dry_run: bool) -> int:
"""Belt-and-suspenders: remove subscribers who should NOT be in the warmup
from the active warmup lists, even if they were imported before a guard
existed or their domain's MX has since flipped to Google. We match against
the FRESH MASTER CSV (re-classified weekly by hc_data_refresh), not the
listmonk attribs snapshot, so a domain that newly became Google-hosted is
caught here. Returns the number of (subscriber, list) removals."""
master_path = os.getenv("HC_MASTER_CSV", os.path.join(STATE_DIR, "hc_warmup_week1.csv"))
if not os.path.exists(master_path):
print(f"[hc-cron] prune: master {master_path} not found, skipping")
return 0
rows = list(csv.DictReader(open(master_path)))
by_email = {r.get("email", "").strip().lower(): r for r in rows if r.get("email")}
removed = 0
for seg_key, seg in SEGMENTS.items():
try:
res = lm("/lists?per_page=100")
list_id = next((l["id"] for l in res.get("data", {}).get("results", [])
if l["name"] == seg["list_name"]), None)
except SystemExit:
list_id = None
if not list_id:
continue
drop_ids = []
for sid, email in _all_list_subscribers(list_id):
r = by_email.get(email)
if r is None:
continue # not in our source data; leave it alone
# DELIVERABILITY-only prune: remove subscribers whose domain is now
# Google-hosted (would hard-bounce from the cold IP). We deliberately
# do NOT evict for audience reasons (e.g. an overdue provider drifting
# out of the 1-90 day window) -- they were a valid target when warmed
# and re-evaluating audience on already-engaged people just wastes
# warmup progress. The import-time guard handles audience for NEW adds.
if _is_google_hosted(r):
drop_ids.append(sid)
if drop_ids:
print(f"[hc-cron] prune {seg_key} (list {list_id}): "
f"{len(drop_ids)} holdouts to remove")
if not dry_run:
# Bulk unsubscribe + detach from this list (chunked).
for i in range(0, len(drop_ids), 500):
chunk = drop_ids[i:i + 500]
lm("/subscribers/lists", {"ids": chunk, "action": "remove",
"target_list_ids": [list_id]}, "PUT")
removed += len(drop_ids)
print(f"[hc-cron] prune: removed {removed} subscriber-list holdouts")
return removed
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--slice", type=int, default=0,
help="override TOTAL daily import slice across segments (0=ramp default)")
ap.add_argument("--segments", default=",".join(ACTIVE_SEGMENTS),
help="comma list of segment keys to warm")
ap.add_argument("--start-campaign", action="store_true",
help="flip campaigns to 'running' (otherwise left as draft for approval)")
ap.add_argument("--prune", action="store_true",
help="also remove now-Google-hosted / out-of-audience subscribers "
"from the warmup lists (run after the weekly refresh)")
ap.add_argument("--prune-only", action="store_true",
help="run ONLY the deliverability prune, then exit (no import/warm)")
args = ap.parse_args()
day = warmup_day()
total_slice = args.slice or daily_slice(day)
segments = [s.strip() for s in args.segments.split(",") if s.strip() in SEGMENTS]
print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} "
f"total_slice={total_slice} segments={segments}")
rows = list(csv.DictReader(open(VERIFIED_CSV)))
print(f"[hc-cron] verified_total={len(rows)}")
if args.prune or args.prune_only:
prune_holdouts(args.dry_run)
if args.prune_only:
return
# Split the daily slice across segments. Revalidation (the lead, richest
# data) gets ~half; the rest share the remainder evenly. The lead reclaims
# any rounding remainder so the total never exceeds the warming-rate budget.
lead = "revalidation_overdue"
others = [s for s in segments if s != lead]
per_seg = {}
if lead in segments:
per_seg[lead] = max(1, int(total_slice * 0.5))
rem = total_slice - per_seg[lead]
else:
rem = total_slice
if others and rem > 0:
base, extra = divmod(rem, len(others))
for i, s in enumerate(others):
per_seg[s] = base + (1 if i < extra else 0)
elif others:
for s in others:
per_seg[s] = 0
# Reclaim any rounding remainder onto the lead so sum(per_seg) == total_slice
# exactly (never overshoot the rate cap, never silently drop budget).
if lead in per_seg:
per_seg[lead] += total_slice - sum(per_seg.values())
grand = 0
for seg_key in segments:
grand += warm_segment(seg_key, rows, per_seg.get(seg_key, 0),
args.dry_run, args.start_campaign)
print(f"[hc-cron] done: imported {grand} new subscribers across {len(segments)} segment(s)")
if __name__ == "__main__":
main()