new-site/scripts/build_healthcare_campaigns_cron.py
justin 54b92b1f06 fix(hc deliverability): MX-based Google-host exclusion during warmup
Found via live mail.log: Google-Workspace-hosted PRACTICE domains (custom
domains whose MX is aspmx.l.google.com, e.g. moosepharmacy.com, hc2kidney.com)
were getting hard 550-5.7.1 rejects from Google's cold-IP bulk filter -- exactly
the bounces that wreck a warming IP's reputation. The original google/non-google
split classified by the email's domain STRING, which can't see that a custom
domain silently uses Google Workspace; only an MX lookup reveals it (33% of our
domains, 228/689, are Google-hosted this way).

- hc_data_refresh.py: new MX classification (one lookup per unique domain via
  dnspython, cached) writes an mx_provider=google/other flag into the master and
  propagates it into the channel CSVs (auto-adding the column). --skip-mx for a
  fast status-only run.
- build_healthcare_campaigns_cron.py: warm_segment now drops mx_provider=google
  rows during warmup (HC_SKIP_GOOGLE=1 default; set 0 once IPs are warm). This is
  defense-in-depth -- correct regardless of which CSV the cron is pointed at.

Verified: today's sends (nongoogle CSV) had 0 Google bounces; the guard cuts the
Google-containing week1_verified cohort's revalidation candidates 82->8.
2026-06-08 03:32:12 -05:00

366 lines
16 KiB
Python

