From 2bc86268f75686b316d2b85e7811c424094dfebb Mon Sep 17 00:00:00 2001 From: justin Date: Sat, 6 Jun 2026 03:57:08 -0500 Subject: [PATCH] healthcare: HC warmup campaign cron (Mon-Fri 7AM Central) - imports overdue-first verified slice into listmonk-hc + runs Medicare-revalidation campaign via hc HOT stream; rate-throttled by pw-hc-rampcap --- infra/cron/pw-hc-campaign | 5 + scripts/build_healthcare_campaigns_cron.py | 209 +++++++++++++++++++++ 2 files changed, 214 insertions(+) create mode 100644 infra/cron/pw-hc-campaign create mode 100644 scripts/build_healthcare_campaigns_cron.py diff --git a/infra/cron/pw-hc-campaign b/infra/cron/pw-hc-campaign new file mode 100644 index 0000000..4d7dcf9 --- /dev/null +++ b/infra/cron/pw-hc-campaign @@ -0,0 +1,5 @@ +# Healthcare warmup: import the next overdue-first slice into listmonk-hc and +# keep the Medicare-revalidation campaign running. Mon-Fri only, 7 AM Central +# (host TZ = America/Chicago). Delivery is throttled by pw-hc-rampcap's +# sliding-window cap, sent ONLY via the hc HOT stream (.107-.109). +0 7 * * 1-5 deploy cd /opt/performancewest && python3 scripts/build_healthcare_campaigns_cron.py --start-campaign >> /var/log/pw-hc-campaign.log 2>&1 diff --git a/scripts/build_healthcare_campaigns_cron.py b/scripts/build_healthcare_campaigns_cron.py new file mode 100644 index 0000000..e7e19ac --- /dev/null +++ b/scripts/build_healthcare_campaigns_cron.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 +"""Healthcare warmup campaign builder for listmonk-hc. + +Runs daily (Mon-Fri, 7 AM Central via cron). Each run: + 1. Imports the next slice of the VERIFIED, overdue-first warmup list into a + listmonk-hc list (deduped; already-imported rows are skipped). + 2. Ensures the teal "Medicare revalidation overdue" campaign exists and is + running, pointed at that list. + 3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the + actual per-hour throttling, so this builder just feeds the queue. + +The daily slice size follows the hc warmup ramp so we never queue more than the +IPs can safely send while warming. Sends ONLY happen via the hc HOT stream +(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool. + +Idempotent: safe to run every weekday. Tracks imported emails in a state file. + +Usage: + python3 scripts/build_healthcare_campaigns_cron.py # daily slice + python3 scripts/build_healthcare_campaigns_cron.py --dry-run + python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default +""" +from __future__ import annotations +import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error +from datetime import datetime, timezone + +LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101") +LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api") +# Token read from the host file written when the api user was created. +def _token() -> str: + t = os.getenv("HC_LISTMONK_TOKEN") + if t: + return t + for p in ("/opt/performancewest/.secrets/hc-listmonk-token", + "/etc/postfix/hc-listmonk-token"): + if os.path.exists(p): + try: + return open(p).read().strip() + except PermissionError: + continue + raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found") + +VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv") +EMAIL_HTML = os.getenv("HC_EMAIL_HTML", "/opt/performancewest/data/hc_campaigns/hc_revalidation_overdue.html") +STATE_FILE = os.getenv("HC_IMPORT_STATE", "/opt/performancewest/data/hc_imported_emails.txt") +WARMUP_STAMP = "/etc/postfix/hc-warmup-start" + +LIST_NAME = "HC Warmup - Revalidation Overdue" +CAMPAIGN_NAME = "HC Warmup - Medicare Revalidation" +FROM_EMAIL = "Performance West Compliance " +SUBJECT = "Action needed: your Medicare revalidation is overdue" +REPLY_TO = "info@performancewest.net" + + +def warmup_day() -> int: + try: + start = int(open(WARMUP_STAMP).read().strip()) + return max(0, int((time.time() - start) // 86400)) + except Exception: + return 0 + + +def daily_slice(day: int) -> int: + """How many NEW subscribers to import today, aligned with the hc ramp. + The rampcap caps hourly *delivery*; this caps daily *queueing* so we never + flood the warming IPs. Mon-Fri only (cron enforces the weekday).""" + if day <= 1: return 100 + if day <= 4: return 300 + if day <= 9: return 600 + return 1000 + + +def lm(path: str, data=None, method=None): + tok = _token() + headers = {"Content-Type": "application/json", + "Authorization": f"token {LISTMONK_USER}:{tok}"} + req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers) + if data is not None: + req.data = json.dumps(data).encode() + if not method: + method = "POST" + if method: + req.get_method = lambda: method + try: + with urllib.request.urlopen(req, timeout=20) as r: + return json.loads(r.read().decode()) + except urllib.error.HTTPError as e: + body = e.read().decode()[:300] + raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}") + + +def get_or_create_list() -> int: + res = lm("/lists?per_page=100") + for l in res.get("data", {}).get("results", []): + if l["name"] == LIST_NAME: + return l["id"] + res = lm("/lists", {"name": LIST_NAME, "type": "private", "optin": "single", + "tags": ["healthcare", "warmup", "revalidation"]}) + return res["data"]["id"] + + +def load_imported() -> set[str]: + if os.path.exists(STATE_FILE): + return {ln.strip().lower() for ln in open(STATE_FILE) if ln.strip()} + return set() + + +def save_imported(emails: set[str]): + os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) + with open(STATE_FILE, "w") as f: + f.write("\n".join(sorted(emails)) + "\n") + + +def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool: + try: + lm("/subscribers", { + "email": email, "name": name or email.split("@")[0], + "status": "enabled", "lists": [list_id], + "attribs": attribs, "preconfirm_subscriptions": True, + }) + return True + except SystemExit as e: + # Already exists -> attach to the list instead. + if "409" in str(e) or "already exists" in str(e).lower(): + try: + q = "subscribers.email = '" + email.replace("'", "''") + "'" + found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1})) + results = found.get("data", {}).get("results", []) + if results: + sid = results[0]["id"] + lm("/subscribers/lists", {"ids": [sid], "action": "add", + "target_list_ids": [list_id], + "status": "confirmed"}, "PUT") + return True + except Exception: + return False + return False + + +def ensure_campaign(list_id: int) -> int: + res = lm("/campaigns?per_page=100") + for c in res.get("data", {}).get("results", []): + if c["name"] == CAMPAIGN_NAME: + return c["id"] + body = open(EMAIL_HTML).read() + payload = { + "name": CAMPAIGN_NAME, "subject": SUBJECT, "lists": [list_id], + "from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext", + "body": body, "messenger": "email", + "tags": ["healthcare", "warmup"], + "headers": [{"Reply-To": REPLY_TO}, + {"List-Unsubscribe": "<{{ UnsubscribeURL }}>"}, + {"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}], + } + res = lm("/campaigns", payload) + return res["data"]["id"] + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--dry-run", action="store_true") + ap.add_argument("--slice", type=int, default=0, help="override daily import slice (0=ramp default)") + ap.add_argument("--start-campaign", action="store_true", + help="flip the campaign to 'running' (otherwise left as draft for approval)") + args = ap.parse_args() + + day = warmup_day() + slice_n = args.slice or daily_slice(day) + print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} daily_slice={slice_n}") + + rows = list(csv.DictReader(open(VERIFIED_CSV))) + imported = load_imported() + todo = [r for r in rows if r.get("email", "").strip().lower() not in imported][:slice_n] + print(f"[hc-cron] verified_total={len(rows)} already_imported={len(imported)} to_import_today={len(todo)}") + + if args.dry_run: + for r in todo[:5]: + print(f" would import: {r['email']} {r.get('name','')[:30]} due={r.get('days_overdue','')}") + print("[hc-cron] dry-run, no changes") + return + + if not todo: + print("[hc-cron] nothing new to import today") + return + + list_id = get_or_create_list() + n_ok = 0 + for r in todo: + email = r["email"].strip().lower() + attribs = { + "npi": r.get("npi", ""), + "practice": r.get("name", ""), + "specialty": r.get("specialty", ""), + "state": r.get("state", ""), + "detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)" + if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")), + } + if add_subscriber(list_id, email, r.get("name") or "", attribs): + imported.add(email); n_ok += 1 + save_imported(imported) + cid = ensure_campaign(list_id) + if args.start_campaign: + lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT") + print(f"[hc-cron] campaign {cid} set to RUNNING") + print(f"[hc-cron] imported {n_ok}/{len(todo)} new subscribers into list {list_id}; campaign={cid}") + + +if __name__ == "__main__": + main()