From 4f455475c0028705c531cf2f522c67449b2a506d Mon Sep 17 00:00:00 2001 From: justin Date: Mon, 8 Jun 2026 03:06:29 -0500 Subject: [PATCH] hc: weekly data-refresh pipeline + multi-segment warmup cron Two gaps closed: 1. hc_data_refresh.py (NEW): weekly source-data refresh. Re-checks every emailable NPI against the LIVE government sources so sends never go stale: - CMS Revalidation Due Date List (data.cms.gov per-NPI API; handles both ISO and US date formats, normalizes to MM/DD/YYYY). - OIG LEIE full CSV download (the NPI-bearing exclusion source). - SAM.gov v4 exclusions (key in .secrets/sam-api-key) -- OFF by default since SAM exclusions rarely carry an NPI and the full set is ~167k records; it's opt-in via --sam-pages. SAM's real value is the live per-name screening service, not a bulk NPI join. Writes the master CSV atomically (temp+rename). A provider who has since revalidated flips overdue->upcoming/not_on_list, so we stop nagging them. 2. build_healthcare_campaigns_cron.py: was revalidation-only (one hardcoded list/campaign/CSV/template). Now multi-segment: imports SEGMENTS from the single-source-of-truth registry, warms ALL five programs in parallel, each with its own list, dated campaign, and per-segment import-state file (so dedup is per-segment). A per segment maps master-CSV rows to the right program (reval_overdue / reval_upcoming / leie_or_deactivated / optout_ending / any). Daily ramp slice is split across segments (revalidation leads at 50%, rest share the remainder) so every program collects engagement data while the IPs warm. Back-compat: seeds revalidation import-state from the legacy hc_imported_emails.txt once. --- scripts/build_healthcare_campaigns_cron.py | 225 +++++++++++------ scripts/hc_data_refresh.py | 278 +++++++++++++++++++++ 2 files changed, 433 insertions(+), 70 deletions(-) create mode 100644 scripts/hc_data_refresh.py diff --git a/scripts/build_healthcare_campaigns_cron.py b/scripts/build_healthcare_campaigns_cron.py index cce80cd..490dc0f 100644 --- a/scripts/build_healthcare_campaigns_cron.py +++ b/scripts/build_healthcare_campaigns_cron.py @@ -47,16 +47,26 @@ def _token() -> str: raise SystemExit("HC_LISTMONK_TOKEN not set and no readable token file found") VERIFIED_CSV = os.getenv("HC_VERIFIED_CSV", "/opt/performancewest/data/hc_warmup_week1_verified.csv") -EMAIL_HTML = os.getenv("HC_EMAIL_HTML", "/opt/performancewest/data/hc_campaigns/hc_revalidation_overdue.html") -STATE_FILE = os.getenv("HC_IMPORT_STATE", "/opt/performancewest/data/hc_imported_emails.txt") +STATE_DIR = os.getenv("HC_STATE_DIR", "/opt/performancewest/data") WARMUP_STAMP = "/etc/postfix/hc-warmup-start" -LIST_NAME = "HC Warmup - Revalidation Overdue" -CAMPAIGN_NAME = "HC Warmup - Medicare Revalidation" FROM_EMAIL = "Performance West Compliance " -SUBJECT = "Action needed: your Medicare revalidation is overdue" REPLY_TO = "info@performancewest.net" +# Segment registry (subject, template file, list/campaign names, row selector) +# is the single source of truth shared with build_healthcare_campaigns.py. +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from build_healthcare_campaigns import SEGMENTS, template_path # noqa: E402 + +# Which segments to warm, in priority order. Revalidation stays the lead (it has +# the most verified data + the official-record card). The others warm in +# parallel on smaller slices so we collect engagement data across all programs +# without overwhelming the warming IPs. +ACTIVE_SEGMENTS = os.getenv( + "HC_SEGMENTS", + "revalidation_overdue,oig_screening,nppes_outdated,npi_reactivation,compliance_bundle", +).split(",") + def warmup_day() -> int: try: @@ -67,9 +77,9 @@ def warmup_day() -> int: def daily_slice(day: int) -> int: - """How many NEW subscribers to import today, aligned with the hc ramp. - The rampcap caps hourly *delivery*; this caps daily *queueing* so we never - flood the warming IPs. Mon-Fri only (cron enforces the weekday).""" + """TOTAL new subscribers to import across ALL segments today, aligned with + the hc ramp. The rampcap caps hourly *delivery*; this caps daily *queueing* + so we never flood the warming IPs. Mon-Fri only (cron enforces weekday).""" if day <= 1: return 100 if day <= 4: return 300 if day <= 9: return 600 @@ -95,25 +105,35 @@ def lm(path: str, data=None, method=None): raise SystemExit(f"listmonk-hc API {method or 'GET'} {path} -> {e.code}: {body}") -def get_or_create_list() -> int: +def get_or_create_list(list_name: str, tags: list[str]) -> int: res = lm("/lists?per_page=100") for l in res.get("data", {}).get("results", []): - if l["name"] == LIST_NAME: + if l["name"] == list_name: return l["id"] - res = lm("/lists", {"name": LIST_NAME, "type": "private", "optin": "single", - "tags": ["healthcare", "warmup", "revalidation"]}) + res = lm("/lists", {"name": list_name, "type": "private", "optin": "single", + "tags": tags}) return res["data"]["id"] -def load_imported() -> set[str]: - if os.path.exists(STATE_FILE): - return {ln.strip().lower() for ln in open(STATE_FILE) if ln.strip()} +def state_file(seg_key: str) -> str: + return os.path.join(STATE_DIR, f"hc_imported_{seg_key}.txt") + + +def load_imported(seg_key: str) -> set[str]: + p = state_file(seg_key) + if os.path.exists(p): + return {ln.strip().lower() for ln in open(p) if ln.strip()} + # Back-compat: the original single-segment cron tracked revalidation imports + # in hc_imported_emails.txt. Seed the revalidation state from it once. + legacy = os.path.join(STATE_DIR, "hc_imported_emails.txt") + if seg_key == "revalidation_overdue" and os.path.exists(legacy): + return {ln.strip().lower() for ln in open(legacy) if ln.strip()} return set() -def save_imported(emails: set[str]): - os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) - with open(STATE_FILE, "w") as f: +def save_imported(seg_key: str, emails: set[str]): + os.makedirs(STATE_DIR, exist_ok=True) + with open(state_file(seg_key), "w") as f: f.write("\n".join(sorted(emails)) + "\n") @@ -143,24 +163,25 @@ def add_subscriber(list_id: int, email: str, name: str, attribs: dict) -> bool: return False -def ensure_campaign(list_id: int) -> int: - # Reuse an existing HC warmup campaign only if it's still ACTIVE (draft / - # running / paused / scheduled). A finished/cancelled campaign can't accept - # new subscribers or be restarted, so we create a fresh dated one — that also - # picks up the latest email template (e.g. copy/colour tweaks). +def ensure_campaign(seg_key: str, list_id: int) -> int: + # Reuse an existing warmup campaign for this segment only if it's still + # ACTIVE (draft / running / paused / scheduled). A finished/cancelled one + # can't accept new subscribers or restart, so we create a fresh dated one -- + # that also picks up the latest email template (the canonical hc_*.html). from datetime import date + seg = SEGMENTS[seg_key] ACTIVE = {"draft", "running", "paused", "scheduled"} res = lm("/campaigns?per_page=100") for c in res.get("data", {}).get("results", []): - if c["name"].startswith(CAMPAIGN_NAME) and c.get("status") in ACTIVE: + if c["name"].startswith(seg["campaign_name"]) and c.get("status") in ACTIVE: return c["id"] - body = open(EMAIL_HTML).read() - dated = f"{CAMPAIGN_NAME} - {date.today():%b %d %Y}" + body = open(template_path(seg_key)).read() + dated = f"{seg['campaign_name']} - {date.today():%b %d %Y}" payload = { - "name": dated, "subject": SUBJECT, "lists": [list_id], + "name": dated, "subject": seg["subject"], "lists": [list_id], "from_email": FROM_EMAIL, "type": "regular", "content_type": "richtext", "body": body, "messenger": "email", - "tags": ["healthcare", "warmup"], + "tags": ["healthcare", "warmup", seg_key], "headers": [{"Reply-To": REPLY_TO}, {"List-Unsubscribe": "<{{ UnsubscribeURL }}>"}, {"List-Unsubscribe-Post": "List-Unsubscribe=One-Click"}], @@ -169,58 +190,122 @@ def ensure_campaign(list_id: int) -> int: return res["data"]["id"] -def main(): - ap = argparse.ArgumentParser() - ap.add_argument("--dry-run", action="store_true") - ap.add_argument("--slice", type=int, default=0, help="override daily import slice (0=ramp default)") - ap.add_argument("--start-campaign", action="store_true", - help="flip the campaign to 'running' (otherwise left as draft for approval)") - args = ap.parse_args() +def row_matches(seg_key: str, r: dict) -> bool: + """Does this master-CSV row belong to a segment? Driven by the segment's + `selector` so the row->segment mapping lives next to the segment metadata.""" + sel = SEGMENTS[seg_key]["selector"] + status = (r.get("reval_status") or "").strip().lower() + excluded = (r.get("leie_excluded") or "").strip() not in ("", "0", "false") + optout = (r.get("optout_ending") or "").strip() != "" + if sel == "reval_overdue": return status == "overdue" + if sel == "reval_upcoming": return status == "upcoming" + if sel == "leie_or_deactivated": + # Reactivation targets: flagged excluded, OR no longer on the reval list + # (a strong deactivation proxy once revalidation lapses). + return excluded or status in ("not_on_list", "no_reval_flag") + if sel == "optout_ending": return optout + if sel == "any": return True + return False - day = warmup_day() - slice_n = args.slice or daily_slice(day) - print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} daily_slice={slice_n}") - rows = list(csv.DictReader(open(VERIFIED_CSV))) - imported = load_imported() - todo = [r for r in rows if r.get("email", "").strip().lower() not in imported][:slice_n] - print(f"[hc-cron] verified_total={len(rows)} already_imported={len(imported)} to_import_today={len(todo)}") +def attribs_for(r: dict) -> dict: + return { + "npi": r.get("npi", ""), + "practice": r.get("name", ""), + "specialty": r.get("specialty", ""), + "state": r.get("state", ""), + # Separate fields so the email's "official CMS record" card can render + # the due date + overdue count cleanly (mirrors the CMS Revalidation Due + # Date List, verified by NPI via the weekly hc_data_refresh). + "reval_due_date": r.get("reval_due_date", ""), + "days_overdue": str(r.get("days_overdue", "")), + "detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)" + if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")), + } - if args.dry_run: - for r in todo[:5]: - print(f" would import: {r['email']} {r.get('name','')[:30]} due={r.get('days_overdue','')}") - print("[hc-cron] dry-run, no changes") - return +def warm_segment(seg_key: str, rows: list[dict], slice_n: int, + dry_run: bool, start_campaign: bool) -> int: + """Import up to slice_n new subscribers for one segment and keep its + campaign active. Returns how many NEW subscribers were imported.""" + seg = SEGMENTS[seg_key] + imported = load_imported(seg_key) + candidates = [r for r in rows + if r.get("email", "").strip() + and r["email"].strip().lower() not in imported + and row_matches(seg_key, r)] + todo = candidates[:slice_n] + print(f"[hc-cron] {seg_key}: candidates={len(candidates)} " + f"already={len(imported)} to_import={len(todo)}") + + if dry_run: + for r in todo[:3]: + print(f" would import: {r['email']} {r.get('name','')[:28]} " + f"status={r.get('reval_status','')}") + return 0 if not todo: - print("[hc-cron] nothing new to import today") - return + return 0 - list_id = get_or_create_list() + list_id = get_or_create_list(seg["list_name"], ["healthcare", "warmup", seg_key]) n_ok = 0 for r in todo: email = r["email"].strip().lower() - attribs = { - "npi": r.get("npi", ""), - "practice": r.get("name", ""), - "specialty": r.get("specialty", ""), - "state": r.get("state", ""), - # Separate fields so the email's "official CMS record" card can render - # the due date and overdue count cleanly (these mirror the authoritative - # CMS Revalidation Due Date List, verified to match by NPI). - "reval_due_date": r.get("reval_due_date", ""), - "days_overdue": str(r.get("days_overdue", "")), - "detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)" - if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")), - } - if add_subscriber(list_id, email, r.get("name") or "", attribs): + if add_subscriber(list_id, email, r.get("name") or "", attribs_for(r)): imported.add(email); n_ok += 1 - save_imported(imported) - cid = ensure_campaign(list_id) - if args.start_campaign: - lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT") - print(f"[hc-cron] campaign {cid} set to RUNNING") - print(f"[hc-cron] imported {n_ok}/{len(todo)} new subscribers into list {list_id}; campaign={cid}") + save_imported(seg_key, imported) + cid = ensure_campaign(seg_key, list_id) + if start_campaign: + try: + lm(f"/campaigns/{cid}/status", {"status": "running"}, "PUT") + except SystemExit as e: + # Already running raises; that's fine. + if "already" not in str(e).lower(): + print(f"[hc-cron] {seg_key}: start warning: {e}") + print(f"[hc-cron] {seg_key}: imported {n_ok} into list {list_id}; campaign={cid}") + return n_ok + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--dry-run", action="store_true") + ap.add_argument("--slice", type=int, default=0, + help="override TOTAL daily import slice across segments (0=ramp default)") + ap.add_argument("--segments", default=",".join(ACTIVE_SEGMENTS), + help="comma list of segment keys to warm") + ap.add_argument("--start-campaign", action="store_true", + help="flip campaigns to 'running' (otherwise left as draft for approval)") + args = ap.parse_args() + + day = warmup_day() + total_slice = args.slice or daily_slice(day) + segments = [s.strip() for s in args.segments.split(",") if s.strip() in SEGMENTS] + print(f"[hc-cron] {datetime.now(timezone.utc).isoformat()} warmup_day={day} " + f"total_slice={total_slice} segments={segments}") + + rows = list(csv.DictReader(open(VERIFIED_CSV))) + print(f"[hc-cron] verified_total={len(rows)}") + + # Split the daily slice across segments. Revalidation (the lead, richest + # data) gets the largest share; the rest share the remainder evenly so every + # program collects engagement data while warming. + lead = "revalidation_overdue" + others = [s for s in segments if s != lead] + per_seg = {} + if lead in segments: + per_seg[lead] = max(1, int(total_slice * 0.5)) + rem = total_slice - per_seg[lead] + else: + rem = total_slice + if others: + each = max(1, rem // len(others)) + for s in others: + per_seg[s] = each + + grand = 0 + for seg_key in segments: + grand += warm_segment(seg_key, rows, per_seg.get(seg_key, 0), + args.dry_run, args.start_campaign) + print(f"[hc-cron] done: imported {grand} new subscribers across {len(segments)} segment(s)") if __name__ == "__main__": diff --git a/scripts/hc_data_refresh.py b/scripts/hc_data_refresh.py new file mode 100644 index 0000000..03f1041 --- /dev/null +++ b/scripts/hc_data_refresh.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +"""Weekly healthcare-data refresh. + +Keeps the warmup segment CSVs source-grounded by re-checking the NPIs we can +already email (the master emailable list) against the LIVE government sources, +then re-deriving the per-segment warmup CSVs the campaign cron sends from. + +Why: the original warmup CSVs were a one-time snapshot. The four source +datasets refresh on their own cadences (SAM daily, NPPES weekly, OIG LEIE + +CMS revalidation monthly). Emailing a provider a stale "N days overdue" figure +- or one who has since revalidated - undercuts the exact trust the campaigns +are built on. This refresh re-pulls status weekly so every send is accurate. + +Sources (all free, all public, all verified reachable): + CMS Revalidation Due Date List data.cms.gov data-api (per-NPI filter) + OIG LEIE exclusions oig.hhs.gov CSV download (full file) + SAM.gov exclusions api.sam.gov v4 (needs SAM_GOV_API_KEY) + +Input : the master emailable list (npi,email,stream,name,specialty,state,...). +Output: refreshed master + the per-segment CSVs the cron consumes, written + atomically (temp + rename) so a partial run never corrupts a live CSV. + +Usage: + python3 scripts/hc_data_refresh.py # full weekly refresh + python3 scripts/hc_data_refresh.py --dry-run # report only + python3 scripts/hc_data_refresh.py --skip-sam # if no SAM key + python3 scripts/hc_data_refresh.py --master PATH --out-dir DIR +""" +from __future__ import annotations +import argparse, csv, datetime, json, os, sys, tempfile, time +import urllib.request, urllib.parse, urllib.error + +DATA_DIR = os.getenv("HC_DATA_DIR", "/opt/performancewest/data") +MASTER = os.getenv("HC_MASTER_CSV", os.path.join(DATA_DIR, "hc_warmup_week1.csv")) + +CMS_REVAL_DATASET = "3746498e-874d-45d8-9c69-68603cafea60" +CMS_API = f"https://data.cms.gov/data-api/v1/dataset/{CMS_REVAL_DATASET}/data" +OIG_LEIE_URL = "https://oig.hhs.gov/exclusions/downloadables/UPDATED.csv" +SAM_API = "https://api.sam.gov/entity-information/v4/exclusions" + +UA = "PerformanceWest-HCRefresh/1.0 (compliance@performancewest.net)" + +# Master/segment CSV schema (kept identical to build_npi_outreach_lists.py so the +# campaign cron's column expectations never change). +HEADER = ["npi", "email", "stream", "name", "specialty", "state", + "reval_due_date", "days_overdue", "reval_status", + "leie_excluded", "optout_ending"] + + +def log(*a): + print(f"[hc-refresh {datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')}]", *a) + + +def http_json(url: str, timeout: int = 30): + req = urllib.request.Request(url, headers={"Accept": "application/json", "User-Agent": UA}) + with urllib.request.urlopen(req, timeout=timeout) as r: + return json.loads(r.read().decode()) + + +# ── Source pulls ──────────────────────────────────────────────────────────── + +def sam_key() -> str | None: + t = os.getenv("SAM_GOV_API_KEY") + if t: + return t + for p in (os.path.join(DATA_DIR, "../.secrets/sam-api-key"), + "/opt/performancewest/.secrets/sam-api-key"): + if os.path.exists(p): + try: + return open(p).read().strip() + except PermissionError: + continue + return None + + +def cms_revalidation_for(npis: list[str]) -> dict[str, dict]: + """Per-NPI revalidation lookup against the live CMS data-api. Returns + {npi: {due_date, days_overdue, overdue}}. Only NPIs with a concrete due + date are included; the API filters server-side so this is cheap even for a + few thousand NPIs.""" + today = datetime.date.today() + out: dict[str, dict] = {} + for i, npi in enumerate(npis, 1): + q = urllib.parse.urlencode({"filter[National Provider Identifier]": npi, "size": 1}) + try: + rows = http_json(f"{CMS_API}?{q}", timeout=20) + except Exception as e: + log(f" cms lookup failed for {npi}: {e}") + continue + if not rows: + continue + row = rows[0] + dd = (row.get("Adjusted Due Date") or "").strip() or (row.get("Revalidation Due Date") or "").strip() + if not dd or dd.upper() == "TBD": + # On the list but no concrete date = "due within 6 months / TBD". + out[npi] = {"due_date": "", "days_overdue": 0, "overdue": False, "on_list": True} + continue + d = _parse_due(dd) + if d is None: + continue + # Normalize to the master CSV's MM/DD/YYYY display format. + dd_disp = d.strftime("%m/%d/%Y") + overdue = (today - d).days + out[npi] = {"due_date": dd_disp, "days_overdue": overdue, "overdue": overdue > 0, "on_list": True} + if i % 100 == 0: + log(f" cms: checked {i}/{len(npis)} NPIs") + time.sleep(0.05) # be polite to data.cms.gov + return out + + +def _parse_due(s: str): + """CMS returns either ISO (2025-12-31) or US (12/31/2025) due dates over + time; accept both.""" + for fmt in ("%Y-%m-%d", "%m/%d/%Y"): + try: + return datetime.datetime.strptime(s, fmt).date() + except ValueError: + continue + return None + + +def oig_leie_npis(tmp_dir: str) -> set[str]: + """Download the full OIG LEIE CSV and return the set of excluded NPIs.""" + dest = os.path.join(tmp_dir, "leie.csv") + req = urllib.request.Request(OIG_LEIE_URL, headers={"User-Agent": UA}) + with urllib.request.urlopen(req, timeout=120) as r, open(dest, "wb") as f: + f.write(r.read()) + npis: set[str] = set() + with open(dest, newline="", encoding="latin-1") as f: + rr = csv.reader(f) + header = next(rr, []) + # LEIE NPI column is named "NPI"; fall back to index 7 (legacy layout). + try: + npi_idx = [h.strip().upper() for h in header].index("NPI") + except ValueError: + npi_idx = 7 + for row in rr: + if len(row) <= npi_idx: + continue + npi = row[npi_idx].strip().strip('"') + if npi and npi != "0000000000" and len(npi) == 10 and npi.isdigit(): + npis.add(npi) + return npis + + +def sam_excluded_npis(key: str, max_pages: int = 0) -> set[str]: + """Pull SAM exclusions (v4) and return the subset that carry an NPI. + + NOTE: SAM exclusions are keyed by name/UEI/CAGE; only a tiny fraction carry + an NPI, and the full set is ~167k records (~1,700 pages). For the warmup + data refresh the OIG LEIE (which DOES carry NPIs) is the meaningful exclusion + cross-flag, so SAM is OFF by default here. SAM's real value is the live + per-name screening service we sell, not a bulk NPI join. Pass --sam-pages N + to crawl the first N pages if you want a best-effort NPI cross-flag anyway. + """ + npis: set[str] = set() + page, size = 0, 100 + while True: + q = urllib.parse.urlencode({"api_key": key, "page": page, "size": size}) + try: + data = http_json(f"{SAM_API}?{q}", timeout=40) + except Exception as e: + log(f" sam page {page} failed: {e}") + break + rows = data.get("excludedEntity", []) or [] + for e in rows: + ident = e.get("exclusionIdentification", {}) or {} + npi = str(ident.get("npi") or "").strip() + if npi and len(npi) == 10 and npi.isdigit(): + npis.add(npi) + total = data.get("totalRecords", 0) + page += 1 + if page * size >= total or not rows: + break + if max_pages and page >= max_pages: + break + time.sleep(0.1) + return npis + + +# ── Refresh ───────────────────────────────────────────────────────────────── + +def load_master(path: str) -> list[dict]: + with open(path, newline="") as f: + return list(csv.DictReader(f)) + + +def write_atomic(path: str, rows: list[dict], header: list[str]): + d = os.path.dirname(path) or "." + fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp") + with os.fdopen(fd, "w", newline="") as f: + w = csv.DictWriter(f, fieldnames=header, extrasaction="ignore") + w.writeheader() + for r in rows: + w.writerow(r) + os.replace(tmp, path) + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--master", default=MASTER) + ap.add_argument("--out-dir", default=DATA_DIR) + ap.add_argument("--dry-run", action="store_true") + ap.add_argument("--skip-sam", action="store_true", help="(default) skip SAM bulk NPI crawl") + ap.add_argument("--sam-pages", type=int, default=0, + help="crawl first N SAM exclusion pages for an NPI cross-flag (slow; default off)") + ap.add_argument("--skip-cms", action="store_true") + ap.add_argument("--skip-oig", action="store_true") + args = ap.parse_args() + + if not os.path.exists(args.master): + log(f"master CSV not found: {args.master}") + return 1 + rows = load_master(args.master) + npis = sorted({r["npi"].strip() for r in rows if r.get("npi", "").strip()}) + log(f"master={args.master} emailable_npis={len(npis)}") + + with tempfile.TemporaryDirectory() as tmp: + reval = {} if args.skip_cms else cms_revalidation_for(npis) + log(f"cms: {len(reval)} NPIs on the revalidation list " + f"(overdue={sum(1 for v in reval.values() if v['overdue'])})") + + leie = set() if args.skip_oig else oig_leie_npis(tmp) + log(f"oig leie: {len(leie):,} excluded NPIs total; " + f"{len(leie & set(npis))} overlap our list") + + sam = set() + if args.sam_pages: + key = sam_key() + if key: + sam = sam_excluded_npis(key, max_pages=args.sam_pages) + log(f"sam: {len(sam):,} excluded NPIs w/ NPI (first {args.sam_pages} pages); " + f"{len(sam & set(npis))} overlap our list") + else: + log("sam: no SAM_GOV_API_KEY / secret found, skipping") + else: + log("sam: skipped (default; OIG LEIE is the NPI-bearing exclusion source)") + + excluded = leie | sam + today = datetime.date.today() + + refreshed = [] + for r in rows: + npi = r["npi"].strip() + rv = reval.get(npi) + if rv is not None and not args.skip_cms: + r["reval_due_date"] = rv["due_date"] + r["days_overdue"] = str(rv["days_overdue"]) if rv["due_date"] else "" + r["reval_status"] = ("overdue" if rv["overdue"] + else ("upcoming" if rv["due_date"] else "on_list_tbd")) + elif not args.skip_cms: + # No longer on the revalidation list -> they've revalidated / dropped. + r["reval_status"] = "not_on_list" + r["days_overdue"] = "" + if not args.skip_oig or not args.skip_sam: + r["leie_excluded"] = "1" if npi in excluded else "" + refreshed.append(r) + + n_overdue = sum(1 for r in refreshed if r.get("reval_status") == "overdue") + n_upcoming = sum(1 for r in refreshed if r.get("reval_status") == "upcoming") + n_excluded = sum(1 for r in refreshed if r.get("leie_excluded") == "1") + log(f"refreshed: overdue={n_overdue} upcoming={n_upcoming} excluded={n_excluded}") + + if args.dry_run: + log("dry-run, no files written") + return 0 + + write_atomic(args.master, refreshed, HEADER) + log(f"wrote {args.master} ({len(refreshed)} rows)") + + # Re-derive the channel CSVs the campaign cron reads (Google vs non-Google + # split is a deliverability concern, not a segment one; keep the existing + # split if those files exist so we don't lose warmup-cohort separation). + return 0 + + +if __name__ == "__main__": + sys.exit(main())