new-site/scripts/build_healthcare_campaigns_cron.py

222 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""Healthcare warmup campaign builder for listmonk-hc.
Runs daily (Mon-Fri, 7 AM Central via cron). Each run:
1. Imports the next slice of the VERIFIED, overdue-first warmup list into a
listmonk-hc list (deduped; already-imported rows are skipped).
2. Ensures the teal "Medicare revalidation overdue" campaign exists and is
running, pointed at that list.
3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the
actual per-hour throttling, so this builder just feeds the queue.
WARMUP COHORT: point HC_VERIFIED_CSV at hc_warmup_nongoogle.csv for weeks 1-2.
Google/Workspace-hosted practice domains (~31%) reject hard from cold IPs
(550-5.7.1 "unsolicited"), which damages warmup reputation. Send the 501
non-Google practice domains first; switch to hc_warmup_google.csv (222) once the
IPs are warm (week 2-3).
The daily slice size follows the hc warmup ramp so we never queue more than the
IPs can safely send while warming. Sends ONLY happen via the hc HOT stream
(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool.
Idempotent: safe to run every weekday. Tracks imported emails in a state file.
Usage:
python3 scripts/build_healthcare_campaigns_cron.py # daily slice
python3 scripts/build_healthcare_campaigns_cron.py --dry-run
python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default
"""
from __future__ import annotations
import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error
from datetime import datetime, timezone
LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101")
LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api")
# Token read from the host file written when the api user was created.
def _token() -> str:
t = os.getenv("HC_LISTMONK_TOKEN")
if t:
return t
for p in ("/opt/performancewest/.secrets/hc-listmonk-token",
"/etc/postfix/hc-listmonk-token"):
if os.path.exists(p):
try:
return open(p).read().strip()
except PermissionError:
continue
raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found")
VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv")
EMAIL_HTML = os.getenv("HC_EMAIL_HTML", "/opt/performancewest/data/hc_campaigns/hc_revalidation_overdue.html")
STATE_FILE = os.getenv("HC_IMPORT_STATE", "/opt/performancewest/data/hc_imported_emails.txt")
WARMUP_STAMP = "/etc/postfix/hc-warmup-start"
LIST_NAME = "HC Warmup - Revalidation Overdue"
CAMPAIGN_NAME = "HC Warmup - Medicare Revalidation"
FROM_EMAIL = "Performance West Compliance <compliance@performancewest.net>"
SUBJECT = "Action needed: your Medicare revalidation is overdue"
REPLY_TO = "info@performancewest.net"
def warmup_day() -> int:
try:
start = int(open(WARMUP_STAMP).read().strip())
return max(0, int((time.time() - start) // 86400))
except Exception:
return 0
def daily_slice(day: int) -> int:
"""How many NEW subscribers to import today, aligned with the hc ramp.
The rampcap caps hourly *delivery*; this caps daily *queueing* so we never
flood the warming IPs. Mon-Fri only (cron enforces the weekday)."""
if day <= 1: return 100
if day <= 4: return 300
if day <= 9: return 600
return 1000
def lm(path: str, data=None, method=None):
tok = _token()
headers = {"Content-Type": "application/json",
"Authorization": f"token {LISTMONK_USER}:{tok}"}
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
if data is not None:
req.data = json.dumps(data).encode()
if not method:
method = "POST"
if method:
req.get_method = lambda: method
try:
with urllib.request.urlopen(req, timeout=20) as r:
return json.loads(r.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode()[:300]
raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}")
def get_or_create_list() -> int:
res = lm("/lists?per_page=100")
for l in res.get("data", {}).get("results", []):
if l["name"] == LIST_NAME:
return l["id"]
res = lm("/lists", {"name": LIST_NAME, "type": "private", "optin": "single",
"tags": ["healthcare", "warmup", "revalidation"]})
return res["data"]["id"]
def load_imported() -> set[str]:
if os.path.exists(STATE_FILE):
return {ln.strip().lower() for ln in open(STATE_FILE) if ln.strip()}
return set()
def save_imported(emails: set[str]):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
with open(STATE_FILE, "w") as f:
f.write("\n".join(sorted(emails)) + "\n")
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
try:
lm("/subscribers", {
"email": email, "name": name or email.split("@")[0],
"status": "enabled", "lists": [list_id],
"attribs": attribs, "preconfirm_subscriptions": True,
})
return True
except SystemExit as e:
# Already exists -> attach to the list instead.
if "409" in str(e) or "already exists" in str(e).lower():
try:
q = "subscribers.email = '" + email.replace("'", "''") + "'"
found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1}))
results = found.get("data", {}).get("results", [])
if results:
sid = results[0]["id"]
lm("/subscribers/lists", {"ids": [sid], "action": "add",
"target_list_ids": [list_id],
"status": "confirmed"}, "PUT")
return True
except Exception:
return False
return False
def ensure_campaign(list_id: int) -> int:
# Reuse an existing HC warmup campaign only if it's still ACTIVE (draft /
# running / paused / scheduled). A finished/cancelled campaign can't accept
# new subscribers or be restarted, so we create a fresh dated one — that also
# picks up the latest email template (e.g. copy/colour tweaks).
from datetime import date
ACTIVE = {"draft", "running", "paused", "scheduled"}
res = lm("/campaigns?per_page=100")
for c in res.get("data", {}).get("results", []):
if c["name"].startswith(CAMPAIGN_NAME) and c.get("status") in ACTIVE:
return c["id"]
body = open(EMAIL_HTML).read()
dated = f"{CAMPAIGN_NAME} - {date.today():%b %d %Y}"
payload = {
"name": dated, "subject": SUBJECT, "lists": [list_id],
"from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext",
"body": body, "messenger": "email",
"tags": ["healthcare", "warmup"],
"headers": [{"Reply-To": REPLY_TO},
{"List-Unsubscribe": "<{{ UnsubscribeURL }}>"},
{"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}],
}
res = lm("/campaigns", payload)
return res["data"]["id"]
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--slice", type=int, default=0, help="override daily import slice (0=ramp default)")
ap.add_argument("--start-campaign", action="store_true",
help="flip the campaign to 'running' (otherwise left as draft for approval)")
args = ap.parse_args()
day = warmup_day()
slice_n = args.slice or daily_slice(day)
print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} daily_slice={slice_n}")
rows = list(csv.DictReader(open(VERIFIED_CSV)))
imported = load_imported()
todo = [r for r in rows if r.get("email", "").strip().lower() not in imported][:slice_n]
print(f"[hc-cron] verified_total={len(rows)} already_imported={len(imported)} to_import_today={len(todo)}")
if args.dry_run:
for r in todo[:5]:
print(f" would import: {r['email']} {r.get('name','')[:30]} due={r.get('days_overdue','')}")
print("[hc-cron] dry-run, no changes")
return
if not todo:
print("[hc-cron] nothing new to import today")
return
list_id = get_or_create_list()
n_ok = 0
for r in todo:
email = r["email"].strip().lower()
attribs = {
"npi": r.get("npi", ""),
"practice": r.get("name", ""),
"specialty": r.get("specialty", ""),
"state": r.get("state", ""),
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
}
if add_subscriber(list_id, email, r.get("name") or "", attribs):
imported.add(email); n_ok += 1
save_imported(imported)
cid = ensure_campaign(list_id)
if args.start_campaign:
lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT")
print(f"[hc-cron] campaign {cid} set to RUNNING")
print(f"[hc-cron] imported {n_ok}/{len(todo)} new subscribers into list {list_id}; campaign={cid}")
if __name__ == "__main__":
main()