From 0320dc17baa91baf85bd3c3a83c32bddb1d5d582 Mon Sep 17 00:00:00 2001 From: justin Date: Sat, 20 Jun 2026 16:01:23 -0500 Subject: [PATCH] healthcare: one-email-per-provider by urgency priority + free check as default Make the free NPI compliance check the catch-all for ALL verified institutional providers, but route anyone with a more important/time-sensitive issue to THAT email instead -- each provider gets exactly one email, their most urgent. - SEGMENTS gain a 'priority' (lower=more urgent): reactivation 10, revalidation overdue 20, due-soon 30, bundle 45, free-NPI-check 100 (catch-all). - assign_segment()/assign_all(): route each provider to the single highest-priority active segment whose selector matches; warm_segment() takes the assignment map and only claims its assigned providers (disjoint pools, no double-mailing). main() now splits the daily slice by priority order, serving urgent segments fully before the broad free-check consumes the remainder. - nppes_outdated selector -> 'institutional_default' (every verified, non- deactivated row), since the free check's value no longer depends on staleness; list/campaign renamed 'HC Warmup - Free NPI Check'. - FIX latent bug: reactivation selector treated 'not on CMS reval list' as deactivated -- false for org NPIs (would mis-tell active practices they're deactivated). Now uses the REAL nppes_deactivated flag (or OIG/SAM exclusion). - Drop blanket oig_screening from the default rotation: it matched every row and would starve the catch-all, and the free check already screens OIG/SAM and routes to the paid fix on a hit. Still runnable via --segments. - Add scripts/test_segment_assignment.py (10 cases incl. 'overdue AND stale -> overdue wins'); all pass. --- scripts/build_healthcare_campaigns.py | 20 ++- scripts/build_healthcare_campaigns_cron.py | 177 +++++++++++++-------- scripts/test_segment_assignment.py | 71 +++++++++ 3 files changed, 195 insertions(+), 73 deletions(-) create mode 100644 scripts/test_segment_assignment.py diff --git a/scripts/build_healthcare_campaigns.py b/scripts/build_healthcare_campaigns.py index c237c18..530954b 100644 --- a/scripts/build_healthcare_campaigns.py +++ b/scripts/build_healthcare_campaigns.py @@ -51,6 +51,14 @@ OUT_DIR = os.path.join(os.path.dirname(__file__), "..", "data", "hc_campaigns") # list_name listmonk-hc list this segment is warmed into # campaign_name listmonk-hc campaign name prefix (dated per build) # selector which warmup-CSV rows belong to this segment (see cron) +# priority URGENCY rank for one-email-per-provider assignment (LOWER = +# more urgent, wins). A provider is warmed into exactly ONE +# segment: the highest-priority (lowest number) active segment +# whose selector matches their row. So a provider who is BOTH +# revalidation-overdue and NPPES-stale gets the overdue email +# (more important + time-sensitive), not the generic free check. +# The free NPI check is the catch-all default at the highest +# number, so everyone with no more-urgent issue still gets it. SEGMENTS = { "revalidation_overdue": { "subject": "Your Medicare revalidation is past due - let's get it filed", @@ -60,6 +68,7 @@ SEGMENTS = { "list_name": "HC Warmup - Revalidation Overdue", "campaign_name": "HC Warmup - Medicare Revalidation", "selector": "reval_overdue", + "priority": 20, }, "revalidation_due_soon": { "subject": "Let's make sure your Medicare revalidation is handled in time", @@ -69,6 +78,7 @@ SEGMENTS = { "list_name": "HC Warmup - Revalidation Due Soon", "campaign_name": "HC Warmup - Revalidation Due Soon", "selector": "reval_due_soon", + "priority": 30, }, "npi_reactivation": { "subject": "Your NPI / Medicare enrollment appears deactivated", @@ -78,15 +88,17 @@ SEGMENTS = { "list_name": "HC Warmup - Reactivation", "campaign_name": "HC Warmup - NPI Reactivation", "selector": "leie_or_deactivated", + "priority": 10, }, "nppes_outdated": { "subject": "A free compliance check for your NPI", "template": "hc_nppes_outdated.html", "cta_path": "/tools/npi-compliance-check", "price": "$349", - "list_name": "HC Warmup - NPPES Update", - "campaign_name": "HC Warmup - NPPES Outdated", - "selector": "nppes_stale", + "list_name": "HC Warmup - Free NPI Check", + "campaign_name": "HC Warmup - Free NPI Check", + "selector": "institutional_default", + "priority": 100, }, "oig_screening": { "subject": "Are you screening for OIG / SAM exclusions?", @@ -96,6 +108,7 @@ SEGMENTS = { "list_name": "HC Warmup - OIG Screening", "campaign_name": "HC Warmup - OIG Screening", "selector": "institutional_verified", + "priority": 40, }, "compliance_bundle": { "subject": "Get your provider compliance handled for the year", @@ -105,6 +118,7 @@ SEGMENTS = { "list_name": "HC Warmup - Compliance Bundle", "campaign_name": "HC Warmup - Compliance Bundle", "selector": "optout_ending", + "priority": 45, }, } diff --git a/scripts/build_healthcare_campaigns_cron.py b/scripts/build_healthcare_campaigns_cron.py index 4421d2e..2f2287a 100644 --- a/scripts/build_healthcare_campaigns_cron.py +++ b/scripts/build_healthcare_campaigns_cron.py @@ -80,9 +80,17 @@ def load_suppressed() -> set[str]: # the most verified data + the official-record card). The others warm in # parallel on smaller slices so we collect engagement data across all programs # without overwhelming the warming IPs. +# Which segments to warm, in priority order. With one-email-per-provider +# assignment (assign_all), each provider is routed to exactly ONE of these by +# urgency: reactivation (10) > revalidation overdue (20) > due soon (30) > the +# free NPI compliance check (100, catch-all). The free check ITSELF includes OIG/ +# SAM exclusion screening and routes to the paid OIG fix on a hit, so the +# standalone blanket `oig_screening` email (which matched every verified row and +# would otherwise starve the catch-all) is intentionally NOT in the default +# rotation -- it can still be run explicitly via --segments for a dedicated push. ACTIVE_SEGMENTS = os.getenv( "HC_SEGMENTS", - "revalidation_overdue,revalidation_due_soon,oig_screening,nppes_outdated,npi_reactivation,compliance_bundle", + "npi_reactivation,revalidation_overdue,revalidation_due_soon,nppes_outdated,compliance_bundle", ).split(",") # Warmup deliverability guard: only mail SLIGHTLY-overdue providers. A practice @@ -98,20 +106,6 @@ WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90")) WARMUP_DUE_SOON_MIN = int(os.getenv("HC_DUE_SOON_MIN", "1")) WARMUP_DUE_SOON_MAX = int(os.getenv("HC_DUE_SOON_MAX", "90")) -# NPPES "out of date" segment: only mail records whose REAL NPPES last_updated -# date is within an [MIN, MAX] whole-years-stale window. MIN keeps the "out of -# date" claim credible (a record updated <3yrs ago isn't convincingly stale); -# MAX caps deliverability/defunct risk (a record untouched for many years is a -# stronger signal the practice closed/moved -- and a bounce burns the warming -# IP). This is what makes the claim LITERALLY TRUE and verifiable -- the provider -# can confirm the exact same last_updated date on the public registry. The date -# is joined in by enrich_nppes_last_updated.py (column nppes_years_stale). Until -# that enrichment has run, the field is empty and this segment safely mails -# nobody (we never assert "out of date" without the government date to back it). -# Observed institutional distribution: tightly clustered 3-7yrs, ~0 beyond 8yrs. -NPPES_STALE_MIN_YEARS = int(os.getenv("HC_NPPES_STALE_MIN_YEARS", "3")) -NPPES_STALE_MAX_YEARS = int(os.getenv("HC_NPPES_STALE_MAX_YEARS", "10")) - def _overdue_days(r: dict): v = (r.get("days_overdue") or "").strip() @@ -347,31 +341,28 @@ def row_matches(seg_key: str, r: dict) -> bool: return WARMUP_DUE_SOON_MIN <= days_until <= WARMUP_DUE_SOON_MAX if sel == "reval_upcoming": return status == "upcoming" if sel == "leie_or_deactivated": - # Reactivation targets: flagged excluded, OR no longer on the reval list - # (a strong deactivation proxy once revalidation lapses). - return excluded or status in ("not_on_list", "no_reval_flag") + # Reactivation targets: flagged OIG/SAM excluded, OR the NPI is genuinely + # DEACTIVATED in NPPES (real signal from enrich_nppes_last_updated.py). + # We deliberately do NOT treat "not on the CMS revalidation list" as + # deactivated here: for the institutional org-NPI pool that's false (an + # org NPI simply may not be an individual Medicare enrollee), which would + # mis-tell a fully-active practice they're deactivated. Only real + # exclusion or a real NPPES deactivation qualifies. + deactivated = (r.get("nppes_deactivated") or "").strip().upper() == "Y" + return excluded or deactivated if sel == "optout_ending": return optout - if sel == "nppes_stale": - # NPPES "out of date" segment. Only mail records whose REAL NPPES - # last_updated date (joined by enrich_nppes_last_updated.py) falls in the - # [MIN, MAX] years-stale window, so the "may be out of date" claim is - # literally true AND deliverable (very-stale records likely belong to - # closed/moved practices that bounce). The provider can verify the same - # date on the public registry. Deactivated NPIs belong to - # npi_reactivation, not here, so they're excluded. We also keep the - # institutional list's SMTP-verification gate (verify_ok) so we only mail - # inboxes we already proved are live. Empty stale field (enrichment not - # yet run) -> no match, so we never assert staleness without the date. + if sel == "institutional_default": + # Catch-all for the FREE NPI compliance-check email: any SMTP-verified + # institutional row that isn't a more-urgent case. With one-email-per- + # provider assignment (assign_segment), higher-priority segments + # (reactivation, revalidation) claim their providers first, so this only + # receives the remainder. Deactivated NPIs are excluded (they belong to + # reactivation and likely bounce). We keep the SMTP-verification gate so + # we only mail inboxes we already proved are live. if (r.get("nppes_deactivated") or "").strip().upper() == "Y": return False - if (str(r.get("verify_ok", "")).strip().upper() - not in ("Y", "YES", "TRUE", "1", "")): - return False - ys = (r.get("nppes_years_stale") or "").strip() - try: - return NPPES_STALE_MIN_YEARS <= int(ys) <= NPPES_STALE_MAX_YEARS - except ValueError: - return False + return (str(r.get("verify_ok", "")).strip().upper() + in ("Y", "YES", "TRUE", "1", "")) if sel == "any": # OIG screening applies to any billing practice, but for warmup we still # exclude the likely-undeliverable: providers heavily overdue (stale) or @@ -398,6 +389,39 @@ def row_matches(seg_key: str, r: dict) -> bool: return False +def _seg_priority(seg_key: str) -> int: + """Urgency rank for a segment (lower = more urgent). Defaults high so a + segment without an explicit priority never out-ranks a real one.""" + return int(SEGMENTS[seg_key].get("priority", 1000)) + + +def assign_segment(r: dict, active_segments: list[str]) -> str | None: + """One-email-per-provider: return the SINGLE segment a provider belongs to -- + the most-urgent (lowest priority number) active segment whose selector matches + their row -- or None if no active segment matches. This is what guarantees a + provider who is e.g. both revalidation-overdue AND eligible for the free check + receives only the overdue email (priority 20), never both. Ties broken by + priority then segment key for determinism.""" + matches = [s for s in active_segments if row_matches(s, r)] + if not matches: + return None + return min(matches, key=lambda s: (_seg_priority(s), s)) + + +def assign_all(rows: list[dict], active_segments: list[str]) -> dict[str, str]: + """Map email -> assigned segment across the whole list, so each segment's + importer can claim only its assigned providers. Computed once per run.""" + out: dict[str, str] = {} + for r in rows: + email = (r.get("email") or "").strip().lower() + if not email: + continue + seg = assign_segment(r, active_segments) + if seg is not None: + out[email] = seg + return out + + def attribs_for(r: dict) -> dict: # days_overdue is positive when past due and negative when upcoming (days # until the due date). Expose a clean positive "days_until" for the @@ -436,18 +460,31 @@ def attribs_for(r: dict) -> dict: def warm_segment(seg_key: str, rows: list[dict], slice_n: int, - dry_run: bool, start_campaign: bool) -> int: + dry_run: bool, start_campaign: bool, + assignment: dict[str, str] | None = None) -> int: """Import up to slice_n new subscribers for one segment and keep its - campaign active. Returns how many NEW subscribers were imported.""" + campaign active. Returns how many NEW subscribers were imported. + + If `assignment` (email -> single assigned segment) is given, a provider is a + candidate only when THIS segment is their assigned one -- enforcing + one-email-per-provider by urgency priority. When it's None, fall back to the + legacy behavior (row_matches), so the warmup cron that runs a single segment + keeps working unchanged.""" seg = SEGMENTS[seg_key] imported = load_imported(seg_key) suppressed = load_suppressed() - candidates = [r for r in rows - if r.get("email", "").strip() - and r["email"].strip().lower() not in imported - and r["email"].strip().lower() not in suppressed - and not _is_google_hosted(r) - and row_matches(seg_key, r)] + + def _is_candidate(r: dict) -> bool: + email = r.get("email", "").strip().lower() + if not email or email in imported or email in suppressed: + return False + if _is_google_hosted(r): + return False + if assignment is not None: + return assignment.get(email) == seg_key + return row_matches(seg_key, r) + + candidates = [r for r in rows if _is_candidate(r)] # Spread the slice across MX operators so no single receiving system (e.g. # Microsoft 365) gets the whole batch. Caps ramp with the warmup day. todo = mx_throttled(candidates, slice_n, mx_daily_caps(warmup_day())) @@ -581,33 +618,33 @@ def main(): if args.prune_only: return - # Split the daily slice across segments. Revalidation (the lead, richest - # data) gets ~half; the rest share the remainder evenly. The lead reclaims - # any rounding remainder so the total never exceeds the warming-rate budget. - lead = "revalidation_overdue" - others = [s for s in segments if s != lead] - per_seg = {} - if lead in segments: - per_seg[lead] = max(1, int(total_slice * 0.5)) - rem = total_slice - per_seg[lead] - else: - rem = total_slice - if others and rem > 0: - base, extra = divmod(rem, len(others)) - for i, s in enumerate(others): - per_seg[s] = base + (1 if i < extra else 0) - elif others: - for s in others: - per_seg[s] = 0 - # Reclaim any rounding remainder onto the lead so sum(per_seg) == total_slice - # exactly (never overshoot the rate cap, never silently drop budget). - if lead in per_seg: - per_seg[lead] += total_slice - sum(per_seg.values()) + # Split the daily slice across segments by URGENCY PRIORITY. We process + # segments most-urgent first (lowest priority number) and give each the + # remaining budget, so urgent segments (reactivation, revalidation-overdue) + # are fully served before the broad free-NPI-check catch-all consumes the + # rest. Because assign_all already routed each provider to a single segment, + # the segment pools are disjoint -- no provider is double-counted or + # double-mailed. Unused budget from a small urgent pool flows to the next + # segment automatically (we only decrement by what was actually imported). + assignment = assign_all(rows, segments) + if assignment: + from collections import Counter + dist = Counter(assignment.values()) + print(f"[hc-cron] assignment (one email/provider by priority): " + + ", ".join(f"{k}={dist[k]}" for k in + sorted(dist, key=lambda s: (_seg_priority(s), s)))) + order = sorted(segments, key=lambda s: (_seg_priority(s), s)) grand = 0 - for seg_key in segments: - grand += warm_segment(seg_key, rows, per_seg.get(seg_key, 0), - args.dry_run, args.start_campaign) + budget = total_slice + for seg_key in order: + cap = budget if not args.dry_run else total_slice + n = warm_segment(seg_key, rows, cap, args.dry_run, args.start_campaign, + assignment) + grand += n + budget -= n + if budget <= 0 and not args.dry_run: + break print(f"[hc-cron] done: imported {grand} new subscribers across {len(segments)} segment(s)") diff --git a/scripts/test_segment_assignment.py b/scripts/test_segment_assignment.py new file mode 100644 index 0000000..5e3c863 --- /dev/null +++ b/scripts/test_segment_assignment.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +"""Regression test for one-email-per-provider segment assignment. + +Verifies assign_segment() routes each provider to their single MOST-URGENT +matching segment (lowest priority number), so a provider who qualifies for +several segments gets exactly one email -- the most important one -- and everyone +else falls through to the free NPI compliance check (the catch-all default). + +Run: python3 scripts/test_segment_assignment.py (exit 0 = pass) +""" +import sys, os +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +import build_healthcare_campaigns_cron as cron # noqa: E402 + +ACTIVE = ["npi_reactivation", "revalidation_overdue", "revalidation_due_soon", + "nppes_outdated", "compliance_bundle"] + + +def row(**kw): + base = {"email": "x@p.com", "verify_ok": "Y", "reval_status": "", + "days_overdue": "", "leie_excluded": "", "optout_ending": "", + "nppes_deactivated": "", "nppes_years_stale": ""} + base.update(kw) + return base + + +CASES = [ + ("plain verified institutional -> free check", + row(reval_status="no_reval_flag"), "nppes_outdated"), + ("overdue 30d -> revalidation_overdue beats free check", + row(reval_status="overdue", days_overdue="30"), "revalidation_overdue"), + ("due soon 30d -> revalidation_due_soon", + row(reval_status="upcoming", days_overdue="-30"), "revalidation_due_soon"), + ("OIG excluded -> reactivation (most urgent), beats overdue", + row(leie_excluded="Y", reval_status="overdue", days_overdue="20"), + "npi_reactivation"), + ("NPPES deactivated -> reactivation", + row(nppes_deactivated="Y", reval_status="no_reval_flag"), "npi_reactivation"), + ("overdue AND stale -> overdue wins, NOT free check", + row(reval_status="overdue", days_overdue="30", nppes_years_stale="9"), + "revalidation_overdue"), + ("deactivated -> NOT free check even though verified", + row(nppes_deactivated="Y"), "npi_reactivation"), + ("optout ending + nothing urgent -> bundle beats free check", + row(optout_ending="2025-12-01", reval_status="no_reval_flag"), + "compliance_bundle"), + ("not verified + nothing -> no assignment", + row(verify_ok="N", reval_status="no_reval_flag"), None), + ("heavily overdue 400d (out of window) -> free check", + row(reval_status="overdue", days_overdue="400"), "nppes_outdated"), +] + + +def main() -> int: + failures = 0 + for desc, r, exp in CASES: + got = cron.assign_segment(r, ACTIVE) + ok = got == exp + if not ok: + failures += 1 + print(f" [{'ok' if ok else 'FAIL'}] {desc:58} -> {got} (exp {exp})") + # Sanity: every provider gets AT MOST one segment (assign_all is a dict). + rows = [r for _, r, _ in CASES] + a = cron.assign_all(rows, ACTIVE) + assert all(isinstance(v, str) for v in a.values()), "assignment must be single-valued" + print(f"\n{'PASS' if not failures else f'{failures} FAILED'}") + return 1 if failures else 0 + + +if __name__ == "__main__": + sys.exit(main())