new-site/scripts/build_healthcare_campaigns_cron.py
justin 483f185861 feat(healthcare): prove revalidation is real via official CMS data + self-verify
Skepticism ("is this even real?") is the top objection. The data IS accurate
(verified our subscribers' NPIs match the official CMS Revalidation Due Date List
exactly), so this is a credibility-presentation fix:

1. Email: replace the plain detail row with an "Official record - CMS Medicare
   Revalidation Due Date List" card (NPI, legal name, due date, days overdue)
   plus a "Verify on CMS.gov" button. Clearly labeled as our presentation of
   public CMS data, not a CMS screenshot (no impersonation).
2. API: npi/lookup now pulls the revalidation due date LIVE from the public CMS
   dataset (data.cms.gov) instead of the empty local table, and returns a
   revalidation{ due_date, source, cms_legal_name, verify_url } proof object.
3. Tool: /tools/npi-compliance-check shows a live "official record" card with a
   self-verify link when CMS returns a due date.

Builder now stores reval_due_date/days_overdue as separate attribs for the card
(existing 194 subscribers backfilled from their detail string).
2026-06-07 23:54:01 -05:00

227 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""Healthcare warmup campaign builder for listmonk-hc.
Runs daily (Mon-Fri, 7 AM Central via cron). Each run:
1. Imports the next slice of the VERIFIED, overdue-first warmup list into a
listmonk-hc list (deduped; already-imported rows are skipped).
2. Ensures the teal "Medicare revalidation overdue" campaign exists and is
running, pointed at that list.
3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the
actual per-hour throttling, so this builder just feeds the queue.
WARMUP COHORT: point HC_VERIFIED_CSV at hc_warmup_nongoogle.csv for weeks 1-2.
Google/Workspace-hosted practice domains (~31%) reject hard from cold IPs
(550-5.7.1 "unsolicited"), which damages warmup reputation. Send the 501
non-Google practice domains first; switch to hc_warmup_google.csv (222) once the
IPs are warm (week 2-3).
The daily slice size follows the hc warmup ramp so we never queue more than the
IPs can safely send while warming. Sends ONLY happen via the hc HOT stream
(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool.
Idempotent: safe to run every weekday. Tracks imported emails in a state file.
Usage:
python3 scripts/build_healthcare_campaigns_cron.py # daily slice
python3 scripts/build_healthcare_campaigns_cron.py --dry-run
python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default
"""
from __future__ import annotations
import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error
from datetime import datetime, timezone
LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101")
LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api")
# Token read from the host file written when the api user was created.
def _token() -> str:
t = os.getenv("HC_LISTMONK_TOKEN")
if t:
return t
for p in ("/opt/performancewest/.secrets/hc-listmonk-token",
"/etc/postfix/hc-listmonk-token"):
if os.path.exists(p):
try:
return open(p).read().strip()
except PermissionError:
continue
raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found")
VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv")
EMAIL_HTML = os.getenv("HC_EMAIL_HTML", "/opt/performancewest/data/hc_campaigns/hc_revalidation_overdue.html")
STATE_FILE = os.getenv("HC_IMPORT_STATE", "/opt/performancewest/data/hc_imported_emails.txt")
WARMUP_STAMP = "/etc/postfix/hc-warmup-start"
LIST_NAME = "HC Warmup - Revalidation Overdue"
CAMPAIGN_NAME = "HC Warmup - Medicare Revalidation"
FROM_EMAIL = "Performance West Compliance <compliance@performancewest.net>"
SUBJECT = "Action needed: your Medicare revalidation is overdue"
REPLY_TO = "info@performancewest.net"
def warmup_day() -> int:
try:
start = int(open(WARMUP_STAMP).read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return 0
def daily_slice(day: int) -> int:
"""How many NEW subscribers to import today, aligned with the hc ramp.
The rampcap caps hourly *delivery*; this caps daily *queueing* so we never
flood the warming IPs. Mon-Fri only (cron enforces the weekday)."""
if day <= 1: return 100
if day <= 4: return 300
if day <= 9: return 600
return 1000
def lm(path: str, data=None, method=None):
tok = _token()
headers = {"Content-Type": "application/json",
"Authorization": f"token {LISTMONK_USER}:{tok}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if not method:
method = "POST"
if method:
req.get_method = lambda: method
try:
with urllib.request.urlopen(req, timeout=20) as r:
return json.loads(r.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}")
def get_or_create_list() -> int:
res = lm("/lists?per_page=100")
for l in res.get("data", {}).get("results", []):
if l["name"] == LIST_NAME:
return l["id"]
res = lm("/lists", {"name": LIST_NAME, "type": "private", "optin": "single",
"tags": ["healthcare", "warmup", "revalidation"]})
return res["data"]["id"]
def load_imported() -> set[str]:
if os.path.exists(STATE_FILE):
return {ln.strip().lower() for ln in open(STATE_FILE) if ln.strip()}
return set()
def save_imported(emails: set[str]):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
with open(STATE_FILE, "w") as f:
f.write("\n".join(sorted(emails)) + "\n")
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
try:
lm("/subscribers", {
"email": email, "name": name or email.split("@")[0],
"status": "enabled", "lists": [list_id],
"attribs": attribs, "preconfirm_subscriptions": True,
})
return True
except SystemExit as e:
# Already exists -> attach to the list instead.
if "409" in str(e) or "already exists" in str(e).lower():
try:
q = "subscribers.email = '" + email.replace("'", "''") + "'"
found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1}))
results = found.get("data", {}).get("results", [])
if results:
sid = results[0]["id"]
lm("/subscribers/lists", {"ids": [sid], "action": "add",
"target_list_ids": [list_id],
"status": "confirmed"}, "PUT")
return True
except Exception:
return False
return False
def ensure_campaign(list_id: int) -> int:
# Reuse an existing HC warmup campaign only if it's still ACTIVE (draft /
# running / paused / scheduled). A finished/cancelled campaign can't accept
# new subscribers or be restarted, so we create a fresh dated one — that also
# picks up the latest email template (e.g. copy/colour tweaks).
from datetime import date
ACTIVE = {"draft", "running", "paused", "scheduled"}
res = lm("/campaigns?per_page=100")
for c in res.get("data", {}).get("results", []):
if c["name"].startswith(CAMPAIGN_NAME) and c.get("status") in ACTIVE:
return c["id"]
body = open(EMAIL_HTML).read()
dated = f"{CAMPAIGN_NAME} - {date.today():%b %d %Y}"
payload = {
"name": dated, "subject": SUBJECT, "lists": [list_id],
"from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext",
"body": body, "messenger": "email",
"tags": ["healthcare", "warmup"],
"headers": [{"Reply-To": REPLY_TO},
{"List-Unsubscribe": "<{{ UnsubscribeURL }}>"},
{"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}],
}
res = lm("/campaigns", payload)
return res["data"]["id"]
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--slice", type=int, default=0, help="override daily import slice (0=ramp default)")
ap.add_argument("--start-campaign", action="store_true",
help="flip the campaign to 'running' (otherwise left as draft for approval)")
args = ap.parse_args()
day = warmup_day()
slice_n = args.slice or daily_slice(day)
print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} daily_slice={slice_n}")
rows = list(csv.DictReader(open(VERIFIED_CSV)))
imported = load_imported()
todo = [r for r in rows if r.get("email", "").strip().lower() not in imported][:slice_n]
print(f"[hc-cron] verified_total={len(rows)} already_imported={len(imported)} to_import_today={len(todo)}")
if args.dry_run:
for r in todo[:5]:
print(f" would import: {r['email']} {r.get('name','')[:30]} due={r.get('days_overdue','')}")
print("[hc-cron] dry-run, no changes")
return
if not todo:
print("[hc-cron] nothing new to import today")
return
list_id = get_or_create_list()
n_ok = 0
for r in todo:
email = r["email"].strip().lower()
attribs = {
"npi": r.get("npi", ""),
"practice": r.get("name", ""),
"specialty": r.get("specialty", ""),
"state": r.get("state", ""),
# Separate fields so the email's "official CMS record" card can render
# the due date and overdue count cleanly (these mirror the authoritative
# CMS Revalidation Due Date List, verified to match by NPI).
"reval_due_date": r.get("reval_due_date", ""),
"days_overdue": str(r.get("days_overdue", "")),
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
}
if add_subscriber(list_id, email, r.get("name") or "", attribs):
imported.add(email); n_ok += 1
save_imported(imported)
cid = ensure_campaign(list_id)
if args.start_campaign:
lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT")
print(f"[hc-cron] campaign {cid} set to RUNNING")
print(f"[hc-cron] imported {n_ok}/{len(todo)} new subscribers into list {list_id}; campaign={cid}")
if __name__ == "__main__":
main()