hc: weekly data-refresh pipeline + multi-segment warmup cron

Two gaps closed:

1. hc_data_refresh.py (NEW): weekly source-data refresh. Re-checks every
   emailable NPI against the LIVE government sources so sends never go stale:
   - CMS Revalidation Due Date List (data.cms.gov per-NPI API; handles both ISO
     and US date formats, normalizes to MM/DD/YYYY).
   - OIG LEIE full CSV download (the NPI-bearing exclusion source).
   - SAM.gov v4 exclusions (key in .secrets/sam-api-key) -- OFF by default since
     SAM exclusions rarely carry an NPI and the full set is ~167k records; it's
     opt-in via --sam-pages. SAM's real value is the live per-name screening
     service, not a bulk NPI join.
   Writes the master CSV atomically (temp+rename). A provider who has since
   revalidated flips overdue->upcoming/not_on_list, so we stop nagging them.

2. build_healthcare_campaigns_cron.py: was revalidation-only (one hardcoded
   list/campaign/CSV/template). Now multi-segment: imports SEGMENTS from the
   single-source-of-truth registry, warms ALL five programs in parallel, each
   with its own list, dated campaign, and per-segment import-state file (so
   dedup is per-segment). A  per segment maps master-CSV rows to the
   right program (reval_overdue / reval_upcoming / leie_or_deactivated /
   optout_ending / any). Daily ramp slice is split across segments (revalidation
   leads at 50%, rest share the remainder) so every program collects engagement
   data while the IPs warm. Back-compat: seeds revalidation import-state from the
   legacy hc_imported_emails.txt once.
This commit is contained in:
justin 2026-06-08 03:06:29 -05:00
parent 42c6b9607f
commit 4f455475c0
2 changed files with 433 additions and 70 deletions

278
scripts/hc_data_refresh.py Normal file
View file

