#!/usr/bin/env python3 """Weekly healthcare-data refresh. Keeps the warmup segment CSVs source-grounded by re-checking the NPIs we can already email (the master emailable list) against the LIVE government sources, then re-deriving the per-segment warmup CSVs the campaign cron sends from. Why: the original warmup CSVs were a one-time snapshot. The four source datasets refresh on their own cadences (SAM daily, NPPES weekly, OIG LEIE + CMS revalidation monthly). Emailing a provider a stale "N days overdue" figure - or one who has since revalidated - undercuts the exact trust the campaigns are built on. This refresh re-pulls status weekly so every send is accurate. Sources (all free, all public, all verified reachable): CMS Revalidation Due Date List data.cms.gov data-api (per-NPI filter) OIG LEIE exclusions oig.hhs.gov CSV download (full file) SAM.gov exclusions api.sam.gov v4 (needs SAM_GOV_API_KEY) Input : the master emailable list (npi,email,stream,name,specialty,state,...). Output: refreshed master + the per-segment CSVs the cron consumes, written atomically (temp + rename) so a partial run never corrupts a live CSV. Usage: python3 scripts/hc_data_refresh.py # full weekly refresh python3 scripts/hc_data_refresh.py --dry-run # report only python3 scripts/hc_data_refresh.py --skip-sam # if no SAM key python3 scripts/hc_data_refresh.py --master PATH --out-dir DIR """ from __future__ import annotations import argparse, csv, datetime, json, os, sys, tempfile, time import urllib.request, urllib.parse, urllib.error DATA_DIR = os.getenv("HC_DATA_DIR", "/opt/performancewest/data") MASTER = os.getenv("HC_MASTER_CSV", os.path.join(DATA_DIR, "hc_warmup_week1.csv")) CMS_REVAL_DATASET = "3746498e-874d-45d8-9c69-68603cafea60" CMS_API = f"https://data.cms.gov/data-api/v1/dataset/{CMS_REVAL_DATASET}/data" OIG_LEIE_URL = "https://oig.hhs.gov/exclusions/downloadables/UPDATED.csv" SAM_API = "https://api.sam.gov/entity-information/v4/exclusions" UA = "PerformanceWest-HCRefresh/1.0 (compliance@performancewest.net)" # Master/segment CSV schema (kept identical to build_npi_outreach_lists.py so the # campaign cron's column expectations never change). HEADER = ["npi", "email", "stream", "name", "specialty", "state", "reval_due_date", "days_overdue", "reval_status", "leie_excluded", "optout_ending"] def log(*a): print(f"[hc-refresh {datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')}]", *a) def http_json(url: str, timeout: int = 30): req = urllib.request.Request(url, headers={"Accept": "application/json", "User-Agent": UA}) with urllib.request.urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode()) # ── Source pulls ──────────────────────────────────────────────────────────── def sam_key() -> str | None: t = os.getenv("SAM_GOV_API_KEY") if t: return t for p in (os.path.join(DATA_DIR, "../.secrets/sam-api-key"), "/opt/performancewest/.secrets/sam-api-key"): if os.path.exists(p): try: return open(p).read().strip() except PermissionError: continue return None def cms_revalidation_for(npis: list[str]) -> dict[str, dict]: """Per-NPI revalidation lookup against the live CMS data-api. Returns {npi: {due_date, days_overdue, overdue}}. Only NPIs with a concrete due date are included; the API filters server-side so this is cheap even for a few thousand NPIs.""" today = datetime.date.today() out: dict[str, dict] = {} for i, npi in enumerate(npis, 1): q = urllib.parse.urlencode({"filter[National Provider Identifier]": npi, "size": 1}) try: rows = http_json(f"{CMS_API}?{q}", timeout=20) except Exception as e: log(f" cms lookup failed for {npi}: {e}") continue if not rows: continue row = rows[0] dd = (row.get("Adjusted Due Date") or "").strip() or (row.get("Revalidation Due Date") or "").strip() if not dd or dd.upper() == "TBD": # On the list but no concrete date = "due within 6 months / TBD". out[npi] = {"due_date": "", "days_overdue": 0, "overdue": False, "on_list": True} continue d = _parse_due(dd) if d is None: continue # Normalize to the master CSV's MM/DD/YYYY display format. dd_disp = d.strftime("%m/%d/%Y") overdue = (today - d).days out[npi] = {"due_date": dd_disp, "days_overdue": overdue, "overdue": overdue > 0, "on_list": True} if i % 100 == 0: log(f" cms: checked {i}/{len(npis)} NPIs") time.sleep(0.05) # be polite to data.cms.gov return out def _parse_due(s: str): """CMS returns either ISO (2025-12-31) or US (12/31/2025) due dates over time; accept both.""" for fmt in ("%Y-%m-%d", "%m/%d/%Y"): try: return datetime.datetime.strptime(s, fmt).date() except ValueError: continue return None def oig_leie_npis(tmp_dir: str) -> set[str]: """Download the full OIG LEIE CSV and return the set of excluded NPIs.""" dest = os.path.join(tmp_dir, "leie.csv") req = urllib.request.Request(OIG_LEIE_URL, headers={"User-Agent": UA}) with urllib.request.urlopen(req, timeout=120) as r, open(dest, "wb") as f: f.write(r.read()) npis: set[str] = set() with open(dest, newline="", encoding="latin-1") as f: rr = csv.reader(f) header = next(rr, []) # LEIE NPI column is named "NPI"; fall back to index 7 (legacy layout). try: npi_idx = [h.strip().upper() for h in header].index("NPI") except ValueError: npi_idx = 7 for row in rr: if len(row) <= npi_idx: continue npi = row[npi_idx].strip().strip('"') if npi and npi != "0000000000" and len(npi) == 10 and npi.isdigit(): npis.add(npi) return npis def sam_excluded_npis(key: str, max_pages: int = 0) -> set[str]: """Pull SAM exclusions (v4) and return the subset that carry an NPI. NOTE: SAM exclusions are keyed by name/UEI/CAGE; only a tiny fraction carry an NPI, and the full set is ~167k records (~1,700 pages). For the warmup data refresh the OIG LEIE (which DOES carry NPIs) is the meaningful exclusion cross-flag, so SAM is OFF by default here. SAM's real value is the live per-name screening service we sell, not a bulk NPI join. Pass --sam-pages N to crawl the first N pages if you want a best-effort NPI cross-flag anyway. """ npis: set[str] = set() page, size = 0, 100 while True: q = urllib.parse.urlencode({"api_key": key, "page": page, "size": size}) try: data = http_json(f"{SAM_API}?{q}", timeout=40) except Exception as e: log(f" sam page {page} failed: {e}") break rows = data.get("excludedEntity", []) or [] for e in rows: ident = e.get("exclusionIdentification", {}) or {} npi = str(ident.get("npi") or "").strip() if npi and len(npi) == 10 and npi.isdigit(): npis.add(npi) total = data.get("totalRecords", 0) page += 1 if page * size >= total or not rows: break if max_pages and page >= max_pages: break time.sleep(0.1) return npis # ── Refresh ───────────────────────────────────────────────────────────────── def load_master(path: str) -> list[dict]: with open(path, newline="") as f: return list(csv.DictReader(f)) def write_atomic(path: str, rows: list[dict], header: list[str]): d = os.path.dirname(path) or "." fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp") with os.fdopen(fd, "w", newline="") as f: w = csv.DictWriter(f, fieldnames=header, extrasaction="ignore") w.writeheader() for r in rows: w.writerow(r) os.replace(tmp, path) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--master", default=MASTER) ap.add_argument("--out-dir", default=DATA_DIR) ap.add_argument("--dry-run", action="store_true") ap.add_argument("--skip-sam", action="store_true", help="(default) skip SAM bulk NPI crawl") ap.add_argument("--sam-pages", type=int, default=0, help="crawl first N SAM exclusion pages for an NPI cross-flag (slow; default off)") ap.add_argument("--skip-cms", action="store_true") ap.add_argument("--skip-oig", action="store_true") args = ap.parse_args() if not os.path.exists(args.master): log(f"master CSV not found: {args.master}") return 1 rows = load_master(args.master) npis = sorted({r["npi"].strip() for r in rows if r.get("npi", "").strip()}) log(f"master={args.master} emailable_npis={len(npis)}") with tempfile.TemporaryDirectory() as tmp: reval = {} if args.skip_cms else cms_revalidation_for(npis) log(f"cms: {len(reval)} NPIs on the revalidation list " f"(overdue={sum(1 for v in reval.values() if v['overdue'])})") leie = set() if args.skip_oig else oig_leie_npis(tmp) log(f"oig leie: {len(leie):,} excluded NPIs total; " f"{len(leie & set(npis))} overlap our list") sam = set() if args.sam_pages: key = sam_key() if key: sam = sam_excluded_npis(key, max_pages=args.sam_pages) log(f"sam: {len(sam):,} excluded NPIs w/ NPI (first {args.sam_pages} pages); " f"{len(sam & set(npis))} overlap our list") else: log("sam: no SAM_GOV_API_KEY / secret found, skipping") else: log("sam: skipped (default; OIG LEIE is the NPI-bearing exclusion source)") excluded = leie | sam today = datetime.date.today() refreshed = [] for r in rows: npi = r["npi"].strip() rv = reval.get(npi) if rv is not None and not args.skip_cms: r["reval_due_date"] = rv["due_date"] r["days_overdue"] = str(rv["days_overdue"]) if rv["due_date"] else "" r["reval_status"] = ("overdue" if rv["overdue"] else ("upcoming" if rv["due_date"] else "on_list_tbd")) elif not args.skip_cms: # No longer on the revalidation list -> they've revalidated / dropped. r["reval_status"] = "not_on_list" r["days_overdue"] = "" if not args.skip_oig or not args.skip_sam: r["leie_excluded"] = "1" if npi in excluded else "" refreshed.append(r) n_overdue = sum(1 for r in refreshed if r.get("reval_status") == "overdue") n_upcoming = sum(1 for r in refreshed if r.get("reval_status") == "upcoming") n_excluded = sum(1 for r in refreshed if r.get("leie_excluded") == "1") log(f"refreshed: overdue={n_overdue} upcoming={n_upcoming} excluded={n_excluded}") if args.dry_run: log("dry-run, no files written") return 0 write_atomic(args.master, refreshed, HEADER) log(f"wrote {args.master} ({len(refreshed)} rows)") # Propagate the fresh status fields into the channel CSVs the campaign cron # actually reads. These are email-keyed subsets of the master with extra # deliverability columns (verify_ok/verify_reason) we must preserve; we only # overwrite the status fields the refresh owns. REFRESHED_FIELDS = ["reval_due_date", "days_overdue", "reval_status", "leie_excluded", "optout_ending", "name", "specialty", "state"] by_email = {r["email"].strip().lower(): r for r in refreshed if r.get("email")} channel_csvs = [os.path.join(args.out_dir, f) for f in ("hc_warmup_nongoogle.csv", "hc_warmup_google.csv", "hc_warmup_week1_verified.csv")] for path in channel_csvs: if not os.path.exists(path): continue with open(path, newline="") as f: rdr = csv.DictReader(f) cols = rdr.fieldnames or [] rows_ch = list(rdr) updated = 0 for r in rows_ch: m = by_email.get(r.get("email", "").strip().lower()) if not m: continue for fld in REFRESHED_FIELDS: if fld in cols and fld in m: r[fld] = m[fld] updated += 1 write_atomic(path, rows_ch, cols) log(f"propagated to {os.path.basename(path)}: {updated}/{len(rows_ch)} rows updated") return 0 if __name__ == "__main__": sys.exit(main())