From 9e155d214c05cc16947c1c8aa9c51a28412b26aa Mon Sep 17 00:00:00 2001 From: justin Date: Sat, 20 Jun 2026 15:21:15 -0500 Subject: [PATCH] healthcare: cite REAL NPPES last_updated date in 'outdated' email The NPPES 'may be out of date' email previously asserted staleness with no per-record evidence (softened earlier to a generic 'periodic review required'). NPPES is fully public and every record carries basic.last_updated, so we now cite the actual government date the provider can verify on the registry. - enrich_nppes_last_updated.py: joins real basic.last_updated / enumeration_date / deactivated onto the institutional list via a cached, resumable per-NPI crawl (no batch endpoint exists). Adds nppes_last_updated, nppes_enumeration, nppes_years_stale, nppes_deactivated. - cron: new 'nppes_stale' selector mails ONLY records >= 3yrs stale (env HC_NPPES_STALE_MIN_YEARS) and excludes deactivated NPIs; empty date => no match, so we never claim staleness without the government date to back it. - template: headline + official-record card now show the real last_updated date and ~N-years-ago, sourced to npiregistry.cms.hhs.gov. - attribs + test SAMPLE expose the new fields; verified render + plaintext. --- data/hc_campaigns/hc_nppes_outdated.html | 8 +- scripts/build_healthcare_campaigns.py | 8 +- scripts/build_healthcare_campaigns_cron.py | 31 +++ scripts/enrich_nppes_last_updated.py | 237 +++++++++++++++++++++ 4 files changed, 279 insertions(+), 5 deletions(-) create mode 100644 scripts/enrich_nppes_last_updated.py diff --git a/data/hc_campaigns/hc_nppes_outdated.html b/data/hc_campaigns/hc_nppes_outdated.html index dd6d930..0dc6948 100644 --- a/data/hc_campaigns/hc_nppes_outdated.html +++ b/data/hc_campaigns/hc_nppes_outdated.html @@ -13,8 +13,8 @@

Hi {{ .Subscriber.Name }},

-

Outdated NPPES data can hold up your payments

-

CMS requires every provider to keep their NPPES registry record current and to periodically attest to it. Records for {{ .Subscriber.Attribs.practice }} (NPI {{ .Subscriber.Attribs.npi }}) and most practices drift out of date over time — it is worth a 60-second check.

+

Your NPPES record hasn’t been updated since {{ .Subscriber.Attribs.nppes_last_updated }}

+

The public NPPES NPI Registry shows the record for {{ .Subscriber.Attribs.practice }} (NPI {{ .Subscriber.Attribs.npi }}) was last updated on {{ .Subscriber.Attribs.nppes_last_updated }} — roughly {{ .Subscriber.Attribs.nppes_years_stale }} years ago. CMS requires providers to keep their NPPES record current and to attest to it periodically, and a record that old has almost certainly drifted from your real address, taxonomy, or contacts. You can confirm the date yourself below — it’s exactly what payers and CMS see.

Why it matters

@@ -32,9 +32,9 @@ - +
NPI{{ .Subscriber.Attribs.npi }}
Registered to{{ .Subscriber.Attribs.practice }}
AttestationPERIODIC REVIEW REQUIRED
Last updated{{ .Subscriber.Attribs.nppes_last_updated }} · ~{{ .Subscriber.Attribs.nppes_years_stale }} yrs ago
-

Source: NPPES NPI Registry (npiregistry.cms.hhs.gov). CMS requires periodic NPPES attestation — review your current record on the official registry below.

+

Source: NPPES NPI Registry (npiregistry.cms.hhs.gov), public “Last Updated” field for this NPI. CMS requires providers to keep NPPES current and attest periodically — review your record on the official registry below.

