healthcare: HC warmup campaign cron (Mon-Fri 7AM Central) - imports overdue-first verified slice into listmonk-hc + runs Medicare-revalidation campaign via hc HOT stream; rate-throttled by pw-hc-rampcap

This commit is contained in:
justin 2026-06-06 03:57:08 -05:00
parent 2d3bccd31e
commit 2bc86268f7
2 changed files with 214 additions and 0 deletions

View file

@ -0,0 +1,209 @@
#!/usr/bin/env python3
"""Healthcare warmup campaign builder for listmonk-hc.
Runs daily (Mon-Fri, 7 AM Central via cron). Each run:
1. Imports the next slice of the VERIFIED, overdue-first warmup list into a
listmonk-hc list (deduped; already-imported rows are skipped).
2. Ensures the teal "Medicare revalidation overdue" campaign exists and is
running, pointed at that list.
3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the
actual per-hour throttling, so this builder just feeds the queue.
The daily slice size follows the hc warmup ramp so we never queue more than the
IPs can safely send while warming. Sends ONLY happen via the hc HOT stream
(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool.
Idempotent: safe to run every weekday. Tracks imported emails in a state file.
Usage:
python3 scripts/build_healthcare_campaigns_cron.py # daily slice
python3 scripts/build_healthcare_campaigns_cron.py --dry-run
python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default
"""
from __future__ import annotations
import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error
from datetime import datetime, timezone
LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101")
LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api")
# Token read from the host file written when the api user was created.
def _token() -> str:
t = os.getenv("HC_LISTMONK_TOKEN")
if t:
return t
for p in ("/opt/performancewest/.secrets/hc-listmonk-token",
"/etc/postfix/hc-listmonk-token"):
if os.path.exists(p):
try:
return open(p).read().strip()
except PermissionError:
continue
raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found")
VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv")
EMAIL_HTML = os.getenv("HC_EMAIL_HTML", "/opt/performancewest/data/hc_campaigns/hc_revalidation_overdue.html")
STATE_FILE = os.getenv("HC_IMPORT_STATE", "/opt/performancewest/data/hc_imported_emails.txt")
WARMUP_STAMP = "/etc/postfix/hc-warmup-start"
LIST_NAME = "HC Warmup - Revalidation Overdue"
CAMPAIGN_NAME = "HC Warmup - Medicare Revalidation"
FROM_EMAIL = "Performance West Compliance <compliance@performancewest.net>"
SUBJECT = "Action needed: your Medicare revalidation is overdue"
REPLY_TO = "info@performancewest.net"
def warmup_day() -> int:
try:
start = int(open(WARMUP_STAMP).read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return 0
def daily_slice(day: int) -> int:
"""How many NEW subscribers to import today, aligned with the hc ramp.
The rampcap caps hourly *delivery*; this caps daily *queueing* so we never
flood the warming IPs. Mon-Fri only (cron enforces the weekday)."""
if day <= 1: return 100
if day <= 4: return 300
if day <= 9: return 600
return 1000
def lm(path: str, data=None, method=None):
tok = _token()
headers = {"Content-Type": "application/json",
"Authorization": f"token {LISTMONK_USER}:{tok}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if not method:
method = "POST"
if method:
req.get_method = lambda: method
try:
with urllib.request.urlopen(req, timeout=20) as r:
return json.loads(r.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}")
def get_or_create_list() -> int:
res = lm("/lists?per_page=100")
for l in res.get("data", {}).get("results", []):
if l["name"] == LIST_NAME:
return l["id"]
res = lm("/lists", {"name": LIST_NAME, "type": "private", "optin": "single",
"tags": ["healthcare", "warmup", "revalidation"]})
return res["data"]["id"]
def load_imported() -> set[str]:
if os.path.exists(STATE_FILE):
return {ln.strip().lower() for ln in open(STATE_FILE) if ln.strip()}
return set()
def save_imported(emails: set[str]):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
with open(STATE_FILE, "w") as f:
f.write("\n".join(sorted(emails)) + "\n")
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
try:
lm("/subscribers", {
"email": email, "name": name or email.split("@")[0],
"status": "enabled", "lists": [list_id],
"attribs": attribs, "preconfirm_subscriptions": True,
})
return True
except SystemExit as e:
# Already exists -> attach to the list instead.
if "409" in str(e) or "already exists" in str(e).lower():
try:
q = "subscribers.email = '" + email.replace("'", "''") + "'"
found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1}))
results = found.get("data", {}).get("results", [])
if results:
sid = results[0]["id"]
lm("/subscribers/lists", {"ids": [sid], "action": "add",
"target_list_ids": [list_id],
"status": "confirmed"}, "PUT")
return True
except Exception:
return False
return False
def ensure_campaign(list_id: int) -> int:
res = lm("/campaigns?per_page=100")
for c in res.get("data", {}).get("results", []):
if c["name"] == CAMPAIGN_NAME:
return c["id"]
body = open(EMAIL_HTML).read()
payload = {
"name": CAMPAIGN_NAME, "subject": SUBJECT, "lists": [list_id],
"from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext",
"body": body, "messenger": "email",
"tags": ["healthcare", "warmup"],
"headers": [{"Reply-To": REPLY_TO},
{"List-Unsubscribe": "<{{ UnsubscribeURL }}>"},
{"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}],
}
res = lm("/campaigns", payload)
return res["data"]["id"]
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--slice", type=int, default=0, help="override daily import slice (0=ramp default)")
ap.add_argument("--start-campaign", action="store_true",
help="flip the campaign to 'running' (otherwise left as draft for approval)")
args = ap.parse_args()
day = warmup_day()
slice_n = args.slice or daily_slice(day)
print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} daily_slice={slice_n}")
rows = list(csv.DictReader(open(VERIFIED_CSV)))
imported = load_imported()
todo = [r for r in rows if r.get("email", "").strip().lower() not in imported][:slice_n]
print(f"[hc-cron] verified_total={len(rows)} already_imported={len(imported)} to_import_today={len(todo)}")
if args.dry_run:
for r in todo[:5]:
print(f" would import: {r['email']} {r.get('name','')[:30]} due={r.get('days_overdue','')}")
print("[hc-cron] dry-run, no changes")
return
if not todo:
print("[hc-cron] nothing new to import today")
return
list_id = get_or_create_list()
n_ok = 0
for r in todo:
email = r["email"].strip().lower()
attribs = {
"npi": r.get("npi", ""),
"practice": r.get("name", ""),
"specialty": r.get("specialty", ""),
"state": r.get("state", ""),
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
}
if add_subscriber(list_id, email, r.get("name") or "", attribs):
imported.add(email); n_ok += 1
save_imported(imported)
cid = ensure_campaign(list_id)
if args.start_campaign:
lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT")
print(f"[hc-cron] campaign {cid} set to RUNNING")
print(f"[hc-cron] imported {n_ok}/{len(todo)} new subscribers into list {list_id}; campaign={cid}")
if __name__ == "__main__":
main()