diff --git a/data/hc_campaigns/hc_nppes_outdated.html b/data/hc_campaigns/hc_nppes_outdated.html
index dd6d930..0dc6948 100644
--- a/data/hc_campaigns/hc_nppes_outdated.html
+++ b/data/hc_campaigns/hc_nppes_outdated.html
@@ -13,8 +13,8 @@
|
Hi {{ .Subscriber.Name }},
- Outdated NPPES data can hold up your payments
- CMS requires every provider to keep their NPPES registry record current and to periodically attest to it. Records for {{ .Subscriber.Attribs.practice }} (NPI {{ .Subscriber.Attribs.npi }}) and most practices drift out of date over time — it is worth a 60-second check.
+ Your NPPES record hasn’t been updated since {{ .Subscriber.Attribs.nppes_last_updated }}
+ The public NPPES NPI Registry shows the record for {{ .Subscriber.Attribs.practice }} (NPI {{ .Subscriber.Attribs.npi }}) was last updated on {{ .Subscriber.Attribs.nppes_last_updated }} — roughly {{ .Subscriber.Attribs.nppes_years_stale }} years ago. CMS requires providers to keep their NPPES record current and to attest to it periodically, and a record that old has almost certainly drifted from your real address, taxonomy, or contacts. You can confirm the date yourself below — it’s exactly what payers and CMS see.
Why it matters
@@ -32,9 +32,9 @@
| NPI | {{ .Subscriber.Attribs.npi }} |
| Registered to | {{ .Subscriber.Attribs.practice }} |
- | Attestation | PERIODIC REVIEW REQUIRED |
+ | Last updated | {{ .Subscriber.Attribs.nppes_last_updated }} · ~{{ .Subscriber.Attribs.nppes_years_stale }} yrs ago |
- Source: NPPES NPI Registry (npiregistry.cms.hhs.gov). CMS requires periodic NPPES attestation — review your current record on the official registry below.
+ Source: NPPES NPI Registry (npiregistry.cms.hhs.gov), public “Last Updated” field for this NPI. CMS requires providers to keep NPPES current and attest periodically — review your record on the official registry below.
|
|
diff --git a/scripts/build_healthcare_campaigns.py b/scripts/build_healthcare_campaigns.py
index fb37d5c..666511d 100644
--- a/scripts/build_healthcare_campaigns.py
+++ b/scripts/build_healthcare_campaigns.py
@@ -86,7 +86,7 @@ SEGMENTS = {
"price": "$349",
"list_name": "HC Warmup - NPPES Update",
"campaign_name": "HC Warmup - NPPES Outdated",
- "selector": "institutional_verified",
+ "selector": "nppes_stale",
},
"oig_screening": {
"subject": "Are you screening for OIG / SAM exclusions?",
@@ -116,6 +116,9 @@ SAMPLE = {
"detail": "06/30/2024 (706 days overdue)",
"reval_due_date": "06/30/2024",
"days_overdue": "706",
+ "nppes_last_updated": "2012-02-08",
+ "nppes_years_stale": "14",
+ "nppes_enumeration": "2011-04-06",
}
@@ -138,6 +141,9 @@ def render(seg_key: str, *, test: bool = False) -> tuple[str, str]:
.replace("{{ .Subscriber.Attribs.detail }}", SAMPLE["detail"])
.replace("{{ .Subscriber.Attribs.reval_due_date }}", SAMPLE["reval_due_date"])
.replace("{{ .Subscriber.Attribs.days_overdue }}", SAMPLE["days_overdue"])
+ .replace("{{ .Subscriber.Attribs.nppes_last_updated }}", SAMPLE["nppes_last_updated"])
+ .replace("{{ .Subscriber.Attribs.nppes_years_stale }}", SAMPLE["nppes_years_stale"])
+ .replace("{{ .Subscriber.Attribs.nppes_enumeration }}", SAMPLE["nppes_enumeration"])
.replace("{{ UnsubscribeURL }}", f"{SITE}/unsubscribe?test=1"))
return s["subject"], html
diff --git a/scripts/build_healthcare_campaigns_cron.py b/scripts/build_healthcare_campaigns_cron.py
index 8ac3865..544ee27 100644
--- a/scripts/build_healthcare_campaigns_cron.py
+++ b/scripts/build_healthcare_campaigns_cron.py
@@ -98,6 +98,15 @@ WARMUP_OVERDUE_MAX = int(os.getenv("HC_OVERDUE_MAX", "90"))
WARMUP_DUE_SOON_MIN = int(os.getenv("HC_DUE_SOON_MIN", "1"))
WARMUP_DUE_SOON_MAX = int(os.getenv("HC_DUE_SOON_MAX", "90"))
+# NPPES "out of date" segment: only mail records whose REAL NPPES last_updated
+# date is at least this many whole years ago. This is what makes the "your NPPES
+# record may be out of date" claim LITERALLY TRUE and verifiable -- the provider
+# can confirm the exact same last_updated date on the public registry. The date
+# is joined in by enrich_nppes_last_updated.py (column nppes_years_stale). Until
+# that enrichment has run, the field is empty and this segment safely mails
+# nobody (we never assert "out of date" without the government date to back it).
+NPPES_STALE_MIN_YEARS = int(os.getenv("HC_NPPES_STALE_MIN_YEARS", "3"))
+
def _overdue_days(r: dict):
v = (r.get("days_overdue") or "").strip()
@@ -337,6 +346,21 @@ def row_matches(seg_key: str, r: dict) -> bool:
# (a strong deactivation proxy once revalidation lapses).
return excluded or status in ("not_on_list", "no_reval_flag")
if sel == "optout_ending": return optout
+ if sel == "nppes_stale":
+ # NPPES "out of date" segment. Only mail records whose REAL NPPES
+ # last_updated date (joined by enrich_nppes_last_updated.py) is at least
+ # NPPES_STALE_MIN_YEARS whole years old, so the "may be out of date"
+ # claim is literally true and the provider can verify the same date on
+ # the public registry. Deactivated NPIs belong to npi_reactivation, not
+ # here, so they're excluded. Empty field (enrichment not yet run) -> no
+ # match, so we never assert staleness without the government date.
+ if (r.get("nppes_deactivated") or "").strip().upper() == "Y":
+ return False
+ ys = (r.get("nppes_years_stale") or "").strip()
+ try:
+ return int(ys) >= NPPES_STALE_MIN_YEARS
+ except ValueError:
+ return False
if sel == "any":
# OIG screening applies to any billing practice, but for warmup we still
# exclude the likely-undeliverable: providers heavily overdue (stale) or
@@ -388,6 +412,13 @@ def attribs_for(r: dict) -> dict:
"days_until": days_until,
# MX operator (for per-operator analysis + throttling audit).
"mx_provider": r.get("mx_provider", ""),
+ # Real NPPES freshness (from enrich_nppes_last_updated.py). Lets the
+ # "NPPES may be out of date" email cite the actual government date the
+ # provider can verify on the public registry, instead of an unbacked
+ # "FLAGGED OUT OF DATE" claim.
+ "nppes_last_updated": r.get("nppes_last_updated", ""),
+ "nppes_years_stale": r.get("nppes_years_stale", ""),
+ "nppes_enumeration": r.get("nppes_enumeration", ""),
"detail": (f"{r.get('reval_due_date','')} ({r.get('days_overdue','')} days overdue)"
if r.get("reval_status") == "overdue" else r.get("reval_due_date", "")),
}
diff --git a/scripts/enrich_nppes_last_updated.py b/scripts/enrich_nppes_last_updated.py
new file mode 100644
index 0000000..7129829
--- /dev/null
+++ b/scripts/enrich_nppes_last_updated.py
@@ -0,0 +1,237 @@
+#!/usr/bin/env python3
+"""Enrich the institutional list with each NPI's REAL NPPES `last_updated` date.
+
+The NPPES NPI Registry is fully public and every record carries a
+`basic.last_updated` date (the last time the provider updated their record) plus
+`basic.enumeration_date` (when the NPI was first issued). This script joins that
+real, government-sourced date onto our institutional email list so the
+"NPPES record may be out of date" campaign can state a LITERAL, verifiable fact
+("NPPES shows your record was last updated on 2012-02-08, ~14 years ago") instead
+of an unsubstantiated "FLAGGED OUT OF DATE" claim. The provider can confirm the
+exact same date on the official registry, which is what makes the pitch credible
+and FTC/defamation-safe.
+
+Why a cache: NPPES has NO batch-by-NPI endpoint -- it is one HTTP request per
+NPI (~63k). The `last_updated` date changes rarely and we only care at
+year-granularity, so we persist results in a sidecar cache keyed by NPI and only
+re-fetch entries that are missing or older than --max-age days. The first fill is
+a one-time slow crawl; every refresh after that is near-instant.
+
+Columns ADDED to the output CSV:
+ nppes_last_updated ISO date string from basic.last_updated (e.g. 2012-02-08)
+ nppes_enumeration ISO date from basic.enumeration_date (NPI issued date)
+ nppes_years_stale whole years since last_updated (e.g. 14)
+ nppes_deactivated "Y" if the NPI returns no active record (reactivation cue)
+
+Usage:
+ # In-place enrich (adds/refreshes the columns on the institutional file):
+ python3 scripts/enrich_nppes_last_updated.py data/hc_nppes_institutional_enriched.csv
+
+ # Explicit in/out + options:
+ python3 scripts/enrich_nppes_last_updated.py IN.csv -o OUT.csv \
+ --cache data/nppes_last_updated_cache.csv --max-age 30 --rps 10 --limit 0
+"""
+from __future__ import annotations
+import argparse
+import csv
+import datetime
+import json
+import os
+import sys
+import tempfile
+import time
+import urllib.error
+import urllib.parse
+import urllib.request
+
+csv.field_size_limit(10_000_000)
+
+NPPES_API = "https://npiregistry.cms.hhs.gov/api/"
+UA = "PerformanceWest-NPPESFreshness/1.0 (compliance@performancewest.net)"
+
+# Columns this enricher OWNS on the output CSV.
+ADDED_COLS = ["nppes_last_updated", "nppes_enumeration",
+ "nppes_years_stale", "nppes_deactivated"]
+# Cache schema (sidecar, keyed by npi).
+CACHE_COLS = ["npi", "nppes_last_updated", "nppes_enumeration",
+ "nppes_deactivated", "fetched_at"]
+
+
+def log(*a):
+ ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds")
+ print(f"[nppes-freshness {ts}]", *a, file=sys.stderr, flush=True)
+
+
+def _parse_iso(s: str):
+ s = (s or "").strip()
+ for fmt in ("%Y-%m-%d", "%m/%d/%Y"):
+ try:
+ return datetime.datetime.strptime(s, fmt).date()
+ except ValueError:
+ continue
+ return None
+
+
+def years_stale(last_updated: str, today: datetime.date) -> str:
+ d = _parse_iso(last_updated)
+ if d is None:
+ return ""
+ return str(max(0, (today - d).days // 365))
+
+
+def fetch_nppes(npi: str, timeout: int = 20, retries: int = 2) -> dict:
+ """Fetch one NPI from the public NPPES registry API.
+
+ Returns {last_updated, enumeration, deactivated}. A record that returns no
+ result is treated as deactivated (the registry only returns active NPIs)."""
+ q = urllib.parse.urlencode({"version": "2.1", "number": npi})
+ url = f"{NPPES_API}?{q}"
+ last_err = None
+ for attempt in range(retries + 1):
+ try:
+ req = urllib.request.Request(
+ url, headers={"Accept": "application/json", "User-Agent": UA})
+ with urllib.request.urlopen(req, timeout=timeout) as r:
+ data = json.loads(r.read().decode())
+ results = data.get("results") or []
+ if not results:
+ return {"nppes_last_updated": "", "nppes_enumeration": "",
+ "nppes_deactivated": "Y"}
+ basic = results[0].get("basic", {}) or {}
+ return {
+ "nppes_last_updated": (basic.get("last_updated") or "").strip(),
+ "nppes_enumeration": (basic.get("enumeration_date") or "").strip(),
+ "nppes_deactivated": "",
+ }
+ except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, json.JSONDecodeError) as e:
+ last_err = e
+ if attempt < retries:
+ time.sleep(0.5 * (attempt + 1)) # linear backoff
+ log(f" fetch failed for {npi}: {last_err}")
+ return {} # transient failure: leave uncached so a later run retries
+
+
+def load_cache(path: str) -> dict[str, dict]:
+ cache: dict[str, dict] = {}
+ if not path or not os.path.exists(path):
+ return cache
+ with open(path, newline="") as f:
+ for row in csv.DictReader(f):
+ npi = (row.get("npi") or "").strip()
+ if npi:
+ cache[npi] = row
+ return cache
+
+
+def write_cache(path: str, cache: dict[str, dict]):
+ if not path:
+ return
+ d = os.path.dirname(path) or "."
+ fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp")
+ with os.fdopen(fd, "w", newline="") as f:
+ w = csv.DictWriter(f, fieldnames=CACHE_COLS, extrasaction="ignore")
+ w.writeheader()
+ for npi in sorted(cache):
+ w.writerow(cache[npi])
+ os.replace(tmp, path)
+
+
+def is_fresh(entry: dict, today: datetime.date, max_age_days: int) -> bool:
+ """A cache entry is fresh if it was fetched within max_age_days."""
+ if not entry:
+ return False
+ fa = _parse_iso(entry.get("fetched_at", ""))
+ if fa is None:
+ return False
+ return (today - fa).days <= max_age_days
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser()
+ ap.add_argument("infile")
+ ap.add_argument("-o", "--out", help="output CSV (default: in-place)")
+ ap.add_argument("--cache", default="data/nppes_last_updated_cache.csv")
+ ap.add_argument("--max-age", type=int, default=30,
+ help="re-fetch cache entries older than N days (default 30)")
+ ap.add_argument("--rps", type=float, default=10.0,
+ help="max requests/sec to the NPPES API (default 10)")
+ ap.add_argument("--limit", type=int, default=0,
+ help="only fetch up to N new NPIs this run (0 = all; for resumable fills)")
+ ap.add_argument("--flush-every", type=int, default=200,
+ help="persist the cache to disk every N new fetches")
+ args = ap.parse_args()
+
+ out = args.out or args.infile
+ today = datetime.date.today()
+ sleep = 1.0 / args.rps if args.rps > 0 else 0.0
+
+ with open(args.infile, newline="") as f:
+ reader = csv.DictReader(f)
+ in_cols = list(reader.fieldnames or [])
+ rows = list(reader)
+ log(f"input={args.infile} rows={len(rows):,}")
+
+ npis = sorted({(r.get("npi") or "").strip() for r in rows if (r.get("npi") or "").strip()})
+ cache = load_cache(args.cache)
+ log(f"cache={args.cache} entries={len(cache):,}")
+
+ # Determine which NPIs need a (re)fetch.
+ todo = [n for n in npis if not is_fresh(cache.get(n, {}), today, args.max_age)]
+ if args.limit:
+ todo = todo[:args.limit]
+ log(f"to_fetch={len(todo):,} (of {len(npis):,} unique NPIs; limit={args.limit or 'all'})")
+
+ fetched = 0
+ t0 = time.time()
+ for i, npi in enumerate(todo, 1):
+ res = fetch_nppes(npi)
+ if res: # only cache successful lookups (transient failures retry later)
+ res["npi"] = npi
+ res["fetched_at"] = today.isoformat()
+ cache[npi] = res
+ fetched += 1
+ if i % 500 == 0:
+ rate = i / max(1e-6, time.time() - t0)
+ log(f" fetched {i:,}/{len(todo):,} ({rate:.1f}/s) cached={fetched:,}")
+ if fetched and fetched % args.flush_every == 0:
+ write_cache(args.cache, cache)
+ if sleep:
+ time.sleep(sleep)
+ write_cache(args.cache, cache)
+ log(f"fetched {fetched:,} new/refreshed; cache now {len(cache):,} entries")
+
+ # Join cache -> rows, computing years_stale at render time (today-relative).
+ out_cols = in_cols + [c for c in ADDED_COLS if c not in in_cols]
+ n_stale2 = n_deact = n_unknown = 0
+ for r in rows:
+ npi = (r.get("npi") or "").strip()
+ ent = cache.get(npi, {})
+ lu = (ent.get("nppes_last_updated") or "").strip()
+ r["nppes_last_updated"] = lu
+ r["nppes_enumeration"] = (ent.get("nppes_enumeration") or "").strip()
+ r["nppes_deactivated"] = (ent.get("nppes_deactivated") or "").strip()
+ ys = years_stale(lu, today) if lu else ""
+ r["nppes_years_stale"] = ys
+ if r["nppes_deactivated"] == "Y":
+ n_deact += 1
+ elif ys == "":
+ n_unknown += 1
+ elif int(ys) >= 2:
+ n_stale2 += 1
+
+ d = os.path.dirname(out) or "."
+ fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp")
+ with os.fdopen(fd, "w", newline="") as f:
+ w = csv.DictWriter(f, fieldnames=out_cols, extrasaction="ignore")
+ w.writeheader()
+ w.writerows(rows)
+ os.replace(tmp, out)
+
+ log(f"wrote {out} (+{len([c for c in ADDED_COLS if c not in in_cols])} cols)")
+ log(f" stale>=2yrs={n_stale2:,} deactivated={n_deact:,} "
+ f"unknown(no cache yet)={n_unknown:,} of {len(rows):,}")
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())