new-site/scripts/enrich_nppes_last_updated.py
justin 9e155d214c healthcare: cite REAL NPPES last_updated date in 'outdated' email
The NPPES 'may be out of date' email previously asserted staleness with no
per-record evidence (softened earlier to a generic 'periodic review required').
NPPES is fully public and every record carries basic.last_updated, so we now
cite the actual government date the provider can verify on the registry.

- enrich_nppes_last_updated.py: joins real basic.last_updated /
  enumeration_date / deactivated onto the institutional list via a cached,
  resumable per-NPI crawl (no batch endpoint exists). Adds nppes_last_updated,
  nppes_enumeration, nppes_years_stale, nppes_deactivated.
- cron: new 'nppes_stale' selector mails ONLY records >= 3yrs stale (env
  HC_NPPES_STALE_MIN_YEARS) and excludes deactivated NPIs; empty date => no
  match, so we never claim staleness without the government date to back it.
- template: headline + official-record card now show the real last_updated
  date and ~N-years-ago, sourced to npiregistry.cms.hhs.gov.
- attribs + test SAMPLE expose the new fields; verified render + plaintext.
2026-06-20 15:21:15 -05:00

237 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""Enrich the institutional list with each NPI's REAL NPPES `last_updated` date.
The NPPES NPI Registry is fully public and every record carries a
`basic.last_updated` date (the last time the provider updated their record) plus
`basic.enumeration_date` (when the NPI was first issued). This script joins that
real, government-sourced date onto our institutional email list so the
"NPPES record may be out of date" campaign can state a LITERAL, verifiable fact
("NPPES shows your record was last updated on 2012-02-08, ~14 years ago") instead
of an unsubstantiated "FLAGGED OUT OF DATE" claim. The provider can confirm the
exact same date on the official registry, which is what makes the pitch credible
and FTC/defamation-safe.
Why a cache: NPPES has NO batch-by-NPI endpoint -- it is one HTTP request per
NPI (~63k). The `last_updated` date changes rarely and we only care at
year-granularity, so we persist results in a sidecar cache keyed by NPI and only
re-fetch entries that are missing or older than --max-age days. The first fill is
a one-time slow crawl; every refresh after that is near-instant.
Columns ADDED to the output CSV:
nppes_last_updated ISO date string from basic.last_updated (e.g. 2012-02-08)
nppes_enumeration ISO date from basic.enumeration_date (NPI issued date)
nppes_years_stale whole years since last_updated (e.g. 14)
nppes_deactivated "Y" if the NPI returns no active record (reactivation cue)
Usage:
# In-place enrich (adds/refreshes the columns on the institutional file):
python3 scripts/enrich_nppes_last_updated.py data/hc_nppes_institutional_enriched.csv
# Explicit in/out + options:
python3 scripts/enrich_nppes_last_updated.py IN.csv -o OUT.csv \
--cache data/nppes_last_updated_cache.csv --max-age 30 --rps 10 --limit 0
"""
from __future__ import annotations
import argparse
import csv
import datetime
import json
import os
import sys
import tempfile
import time
import urllib.error
import urllib.parse
import urllib.request
csv.field_size_limit(10_000_000)
NPPES_API = "https://npiregistry.cms.hhs.gov/api/"
UA = "PerformanceWest-NPPESFreshness/1.0 (compliance@performancewest.net)"
# Columns this enricher OWNS on the output CSV.
ADDED_COLS = ["nppes_last_updated", "nppes_enumeration",
"nppes_years_stale", "nppes_deactivated"]
# Cache schema (sidecar, keyed by npi).
CACHE_COLS = ["npi", "nppes_last_updated", "nppes_enumeration",
"nppes_deactivated", "fetched_at"]
def log(*a):
ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds")
print(f"[nppes-freshness {ts}]", *a, file=sys.stderr, flush=True)
def _parse_iso(s: str):
s = (s or "").strip()
for fmt in ("%Y-%m-%d", "%m/%d/%Y"):
try:
return datetime.datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
def years_stale(last_updated: str, today: datetime.date) -> str:
d = _parse_iso(last_updated)
if d is None:
return ""
return str(max(0, (today - d).days // 365))
def fetch_nppes(npi: str, timeout: int = 20, retries: int = 2) -> dict:
"""Fetch one NPI from the public NPPES registry API.
Returns {last_updated, enumeration, deactivated}. A record that returns no
result is treated as deactivated (the registry only returns active NPIs)."""
q = urllib.parse.urlencode({"version": "2.1", "number": npi})
url = f"{NPPES_API}?{q}"
last_err = None
for attempt in range(retries + 1):
try:
req = urllib.request.Request(
url, headers={"Accept": "application/json", "User-Agent": UA})
with urllib.request.urlopen(req, timeout=timeout) as r:
data = json.loads(r.read().decode())
results = data.get("results") or []
if not results:
return {"nppes_last_updated": "", "nppes_enumeration": "",
"nppes_deactivated": "Y"}
basic = results[0].get("basic", {}) or {}
return {
"nppes_last_updated": (basic.get("last_updated") or "").strip(),
"nppes_enumeration": (basic.get("enumeration_date") or "").strip(),
"nppes_deactivated": "",
}
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, json.JSONDecodeError) as e:
last_err = e
if attempt < retries:
time.sleep(0.5 * (attempt + 1)) # linear backoff
log(f" fetch failed for {npi}: {last_err}")
return {} # transient failure: leave uncached so a later run retries
def load_cache(path: str) -> dict[str, dict]:
cache: dict[str, dict] = {}
if not path or not os.path.exists(path):
return cache
with open(path, newline="") as f:
for row in csv.DictReader(f):
npi = (row.get("npi") or "").strip()
if npi:
cache[npi] = row
return cache
def write_cache(path: str, cache: dict[str, dict]):
if not path:
return
d = os.path.dirname(path) or "."
fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp")
with os.fdopen(fd, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=CACHE_COLS, extrasaction="ignore")
w.writeheader()
for npi in sorted(cache):
w.writerow(cache[npi])
os.replace(tmp, path)
def is_fresh(entry: dict, today: datetime.date, max_age_days: int) -> bool:
"""A cache entry is fresh if it was fetched within max_age_days."""
if not entry:
return False
fa = _parse_iso(entry.get("fetched_at", ""))
if fa is None:
return False
return (today - fa).days <= max_age_days
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("infile")
ap.add_argument("-o", "--out", help="output CSV (default: in-place)")
ap.add_argument("--cache", default="data/nppes_last_updated_cache.csv")
ap.add_argument("--max-age", type=int, default=30,
help="re-fetch cache entries older than N days (default 30)")
ap.add_argument("--rps", type=float, default=10.0,
help="max requests/sec to the NPPES API (default 10)")
ap.add_argument("--limit", type=int, default=0,
help="only fetch up to N new NPIs this run (0 = all; for resumable fills)")
ap.add_argument("--flush-every", type=int, default=200,
help="persist the cache to disk every N new fetches")
args = ap.parse_args()
out = args.out or args.infile
today = datetime.date.today()
sleep = 1.0 / args.rps if args.rps > 0 else 0.0
with open(args.infile, newline="") as f:
reader = csv.DictReader(f)
in_cols = list(reader.fieldnames or [])
rows = list(reader)
log(f"input={args.infile} rows={len(rows):,}")
npis = sorted({(r.get("npi") or "").strip() for r in rows if (r.get("npi") or "").strip()})
cache = load_cache(args.cache)
log(f"cache={args.cache} entries={len(cache):,}")
# Determine which NPIs need a (re)fetch.
todo = [n for n in npis if not is_fresh(cache.get(n, {}), today, args.max_age)]
if args.limit:
todo = todo[:args.limit]
log(f"to_fetch={len(todo):,} (of {len(npis):,} unique NPIs; limit={args.limit or 'all'})")
fetched = 0
t0 = time.time()
for i, npi in enumerate(todo, 1):
res = fetch_nppes(npi)
if res: # only cache successful lookups (transient failures retry later)
res["npi"] = npi
res["fetched_at"] = today.isoformat()
cache[npi] = res
fetched += 1
if i % 500 == 0:
rate = i / max(1e-6, time.time() - t0)
log(f" fetched {i:,}/{len(todo):,} ({rate:.1f}/s) cached={fetched:,}")
if fetched and fetched % args.flush_every == 0:
write_cache(args.cache, cache)
if sleep:
time.sleep(sleep)
write_cache(args.cache, cache)
log(f"fetched {fetched:,} new/refreshed; cache now {len(cache):,} entries")
# Join cache -> rows, computing years_stale at render time (today-relative).
out_cols = in_cols + [c for c in ADDED_COLS if c not in in_cols]
n_stale2 = n_deact = n_unknown = 0
for r in rows:
npi = (r.get("npi") or "").strip()
ent = cache.get(npi, {})
lu = (ent.get("nppes_last_updated") or "").strip()
r["nppes_last_updated"] = lu
r["nppes_enumeration"] = (ent.get("nppes_enumeration") or "").strip()
r["nppes_deactivated"] = (ent.get("nppes_deactivated") or "").strip()
ys = years_stale(lu, today) if lu else ""
r["nppes_years_stale"] = ys
if r["nppes_deactivated"] == "Y":
n_deact += 1
elif ys == "":
n_unknown += 1
elif int(ys) >= 2:
n_stale2 += 1
d = os.path.dirname(out) or "."
fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp")
with os.fdopen(fd, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=out_cols, extrasaction="ignore")
w.writeheader()
w.writerows(rows)
os.replace(tmp, out)
log(f"wrote {out} (+{len([c for c in ADDED_COLS if c not in in_cols])} cols)")
log(f" stale>=2yrs={n_stale2:,} deactivated={n_deact:,} "
f"unknown(no cache yet)={n_unknown:,} of {len(rows):,}")
return 0
if __name__ == "__main__":
sys.exit(main())