#!/usr/bin/env python3
"""Healthcare warmup campaign builder for listmonk-hc.
Runs daily (Mon-Fri, 7 AM Central via cron). Each run:
1. Imports the next slice of the VERIFIED, overdue-first warmup list into a
listmonk-hc list (deduped; already-imported rows are skipped).
2. Ensures the teal "Medicare revalidation overdue" campaign exists and is
running, pointed at that list.
3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the
actual per-hour throttling, so this builder just feeds the queue.
WARMUP COHORT: point HC_VERIFIED_CSV at hc_warmup_nongoogle.csv for weeks 1-2.
Google/Workspace-hosted practice domains (~31%) reject hard from cold IPs
(550-5.7.1 "unsolicited"), which damages warmup reputation. Send the 501
non-Google practice domains first; switch to hc_warmup_google.csv (222) once the
IPs are warm (week 2-3).
The daily slice size follows the hc warmup ramp so we never queue more than the
IPs can safely send while warming. Sends ONLY happen via the hc HOT stream
(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool.
Idempotent: safe to run every weekday. Tracks imported emails in a state file.
Usage:
python3 scripts/build_healthcare_campaigns_cron.py # daily slice
python3 scripts/build_healthcare_campaigns_cron.py --dry-run
python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default
"""
from __future__ import annotations
import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error
from datetime import datetime, timezone
LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101")
LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api")
# Token read from the host file written when the api user was created.
def _token() -> str:
t = os.getenv("HC_LISTMONK_TOKEN")
if t:
return t
for p in ("/opt/performancewest/.secrets/hc-listmonk-token",
"/etc/postfix/hc-listmonk-token"):
if os.path.exists(p):
try:
return open(p).read().strip()
except PermissionError:
continue
raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found")
VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv")
STATE_DIR = os.getenv("HC_STATE_DIR", "/opt/performancewest/data")
WARMUP_STAMP = "/etc/postfix/hc-warmup-start"
FROM_EMAIL = "Performance West Compliance <compliance@performancewest.net>"
REPLY_TO = "info@performancewest.net"
# Segment registry (subject, template file, list/campaign names, row selector)
# is the single source of truth shared with build_healthcare_campaigns.py.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from build_healthcare_campaigns import SEGMENTS, template_path # noqa: E402
# Which segments to warm, in priority order. Revalidation stays the lead (it has
# the most verified data + the official-record card). The others warm in
# parallel on smaller slices so we collect engagement data across all programs
# without overwhelming the warming IPs.
ACTIVE_SEGMENTS = os.getenv(
"HC_SEGMENTS",
"revalidation_overdue,oig_screening,nppes_outdated,npi_reactivation,compliance_bundle",
).split(",")
# Warmup deliverability guard: only mail SLIGHTLY-overdue providers. A practice
# that lapsed recently is almost certainly still operating with a live inbox; one
# that is many months/years overdue has likely closed, merged, or abandoned the
# address, so its mail bounces -- and bounces are the fastest way to wreck a
# warming IP's reputation. Window is inclusive [MIN, MAX] days overdue.
WARMUP_OVERDUE_MIN = int(os.getenv("HC_OVERDUE_MIN", "1"))
WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90"))
def _overdue_days(r: dict):
v = (r.get("days_overdue") or "").strip()
try:
return int(v)
except ValueError:
return None
# During warmup, hold out Google-Workspace-hosted domains: Google rejects bulk
# mail from cold/warming IPs hard (550-5.7.1), and those bounces wreck the
# warming reputation. The mx_provider flag is set by the weekly hc_data_refresh
# (an MX lookup, since a custom domain can silently use Google Workspace). Set
# HC_SKIP_GOOGLE=0 to lift this once the IPs are warm.
SKIP_GOOGLE = os.getenv("HC_SKIP_GOOGLE", "1") not in ("0", "false", "no")
def _is_google_hosted(r: dict) -> bool:
if not SKIP_GOOGLE:
return False
return (r.get("mx_provider") or "").strip().lower() == "google"
def warmup_day() -> int:
try:
start = int(open(WARMUP_STAMP).read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return 0
def daily_slice(day: int) -> int:
"""TOTAL new subscribers to import across ALL segments today, aligned with
the hc ramp. The rampcap caps hourly *delivery*; this caps daily *queueing*
so we never flood the warming IPs. Mon-Fri only (cron enforces weekday)."""
if day <= 1: return 100
if day <= 4: return 300
if day <= 9: return 600
return 1000
def lm(path: str, data=None, method=None):
tok = _token()
headers = {"Content-Type": "application/json",
"Authorization": f"token {LISTMONK_USER}:{tok}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if not method:
method = "POST"
if method:
req.get_method = lambda: method
try:
with urllib.request.urlopen(req, timeout=20) as r:
return json.loads(r.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}")
def get_or_create_list(list_name: str, tags: list[str]) -> int:
res = lm("/lists?per_page=100")
for l in res.get("data", {}).get("results", []):
if l["name"] == list_name:
return l["id"]
res = lm("/lists", {"name": list_name, "type": "private", "optin": "single",
"tags": tags})
return res["data"]["id"]
def state_file(seg_key: str) -> str:
return os.path.join(STATE_DIR, f"hc_imported_{seg_key}.txt")
def load_imported(seg_key: str) -> set[str]:
p = state_file(seg_key)
if os.path.exists(p):
return {ln.strip().lower() for ln in open(p) if ln.strip()}
# Back-compat: the original single-segment cron tracked revalidation imports
# in hc_imported_emails.txt. Seed the revalidation state from it once.
legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt")
if seg_key == "revalidation_overdue" and os.path.exists(legacy):
return {ln.strip().lower() for ln in open(legacy) if ln.strip()}
return set()
def save_imported(seg_key: str, emails: set[str]):
os.makedirs(STATE_DIR, exist_ok=True)
with open(state_file(seg_key), "w") as f:
f.write("\n".join(sorted(emails)) + "\n")
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
try:
lm("/subscribers", {
"email": email, "name": name or email.split("@")[0],
"status": "enabled", "lists": [list_id],
"attribs": attribs, "preconfirm_subscriptions": True,
})
return True
except SystemExit as e:
# Already exists -> attach to the list instead.
if "409" in str(e) or "already exists" in str(e).lower():
try:
q = "subscribers.email = '" + email.replace("'", "''") + "'"
found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1}))
results = found.get("data", {}).get("results", [])
if results:
sid = results[0]["id"]
lm("/subscribers/lists", {"ids": [sid], "action": "add",
"target_list_ids": [list_id],
"status": "confirmed"}, "PUT")
return True
except Exception:
return False
return False
def ensure_campaign(seg_key: str, list_id: int) -> int:
# Reuse an existing warmup campaign for this segment only if it's still
# ACTIVE (draft / running / paused / scheduled). A finished/cancelled one
# can't accept new subscribers or restart, so we create a fresh dated one --
# that also picks up the latest email template (the canonical hc_*.html).
from datetime import date
seg = SEGMENTS[seg_key]
ACTIVE = {"draft", "running", "paused", "scheduled"}
res = lm("/campaigns?per_page=100")
for c in res.get("data", {}).get("results", []):
if c["name"].startswith(seg["campaign_name"]) and c.get("status") in ACTIVE:
return c["id"]
body = open(template_path(seg_key)).read()
dated = f"{seg['campaign_name']} - {date.today():%b %d %Y}"
payload = {
"name": dated, "subject": seg["subject"], "lists": [list_id],
"from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext",
"body": body, "messenger": "email",
"tags": ["healthcare", "warmup", seg_key],
"headers": [{"Reply-To": REPLY_TO},
{"List-Unsubscribe": "<{{ UnsubscribeURL }}>"},
{"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}],
}
res = lm("/campaigns", payload)
return res["data"]["id"]
def row_matches(seg_key: str, r: dict) -> bool:
"""Does this master-CSV row belong to a segment? Driven by the segment's
`selector` so the row->segment mapping lives next to the segment metadata."""
sel = SEGMENTS[seg_key]["selector"]
status = (r.get("reval_status") or "").strip().lower()
excluded = (r.get("leie_excluded") or "").strip() not in ("", "0", "false")
optout = (r.get("optout_ending") or "").strip() != ""
if sel == "reval_overdue":
# Only SLIGHTLY-overdue: recently-lapsed practices are still active with
# deliverable inboxes. Heavily-overdue ones likely bounce and burn the
# warming IP's reputation, so we hold them out of the warmup window.
if status != "overdue":
return False
od = _overdue_days(r)
return od is not None and WARMUP_OVERDUE_MIN <= od <= WARMUP_OVERDUE_MAX
if sel == "reval_upcoming": return status == "upcoming"
if sel == "leie_or_deactivated":
# Reactivation targets: flagged excluded, OR no longer on the reval list
# (a strong deactivation proxy once revalidation lapses).
return excluded or status in ("not_on_list", "no_reval_flag")
if sel == "optout_ending": return optout
if sel == "any":
# OIG screening applies to any billing practice, but for warmup we still
# exclude the likely-undeliverable: providers heavily overdue (stale) or
# already dropped off the reval list. Recently-lapsed and upcoming stay.
if status in ("not_on_list", "no_reval_flag"):
return False
od = _overdue_days(r)
if status == "overdue" and (od is None or od > WARMUP_OVERDUE_MAX):
return False
return True
return False
def attribs_for(r: dict) -> dict:
return {
"npi": r.get("npi", ""),
"practice": r.get("name", ""),
"specialty": r.get("specialty", ""),
"state": r.get("state", ""),
# Separate fields so the email's "official CMS record" card can render
# the due date + overdue count cleanly (mirrors the CMS Revalidation Due
# Date List, verified by NPI via the weekly hc_data_refresh).
"reval_due_date": r.get("reval_due_date", ""),
"days_overdue": str(r.get("days_overdue", "")),
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
}
def warm_segment(seg_key: str, rows: list[dict], slice_n: int,
dry_run: bool, start_campaign: bool) -> int:
"""Import up to slice_n new subscribers for one segment and keep its
campaign active. Returns how many NEW subscribers were imported."""
seg = SEGMENTS[seg_key]
imported = load_imported(seg_key)
candidates = [r for r in rows
if r.get("email", "").strip()
and r["email"].strip().lower() not in imported
and not _is_google_hosted(r)
and row_matches(seg_key, r)]
todo = candidates[:slice_n]
print(f"[hc-cron] {seg_key}: candidates={len(candidates)} "
f"already={len(imported)} to_import={len(todo)}")
if dry_run:
for r in todo[:3]:
print(f" would import: {r['email']} {r.get('name','')[:28]} "
f"status={r.get('reval_status','')}")
return 0
if not todo:
return 0
list_id = get_or_create_list(seg["list_name"], ["healthcare", "warmup", seg_key])
n_ok = 0
for r in todo:
email = r["email"].strip().lower()
if add_subscriber(list_id, email, r.get("name") or "", attribs_for(r)):
imported.add(email); n_ok += 1
save_imported(seg_key, imported)
cid = ensure_campaign(seg_key, list_id)
if start_campaign:
try:
lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT")
except SystemExit as e:
# Already running raises; that's fine.
if "already" not in str(e).lower():
print(f"[hc-cron] {seg_key}: start warning: {e}")
print(f"[hc-cron] {seg_key}: imported {n_ok} into list {list_id}; campaign={cid}")
return n_ok
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--slice", type=int, default=0,
help="override TOTAL daily import slice across segments (0=ramp default)")
ap.add_argument("--segments", default=",".join(ACTIVE_SEGMENTS),
help="comma list of segment keys to warm")
ap.add_argument("--start-campaign", action="store_true",
help="flip campaigns to 'running' (otherwise left as draft for approval)")
args = ap.parse_args()
day = warmup_day()
total_slice = args.slice or daily_slice(day)
segments = [s.strip() for s in args.segments.split(",") if s.strip() in SEGMENTS]
print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} "
f"total_slice={total_slice} segments={segments}")
rows = list(csv.DictReader(open(VERIFIED_CSV)))
print(f"[hc-cron] verified_total={len(rows)}")
# Split the daily slice across segments. Revalidation (the lead, richest
# data) gets ~half; the rest share the remainder evenly. The lead reclaims
# any rounding remainder so the total never exceeds the warming-rate budget.
lead = "revalidation_overdue"
others = [s for s in segments if s != lead]
per_seg = {}
if lead in segments:
per_seg[lead] = max(1, int(total_slice * 0.5))
rem = total_slice - per_seg[lead]
else:
rem = total_slice
if others and rem > 0:
base, extra = divmod(rem, len(others))
for i, s in enumerate(others):
per_seg[s] = base + (1 if i < extra else 0)
elif others:
for s in others:
per_seg[s] = 0
# Reclaim any rounding remainder onto the lead so sum(per_seg) == total_slice
# exactly (never overshoot the rate cap, never silently drop budget).
if lead in per_seg:
per_seg[lead] += total_slice - sum(per_seg.values())
grand = 0
for seg_key in segments:
grand += warm_segment(seg_key, rows, per_seg.get(seg_key, 0),
args.dry_run, args.start_campaign)
print(f"[hc-cron] done: imported {grand} new subscribers across {len(segments)} segment(s)")
if __name__ == "__main__":
main()