diff --git a/scripts/build_healthcare_campaigns.py b/scripts/build_healthcare_campaigns.py index fb37d5c..666511d 100644 --- a/scripts/build_healthcare_campaigns.py +++ b/scripts/build_healthcare_campaigns.py @@ -86,7 +86,7 @@ SEGMENTS = { "price": "$349", "list_name": "HC Warmup - NPPES Update", "campaign_name": "HC Warmup - NPPES Outdated", - "selector": "institutional_verified", + "selector": "nppes_stale", }, "oig_screening": { "subject": "Are you screening for OIG / SAM exclusions?", @@ -116,6 +116,9 @@ SAMPLE = { "detail": "06/30/2024 (706 days overdue)", "reval_due_date": "06/30/2024", "days_overdue": "706", + "nppes_last_updated": "2012-02-08", + "nppes_years_stale": "14", + "nppes_enumeration": "2011-04-06", } @@ -138,6 +141,9 @@ def render(seg_key: str, *, test: bool = False) -> tuple[str, str]: .replace("{{ .Subscriber.Attribs.detail }}", SAMPLE["detail"]) .replace("{{ .Subscriber.Attribs.reval_due_date }}", SAMPLE["reval_due_date"]) .replace("{{ .Subscriber.Attribs.days_overdue }}", SAMPLE["days_overdue"]) + .replace("{{ .Subscriber.Attribs.nppes_last_updated }}", SAMPLE["nppes_last_updated"]) + .replace("{{ .Subscriber.Attribs.nppes_years_stale }}", SAMPLE["nppes_years_stale"]) + .replace("{{ .Subscriber.Attribs.nppes_enumeration }}", SAMPLE["nppes_enumeration"]) .replace("{{ UnsubscribeURL }}", f"{SITE}/unsubscribe?test=1")) return s["subject"], html diff --git a/scripts/build_healthcare_campaigns_cron.py b/scripts/build_healthcare_campaigns_cron.py index 8ac3865..544ee27 100644 --- a/scripts/build_healthcare_campaigns_cron.py +++ b/scripts/build_healthcare_campaigns_cron.py @@ -98,6 +98,15 @@ WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90")) WARMUP_DUE_SOON_MIN = int(os.getenv("HC_DUE_SOON_MIN", "1")) WARMUP_DUE_SOON_MAX = int(os.getenv("HC_DUE_SOON_MAX", "90")) +# NPPES "out of date" segment: only mail records whose REAL NPPES last_updated +# date is at least this many whole years ago. This is what makes the "your NPPES +# record may be out of date" claim LITERALLY TRUE and verifiable -- the provider +# can confirm the exact same last_updated date on the public registry. The date +# is joined in by enrich_nppes_last_updated.py (column nppes_years_stale). Until +# that enrichment has run, the field is empty and this segment safely mails +# nobody (we never assert "out of date" without the government date to back it). +NPPES_STALE_MIN_YEARS = int(os.getenv("HC_NPPES_STALE_MIN_YEARS", "3")) + def _overdue_days(r: dict): v = (r.get("days_overdue") or "").strip() @@ -337,6 +346,21 @@ def row_matches(seg_key: str, r: dict) -> bool: # (a strong deactivation proxy once revalidation lapses). return excluded or status in ("not_on_list", "no_reval_flag") if sel == "optout_ending": return optout + if sel == "nppes_stale": + # NPPES "out of date" segment. Only mail records whose REAL NPPES + # last_updated date (joined by enrich_nppes_last_updated.py) is at least + # NPPES_STALE_MIN_YEARS whole years old, so the "may be out of date" + # claim is literally true and the provider can verify the same date on + # the public registry. Deactivated NPIs belong to npi_reactivation, not + # here, so they're excluded. Empty field (enrichment not yet run) -> no + # match, so we never assert staleness without the government date. + if (r.get("nppes_deactivated") or "").strip().upper() == "Y": + return False + ys = (r.get("nppes_years_stale") or "").strip() + try: + return int(ys) >= NPPES_STALE_MIN_YEARS + except ValueError: + return False if sel == "any": # OIG screening applies to any billing practice, but for warmup we still # exclude the likely-undeliverable: providers heavily overdue (stale) or @@ -388,6 +412,13 @@ def attribs_for(r: dict) -> dict: "days_until": days_until, # MX operator (for per-operator analysis + throttling audit). "mx_provider": r.get("mx_provider", ""), + # Real NPPES freshness (from enrich_nppes_last_updated.py). Lets the + # "NPPES may be out of date" email cite the actual government date the + # provider can verify on the public registry, instead of an unbacked + # "FLAGGED OUT OF DATE" claim. + "nppes_last_updated": r.get("nppes_last_updated", ""), + "nppes_years_stale": r.get("nppes_years_stale", ""), + "nppes_enumeration": r.get("nppes_enumeration", ""), "detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)" if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")), } diff --git a/scripts/enrich_nppes_last_updated.py b/scripts/enrich_nppes_last_updated.py new file mode 100644 index 0000000..7129829 --- /dev/null +++ b/scripts/enrich_nppes_last_updated.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 +"""Enrich the institutional list with each NPI's REAL NPPES `last_updated` date. + +The NPPES NPI Registry is fully public and every record carries a +`basic.last_updated` date (the last time the provider updated their record) plus +`basic.enumeration_date` (when the NPI was first issued). This script joins that +real, government-sourced date onto our institutional email list so the +"NPPES record may be out of date" campaign can state a LITERAL, verifiable fact +("NPPES shows your record was last updated on 2012-02-08, ~14 years ago") instead +of an unsubstantiated "FLAGGED OUT OF DATE" claim. The provider can confirm the +exact same date on the official registry, which is what makes the pitch credible +and FTC/defamation-safe. + +Why a cache: NPPES has NO batch-by-NPI endpoint -- it is one HTTP request per +NPI (~63k). The `last_updated` date changes rarely and we only care at +year-granularity, so we persist results in a sidecar cache keyed by NPI and only +re-fetch entries that are missing or older than --max-age days. The first fill is +a one-time slow crawl; every refresh after that is near-instant. + +Columns ADDED to the output CSV: + nppes_last_updated ISO date string from basic.last_updated (e.g. 2012-02-08) + nppes_enumeration ISO date from basic.enumeration_date (NPI issued date) + nppes_years_stale whole years since last_updated (e.g. 14) + nppes_deactivated "Y" if the NPI returns no active record (reactivation cue) + +Usage: + # In-place enrich (adds/refreshes the columns on the institutional file): + python3 scripts/enrich_nppes_last_updated.py data/hc_nppes_institutional_enriched.csv + + # Explicit in/out + options: + python3 scripts/enrich_nppes_last_updated.py IN.csv -o OUT.csv \ + --cache data/nppes_last_updated_cache.csv --max-age 30 --rps 10 --limit 0 +""" +from __future__ import annotations +import argparse +import csv +import datetime +import json +import os +import sys +import tempfile +import time +import urllib.error +import urllib.parse +import urllib.request + +csv.field_size_limit(10_000_000) + +NPPES_API = "https://npiregistry.cms.hhs.gov/api/" +UA = "PerformanceWest-NPPESFreshness/1.0 (compliance@performancewest.net)" + +# Columns this enricher OWNS on the output CSV. +ADDED_COLS = ["nppes_last_updated", "nppes_enumeration", + "nppes_years_stale", "nppes_deactivated"] +# Cache schema (sidecar, keyed by npi). +CACHE_COLS = ["npi", "nppes_last_updated", "nppes_enumeration", + "nppes_deactivated", "fetched_at"] + + +def log(*a): + ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") + print(f"[nppes-freshness {ts}]", *a, file=sys.stderr, flush=True) + + +def _parse_iso(s: str): + s = (s or "").strip() + for fmt in ("%Y-%m-%d", "%m/%d/%Y"): + try: + return datetime.datetime.strptime(s, fmt).date() + except ValueError: + continue + return None + + +def years_stale(last_updated: str, today: datetime.date) -> str: + d = _parse_iso(last_updated) + if d is None: + return "" + return str(max(0, (today - d).days // 365)) + + +def fetch_nppes(npi: str, timeout: int = 20, retries: int = 2) -> dict: + """Fetch one NPI from the public NPPES registry API. + + Returns {last_updated, enumeration, deactivated}. A record that returns no + result is treated as deactivated (the registry only returns active NPIs).""" + q = urllib.parse.urlencode({"version": "2.1", "number": npi}) + url = f"{NPPES_API}?{q}" + last_err = None + for attempt in range(retries + 1): + try: + req = urllib.request.Request( + url, headers={"Accept": "application/json", "User-Agent": UA}) + with urllib.request.urlopen(req, timeout=timeout) as r: + data = json.loads(r.read().decode()) + results = data.get("results") or [] + if not results: + return {"nppes_last_updated": "", "nppes_enumeration": "", + "nppes_deactivated": "Y"} + basic = results[0].get("basic", {}) or {} + return { + "nppes_last_updated": (basic.get("last_updated") or "").strip(), + "nppes_enumeration": (basic.get("enumeration_date") or "").strip(), + "nppes_deactivated": "", + } + except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, json.JSONDecodeError) as e: + last_err = e + if attempt < retries: + time.sleep(0.5 * (attempt + 1)) # linear backoff + log(f" fetch failed for {npi}: {last_err}") + return {} # transient failure: leave uncached so a later run retries + + +def load_cache(path: str) -> dict[str, dict]: + cache: dict[str, dict] = {} + if not path or not os.path.exists(path): + return cache + with open(path, newline="") as f: + for row in csv.DictReader(f): + npi = (row.get("npi") or "").strip() + if npi: + cache[npi] = row + return cache + + +def write_cache(path: str, cache: dict[str, dict]): + if not path: + return + d = os.path.dirname(path) or "." + fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp") + with os.fdopen(fd, "w", newline="") as f: + w = csv.DictWriter(f, fieldnames=CACHE_COLS, extrasaction="ignore") + w.writeheader() + for npi in sorted(cache): + w.writerow(cache[npi]) + os.replace(tmp, path) + + +def is_fresh(entry: dict, today: datetime.date, max_age_days: int) -> bool: + """A cache entry is fresh if it was fetched within max_age_days.""" + if not entry: + return False + fa = _parse_iso(entry.get("fetched_at", "")) + if fa is None: + return False + return (today - fa).days <= max_age_days + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("infile") + ap.add_argument("-o", "--out", help="output CSV (default: in-place)") + ap.add_argument("--cache", default="data/nppes_last_updated_cache.csv") + ap.add_argument("--max-age", type=int, default=30, + help="re-fetch cache entries older than N days (default 30)") + ap.add_argument("--rps", type=float, default=10.0, + help="max requests/sec to the NPPES API (default 10)") + ap.add_argument("--limit", type=int, default=0, + help="only fetch up to N new NPIs this run (0 = all; for resumable fills)") + ap.add_argument("--flush-every", type=int, default=200, + help="persist the cache to disk every N new fetches") + args = ap.parse_args() + + out = args.out or args.infile + today = datetime.date.today() + sleep = 1.0 / args.rps if args.rps > 0 else 0.0 + + with open(args.infile, newline="") as f: + reader = csv.DictReader(f) + in_cols = list(reader.fieldnames or []) + rows = list(reader) + log(f"input={args.infile} rows={len(rows):,}") + + npis = sorted({(r.get("npi") or "").strip() for r in rows if (r.get("npi") or "").strip()}) + cache = load_cache(args.cache) + log(f"cache={args.cache} entries={len(cache):,}") + + # Determine which NPIs need a (re)fetch. + todo = [n for n in npis if not is_fresh(cache.get(n, {}), today, args.max_age)] + if args.limit: + todo = todo[:args.limit] + log(f"to_fetch={len(todo):,} (of {len(npis):,} unique NPIs; limit={args.limit or 'all'})") + + fetched = 0 + t0 = time.time() + for i, npi in enumerate(todo, 1): + res = fetch_nppes(npi) + if res: # only cache successful lookups (transient failures retry later) + res["npi"] = npi + res["fetched_at"] = today.isoformat() + cache[npi] = res + fetched += 1 + if i % 500 == 0: + rate = i / max(1e-6, time.time() - t0) + log(f" fetched {i:,}/{len(todo):,} ({rate:.1f}/s) cached={fetched:,}") + if fetched and fetched % args.flush_every == 0: + write_cache(args.cache, cache) + if sleep: + time.sleep(sleep) + write_cache(args.cache, cache) + log(f"fetched {fetched:,} new/refreshed; cache now {len(cache):,} entries") + + # Join cache -> rows, computing years_stale at render time (today-relative). + out_cols = in_cols + [c for c in ADDED_COLS if c not in in_cols] + n_stale2 = n_deact = n_unknown = 0 + for r in rows: + npi = (r.get("npi") or "").strip() + ent = cache.get(npi, {}) + lu = (ent.get("nppes_last_updated") or "").strip() + r["nppes_last_updated"] = lu + r["nppes_enumeration"] = (ent.get("nppes_enumeration") or "").strip() + r["nppes_deactivated"] = (ent.get("nppes_deactivated") or "").strip() + ys = years_stale(lu, today) if lu else "" + r["nppes_years_stale"] = ys + if r["nppes_deactivated"] == "Y": + n_deact += 1 + elif ys == "": + n_unknown += 1 + elif int(ys) >= 2: + n_stale2 += 1 + + d = os.path.dirname(out) or "." + fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp") + with os.fdopen(fd, "w", newline="") as f: + w = csv.DictWriter(f, fieldnames=out_cols, extrasaction="ignore") + w.writeheader() + w.writerows(rows) + os.replace(tmp, out) + + log(f"wrote {out} (+{len([c for c in ADDED_COLS if c not in in_cols])} cols)") + log(f" stale>=2yrs={n_stale2:,} deactivated={n_deact:,} " + f"unknown(no cache yet)={n_unknown:,} of {len(rows):,}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())