- Add NPPES_STALE_MAX_YEARS (default 10): a record untouched for many years is a stronger signal the practice closed/moved, and a bounce burns the warming IP. Observed institutional distribution clusters 3-7yrs with ~0 beyond 8, so 10 is a safe ceiling that mails the whole real pool while excluding any outlier ancient record. MIN stays 3 (keeps the 'out of date' claim credible). - Restore the SMTP-verification gate (verify_ok) that the shared institutional_verified selector had -- the swap to nppes_stale dropped it; we only mail inboxes we already proved live. - enrich: process the re-fetch queue STALEST-FIRST so a bounded (--limit) or --max-age refresh spends its budget on the most-overdue cache entries (and new NPIs) first, never starving them behind merely-aging ones. - Selector unit-tested (10 cases incl. window edges, verify gate, deactivated).
243 lines
9.6 KiB
Python
243 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Enrich the institutional list with each NPI's REAL NPPES `last_updated` date.
|
|
|
|
The NPPES NPI Registry is fully public and every record carries a
|
|
`basic.last_updated` date (the last time the provider updated their record) plus
|
|
`basic.enumeration_date` (when the NPI was first issued). This script joins that
|
|
real, government-sourced date onto our institutional email list so the
|
|
"NPPES record may be out of date" campaign can state a LITERAL, verifiable fact
|
|
("NPPES shows your record was last updated on 2012-02-08, ~14 years ago") instead
|
|
of an unsubstantiated "FLAGGED OUT OF DATE" claim. The provider can confirm the
|
|
exact same date on the official registry, which is what makes the pitch credible
|
|
and FTC/defamation-safe.
|
|
|
|
Why a cache: NPPES has NO batch-by-NPI endpoint -- it is one HTTP request per
|
|
NPI (~63k). The `last_updated` date changes rarely and we only care at
|
|
year-granularity, so we persist results in a sidecar cache keyed by NPI and only
|
|
re-fetch entries that are missing or older than --max-age days. The first fill is
|
|
a one-time slow crawl; every refresh after that is near-instant.
|
|
|
|
Columns ADDED to the output CSV:
|
|
nppes_last_updated ISO date string from basic.last_updated (e.g. 2012-02-08)
|
|
nppes_enumeration ISO date from basic.enumeration_date (NPI issued date)
|
|
nppes_years_stale whole years since last_updated (e.g. 14)
|
|
nppes_deactivated "Y" if the NPI returns no active record (reactivation cue)
|
|
|
|
Usage:
|
|
# In-place enrich (adds/refreshes the columns on the institutional file):
|
|
python3 scripts/enrich_nppes_last_updated.py data/hc_nppes_institutional_enriched.csv
|
|
|
|
# Explicit in/out + options:
|
|
python3 scripts/enrich_nppes_last_updated.py IN.csv -o OUT.csv \
|
|
--cache data/nppes_last_updated_cache.csv --max-age 30 --rps 10 --limit 0
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse
|
|
import csv
|
|
import datetime
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
csv.field_size_limit(10_000_000)
|
|
|
|
NPPES_API = "https://npiregistry.cms.hhs.gov/api/"
|
|
UA = "PerformanceWest-NPPESFreshness/1.0 (compliance@performancewest.net)"
|
|
|
|
# Columns this enricher OWNS on the output CSV.
|
|
ADDED_COLS = ["nppes_last_updated", "nppes_enumeration",
|
|
"nppes_years_stale", "nppes_deactivated"]
|
|
# Cache schema (sidecar, keyed by npi).
|
|
CACHE_COLS = ["npi", "nppes_last_updated", "nppes_enumeration",
|
|
"nppes_deactivated", "fetched_at"]
|
|
|
|
|
|
def log(*a):
|
|
ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds")
|
|
print(f"[nppes-freshness {ts}]", *a, file=sys.stderr, flush=True)
|
|
|
|
|
|
def _parse_iso(s: str):
|
|
s = (s or "").strip()
|
|
for fmt in ("%Y-%m-%d", "%m/%d/%Y"):
|
|
try:
|
|
return datetime.datetime.strptime(s, fmt).date()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def years_stale(last_updated: str, today: datetime.date) -> str:
|
|
d = _parse_iso(last_updated)
|
|
if d is None:
|
|
return ""
|
|
return str(max(0, (today - d).days // 365))
|
|
|
|
|
|
def fetch_nppes(npi: str, timeout: int = 20, retries: int = 2) -> dict:
|
|
"""Fetch one NPI from the public NPPES registry API.
|
|
|
|
Returns {last_updated, enumeration, deactivated}. A record that returns no
|
|
result is treated as deactivated (the registry only returns active NPIs)."""
|
|
q = urllib.parse.urlencode({"version": "2.1", "number": npi})
|
|
url = f"{NPPES_API}?{q}"
|
|
last_err = None
|
|
for attempt in range(retries + 1):
|
|
try:
|
|
req = urllib.request.Request(
|
|
url, headers={"Accept": "application/json", "User-Agent": UA})
|
|
with urllib.request.urlopen(req, timeout=timeout) as r:
|
|
data = json.loads(r.read().decode())
|
|
results = data.get("results") or []
|
|
if not results:
|
|
return {"nppes_last_updated": "", "nppes_enumeration": "",
|
|
"nppes_deactivated": "Y"}
|
|
basic = results[0].get("basic", {}) or {}
|
|
return {
|
|
"nppes_last_updated": (basic.get("last_updated") or "").strip(),
|
|
"nppes_enumeration": (basic.get("enumeration_date") or "").strip(),
|
|
"nppes_deactivated": "",
|
|
}
|
|
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, json.JSONDecodeError) as e:
|
|
last_err = e
|
|
if attempt < retries:
|
|
time.sleep(0.5 * (attempt + 1)) # linear backoff
|
|
log(f" fetch failed for {npi}: {last_err}")
|
|
return {} # transient failure: leave uncached so a later run retries
|
|
|
|
|
|
def load_cache(path: str) -> dict[str, dict]:
|
|
cache: dict[str, dict] = {}
|
|
if not path or not os.path.exists(path):
|
|
return cache
|
|
with open(path, newline="") as f:
|
|
for row in csv.DictReader(f):
|
|
npi = (row.get("npi") or "").strip()
|
|
if npi:
|
|
cache[npi] = row
|
|
return cache
|
|
|
|
|
|
def write_cache(path: str, cache: dict[str, dict]):
|
|
if not path:
|
|
return
|
|
d = os.path.dirname(path) or "."
|
|
fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp")
|
|
with os.fdopen(fd, "w", newline="") as f:
|
|
w = csv.DictWriter(f, fieldnames=CACHE_COLS, extrasaction="ignore")
|
|
w.writeheader()
|
|
for npi in sorted(cache):
|
|
w.writerow(cache[npi])
|
|
os.replace(tmp, path)
|
|
|
|
|
|
def is_fresh(entry: dict, today: datetime.date, max_age_days: int) -> bool:
|
|
"""A cache entry is fresh if it was fetched within max_age_days."""
|
|
if not entry:
|
|
return False
|
|
fa = _parse_iso(entry.get("fetched_at", ""))
|
|
if fa is None:
|
|
return False
|
|
return (today - fa).days <= max_age_days
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("infile")
|
|
ap.add_argument("-o", "--out", help="output CSV (default: in-place)")
|
|
ap.add_argument("--cache", default="data/nppes_last_updated_cache.csv")
|
|
ap.add_argument("--max-age", type=int, default=30,
|
|
help="re-fetch cache entries older than N days (default 30)")
|
|
ap.add_argument("--rps", type=float, default=10.0,
|
|
help="max requests/sec to the NPPES API (default 10)")
|
|
ap.add_argument("--limit", type=int, default=0,
|
|
help="only fetch up to N new NPIs this run (0 = all; for resumable fills)")
|
|
ap.add_argument("--flush-every", type=int, default=200,
|
|
help="persist the cache to disk every N new fetches")
|
|
args = ap.parse_args()
|
|
|
|
out = args.out or args.infile
|
|
today = datetime.date.today()
|
|
sleep = 1.0 / args.rps if args.rps > 0 else 0.0
|
|
|
|
with open(args.infile, newline="") as f:
|
|
reader = csv.DictReader(f)
|
|
in_cols = list(reader.fieldnames or [])
|
|
rows = list(reader)
|
|
log(f"input={args.infile} rows={len(rows):,}")
|
|
|
|
npis = sorted({(r.get("npi") or "").strip() for r in rows if (r.get("npi") or "").strip()})
|
|
cache = load_cache(args.cache)
|
|
log(f"cache={args.cache} entries={len(cache):,}")
|
|
|
|
# Determine which NPIs need a (re)fetch, STALEST FIRST so a bounded run
|
|
# (--limit) always spends its budget on the most-overdue cache entries.
|
|
# Never-fetched entries have an empty fetched_at, which sorts first, so new
|
|
# NPIs are prioritized over merely-aging ones.
|
|
todo = [n for n in npis if not is_fresh(cache.get(n, {}), today, args.max_age)]
|
|
todo.sort(key=lambda n: cache.get(n, {}).get("fetched_at", "") or "")
|
|
n_due = len(todo)
|
|
if args.limit:
|
|
todo = todo[:args.limit]
|
|
log(f"to_fetch={len(todo):,} (of {n_due:,} due / {len(npis):,} unique NPIs; "
|
|
f"limit={args.limit or 'all'})")
|
|
|
|
fetched = 0
|
|
t0 = time.time()
|
|
for i, npi in enumerate(todo, 1):
|
|
res = fetch_nppes(npi)
|
|
if res: # only cache successful lookups (transient failures retry later)
|
|
res["npi"] = npi
|
|
res["fetched_at"] = today.isoformat()
|
|
cache[npi] = res
|
|
fetched += 1
|
|
if i % 500 == 0:
|
|
rate = i / max(1e-6, time.time() - t0)
|
|
log(f" fetched {i:,}/{len(todo):,} ({rate:.1f}/s) cached={fetched:,}")
|
|
if fetched and fetched % args.flush_every == 0:
|
|
write_cache(args.cache, cache)
|
|
if sleep:
|
|
time.sleep(sleep)
|
|
write_cache(args.cache, cache)
|
|
log(f"fetched {fetched:,} new/refreshed; cache now {len(cache):,} entries")
|
|
|
|
# Join cache -> rows, computing years_stale at render time (today-relative).
|
|
out_cols = in_cols + [c for c in ADDED_COLS if c not in in_cols]
|
|
n_stale2 = n_deact = n_unknown = 0
|
|
for r in rows:
|
|
npi = (r.get("npi") or "").strip()
|
|
ent = cache.get(npi, {})
|
|
lu = (ent.get("nppes_last_updated") or "").strip()
|
|
r["nppes_last_updated"] = lu
|
|
r["nppes_enumeration"] = (ent.get("nppes_enumeration") or "").strip()
|
|
r["nppes_deactivated"] = (ent.get("nppes_deactivated") or "").strip()
|
|
ys = years_stale(lu, today) if lu else ""
|
|
r["nppes_years_stale"] = ys
|
|
if r["nppes_deactivated"] == "Y":
|
|
n_deact += 1
|
|
elif ys == "":
|
|
n_unknown += 1
|
|
elif int(ys) >= 2:
|
|
n_stale2 += 1
|
|
|
|
d = os.path.dirname(out) or "."
|
|
fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp")
|
|
with os.fdopen(fd, "w", newline="") as f:
|
|
w = csv.DictWriter(f, fieldnames=out_cols, extrasaction="ignore")
|
|
w.writeheader()
|
|
w.writerows(rows)
|
|
os.replace(tmp, out)
|
|
|
|
log(f"wrote {out} (+{len([c for c in ADDED_COLS if c not in in_cols])} cols)")
|
|
log(f" stale>=2yrs={n_stale2:,} deactivated={n_deact:,} "
|
|
f"unknown(no cache yet)={n_unknown:,} of {len(rows):,}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|