#!/usr/bin/env python3 """Enrich the institutional list with each NPI's REAL NPPES `last_updated` date. The NPPES NPI Registry is fully public and every record carries a `basic.last_updated` date (the last time the provider updated their record) plus `basic.enumeration_date` (when the NPI was first issued). This script joins that real, government-sourced date onto our institutional email list so the "NPPES record may be out of date" campaign can state a LITERAL, verifiable fact ("NPPES shows your record was last updated on 2012-02-08, ~14 years ago") instead of an unsubstantiated "FLAGGED OUT OF DATE" claim. The provider can confirm the exact same date on the official registry, which is what makes the pitch credible and FTC/defamation-safe. Why a cache: NPPES has NO batch-by-NPI endpoint -- it is one HTTP request per NPI (~63k). The `last_updated` date changes rarely and we only care at year-granularity, so we persist results in a sidecar cache keyed by NPI and only re-fetch entries that are missing or older than --max-age days. The first fill is a one-time slow crawl; every refresh after that is near-instant. Columns ADDED to the output CSV: nppes_last_updated ISO date string from basic.last_updated (e.g. 2012-02-08) nppes_enumeration ISO date from basic.enumeration_date (NPI issued date) nppes_years_stale whole years since last_updated (e.g. 14) nppes_deactivated "Y" if the NPI returns no active record (reactivation cue) Usage: # In-place enrich (adds/refreshes the columns on the institutional file): python3 scripts/enrich_nppes_last_updated.py data/hc_nppes_institutional_enriched.csv # Explicit in/out + options: python3 scripts/enrich_nppes_last_updated.py IN.csv -o OUT.csv \ --cache data/nppes_last_updated_cache.csv --max-age 30 --rps 10 --limit 0 """ from __future__ import annotations import argparse import csv import datetime import json import os import sys import tempfile import time import urllib.error import urllib.parse import urllib.request csv.field_size_limit(10_000_000) NPPES_API = "https://npiregistry.cms.hhs.gov/api/" UA = "PerformanceWest-NPPESFreshness/1.0 (compliance@performancewest.net)" # Columns this enricher OWNS on the output CSV. ADDED_COLS = ["nppes_last_updated", "nppes_enumeration", "nppes_years_stale", "nppes_deactivated"] # Cache schema (sidecar, keyed by npi). CACHE_COLS = ["npi", "nppes_last_updated", "nppes_enumeration", "nppes_deactivated", "fetched_at"] def log(*a): ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") print(f"[nppes-freshness {ts}]", *a, file=sys.stderr, flush=True) def _parse_iso(s: str): s = (s or "").strip() for fmt in ("%Y-%m-%d", "%m/%d/%Y"): try: return datetime.datetime.strptime(s, fmt).date() except ValueError: continue return None def years_stale(last_updated: str, today: datetime.date) -> str: d = _parse_iso(last_updated) if d is None: return "" return str(max(0, (today - d).days // 365)) def fetch_nppes(npi: str, timeout: int = 20, retries: int = 2) -> dict: """Fetch one NPI from the public NPPES registry API. Returns {last_updated, enumeration, deactivated}. A record that returns no result is treated as deactivated (the registry only returns active NPIs).""" q = urllib.parse.urlencode({"version": "2.1", "number": npi}) url = f"{NPPES_API}?{q}" last_err = None for attempt in range(retries + 1): try: req = urllib.request.Request( url, headers={"Accept": "application/json", "User-Agent": UA}) with urllib.request.urlopen(req, timeout=timeout) as r: data = json.loads(r.read().decode()) results = data.get("results") or [] if not results: return {"nppes_last_updated": "", "nppes_enumeration": "", "nppes_deactivated": "Y"} basic = results[0].get("basic", {}) or {} return { "nppes_last_updated": (basic.get("last_updated") or "").strip(), "nppes_enumeration": (basic.get("enumeration_date") or "").strip(), "nppes_deactivated": "", } except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, json.JSONDecodeError) as e: last_err = e if attempt < retries: time.sleep(0.5 * (attempt + 1)) # linear backoff log(f" fetch failed for {npi}: {last_err}") return {} # transient failure: leave uncached so a later run retries def load_cache(path: str) -> dict[str, dict]: cache: dict[str, dict] = {} if not path or not os.path.exists(path): return cache with open(path, newline="") as f: for row in csv.DictReader(f): npi = (row.get("npi") or "").strip() if npi: cache[npi] = row return cache def write_cache(path: str, cache: dict[str, dict]): if not path: return d = os.path.dirname(path) or "." fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp") with os.fdopen(fd, "w", newline="") as f: w = csv.DictWriter(f, fieldnames=CACHE_COLS, extrasaction="ignore") w.writeheader() for npi in sorted(cache): w.writerow(cache[npi]) os.replace(tmp, path) def is_fresh(entry: dict, today: datetime.date, max_age_days: int) -> bool: """A cache entry is fresh if it was fetched within max_age_days.""" if not entry: return False fa = _parse_iso(entry.get("fetched_at", "")) if fa is None: return False return (today - fa).days <= max_age_days def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("infile") ap.add_argument("-o", "--out", help="output CSV (default: in-place)") ap.add_argument("--cache", default="data/nppes_last_updated_cache.csv") ap.add_argument("--max-age", type=int, default=30, help="re-fetch cache entries older than N days (default 30)") ap.add_argument("--rps", type=float, default=10.0, help="max requests/sec to the NPPES API (default 10)") ap.add_argument("--limit", type=int, default=0, help="only fetch up to N new NPIs this run (0 = all; for resumable fills)") ap.add_argument("--flush-every", type=int, default=200, help="persist the cache to disk every N new fetches") args = ap.parse_args() out = args.out or args.infile today = datetime.date.today() sleep = 1.0 / args.rps if args.rps > 0 else 0.0 with open(args.infile, newline="") as f: reader = csv.DictReader(f) in_cols = list(reader.fieldnames or []) rows = list(reader) log(f"input={args.infile} rows={len(rows):,}") npis = sorted({(r.get("npi") or "").strip() for r in rows if (r.get("npi") or "").strip()}) cache = load_cache(args.cache) log(f"cache={args.cache} entries={len(cache):,}") # Determine which NPIs need a (re)fetch, STALEST FIRST so a bounded run # (--limit) always spends its budget on the most-overdue cache entries. # Never-fetched entries have an empty fetched_at, which sorts first, so new # NPIs are prioritized over merely-aging ones. todo = [n for n in npis if not is_fresh(cache.get(n, {}), today, args.max_age)] todo.sort(key=lambda n: cache.get(n, {}).get("fetched_at", "") or "") n_due = len(todo) if args.limit: todo = todo[:args.limit] log(f"to_fetch={len(todo):,} (of {n_due:,} due / {len(npis):,} unique NPIs; " f"limit={args.limit or 'all'})") fetched = 0 t0 = time.time() for i, npi in enumerate(todo, 1): res = fetch_nppes(npi) if res: # only cache successful lookups (transient failures retry later) res["npi"] = npi res["fetched_at"] = today.isoformat() cache[npi] = res fetched += 1 if i % 500 == 0: rate = i / max(1e-6, time.time() - t0) log(f" fetched {i:,}/{len(todo):,} ({rate:.1f}/s) cached={fetched:,}") if fetched and fetched % args.flush_every == 0: write_cache(args.cache, cache) if sleep: time.sleep(sleep) write_cache(args.cache, cache) log(f"fetched {fetched:,} new/refreshed; cache now {len(cache):,} entries") # Join cache -> rows, computing years_stale at render time (today-relative). out_cols = in_cols + [c for c in ADDED_COLS if c not in in_cols] n_stale2 = n_deact = n_unknown = 0 for r in rows: npi = (r.get("npi") or "").strip() ent = cache.get(npi, {}) lu = (ent.get("nppes_last_updated") or "").strip() r["nppes_last_updated"] = lu r["nppes_enumeration"] = (ent.get("nppes_enumeration") or "").strip() r["nppes_deactivated"] = (ent.get("nppes_deactivated") or "").strip() ys = years_stale(lu, today) if lu else "" r["nppes_years_stale"] = ys if r["nppes_deactivated"] == "Y": n_deact += 1 elif ys == "": n_unknown += 1 elif int(ys) >= 2: n_stale2 += 1 d = os.path.dirname(out) or "." fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp") with os.fdopen(fd, "w", newline="") as f: w = csv.DictWriter(f, fieldnames=out_cols, extrasaction="ignore") w.writeheader() w.writerows(rows) os.replace(tmp, out) log(f"wrote {out} (+{len([c for c in ADDED_COLS if c not in in_cols])} cols)") log(f" stale>=2yrs={n_stale2:,} deactivated={n_deact:,} " f"unknown(no cache yet)={n_unknown:,} of {len(rows):,}") return 0 if __name__ == "__main__": sys.exit(main())