#!/usr/bin/env python3 """Healthcare warmup campaign builder for listmonk-hc. Runs daily (Mon-Fri, 7 AM Central via cron). Each run: 1. Imports the next slice of the VERIFIED, overdue-first warmup list into a listmonk-hc list (deduped; already-imported rows are skipped). 2. Ensures the teal "Medicare revalidation overdue" campaign exists and is running, pointed at that list. 3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the actual per-hour throttling, so this builder just feeds the queue. WARMUP COHORT: point HC_VERIFIED_CSV at hc_warmup_nongoogle.csv for weeks 1-2. Google/Workspace-hosted practice domains (~31%) reject hard from cold IPs (550-5.7.1 "unsolicited"), which damages warmup reputation. Send the 501 non-Google practice domains first; switch to hc_warmup_google.csv (222) once the IPs are warm (week 2-3). The daily slice size follows the hc warmup ramp so we never queue more than the IPs can safely send while warming. Sends ONLY happen via the hc HOT stream (listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool. Idempotent: safe to run every weekday. Tracks imported emails in a state file. Usage: python3 scripts/build_healthcare_campaigns_cron.py # daily slice python3 scripts/build_healthcare_campaigns_cron.py --dry-run python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default """ from __future__ import annotations import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error from datetime import datetime, timezone LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101") LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api") # Token read from the host file written when the api user was created. def _token() -> str: t = os.getenv("HC_LISTMONK_TOKEN") if t: return t for p in ("/opt/performancewest/.secrets/hc-listmonk-token", "/etc/postfix/hc-listmonk-token"): if os.path.exists(p): try: return open(p).read().strip() except PermissionError: continue raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found") VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv") STATE_DIR = os.getenv("HC_STATE_DIR", "/opt/performancewest/data") WARMUP_STAMP = "/etc/postfix/hc-warmup-start" # Permanent do-not-contact list (one email per line). Providers who tell us they # already revalidated, asked to stop, etc. go here so the warmup NEVER imports or # re-mails them, regardless of segment or stale CMS data. Append + run --prune. SUPPRESS_FILE = os.getenv("HC_SUPPRESS_FILE", os.path.join(STATE_DIR, "hc_suppress.txt")) # Bulk From — sends from the dedicated bulk subdomain so its sending reputation # is isolated from the root domain (which stays clean for transactional / # verification mail). Replies still go to the root domain via Reply-To, so the # customer-facing reply experience is unchanged. See docs/deliverability.md. FROM_EMAIL = os.getenv( "HC_CAMPAIGN_FROM", "Performance West Compliance ", ) REPLY_TO = os.getenv("HC_CAMPAIGN_REPLY_TO", "info@performancewest.net") # Segment registry (subject, template file, list/campaign names, row selector) # is the single source of truth shared with build_healthcare_campaigns.py. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from build_healthcare_campaigns import SEGMENTS, template_path # noqa: E402 from _email_plaintext import html_to_text # noqa: E402 def load_suppressed() -> set[str]: if os.path.exists(SUPPRESS_FILE): return {ln.strip().lower() for ln in open(SUPPRESS_FILE) if ln.strip() and not ln.startswith("#")} return set() # Which segments to warm, in priority order. Revalidation stays the lead (it has # the most verified data + the official-record card). The others warm in # parallel on smaller slices so we collect engagement data across all programs # without overwhelming the warming IPs. # Which segments to warm, in priority order. With one-email-per-provider # assignment (assign_all), each provider is routed to exactly ONE of these by # urgency: reactivation (10) > revalidation overdue (20) > due soon (30) > the # free NPI compliance check (100, catch-all). The free check ITSELF includes OIG/ # SAM exclusion screening and routes to the paid OIG fix on a hit, so the # standalone blanket `oig_screening` email (which matched every verified row and # would otherwise starve the catch-all) is intentionally NOT in the default # rotation -- it can still be run explicitly via --segments for a dedicated push. ACTIVE_SEGMENTS = os.getenv( "HC_SEGMENTS", "npi_reactivation,revalidation_overdue,revalidation_due_soon,nppes_outdated,compliance_bundle", ).split(",") # Warmup deliverability guard: only mail SLIGHTLY-overdue providers. A practice # that lapsed recently is almost certainly still operating with a live inbox; one # that is many months/years overdue has likely closed, merged, or abandoned the # address, so its mail bounces -- and bounces are the fastest way to wreck a # warming IP's reputation. Window is inclusive [MIN, MAX] days overdue. WARMUP_OVERDUE_MIN = int(os.getenv("HC_OVERDUE_MIN", "1")) WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90")) # Proactive "revalidation due soon" window (days UNTIL the due date). Mirrors the # overdue window so we reach providers shortly before AND after their deadline, # roughly doubling the deliverable warmup pool from the same CMS data source. WARMUP_DUE_SOON_MIN = int(os.getenv("HC_DUE_SOON_MIN", "1")) WARMUP_DUE_SOON_MAX = int(os.getenv("HC_DUE_SOON_MAX", "90")) def _overdue_days(r: dict): v = (r.get("days_overdue") or "").strip() try: return int(v) except ValueError: return None # During warmup, hold out Google-Workspace-hosted domains: Google rejects bulk # mail from cold/warming IPs hard (550-5.7.1), and those bounces wreck the # warming reputation. The mx_provider flag is set by the weekly hc_data_refresh # (an MX lookup, since a custom domain can silently use Google Workspace). Set # HC_SKIP_GOOGLE=0 to lift this once the IPs are warm. SKIP_GOOGLE = os.getenv("HC_SKIP_GOOGLE", "1") not in ("0", "false", "no") def _is_google_hosted(r: dict) -> bool: if not SKIP_GOOGLE: return False return (r.get("mx_provider") or "").strip().lower() == "google" def warmup_day() -> int: try: start = int(open(WARMUP_STAMP).read().strip()) return max(0, int((time.time() - start) // 86400)) except Exception: return 0 def daily_slice(day: int) -> int: """TOTAL new subscribers to import across ALL segments today, aligned with the hc ramp. The rampcap caps hourly *delivery*; this caps daily *queueing* so we never flood the warming IPs. Runs DAILY (continuous warmup -- mailbox providers reward consistent daily volume; weekends are NOT skipped).""" if day <= 1: return 100 if day <= 4: return 300 if day <= 9: return 600 return 1000 # ── Per-MX-operator throttle ───────────────────────────────────────────────── # Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365, # Google Workspace, Proofpoint, ...), not by recipient domain. So we cap how many # new providers we queue per operator per day, and let volume spread across the # long tail of operators freely. This lets total daily volume be much higher than # a flat cap without hammering any single receiving system. Caps ramp with the # warmup day. "default" applies to any operator not explicitly listed (mostly the # long tail of small/independent mail hosts -- a generous cap is safe there # because each sees only a handful). def mx_daily_caps(day: int) -> dict: # (microsoft, google, proofpoint, default-per-operator) if day <= 1: big, default = 25, 15 elif day <= 4: big, default = 60, 40 elif day <= 9: big, default = 120, 80 else: big, default = 250, 150 return { "microsoft": big, "google": big, "proofpoint": big, "cisco": big, "mimecast": big, "barracuda": big, "__default__": default, } def mx_throttled(candidates: list[dict], total_n: int, caps: dict) -> list[dict]: """Pick up to total_n candidates, capping per mx_provider so no single receiving operator gets more than its daily share. Preserves input order within each operator. Falls back to ungrouped slicing if rows have no mx_provider.""" if not candidates or "mx_provider" not in candidates[0]: return candidates[:total_n] per_op: dict = {} chosen: list[dict] = [] default_cap = caps.get("__default__", 50) for r in candidates: if len(chosen) >= total_n: break op = (r.get("mx_provider") or "").strip() or "__default__" cap = caps.get(op, default_cap) if per_op.get(op, 0) >= cap: continue per_op[op] = per_op.get(op, 0) + 1 chosen.append(r) return chosen def lm(path: str, data=None, method=None): tok = _token() headers = {"Content-Type": "application/json", "Authorization": f"token {LISTMONK_USER}:{tok}"} req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers) if data is not None: req.data = json.dumps(data).encode() if not method: method = "POST" if method: req.get_method = lambda: method try: with urllib.request.urlopen(req, timeout=20) as r: return json.loads(r.read().decode()) except urllib.error.HTTPError as e: body = e.read().decode()[:300] raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}") def get_or_create_list(list_name: str, tags: list[str]) -> int: res = lm("/lists?per_page=100") for l in res.get("data", {}).get("results", []): if l["name"] == list_name: return l["id"] res = lm("/lists", {"name": list_name, "type": "private", "optin": "single", "tags": tags}) return res["data"]["id"] def state_file(seg_key: str) -> str: return os.path.join(STATE_DIR, f"hc_imported_{seg_key}.txt") def load_imported(seg_key: str) -> set[str]: p = state_file(seg_key) if os.path.exists(p): return {ln.strip().lower() for ln in open(p) if ln.strip()} # Back-compat: the original single-segment cron tracked revalidation imports # in hc_imported_emails.txt. Seed the revalidation state from it once. legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt") if seg_key == "revalidation_overdue" and os.path.exists(legacy): return {ln.strip().lower() for ln in open(legacy) if ln.strip()} return set() def save_imported(seg_key: str, emails: set[str]): os.makedirs(STATE_DIR, exist_ok=True) with open(state_file(seg_key), "w") as f: f.write("\n".join(sorted(emails)) + "\n") def load_all_imported() -> set[str]: """Union of EVERY segment's imported-emails state, i.e. everyone who has already been emailed by ANY segment. Used as a cross-segment AND cross-cron guard so a provider gets exactly one healthcare email overall: the two crons (pw-hc-campaign on the small warmup file, pw-hc-nppes on the 63k institutional file) share these state files, and ~312 emails overlap both files, so without this a provider warmed as 'revalidation_overdue' by one cron could also be warmed as the free 'nppes_outdated' check by the other. Reads all hc_imported_*.txt plus the legacy single-segment file.""" seen: set[str] = set() for key in SEGMENTS: seen |= load_imported(key) legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt") if os.path.exists(legacy): seen |= {ln.strip().lower() for ln in open(legacy) if ln.strip()} return seen def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool: try: lm("/subscribers", { "email": email, "name": name or email.split("@")[0], "status": "enabled", "lists": [list_id], "attribs": attribs, "preconfirm_subscriptions": True, }) return True except SystemExit as e: # Already exists -> attach to the list AND refresh attribs so per-segment # merge fields (e.g. days_until for the due-soon template) are correct # even when the same provider already exists from another segment import. if "409" in str(e) or "already exists" in str(e).lower(): try: q = "subscribers.email = '" + email.replace("'", "''") + "'" found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1})) results = found.get("data", {}).get("results", []) if results: sid = results[0]["id"] existing_attribs = results[0].get("attribs") or {} # Merge: keep prior fields, overwrite with this segment's # values (npi/practice/due-date/days_until/days_overdue). merged = {**existing_attribs, **attribs} lm(f"/subscribers/{sid}", { "email": email, "name": name or email.split("@")[0], "attribs": merged, }, "PUT") lm("/subscribers/lists", {"ids": [sid], "action": "add", "target_list_ids": [list_id], "status": "confirmed"}, "PUT") return True except Exception: return False return False def ensure_campaign(seg_key: str, list_id: int, send_list_name: str | None = None) -> int: # One campaign per (segment, day): each day's campaign is bound to that day's # dedicated SEND list, so it mails ONLY that day's newly-imported slice. We # therefore reuse a campaign only when it's the SAME day's campaign (matched by # the dated name) and still active; we never reuse a prior day's campaign, # because that would re-target an old slice. A finished/cancelled same-day # campaign can't restart, so a fresh one is created (also picking up the latest # canonical hc_*.html template). from datetime import date seg = SEGMENTS[seg_key] ACTIVE = {"draft", "running", "paused", "scheduled"} dated = f"{seg['campaign_name']} - {date.today():%b %d %Y}" res = lm("/campaigns?per_page=100") for c in res.get("data", {}).get("results", []): if c["name"] == dated and c.get("status") in ACTIVE: return c["id"] body = open(template_path(seg_key)).read() payload = { "name": dated, "subject": seg["subject"], "lists": [list_id], "from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext", "body": body, "altbody": html_to_text(body), "messenger": "email", "tags": ["healthcare", "warmup", seg_key], "headers": [{"Reply-To": REPLY_TO}, {"List-Unsubscribe": "<{{ UnsubscribeURL }}>"}, {"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}], } res = lm("/campaigns", payload) return res["data"]["id"] def row_matches(seg_key: str, r: dict) -> bool: """Does this master-CSV row belong to a segment? Driven by the segment's `selector` so the row->segment mapping lives next to the segment metadata.""" sel = SEGMENTS[seg_key]["selector"] status = (r.get("reval_status") or "").strip().lower() excluded = (r.get("leie_excluded") or "").strip() not in ("", "0", "false") optout = (r.get("optout_ending") or "").strip() != "" if sel == "reval_overdue": # Only SLIGHTLY-overdue: recently-lapsed practices are still active with # deliverable inboxes. Heavily-overdue ones likely bounce and burn the # warming IP's reputation, so we hold them out of the warmup window. if status != "overdue": return False od = _overdue_days(r) return od is not None and WARMUP_OVERDUE_MIN <= od <= WARMUP_OVERDUE_MAX if sel == "reval_due_soon": # Proactive: revalidation is UPCOMING within the lookahead window. Pitch # is "handle it before your deadline" -- taps the same CMS Revalidation # Due Date List as reval_overdue but the (much larger) not-yet-due slice, # so it grows warmup supply without touching a new data source. # days_overdue is negative for upcoming (days until due), so a provider # due in N days has days_overdue == -N. if status != "upcoming": return False od = _overdue_days(r) if od is None: return False days_until = -od return WARMUP_DUE_SOON_MIN <= days_until <= WARMUP_DUE_SOON_MAX if sel == "reval_upcoming": return status == "upcoming" if sel == "leie_or_deactivated": # Reactivation targets: flagged OIG/SAM excluded, OR the NPI is genuinely # DEACTIVATED in NPPES (real signal from enrich_nppes_last_updated.py). # We deliberately do NOT treat "not on the CMS revalidation list" as # deactivated here: for the institutional org-NPI pool that's false (an # org NPI simply may not be an individual Medicare enrollee), which would # mis-tell a fully-active practice they're deactivated. Only real # exclusion or a real NPPES deactivation qualifies. deactivated = (r.get("nppes_deactivated") or "").strip().upper() == "Y" return excluded or deactivated if sel == "optout_ending": return optout if sel == "institutional_default": # Catch-all for the FREE NPI compliance-check email: any SMTP-verified # institutional row that isn't a more-urgent case. With one-email-per- # provider assignment (assign_segment), higher-priority segments # (reactivation, revalidation) claim their providers first, so this only # receives the remainder. Deactivated NPIs are excluded (they belong to # reactivation and likely bounce). We keep the SMTP-verification gate so # we only mail inboxes we already proved are live. if (r.get("nppes_deactivated") or "").strip().upper() == "Y": return False return (str(r.get("verify_ok", "")).strip().upper() in ("Y", "YES", "TRUE", "1", "")) if sel == "any": # OIG screening applies to any billing practice, but for warmup we still # exclude the likely-undeliverable: providers heavily overdue (stale) or # already dropped off the reval list. Recently-lapsed and upcoming stay. if status in ("not_on_list", "no_reval_flag"): return False od = _overdue_days(r) if status == "overdue" and (od is None or od > WARMUP_OVERDUE_MAX): return False return True if sel == "institutional_verified": # For the freshly SMTP-VERIFIED institutional list, "not on the CMS # revalidation list" does NOT mean undeliverable -- it just means the org # NPI is not an individual Medicare enrollee. We already proved the inbox # is live (verify_ok), so trust that instead of using reval-list presence # as a deliverability proxy. Still hold out the heavily-overdue (stale) # individual enrollees that DO appear, to protect the warming IP. if (str(r.get("verify_ok", "")).strip().upper() not in ("Y", "YES", "TRUE", "1", "")): return False od = _overdue_days(r) if status == "overdue" and od is not None and od > WARMUP_OVERDUE_MAX: return False return True return False def _seg_priority(seg_key: str) -> int: """Urgency rank for a segment (lower = more urgent). Defaults high so a segment without an explicit priority never out-ranks a real one.""" return int(SEGMENTS[seg_key].get("priority", 1000)) def assign_segment(r: dict, active_segments: list[str]) -> str | None: """One-email-per-provider: return the SINGLE segment a provider belongs to -- the most-urgent (lowest priority number) active segment whose selector matches their row -- or None if no active segment matches. This is what guarantees a provider who is e.g. both revalidation-overdue AND eligible for the free check receives only the overdue email (priority 20), never both. Ties broken by priority then segment key for determinism.""" matches = [s for s in active_segments if row_matches(s, r)] if not matches: return None return min(matches, key=lambda s: (_seg_priority(s), s)) def assign_all(rows: list[dict], active_segments: list[str]) -> dict[str, str]: """Map email -> assigned segment across the whole list, so each segment's importer can claim only its assigned providers. Computed once per run. An email can appear on MULTIPLE rows (a shared practice inbox covering several NPIs, e.g. a credentialing address) and those rows can carry DIFFERENT statuses (one NPI overdue, another not on the list). We must keep the MOST-URGENT assignment across all of that email's rows -- otherwise a later, less-urgent row would clobber an earlier urgent one and the provider would get the free check instead of the overdue email. So we compare priorities and keep the winner (lower number = more urgent).""" out: dict[str, str] = {} for r in rows: email = (r.get("email") or "").strip().lower() if not email: continue seg = assign_segment(r, active_segments) if seg is None: continue prev = out.get(email) if prev is None or _seg_priority(seg) < _seg_priority(prev): out[email] = seg return out def attribs_for(r: dict) -> dict: # days_overdue is positive when past due and negative when upcoming (days # until the due date). Expose a clean positive "days_until" for the # due-soon segment's template. od_raw = (str(r.get("days_overdue", "")) or "").strip() days_until = "" try: od = int(od_raw) if od < 0: days_until = str(-od) except ValueError: pass return { "npi": r.get("npi", ""), "practice": r.get("name", ""), "specialty": r.get("specialty", ""), "state": r.get("state", ""), # Separate fields so the email's "official CMS record" card can render # the due date + overdue/until count cleanly (mirrors the CMS # Revalidation Due Date List, verified by NPI via the weekly refresh). "reval_due_date": r.get("reval_due_date", ""), "days_overdue": str(r.get("days_overdue", "")), "days_until": days_until, # MX operator (for per-operator analysis + throttling audit). "mx_provider": r.get("mx_provider", ""), # Real NPPES freshness (from enrich_nppes_last_updated.py). Lets the # "NPPES may be out of date" email cite the actual government date the # provider can verify on the public registry, instead of an unbacked # "FLAGGED OUT OF DATE" claim. "nppes_last_updated": r.get("nppes_last_updated", ""), "nppes_years_stale": r.get("nppes_years_stale", ""), "nppes_enumeration": r.get("nppes_enumeration", ""), "detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)" if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")), } def warm_segment(seg_key: str, rows: list[dict], slice_n: int, dry_run: bool, start_campaign: bool, assignment: dict[str, str] | None = None) -> int: """Import up to slice_n new subscribers for one segment and keep its campaign active. Returns how many NEW subscribers were imported. If `assignment` (email -> single assigned segment) is given, a provider is a candidate only when THIS segment is their assigned one -- enforcing one-email-per-provider by urgency priority. When it's None, fall back to the legacy behavior (row_matches), so the warmup cron that runs a single segment keeps working unchanged.""" seg = SEGMENTS[seg_key] imported = load_imported(seg_key) # Cross-segment + cross-cron guard: skip anyone already emailed by ANY # segment so each provider gets exactly one healthcare email overall. already_anywhere = load_all_imported() suppressed = load_suppressed() def _is_candidate(r: dict) -> bool: email = r.get("email", "").strip().lower() if not email or email in already_anywhere or email in suppressed: return False if _is_google_hosted(r): return False if assignment is not None: # The email must be assigned to THIS segment AND this specific row # must be the one that earns it. An email can span several rows (a # shared practice inbox over multiple NPIs); only the row whose own # status matches this segment's selector should represent it, so the # template renders that row's real data (e.g. the overdue NPI's due # date, never a sibling 'not_on_list' row's blank one). This also # dedupes: at most one row per email passes. return assignment.get(email) == seg_key and row_matches(seg_key, r) return row_matches(seg_key, r) # Dedupe by email: an email can legitimately appear on multiple matching # rows (e.g. two overdue NPIs share one inbox). Keep the first so the email # is imported once and counted once against the slice budget. candidates = [] seen_emails: set[str] = set() for r in rows: if not _is_candidate(r): continue email = r["email"].strip().lower() if email in seen_emails: continue seen_emails.add(email) candidates.append(r) # Spread the slice across MX operators so no single receiving system (e.g. # Microsoft 365) gets the whole batch. Caps ramp with the warmup day. todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day())) print(f"[hc-cron] {seg_key}: candidates={len(candidates)} " f"in_segment={len(imported)} emailed_anywhere={len(already_anywhere)} " f"to_import={len(todo)}") if dry_run: for r in todo[:3]: print(f" would import: {r['email']} {r.get('name','')[:28]} " f"status={r.get('reval_status','')}") return 0 if not todo: return 0 list_id = get_or_create_list(seg["list_name"], ["healthcare", "warmup", seg_key]) # Per-day SEND list: the campaign targets ONLY today's newly-imported slice, # never the whole accumulated segment list. Without this, each day's campaign # re-mailed every prior subscriber (the cumulative list), so the cohort # imported on day 1 received the identical email every subsequent day -- a # 5-6x repeat that burned reputation and blocklisted ~12% of the list. The # persistent `list_id` is still used for dedup/records; the dated send list is # what the campaign actually sends to, so every provider gets exactly one send. from datetime import date send_list_name = f"{seg['list_name']} - SEND {date.today():%Y-%m-%d}" send_list_id = get_or_create_list(send_list_name, ["healthcare", "warmup", seg_key, "daily-send"]) n_ok = 0 for r in todo: email = r["email"].strip().lower() # Add to BOTH the persistent segment list (dedup/records) and today's # send list (the campaign's only audience). ok = add_subscriber(list_id, email, r.get("name") or "", attribs_for(r)) add_subscriber(send_list_id, email, r.get("name") or "", attribs_for(r)) if ok: imported.add(email); n_ok += 1 save_imported(seg_key, imported) # Campaign targets the per-day send list, NOT the cumulative segment list. cid = ensure_campaign(seg_key, send_list_id, send_list_name) if start_campaign: try: lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT") except SystemExit as e: # Already running raises; that's fine. if "already" not in str(e).lower(): print(f"[hc-cron] {seg_key}: start warning: {e}") print(f"[hc-cron] {seg_key}: imported {n_ok} into list {list_id} " f"(send list {send_list_id}); campaign={cid}") return n_ok def _all_list_subscribers(list_id: int): """Yield (id, email) for every subscriber on a list, paging the API.""" page, per = 1, 1000 while True: q = urllib.parse.urlencode({"list_id": list_id, "page": page, "per_page": per}) res = lm("/subscribers?" + q) results = res.get("data", {}).get("results", []) or [] for s in results: yield s["id"], (s.get("email") or "").strip().lower() if len(results) < per: break page += 1 def prune_holdouts(dry_run: bool) -> int: """Belt-and-suspenders: remove subscribers who should NOT be in the warmup from the active warmup lists, even if they were imported before a guard existed or their domain's MX has since flipped to Google. We match against the FRESH MASTER CSV (re-classified weekly by hc_data_refresh), not the listmonk attribs snapshot, so a domain that newly became Google-hosted is caught here. Returns the number of (subscriber, list) removals.""" master_path = os.getenv("HC_MASTER_CSV", os.path.join(STATE_DIR, "hc_warmup_week1.csv")) if not os.path.exists(master_path): print(f"[hc-cron] prune: master {master_path} not found, skipping") return 0 rows = list(csv.DictReader(open(master_path))) by_email = {r.get("email", "").strip().lower(): r for r in rows if r.get("email")} suppressed = load_suppressed() removed = 0 for seg_key, seg in SEGMENTS.items(): try: res = lm("/lists?per_page=100") list_id = next((l["id"] for l in res.get("data", {}).get("results", []) if l["name"] == seg["list_name"]), None) except SystemExit: list_id = None if not list_id: continue drop_ids = [] for sid, email in _all_list_subscribers(list_id): # Always remove anyone on the permanent do-not-contact list (they # told us to stop / already revalidated), regardless of source data. if email in suppressed: drop_ids.append(sid) continue r = by_email.get(email) if r is None: continue # not in our source data; leave it alone # DELIVERABILITY-only prune: remove subscribers whose domain is now # Google-hosted (would hard-bounce from the cold IP). We deliberately # do NOT evict for audience reasons (e.g. an overdue provider drifting # out of the 1-90 day window) -- they were a valid target when warmed # and re-evaluating audience on already-engaged people just wastes # warmup progress. The import-time guard handles audience for NEW adds. if _is_google_hosted(r): drop_ids.append(sid) if drop_ids: print(f"[hc-cron] prune {seg_key} (list {list_id}): " f"{len(drop_ids)} holdouts to remove") if not dry_run: # Bulk unsubscribe + detach from this list (chunked). for i in range(0, len(drop_ids), 500): chunk = drop_ids[i:i + 500] lm("/subscribers/lists", {"ids": chunk, "action": "remove", "target_list_ids": [list_id]}, "PUT") removed += len(drop_ids) print(f"[hc-cron] prune: removed {removed} subscriber-list holdouts") return removed def main(): ap = argparse.ArgumentParser() ap.add_argument("--dry-run", action="store_true") ap.add_argument("--slice", type=int, default=0, help="override TOTAL daily import slice across segments (0=ramp default)") ap.add_argument("--segments", default=",".join(ACTIVE_SEGMENTS), help="comma list of segment keys to warm") ap.add_argument("--start-campaign", action="store_true", help="flip campaigns to 'running' (otherwise left as draft for approval)") ap.add_argument("--prune", action="store_true", help="also remove now-Google-hosted / out-of-audience subscribers " "from the warmup lists (run after the weekly refresh)") ap.add_argument("--prune-only", action="store_true", help="run ONLY the deliverability prune, then exit (no import/warm)") args = ap.parse_args() day = warmup_day() total_slice = args.slice or daily_slice(day) segments = [s.strip() for s in args.segments.split(",") if s.strip() in SEGMENTS] print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} " f"total_slice={total_slice} segments={segments}") rows = list(csv.DictReader(open(VERIFIED_CSV))) print(f"[hc-cron] verified_total={len(rows)}") if args.prune or args.prune_only: prune_holdouts(args.dry_run) if args.prune_only: return # Split the daily slice across segments by URGENCY PRIORITY. We process # segments most-urgent first (lowest priority number) and give each the # remaining budget, so urgent segments (reactivation, revalidation-overdue) # are fully served before the broad free-NPI-check catch-all consumes the # rest. Because assign_all already routed each provider to a single segment, # the segment pools are disjoint -- no provider is double-counted or # double-mailed. Unused budget from a small urgent pool flows to the next # segment automatically (we only decrement by what was actually imported). assignment = assign_all(rows, segments) if assignment: from collections import Counter dist = Counter(assignment.values()) print(f"[hc-cron] assignment (one email/provider by priority): " + ", ".join(f"{k}={dist[k]}" for k in sorted(dist, key=lambda s: (_seg_priority(s), s)))) order = sorted(segments, key=lambda s: (_seg_priority(s), s)) grand = 0 budget = total_slice for seg_key in order: cap = budget if not args.dry_run else total_slice n = warm_segment(seg_key, rows, cap, args.dry_run, args.start_campaign, assignment) grand += n budget -= n if budget <= 0 and not args.dry_run: break print(f"[hc-cron] done: imported {grand} new subscribers across {len(segments)} segment(s)") if __name__ == "__main__": main()