The HC warmup imported ~1000 fresh providers/day into a persistent segment list (list 10), but each day's campaign targeted that WHOLE cumulative list -- so the cohort imported on day 1 received the identical 'Free NPI Check' email every subsequent day (verified: subscriber 3410 got campaigns 38-42, 5x). Program-wide that was 23,843 sends to 6,587 people (3.6x avg, max 30x) and blocklisted ~12% of the list -- a Yahoo 'user complaints' deferral now confirms the burn. Fix: import each day's slice into a dedicated dated SEND list and bind that day's campaign to ONLY that list, so every provider gets exactly one send. The persistent segment list is still used for dedup/records. ensure_campaign now matches the exact dated name (never reuses a prior day's campaign/slice).
724 lines
34 KiB
Python
724 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
"""Healthcare warmup campaign builder for listmonk-hc.
|
|
|
|
Runs daily (Mon-Fri, 7 AM Central via cron). Each run:
|
|
1. Imports the next slice of the VERIFIED, overdue-first warmup list into a
|
|
listmonk-hc list (deduped; already-imported rows are skipped).
|
|
2. Ensures the teal "Medicare revalidation overdue" campaign exists and is
|
|
running, pointed at that list.
|
|
3. Listmonk-hc's sliding-window rate cap (driven by pw-hc-rampcap) does the
|
|
actual per-hour throttling, so this builder just feeds the queue.
|
|
|
|
WARMUP COHORT: point HC_VERIFIED_CSV at hc_warmup_nongoogle.csv for weeks 1-2.
|
|
Google/Workspace-hosted practice domains (~31%) reject hard from cold IPs
|
|
(550-5.7.1 "unsolicited"), which damages warmup reputation. Send the 501
|
|
non-Google practice domains first; switch to hc_warmup_google.csv (222) once the
|
|
IPs are warm (week 2-3).
|
|
|
|
The daily slice size follows the hc warmup ramp so we never queue more than the
|
|
IPs can safely send while warming. Sends ONLY happen via the hc HOT stream
|
|
(listmonk-hc -> host :2526/2527/2528 -> .107/.108/.109), never the trucking pool.
|
|
|
|
Idempotent: safe to run every weekday. Tracks imported emails in a state file.
|
|
|
|
Usage:
|
|
python3 scripts/build_healthcare_campaigns_cron.py # daily slice
|
|
python3 scripts/build_healthcare_campaigns_cron.py --dry-run
|
|
python3 scripts/build_healthcare_campaigns_cron.py --slice 0 # use ramp default
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse, base64, csv, json, os, sys, time, urllib.request, urllib.parse, urllib.error
|
|
from datetime import datetime, timezone
|
|
|
|
LISTMONK_URL = os.getenv("HC_LISTMONK_URL", "http://localhost:9101")
|
|
LISTMONK_USER = os.getenv("HC_LISTMONK_USER", "api")
|
|
# Token read from the host file written when the api user was created.
|
|
def _token() -> str:
|
|
t = os.getenv("HC_LISTMONK_TOKEN")
|
|
if t:
|
|
return t
|
|
for p in ("/opt/performancewest/.secrets/hc-listmonk-token",
|
|
"/etc/postfix/hc-listmonk-token"):
|
|
if os.path.exists(p):
|
|
try:
|
|
return open(p).read().strip()
|
|
except PermissionError:
|
|
continue
|
|
raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found")
|
|
|
|
VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv")
|
|
STATE_DIR = os.getenv("HC_STATE_DIR", "/opt/performancewest/data")
|
|
WARMUP_STAMP = "/etc/postfix/hc-warmup-start"
|
|
# Permanent do-not-contact list (one email per line). Providers who tell us they
|
|
# already revalidated, asked to stop, etc. go here so the warmup NEVER imports or
|
|
# re-mails them, regardless of segment or stale CMS data. Append + run --prune.
|
|
SUPPRESS_FILE = os.getenv("HC_SUPPRESS_FILE", os.path.join(STATE_DIR, "hc_suppress.txt"))
|
|
|
|
# Bulk From — sends from the dedicated bulk subdomain so its sending reputation
|
|
# is isolated from the root domain (which stays clean for transactional /
|
|
# verification mail). Replies still go to the root domain via Reply-To, so the
|
|
# customer-facing reply experience is unchanged. See docs/deliverability.md.
|
|
FROM_EMAIL = os.getenv(
|
|
"HC_CAMPAIGN_FROM",
|
|
"Performance West Compliance <compliance@send.performancewest.net>",
|
|
)
|
|
REPLY_TO = os.getenv("HC_CAMPAIGN_REPLY_TO", "info@performancewest.net")
|
|
|
|
# Segment registry (subject, template file, list/campaign names, row selector)
|
|
# is the single source of truth shared with build_healthcare_campaigns.py.
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from build_healthcare_campaigns import SEGMENTS, template_path # noqa: E402
|
|
from _email_plaintext import html_to_text # noqa: E402
|
|
|
|
|
|
def load_suppressed() -> set[str]:
|
|
if os.path.exists(SUPPRESS_FILE):
|
|
return {ln.strip().lower() for ln in open(SUPPRESS_FILE) if ln.strip() and not ln.startswith("#")}
|
|
return set()
|
|
|
|
# Which segments to warm, in priority order. Revalidation stays the lead (it has
|
|
# the most verified data + the official-record card). The others warm in
|
|
# parallel on smaller slices so we collect engagement data across all programs
|
|
# without overwhelming the warming IPs.
|
|
# Which segments to warm, in priority order. With one-email-per-provider
|
|
# assignment (assign_all), each provider is routed to exactly ONE of these by
|
|
# urgency: reactivation (10) > revalidation overdue (20) > due soon (30) > the
|
|
# free NPI compliance check (100, catch-all). The free check ITSELF includes OIG/
|
|
# SAM exclusion screening and routes to the paid OIG fix on a hit, so the
|
|
# standalone blanket `oig_screening` email (which matched every verified row and
|
|
# would otherwise starve the catch-all) is intentionally NOT in the default
|
|
# rotation -- it can still be run explicitly via --segments for a dedicated push.
|
|
ACTIVE_SEGMENTS = os.getenv(
|
|
"HC_SEGMENTS",
|
|
"npi_reactivation,revalidation_overdue,revalidation_due_soon,nppes_outdated,compliance_bundle",
|
|
).split(",")
|
|
|
|
# Warmup deliverability guard: only mail SLIGHTLY-overdue providers. A practice
|
|
# that lapsed recently is almost certainly still operating with a live inbox; one
|
|
# that is many months/years overdue has likely closed, merged, or abandoned the
|
|
# address, so its mail bounces -- and bounces are the fastest way to wreck a
|
|
# warming IP's reputation. Window is inclusive [MIN, MAX] days overdue.
|
|
WARMUP_OVERDUE_MIN = int(os.getenv("HC_OVERDUE_MIN", "1"))
|
|
WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90"))
|
|
# Proactive "revalidation due soon" window (days UNTIL the due date). Mirrors the
|
|
# overdue window so we reach providers shortly before AND after their deadline,
|
|
# roughly doubling the deliverable warmup pool from the same CMS data source.
|
|
WARMUP_DUE_SOON_MIN = int(os.getenv("HC_DUE_SOON_MIN", "1"))
|
|
WARMUP_DUE_SOON_MAX = int(os.getenv("HC_DUE_SOON_MAX", "90"))
|
|
|
|
|
|
def _overdue_days(r: dict):
|
|
v = (r.get("days_overdue") or "").strip()
|
|
try:
|
|
return int(v)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
# During warmup, hold out Google-Workspace-hosted domains: Google rejects bulk
|
|
# mail from cold/warming IPs hard (550-5.7.1), and those bounces wreck the
|
|
# warming reputation. The mx_provider flag is set by the weekly hc_data_refresh
|
|
# (an MX lookup, since a custom domain can silently use Google Workspace). Set
|
|
# HC_SKIP_GOOGLE=0 to lift this once the IPs are warm.
|
|
SKIP_GOOGLE = os.getenv("HC_SKIP_GOOGLE", "1") not in ("0", "false", "no")
|
|
|
|
|
|
def _is_google_hosted(r: dict) -> bool:
|
|
if not SKIP_GOOGLE:
|
|
return False
|
|
return (r.get("mx_provider") or "").strip().lower() == "google"
|
|
|
|
|
|
def warmup_day() -> int:
|
|
try:
|
|
start = int(open(WARMUP_STAMP).read().strip())
|
|
return max(0, int((time.time() - start) // 86400))
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def daily_slice(day: int) -> int:
|
|
"""TOTAL new subscribers to import across ALL segments today, aligned with
|
|
the hc ramp. The rampcap caps hourly *delivery*; this caps daily *queueing*
|
|
so we never flood the warming IPs. Runs DAILY (continuous warmup -- mailbox
|
|
providers reward consistent daily volume; weekends are NOT skipped)."""
|
|
if day <= 1: return 100
|
|
if day <= 4: return 300
|
|
if day <= 9: return 600
|
|
return 1000
|
|
|
|
|
|
# ── Per-MX-operator throttle ─────────────────────────────────────────────────
|
|
# Sender reputation is tracked by the RECEIVING mail operator (Microsoft 365,
|
|
# Google Workspace, Proofpoint, ...), not by recipient domain. So we cap how many
|
|
# new providers we queue per operator per day, and let volume spread across the
|
|
# long tail of operators freely. This lets total daily volume be much higher than
|
|
# a flat cap without hammering any single receiving system. Caps ramp with the
|
|
# warmup day. "default" applies to any operator not explicitly listed (mostly the
|
|
# long tail of small/independent mail hosts -- a generous cap is safe there
|
|
# because each sees only a handful).
|
|
def mx_daily_caps(day: int) -> dict:
|
|
# (microsoft, google, proofpoint, default-per-operator)
|
|
if day <= 1: big, default = 25, 15
|
|
elif day <= 4: big, default = 60, 40
|
|
elif day <= 9: big, default = 120, 80
|
|
else: big, default = 250, 150
|
|
return {
|
|
"microsoft": big,
|
|
"google": big,
|
|
"proofpoint": big,
|
|
"cisco": big,
|
|
"mimecast": big,
|
|
"barracuda": big,
|
|
"__default__": default,
|
|
}
|
|
|
|
|
|
def mx_throttled(candidates: list[dict], total_n: int, caps: dict) -> list[dict]:
|
|
"""Pick up to total_n candidates, capping per mx_provider so no single
|
|
receiving operator gets more than its daily share. Preserves input order
|
|
within each operator. Falls back to ungrouped slicing if rows have no
|
|
mx_provider."""
|
|
if not candidates or "mx_provider" not in candidates[0]:
|
|
return candidates[:total_n]
|
|
per_op: dict = {}
|
|
chosen: list[dict] = []
|
|
default_cap = caps.get("__default__", 50)
|
|
for r in candidates:
|
|
if len(chosen) >= total_n:
|
|
break
|
|
op = (r.get("mx_provider") or "").strip() or "__default__"
|
|
cap = caps.get(op, default_cap)
|
|
if per_op.get(op, 0) >= cap:
|
|
continue
|
|
per_op[op] = per_op.get(op, 0) + 1
|
|
chosen.append(r)
|
|
return chosen
|
|
|
|
|
|
def lm(path: str, data=None, method=None):
|
|
tok = _token()
|
|
headers = {"Content-Type": "application/json",
|
|
"Authorization": f"token {LISTMONK_USER}:{tok}"}
|
|
req = urllib.request.Request(f"{LISTMONK_URL}/api{path}", headers=headers)
|
|
if data is not None:
|
|
req.data = json.dumps(data).encode()
|
|
if not method:
|
|
method = "POST"
|
|
if method:
|
|
req.get_method = lambda: method
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=20) as r:
|
|
return json.loads(r.read().decode())
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode()[:300]
|
|
raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}")
|
|
|
|
|
|
def get_or_create_list(list_name: str, tags: list[str]) -> int:
|
|
res = lm("/lists?per_page=100")
|
|
for l in res.get("data", {}).get("results", []):
|
|
if l["name"] == list_name:
|
|
return l["id"]
|
|
res = lm("/lists", {"name": list_name, "type": "private", "optin": "single",
|
|
"tags": tags})
|
|
return res["data"]["id"]
|
|
|
|
|
|
def state_file(seg_key: str) -> str:
|
|
return os.path.join(STATE_DIR, f"hc_imported_{seg_key}.txt")
|
|
|
|
|
|
def load_imported(seg_key: str) -> set[str]:
|
|
p = state_file(seg_key)
|
|
if os.path.exists(p):
|
|
return {ln.strip().lower() for ln in open(p) if ln.strip()}
|
|
# Back-compat: the original single-segment cron tracked revalidation imports
|
|
# in hc_imported_emails.txt. Seed the revalidation state from it once.
|
|
legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt")
|
|
if seg_key == "revalidation_overdue" and os.path.exists(legacy):
|
|
return {ln.strip().lower() for ln in open(legacy) if ln.strip()}
|
|
return set()
|
|
|
|
|
|
def save_imported(seg_key: str, emails: set[str]):
|
|
os.makedirs(STATE_DIR, exist_ok=True)
|
|
with open(state_file(seg_key), "w") as f:
|
|
f.write("\n".join(sorted(emails)) + "\n")
|
|
|
|
|
|
def load_all_imported() -> set[str]:
|
|
"""Union of EVERY segment's imported-emails state, i.e. everyone who has
|
|
already been emailed by ANY segment. Used as a cross-segment AND cross-cron
|
|
guard so a provider gets exactly one healthcare email overall: the two crons
|
|
(pw-hc-campaign on the small warmup file, pw-hc-nppes on the 63k institutional
|
|
file) share these state files, and ~312 emails overlap both files, so without
|
|
this a provider warmed as 'revalidation_overdue' by one cron could also be
|
|
warmed as the free 'nppes_outdated' check by the other. Reads all
|
|
hc_imported_*.txt plus the legacy single-segment file."""
|
|
seen: set[str] = set()
|
|
for key in SEGMENTS:
|
|
seen |= load_imported(key)
|
|
legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt")
|
|
if os.path.exists(legacy):
|
|
seen |= {ln.strip().lower() for ln in open(legacy) if ln.strip()}
|
|
return seen
|
|
|
|
|
|
def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool:
|
|
try:
|
|
lm("/subscribers", {
|
|
"email": email, "name": name or email.split("@")[0],
|
|
"status": "enabled", "lists": [list_id],
|
|
"attribs": attribs, "preconfirm_subscriptions": True,
|
|
})
|
|
return True
|
|
except SystemExit as e:
|
|
# Already exists -> attach to the list AND refresh attribs so per-segment
|
|
# merge fields (e.g. days_until for the due-soon template) are correct
|
|
# even when the same provider already exists from another segment import.
|
|
if "409" in str(e) or "already exists" in str(e).lower():
|
|
try:
|
|
q = "subscribers.email = '" + email.replace("'", "''") + "'"
|
|
found = lm("/subscribers?" + urllib.parse.urlencode({"query": q, "per_page": 1}))
|
|
results = found.get("data", {}).get("results", [])
|
|
if results:
|
|
sid = results[0]["id"]
|
|
existing_attribs = results[0].get("attribs") or {}
|
|
# Merge: keep prior fields, overwrite with this segment's
|
|
# values (npi/practice/due-date/days_until/days_overdue).
|
|
merged = {**existing_attribs, **attribs}
|
|
lm(f"/subscribers/{sid}", {
|
|
"email": email, "name": name or email.split("@")[0],
|
|
"attribs": merged,
|
|
}, "PUT")
|
|
lm("/subscribers/lists", {"ids": [sid], "action": "add",
|
|
"target_list_ids": [list_id],
|
|
"status": "confirmed"}, "PUT")
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
|
|
def ensure_campaign(seg_key: str, list_id: int, send_list_name: str | None = None) -> int:
|
|
# One campaign per (segment, day): each day's campaign is bound to that day's
|
|
# dedicated SEND list, so it mails ONLY that day's newly-imported slice. We
|
|
# therefore reuse a campaign only when it's the SAME day's campaign (matched by
|
|
# the dated name) and still active; we never reuse a prior day's campaign,
|
|
# because that would re-target an old slice. A finished/cancelled same-day
|
|
# campaign can't restart, so a fresh one is created (also picking up the latest
|
|
# canonical hc_*.html template).
|
|
from datetime import date
|
|
seg = SEGMENTS[seg_key]
|
|
ACTIVE = {"draft", "running", "paused", "scheduled"}
|
|
dated = f"{seg['campaign_name']} - {date.today():%b %d %Y}"
|
|
res = lm("/campaigns?per_page=100")
|
|
for c in res.get("data", {}).get("results", []):
|
|
if c["name"] == dated and c.get("status") in ACTIVE:
|
|
return c["id"]
|
|
body = open(template_path(seg_key)).read()
|
|
payload = {
|
|
"name": dated, "subject": seg["subject"], "lists": [list_id],
|
|
"from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext",
|
|
"body": body, "altbody": html_to_text(body), "messenger": "email",
|
|
"tags": ["healthcare", "warmup", seg_key],
|
|
"headers": [{"Reply-To": REPLY_TO},
|
|
{"List-Unsubscribe": "<{{ UnsubscribeURL }}>"},
|
|
{"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}],
|
|
}
|
|
res = lm("/campaigns", payload)
|
|
return res["data"]["id"]
|
|
|
|
|
|
def row_matches(seg_key: str, r: dict) -> bool:
|
|
"""Does this master-CSV row belong to a segment? Driven by the segment's
|
|
`selector` so the row->segment mapping lives next to the segment metadata."""
|
|
sel = SEGMENTS[seg_key]["selector"]
|
|
status = (r.get("reval_status") or "").strip().lower()
|
|
excluded = (r.get("leie_excluded") or "").strip() not in ("", "0", "false")
|
|
optout = (r.get("optout_ending") or "").strip() != ""
|
|
if sel == "reval_overdue":
|
|
# Only SLIGHTLY-overdue: recently-lapsed practices are still active with
|
|
# deliverable inboxes. Heavily-overdue ones likely bounce and burn the
|
|
# warming IP's reputation, so we hold them out of the warmup window.
|
|
if status != "overdue":
|
|
return False
|
|
od = _overdue_days(r)
|
|
return od is not None and WARMUP_OVERDUE_MIN <= od <= WARMUP_OVERDUE_MAX
|
|
if sel == "reval_due_soon":
|
|
# Proactive: revalidation is UPCOMING within the lookahead window. Pitch
|
|
# is "handle it before your deadline" -- taps the same CMS Revalidation
|
|
# Due Date List as reval_overdue but the (much larger) not-yet-due slice,
|
|
# so it grows warmup supply without touching a new data source.
|
|
# days_overdue is negative for upcoming (days until due), so a provider
|
|
# due in N days has days_overdue == -N.
|
|
if status != "upcoming":
|
|
return False
|
|
od = _overdue_days(r)
|
|
if od is None:
|
|
return False
|
|
days_until = -od
|
|
return WARMUP_DUE_SOON_MIN <= days_until <= WARMUP_DUE_SOON_MAX
|
|
if sel == "reval_upcoming": return status == "upcoming"
|
|
if sel == "leie_or_deactivated":
|
|
# Reactivation targets: flagged OIG/SAM excluded, OR the NPI is genuinely
|
|
# DEACTIVATED in NPPES (real signal from enrich_nppes_last_updated.py).
|
|
# We deliberately do NOT treat "not on the CMS revalidation list" as
|
|
# deactivated here: for the institutional org-NPI pool that's false (an
|
|
# org NPI simply may not be an individual Medicare enrollee), which would
|
|
# mis-tell a fully-active practice they're deactivated. Only real
|
|
# exclusion or a real NPPES deactivation qualifies.
|
|
deactivated = (r.get("nppes_deactivated") or "").strip().upper() == "Y"
|
|
return excluded or deactivated
|
|
if sel == "optout_ending": return optout
|
|
if sel == "institutional_default":
|
|
# Catch-all for the FREE NPI compliance-check email: any SMTP-verified
|
|
# institutional row that isn't a more-urgent case. With one-email-per-
|
|
# provider assignment (assign_segment), higher-priority segments
|
|
# (reactivation, revalidation) claim their providers first, so this only
|
|
# receives the remainder. Deactivated NPIs are excluded (they belong to
|
|
# reactivation and likely bounce). We keep the SMTP-verification gate so
|
|
# we only mail inboxes we already proved are live.
|
|
if (r.get("nppes_deactivated") or "").strip().upper() == "Y":
|
|
return False
|
|
return (str(r.get("verify_ok", "")).strip().upper()
|
|
in ("Y", "YES", "TRUE", "1", ""))
|
|
if sel == "any":
|
|
# OIG screening applies to any billing practice, but for warmup we still
|
|
# exclude the likely-undeliverable: providers heavily overdue (stale) or
|
|
# already dropped off the reval list. Recently-lapsed and upcoming stay.
|
|
if status in ("not_on_list", "no_reval_flag"):
|
|
return False
|
|
od = _overdue_days(r)
|
|
if status == "overdue" and (od is None or od > WARMUP_OVERDUE_MAX):
|
|
return False
|
|
return True
|
|
if sel == "institutional_verified":
|
|
# For the freshly SMTP-VERIFIED institutional list, "not on the CMS
|
|
# revalidation list" does NOT mean undeliverable -- it just means the org
|
|
# NPI is not an individual Medicare enrollee. We already proved the inbox
|
|
# is live (verify_ok), so trust that instead of using reval-list presence
|
|
# as a deliverability proxy. Still hold out the heavily-overdue (stale)
|
|
# individual enrollees that DO appear, to protect the warming IP.
|
|
if (str(r.get("verify_ok", "")).strip().upper() not in ("Y", "YES", "TRUE", "1", "")):
|
|
return False
|
|
od = _overdue_days(r)
|
|
if status == "overdue" and od is not None and od > WARMUP_OVERDUE_MAX:
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
|
|
def _seg_priority(seg_key: str) -> int:
|
|
"""Urgency rank for a segment (lower = more urgent). Defaults high so a
|
|
segment without an explicit priority never out-ranks a real one."""
|
|
return int(SEGMENTS[seg_key].get("priority", 1000))
|
|
|
|
|
|
def assign_segment(r: dict, active_segments: list[str]) -> str | None:
|
|
"""One-email-per-provider: return the SINGLE segment a provider belongs to --
|
|
the most-urgent (lowest priority number) active segment whose selector matches
|
|
their row -- or None if no active segment matches. This is what guarantees a
|
|
provider who is e.g. both revalidation-overdue AND eligible for the free check
|
|
receives only the overdue email (priority 20), never both. Ties broken by
|
|
priority then segment key for determinism."""
|
|
matches = [s for s in active_segments if row_matches(s, r)]
|
|
if not matches:
|
|
return None
|
|
return min(matches, key=lambda s: (_seg_priority(s), s))
|
|
|
|
|
|
def assign_all(rows: list[dict], active_segments: list[str]) -> dict[str, str]:
|
|
"""Map email -> assigned segment across the whole list, so each segment's
|
|
importer can claim only its assigned providers. Computed once per run.
|
|
|
|
An email can appear on MULTIPLE rows (a shared practice inbox covering
|
|
several NPIs, e.g. a credentialing address) and those rows can carry
|
|
DIFFERENT statuses (one NPI overdue, another not on the list). We must keep
|
|
the MOST-URGENT assignment across all of that email's rows -- otherwise a
|
|
later, less-urgent row would clobber an earlier urgent one and the provider
|
|
would get the free check instead of the overdue email. So we compare
|
|
priorities and keep the winner (lower number = more urgent)."""
|
|
out: dict[str, str] = {}
|
|
for r in rows:
|
|
email = (r.get("email") or "").strip().lower()
|
|
if not email:
|
|
continue
|
|
seg = assign_segment(r, active_segments)
|
|
if seg is None:
|
|
continue
|
|
prev = out.get(email)
|
|
if prev is None or _seg_priority(seg) < _seg_priority(prev):
|
|
out[email] = seg
|
|
return out
|
|
|
|
|
|
def attribs_for(r: dict) -> dict:
|
|
# days_overdue is positive when past due and negative when upcoming (days
|
|
# until the due date). Expose a clean positive "days_until" for the
|
|
# due-soon segment's template.
|
|
od_raw = (str(r.get("days_overdue", "")) or "").strip()
|
|
days_until = ""
|
|
try:
|
|
od = int(od_raw)
|
|
if od < 0:
|
|
days_until = str(-od)
|
|
except ValueError:
|
|
pass
|
|
return {
|
|
"npi": r.get("npi", ""),
|
|
"practice": r.get("name", ""),
|
|
"specialty": r.get("specialty", ""),
|
|
"state": r.get("state", ""),
|
|
# Separate fields so the email's "official CMS record" card can render
|
|
# the due date + overdue/until count cleanly (mirrors the CMS
|
|
# Revalidation Due Date List, verified by NPI via the weekly refresh).
|
|
"reval_due_date": r.get("reval_due_date", ""),
|
|
"days_overdue": str(r.get("days_overdue", "")),
|
|
"days_until": days_until,
|
|
# MX operator (for per-operator analysis + throttling audit).
|
|
"mx_provider": r.get("mx_provider", ""),
|
|
# Real NPPES freshness (from enrich_nppes_last_updated.py). Lets the
|
|
# "NPPES may be out of date" email cite the actual government date the
|
|
# provider can verify on the public registry, instead of an unbacked
|
|
# "FLAGGED OUT OF DATE" claim.
|
|
"nppes_last_updated": r.get("nppes_last_updated", ""),
|
|
"nppes_years_stale": r.get("nppes_years_stale", ""),
|
|
"nppes_enumeration": r.get("nppes_enumeration", ""),
|
|
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
|
|
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
|
|
}
|
|
|
|
|
|
def warm_segment(seg_key: str, rows: list[dict], slice_n: int,
|
|
dry_run: bool, start_campaign: bool,
|
|
assignment: dict[str, str] | None = None) -> int:
|
|
"""Import up to slice_n new subscribers for one segment and keep its
|
|
campaign active. Returns how many NEW subscribers were imported.
|
|
|
|
If `assignment` (email -> single assigned segment) is given, a provider is a
|
|
candidate only when THIS segment is their assigned one -- enforcing
|
|
one-email-per-provider by urgency priority. When it's None, fall back to the
|
|
legacy behavior (row_matches), so the warmup cron that runs a single segment
|
|
keeps working unchanged."""
|
|
seg = SEGMENTS[seg_key]
|
|
imported = load_imported(seg_key)
|
|
# Cross-segment + cross-cron guard: skip anyone already emailed by ANY
|
|
# segment so each provider gets exactly one healthcare email overall.
|
|
already_anywhere = load_all_imported()
|
|
suppressed = load_suppressed()
|
|
|
|
def _is_candidate(r: dict) -> bool:
|
|
email = r.get("email", "").strip().lower()
|
|
if not email or email in already_anywhere or email in suppressed:
|
|
return False
|
|
if _is_google_hosted(r):
|
|
return False
|
|
if assignment is not None:
|
|
# The email must be assigned to THIS segment AND this specific row
|
|
# must be the one that earns it. An email can span several rows (a
|
|
# shared practice inbox over multiple NPIs); only the row whose own
|
|
# status matches this segment's selector should represent it, so the
|
|
# template renders that row's real data (e.g. the overdue NPI's due
|
|
# date, never a sibling 'not_on_list' row's blank one). This also
|
|
# dedupes: at most one row per email passes.
|
|
return assignment.get(email) == seg_key and row_matches(seg_key, r)
|
|
return row_matches(seg_key, r)
|
|
|
|
# Dedupe by email: an email can legitimately appear on multiple matching
|
|
# rows (e.g. two overdue NPIs share one inbox). Keep the first so the email
|
|
# is imported once and counted once against the slice budget.
|
|
candidates = []
|
|
seen_emails: set[str] = set()
|
|
for r in rows:
|
|
if not _is_candidate(r):
|
|
continue
|
|
email = r["email"].strip().lower()
|
|
if email in seen_emails:
|
|
continue
|
|
seen_emails.add(email)
|
|
candidates.append(r)
|
|
# Spread the slice across MX operators so no single receiving system (e.g.
|
|
# Microsoft 365) gets the whole batch. Caps ramp with the warmup day.
|
|
todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day()))
|
|
print(f"[hc-cron] {seg_key}: candidates={len(candidates)} "
|
|
f"in_segment={len(imported)} emailed_anywhere={len(already_anywhere)} "
|
|
f"to_import={len(todo)}")
|
|
|
|
if dry_run:
|
|
for r in todo[:3]:
|
|
print(f" would import: {r['email']} {r.get('name','')[:28]} "
|
|
f"status={r.get('reval_status','')}")
|
|
return 0
|
|
if not todo:
|
|
return 0
|
|
|
|
list_id = get_or_create_list(seg["list_name"], ["healthcare", "warmup", seg_key])
|
|
# Per-day SEND list: the campaign targets ONLY today's newly-imported slice,
|
|
# never the whole accumulated segment list. Without this, each day's campaign
|
|
# re-mailed every prior subscriber (the cumulative list), so the cohort
|
|
# imported on day 1 received the identical email every subsequent day -- a
|
|
# 5-6x repeat that burned reputation and blocklisted ~12% of the list. The
|
|
# persistent `list_id` is still used for dedup/records; the dated send list is
|
|
# what the campaign actually sends to, so every provider gets exactly one send.
|
|
from datetime import date
|
|
send_list_name = f"{seg['list_name']} - SEND {date.today():%Y-%m-%d}"
|
|
send_list_id = get_or_create_list(send_list_name,
|
|
["healthcare", "warmup", seg_key, "daily-send"])
|
|
n_ok = 0
|
|
for r in todo:
|
|
email = r["email"].strip().lower()
|
|
# Add to BOTH the persistent segment list (dedup/records) and today's
|
|
# send list (the campaign's only audience).
|
|
ok = add_subscriber(list_id, email, r.get("name") or "", attribs_for(r))
|
|
add_subscriber(send_list_id, email, r.get("name") or "", attribs_for(r))
|
|
if ok:
|
|
imported.add(email); n_ok += 1
|
|
save_imported(seg_key, imported)
|
|
# Campaign targets the per-day send list, NOT the cumulative segment list.
|
|
cid = ensure_campaign(seg_key, send_list_id, send_list_name)
|
|
if start_campaign:
|
|
try:
|
|
lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT")
|
|
except SystemExit as e:
|
|
# Already running raises; that's fine.
|
|
if "already" not in str(e).lower():
|
|
print(f"[hc-cron] {seg_key}: start warning: {e}")
|
|
print(f"[hc-cron] {seg_key}: imported {n_ok} into list {list_id} "
|
|
f"(send list {send_list_id}); campaign={cid}")
|
|
return n_ok
|
|
|
|
|
|
def _all_list_subscribers(list_id: int):
|
|
"""Yield (id, email) for every subscriber on a list, paging the API."""
|
|
page, per = 1, 1000
|
|
while True:
|
|
q = urllib.parse.urlencode({"list_id": list_id, "page": page, "per_page": per})
|
|
res = lm("/subscribers?" + q)
|
|
results = res.get("data", {}).get("results", []) or []
|
|
for s in results:
|
|
yield s["id"], (s.get("email") or "").strip().lower()
|
|
if len(results) < per:
|
|
break
|
|
page += 1
|
|
|
|
|
|
def prune_holdouts(dry_run: bool) -> int:
|
|
"""Belt-and-suspenders: remove subscribers who should NOT be in the warmup
|
|
from the active warmup lists, even if they were imported before a guard
|
|
existed or their domain's MX has since flipped to Google. We match against
|
|
the FRESH MASTER CSV (re-classified weekly by hc_data_refresh), not the
|
|
listmonk attribs snapshot, so a domain that newly became Google-hosted is
|
|
caught here. Returns the number of (subscriber, list) removals."""
|
|
master_path = os.getenv("HC_MASTER_CSV", os.path.join(STATE_DIR, "hc_warmup_week1.csv"))
|
|
if not os.path.exists(master_path):
|
|
print(f"[hc-cron] prune: master {master_path} not found, skipping")
|
|
return 0
|
|
rows = list(csv.DictReader(open(master_path)))
|
|
by_email = {r.get("email", "").strip().lower(): r for r in rows if r.get("email")}
|
|
suppressed = load_suppressed()
|
|
removed = 0
|
|
for seg_key, seg in SEGMENTS.items():
|
|
try:
|
|
res = lm("/lists?per_page=100")
|
|
list_id = next((l["id"] for l in res.get("data", {}).get("results", [])
|
|
if l["name"] == seg["list_name"]), None)
|
|
except SystemExit:
|
|
list_id = None
|
|
if not list_id:
|
|
continue
|
|
drop_ids = []
|
|
for sid, email in _all_list_subscribers(list_id):
|
|
# Always remove anyone on the permanent do-not-contact list (they
|
|
# told us to stop / already revalidated), regardless of source data.
|
|
if email in suppressed:
|
|
drop_ids.append(sid)
|
|
continue
|
|
r = by_email.get(email)
|
|
if r is None:
|
|
continue # not in our source data; leave it alone
|
|
# DELIVERABILITY-only prune: remove subscribers whose domain is now
|
|
# Google-hosted (would hard-bounce from the cold IP). We deliberately
|
|
# do NOT evict for audience reasons (e.g. an overdue provider drifting
|
|
# out of the 1-90 day window) -- they were a valid target when warmed
|
|
# and re-evaluating audience on already-engaged people just wastes
|
|
# warmup progress. The import-time guard handles audience for NEW adds.
|
|
if _is_google_hosted(r):
|
|
drop_ids.append(sid)
|
|
if drop_ids:
|
|
print(f"[hc-cron] prune {seg_key} (list {list_id}): "
|
|
f"{len(drop_ids)} holdouts to remove")
|
|
if not dry_run:
|
|
# Bulk unsubscribe + detach from this list (chunked).
|
|
for i in range(0, len(drop_ids), 500):
|
|
chunk = drop_ids[i:i + 500]
|
|
lm("/subscribers/lists", {"ids": chunk, "action": "remove",
|
|
"target_list_ids": [list_id]}, "PUT")
|
|
removed += len(drop_ids)
|
|
print(f"[hc-cron] prune: removed {removed} subscriber-list holdouts")
|
|
return removed
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--dry-run", action="store_true")
|
|
ap.add_argument("--slice", type=int, default=0,
|
|
help="override TOTAL daily import slice across segments (0=ramp default)")
|
|
ap.add_argument("--segments", default=",".join(ACTIVE_SEGMENTS),
|
|
help="comma list of segment keys to warm")
|
|
ap.add_argument("--start-campaign", action="store_true",
|
|
help="flip campaigns to 'running' (otherwise left as draft for approval)")
|
|
ap.add_argument("--prune", action="store_true",
|
|
help="also remove now-Google-hosted / out-of-audience subscribers "
|
|
"from the warmup lists (run after the weekly refresh)")
|
|
ap.add_argument("--prune-only", action="store_true",
|
|
help="run ONLY the deliverability prune, then exit (no import/warm)")
|
|
args = ap.parse_args()
|
|
|
|
day = warmup_day()
|
|
total_slice = args.slice or daily_slice(day)
|
|
segments = [s.strip() for s in args.segments.split(",") if s.strip() in SEGMENTS]
|
|
print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} "
|
|
f"total_slice={total_slice} segments={segments}")
|
|
|
|
rows = list(csv.DictReader(open(VERIFIED_CSV)))
|
|
print(f"[hc-cron] verified_total={len(rows)}")
|
|
|
|
if args.prune or args.prune_only:
|
|
prune_holdouts(args.dry_run)
|
|
if args.prune_only:
|
|
return
|
|
|
|
# Split the daily slice across segments by URGENCY PRIORITY. We process
|
|
# segments most-urgent first (lowest priority number) and give each the
|
|
# remaining budget, so urgent segments (reactivation, revalidation-overdue)
|
|
# are fully served before the broad free-NPI-check catch-all consumes the
|
|
# rest. Because assign_all already routed each provider to a single segment,
|
|
# the segment pools are disjoint -- no provider is double-counted or
|
|
# double-mailed. Unused budget from a small urgent pool flows to the next
|
|
# segment automatically (we only decrement by what was actually imported).
|
|
assignment = assign_all(rows, segments)
|
|
if assignment:
|
|
from collections import Counter
|
|
dist = Counter(assignment.values())
|
|
print(f"[hc-cron] assignment (one email/provider by priority): "
|
|
+ ", ".join(f"{k}={dist[k]}" for k in
|
|
sorted(dist, key=lambda s: (_seg_priority(s), s))))
|
|
|
|
order = sorted(segments, key=lambda s: (_seg_priority(s), s))
|
|
grand = 0
|
|
budget = total_slice
|
|
for seg_key in order:
|
|
cap = budget if not args.dry_run else total_slice
|
|
n = warm_segment(seg_key, rows, cap, args.dry_run, args.start_campaign,
|
|
assignment)
|
|
grand += n
|
|
budget -= n
|
|
if budget <= 0 and not args.dry_run:
|
|
break
|
|
print(f"[hc-cron] done: imported {grand} new subscribers across {len(segments)} segment(s)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|