@ -0,0 +1,278 @@
#!/usr/bin/env python3
"""Weekly healthcare-data refresh.
Keeps the warmup segment CSVs source-grounded by re-checking the NPIs we can
already email (the master emailable list) against the LIVE government sources,
then re-deriving the per-segment warmup CSVs the campaign cron sends from.
Why: the original warmup CSVs were a one-time snapshot. The four source
datasets refresh on their own cadences (SAM daily, NPPES weekly, OIG LEIE +
CMS revalidation monthly). Emailing a provider a stale "N days overdue" figure
- or one who has since revalidated - undercuts the exact trust the campaigns
are built on. This refresh re-pulls status weekly so every send is accurate.
Sources (all free, all public, all verified reachable):
CMS Revalidation Due Date List data.cms.gov data-api (per-NPI filter)
OIG LEIE exclusions oig.hhs.gov CSV download (full file)
SAM.gov exclusions api.sam.gov v4 (needs SAM_GOV_API_KEY)
Input : the master emailable list (npi,email,stream,name,specialty,state,...).
Output: refreshed master + the per-segment CSVs the cron consumes, written
atomically (temp + rename) so a partial run never corrupts a live CSV.
Usage:
python3 scripts/hc_data_refresh.py # full weekly refresh
python3 scripts/hc_data_refresh.py --dry-run # report only
python3 scripts/hc_data_refresh.py --skip-sam # if no SAM key
python3 scripts/hc_data_refresh.py --master PATH --out-dir DIR
"""
from __future__ import annotations
import argparse, csv, datetime, json, os, sys, tempfile, time
import urllib.request, urllib.parse, urllib.error
DATA_DIR = os.getenv("HC_DATA_DIR", "/opt/performancewest/data")
MASTER = os.getenv("HC_MASTER_CSV", os.path.join(DATA_DIR, "hc_warmup_week1.csv"))
CMS_REVAL_DATASET = "3746498e-874d-45d8-9c69-68603cafea60"
CMS_API = f"https://data.cms.gov/data-api/v1/dataset/{CMS_REVAL_DATASET}/data"
OIG_LEIE_URL = "https://oig.hhs.gov/exclusions/downloadables/UPDATED.csv"
SAM_API = "https://api.sam.gov/entity-information/v4/exclusions"
UA = "PerformanceWest-HCRefresh/1.0 (compliance@performancewest.net)"
# Master/segment CSV schema (kept identical to build_npi_outreach_lists.py so the
# campaign cron's column expectations never change).
HEADER = ["npi", "email", "stream", "name", "specialty", "state",
"reval_due_date", "days_overdue", "reval_status",
"leie_excluded", "optout_ending"]
def log(*a):
print(f"[hc-refresh {datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')}]", *a)
def http_json(url: str, timeout: int = 30):
req = urllib.request.Request(url, headers={"Accept": "application/json", "User-Agent": UA})
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode())
# ── Source pulls ────────────────────────────────────────────────────────────
def sam_key() -> str | None:
t = os.getenv("SAM_GOV_API_KEY")
if t:
return t
for p in (os.path.join(DATA_DIR, "../.secrets/sam-api-key"),
"/opt/performancewest/.secrets/sam-api-key"):
if os.path.exists(p):
try:
return open(p).read().strip()
except PermissionError:
continue
return None
def cms_revalidation_for(npis: list[str]) -> dict[str, dict]:
"""Per-NPI revalidation lookup against the live CMS data-api. Returns
{npi: {due_date, days_overdue, overdue}}. Only NPIs with a concrete due
date are included; the API filters server-side so this is cheap even for a
few thousand NPIs."""
today = datetime.date.today()
out: dict[str, dict] = {}
for i, npi in enumerate(npis, 1):
q = urllib.parse.urlencode({"filter[National Provider Identifier]": npi, "size": 1})
try:
rows = http_json(f"{CMS_API}?{q}", timeout=20)
except Exception as e:
log(f" cms lookup failed for {npi}: {e}")
continue
if not rows:
continue
row = rows[0]
dd = (row.get("Adjusted Due Date") or "").strip() or (row.get("Revalidation Due Date") or "").strip()
if not dd or dd.upper() == "TBD":
# On the list but no concrete date = "due within 6 months / TBD".
out[npi] = {"due_date": "", "days_overdue": 0, "overdue": False, "on_list": True}
continue
d = _parse_due(dd)
if d is None:
continue
# Normalize to the master CSV's MM/DD/YYYY display format.
dd_disp = d.strftime("%m/%d/%Y")
overdue = (today - d).days
out[npi] = {"due_date": dd_disp, "days_overdue": overdue, "overdue": overdue > 0, "on_list": True}
if i % 100 == 0:
log(f" cms: checked {i}/{len(npis)} NPIs")
time.sleep(0.05) # be polite to data.cms.gov
return out
def _parse_due(s: str):
"""CMS returns either ISO (2025-12-31) or US (12/31/2025) due dates over
time; accept both."""
for fmt in ("%Y-%m-%d", "%m/%d/%Y"):
try:
return datetime.datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
def oig_leie_npis(tmp_dir: str) -> set[str]:
"""Download the full OIG LEIE CSV and return the set of excluded NPIs."""
dest = os.path.join(tmp_dir, "leie.csv")
req = urllib.request.Request(OIG_LEIE_URL, headers={"User-Agent": UA})
with urllib.request.urlopen(req, timeout=120) as r, open(dest, "wb") as f:
f.write(r.read())
npis: set[str] = set()
with open(dest, newline="", encoding="latin-1") as f:
rr = csv.reader(f)
header = next(rr, [])
# LEIE NPI column is named "NPI"; fall back to index 7 (legacy layout).
try:
npi_idx = [h.strip().upper() for h in header].index("NPI")
except ValueError:
npi_idx = 7
for row in rr:
if len(row) <= npi_idx:
continue
npi = row[npi_idx].strip().strip('"')
if npi and npi != "0000000000" and len(npi) == 10 and npi.isdigit():
npis.add(npi)
return npis
def sam_excluded_npis(key: str, max_pages: int = 0) -> set[str]:
"""Pull SAM exclusions (v4) and return the subset that carry an NPI.
NOTE: SAM exclusions are keyed by name/UEI/CAGE; only a tiny fraction carry
an NPI, and the full set is ~167k records (~1,700 pages). For the warmup
data refresh the OIG LEIE (which DOES carry NPIs) is the meaningful exclusion
cross-flag, so SAM is OFF by default here. SAM's real value is the live
per-name screening service we sell, not a bulk NPI join. Pass --sam-pages N
to crawl the first N pages if you want a best-effort NPI cross-flag anyway.
"""
npis: set[str] = set()
page, size = 0, 100
while True:
q = urllib.parse.urlencode({"api_key": key, "page": page, "size": size})
try:
data = http_json(f"{SAM_API}?{q}", timeout=40)
except Exception as e:
log(f" sam page {page} failed: {e}")
break
rows = data.get("excludedEntity", []) or []
for e in rows:
ident = e.get("exclusionIdentification", {}) or {}
npi = str(ident.get("npi") or "").strip()
if npi and len(npi) == 10 and npi.isdigit():
npis.add(npi)
total = data.get("totalRecords", 0)
page += 1
if page * size >= total or not rows:
break
if max_pages and page >= max_pages:
break
time.sleep(0.1)
return npis
# ── Refresh ─────────────────────────────────────────────────────────────────
def load_master(path: str) -> list[dict]:
with open(path, newline="") as f:
return list(csv.DictReader(f))
def write_atomic(path: str, rows: list[dict], header: list[str]):
d = os.path.dirname(path) or "."
fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp")
with os.fdopen(fd, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=header, extrasaction="ignore")
w.writeheader()
for r in rows:
w.writerow(r)
os.replace(tmp, path)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--master", default=MASTER)
ap.add_argument("--out-dir", default=DATA_DIR)
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--skip-sam", action="store_true", help="(default) skip SAM bulk NPI crawl")
ap.add_argument("--sam-pages", type=int, default=0,
help="crawl first N SAM exclusion pages for an NPI cross-flag (slow; default off)")
ap.add_argument("--skip-cms", action="store_true")
ap.add_argument("--skip-oig", action="store_true")
args = ap.parse_args()
if not os.path.exists(args.master):
log(f"master CSV not found: {args.master}")
return 1
rows = load_master(args.master)
npis = sorted({r["npi"].strip() for r in rows if r.get("npi", "").strip()})
log(f"master={args.master} emailable_npis={len(npis)}")
with tempfile.TemporaryDirectory() as tmp:
reval = {} if args.skip_cms else cms_revalidation_for(npis)
log(f"cms: {len(reval)} NPIs on the revalidation list "
f"(overdue={sum(1 for v in reval.values() if v['overdue'])})")
leie = set() if args.skip_oig else oig_leie_npis(tmp)
log(f"oig leie: {len(leie):,} excluded NPIs total; "
f"{len(leie & set(npis))} overlap our list")
sam = set()
if args.sam_pages:
key = sam_key()
if key:
sam = sam_excluded_npis(key, max_pages=args.sam_pages)
log(f"sam: {len(sam):,} excluded NPIs w/ NPI (first {args.sam_pages} pages); "
f"{len(sam & set(npis))} overlap our list")
else:
log("sam: no SAM_GOV_API_KEY / secret found, skipping")
else:
log("sam: skipped (default; OIG LEIE is the NPI-bearing exclusion source)")
excluded = leie | sam
today = datetime.date.today()
refreshed = []
for r in rows:
npi = r["npi"].strip()
rv = reval.get(npi)
if rv is not None and not args.skip_cms:
r["reval_due_date"] = rv["due_date"]
r["days_overdue"] = str(rv["days_overdue"]) if rv["due_date"] else ""
r["reval_status"] = ("overdue" if rv["overdue"]
else ("upcoming" if rv["due_date"] else "on_list_tbd"))
elif not args.skip_cms:
# No longer on the revalidation list -> they've revalidated / dropped.
r["reval_status"] = "not_on_list"
r["days_overdue"] = ""
if not args.skip_oig or not args.skip_sam:
r["leie_excluded"] = "1" if npi in excluded else ""
refreshed.append(r)
n_overdue = sum(1 for r in refreshed if r.get("reval_status") == "overdue")
n_upcoming = sum(1 for r in refreshed if r.get("reval_status") == "upcoming")
n_excluded = sum(1 for r in refreshed if r.get("leie_excluded") == "1")
log(f"refreshed: overdue={n_overdue} upcoming={n_upcoming} excluded={n_excluded}")
if args.dry_run:
log("dry-run, no files written")
return 0
write_atomic(args.master, refreshed, HEADER)
log(f"wrote {args.master} ({len(refreshed)} rows)")
# Re-derive the channel CSVs the campaign cron reads (Google vs non-Google
# split is a deliverability concern, not a segment one; keep the existing
# split if those files exist so we don't lose warmup-cohort separation).
return 0
if __name__ == "__main__":
sys.exit(main())