Add scripts/healthcare_email_streams.py as the single source of truth for classifying NPPES-endpoint emails into institutional (HOT stream) / consumer (trucking-discipline stream) / direct (DirectTrust, parked), plus an exclude set for non-prospect giants (va.gov, *.mil, cvshealth, walgreens, walmart). Rework build_npi_outreach_lists.py to emit one CSV per stream (npi_healthcare_institutional/consumer + npi_direct_secure), overdue-first sorted, with companion files (revalidation/leie/optout) now optional. Verified on May 2026 NPPES endpoint_pfile: 89,557 institutional / 19,366 consumer / 242,441 direct rows.
271 lines
10 KiB
Python
271 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Build NPI outreach lists from free public CMS data.
|
|
|
|
Joins NPPES endpoint emails to the CMS Medicare revalidation-due list (and flags
|
|
OIG LEIE exclusions + opt-out expirations) to produce ready-to-send outreach
|
|
segments. No paid email-append vendor required.
|
|
|
|
Two email channels are produced:
|
|
1. cold_emailable — normal inboxes (consumer webmail + practice domains) you
|
|
can email TODAY from a standard MTA.
|
|
2. direct_secure — DirectTrust / HISP addresses (e.g. *.direct.*) that route
|
|
only inside the DirectTrust network. Hold these until you
|
|
sign up for DirectTrust, then reach them via Direct Secure
|
|
Messaging (high-trust, spam-resistant).
|
|
|
|
Inputs (download first; see docs/new-sector-compliance-targets.md sec 7-8):
|
|
endpoint.csv NPPES endpoint_pfile (NPI -> endpoint email)
|
|
revalidation_due.csv CMS Revalidation Due Date List
|
|
leie.csv OIG LEIE exclusions (optional cross-flag)
|
|
optout.csv Medicare opt-out affidavits (optional cross-flag)
|
|
|
|
Output: CSVs under ./out/ plus a summary to stdout.
|
|
|
|
Usage:
|
|
python3 scripts/build_npi_outreach_lists.py --data-dir /tmp/npi_companion
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import datetime
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from healthcare_email_streams import ( # noqa: E402
|
|
classify as classify_stream,
|
|
EMAIL_RE as _STREAM_EMAIL_RE,
|
|
)
|
|
|
|
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
|
|
|
|
# Stream classification (institutional / consumer / direct) lives in the shared
|
|
# scripts/healthcare_email_streams.py module so the list builder and any
|
|
# campaign-import tooling can never drift. Imported above as classify_stream.
|
|
|
|
|
|
def csv_open(path: str):
|
|
# CMS files are latin-1; NPPES is utf-8 but latin-1 reads it safely too.
|
|
return open(path, newline="", encoding="latin-1")
|
|
|
|
|
|
def load_endpoint_emails(path: str):
|
|
"""NPI -> list of (email, stream). stream in
|
|
{institutional, consumer, direct}. 'excluded'/'invalid' are dropped.
|
|
|
|
Streams map to the dual-stream MTA design (see
|
|
docs/healthcare-email-stream-plan.md):
|
|
institutional -> HEALTHCARE HOT stream
|
|
consumer -> trucking-discipline (low-cap) stream
|
|
direct -> parked until DirectTrust
|
|
"""
|
|
npi_emails: dict[str, list[tuple[str, str]]] = defaultdict(list)
|
|
seen: set[tuple[str, str]] = set()
|
|
stats = defaultdict(int)
|
|
with csv_open(path) as f:
|
|
r = csv.reader(f)
|
|
next(r, None)
|
|
for row in r:
|
|
if len(row) < 4:
|
|
continue
|
|
npi = row[0].strip().strip('"')
|
|
ep = row[3].strip().strip('"')
|
|
if not npi:
|
|
continue
|
|
stream = classify_stream(ep)
|
|
if stream in ("invalid", "excluded"):
|
|
stats[stream] += 1
|
|
continue
|
|
ep_l = ep.lower()
|
|
key = (npi, ep_l)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
npi_emails[npi].append((ep, stream))
|
|
stats[stream] += 1
|
|
return npi_emails, stats
|
|
|
|
|
|
def load_revalidation(path: str):
|
|
"""NPI -> (due_date_str, days_overdue|None, name, specialty, state).
|
|
|
|
The revalidation list is optional enrichment; if absent, returns {} so the
|
|
builder still produces the stream files from endpoint emails alone.
|
|
"""
|
|
out: dict[str, dict] = {}
|
|
if not os.path.exists(path):
|
|
return out
|
|
today = datetime.date.today()
|
|
with csv_open(path) as f:
|
|
r = csv.reader(f)
|
|
next(r, None)
|
|
for row in r:
|
|
if len(row) < 11:
|
|
continue
|
|
npi = row[1].strip()
|
|
if not npi:
|
|
continue
|
|
dd = (row[10].strip() or row[9].strip()) # adjusted else due
|
|
if not dd or dd.upper() == "TBD":
|
|
continue
|
|
try:
|
|
d = datetime.datetime.strptime(dd, "%m/%d/%Y").date()
|
|
except ValueError:
|
|
continue
|
|
overdue = (today - d).days # positive = overdue
|
|
name = f"{row[2].strip()} {row[3].strip()}".strip() or row[4].strip()
|
|
out[npi] = {
|
|
"due_date": dd,
|
|
"days_overdue": overdue,
|
|
"overdue": overdue > 0,
|
|
"name": name,
|
|
"specialty": row[8].strip(),
|
|
"state": row[5].strip(),
|
|
}
|
|
return out
|
|
|
|
|
|
def load_leie_npis(path: str) -> set[str]:
|
|
npis = set()
|
|
if not os.path.exists(path):
|
|
return npis
|
|
with csv_open(path) as f:
|
|
r = csv.reader(f)
|
|
next(r, None)
|
|
for row in r:
|
|
if len(row) < 8:
|
|
continue
|
|
npi = row[7].strip().strip('"')
|
|
if npi and npi != "0000000000" and len(npi) == 10:
|
|
npis.add(npi)
|
|
return npis
|
|
|
|
|
|
def load_optout(path: str):
|
|
"""NPI -> optout_end_date for those ending within 12 months."""
|
|
today = datetime.date.today()
|
|
horizon = today + datetime.timedelta(days=365)
|
|
out = {}
|
|
if not os.path.exists(path):
|
|
return out
|
|
with csv_open(path) as f:
|
|
r = csv.reader(f)
|
|
next(r, None)
|
|
for row in r:
|
|
if len(row) < 6:
|
|
continue
|
|
npi = row[2].strip()
|
|
end = row[5].strip()
|
|
try:
|
|
d = datetime.datetime.strptime(end, "%m/%d/%Y").date()
|
|
except ValueError:
|
|
continue
|
|
if today <= d <= horizon:
|
|
out[npi] = end
|
|
return out
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--data-dir", default="/tmp/npi_companion")
|
|
ap.add_argument("--out-dir", default=None)
|
|
args = ap.parse_args()
|
|
|
|
d = args.data_dir
|
|
out_dir = args.out_dir or os.path.join(d, "out")
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
print("Loading endpoint emails ...")
|
|
npi_emails, estats = load_endpoint_emails(os.path.join(d, "endpoint.csv"))
|
|
print(f" NPIs with email: {len(npi_emails):,} | institutional: "
|
|
f"{estats['institutional']:,} | consumer: {estats['consumer']:,} | "
|
|
f"direct/HISP: {estats['direct']:,} "
|
|
f"(dropped excluded={estats['excluded']:,}, invalid={estats['invalid']:,})")
|
|
|
|
print("Loading revalidation due dates ...")
|
|
reval = load_revalidation(os.path.join(d, "revalidation_due.csv"))
|
|
overdue = {n: v for n, v in reval.items() if v["overdue"]}
|
|
upcoming = {n: v for n, v in reval.items() if not v["overdue"]}
|
|
print(f" NPIs with concrete due date: {len(reval):,} | overdue: {len(overdue):,} | upcoming: {len(upcoming):,}")
|
|
|
|
leie = load_leie_npis(os.path.join(d, "leie.csv"))
|
|
optout = load_optout(os.path.join(d, "optout.csv"))
|
|
print(f" LEIE w/ NPI: {len(leie):,} | opt-out ending <12mo: {len(optout):,}")
|
|
|
|
counts = defaultdict(int)
|
|
HEADER = ["npi", "email", "stream", "name", "specialty", "state",
|
|
"reval_due_date", "days_overdue", "reval_status",
|
|
"leie_excluded", "optout_ending"]
|
|
|
|
def row_for(npi, email, stream):
|
|
info = reval.get(npi, {})
|
|
if info:
|
|
status = "overdue" if info.get("overdue") else "upcoming"
|
|
else:
|
|
status = "no_reval_flag"
|
|
return [
|
|
npi, email, stream, info.get("name", ""), info.get("specialty", ""),
|
|
info.get("state", ""), info.get("due_date", ""),
|
|
info.get("days_overdue", ""), status,
|
|
"Y" if npi in leie else "", optout.get(npi, ""),
|
|
]
|
|
|
|
# One file per outbound stream (the dual-stream MTA routing key).
|
|
# institutional -> HEALTHCARE HOT stream (own IPs + 10k/day cap)
|
|
# consumer -> trucking-discipline (low-cap) stream
|
|
# direct -> parked until DirectTrust
|
|
# Within each, rows are sorted overdue-first (highest intent = best send order).
|
|
stream_files = {
|
|
"institutional": os.path.join(out_dir, "npi_healthcare_institutional.csv"),
|
|
"consumer": os.path.join(out_dir, "npi_healthcare_consumer.csv"),
|
|
"direct": os.path.join(out_dir, "npi_direct_secure.csv"),
|
|
}
|
|
writers = {}
|
|
handles = {}
|
|
for stream, path in stream_files.items():
|
|
h = open(path, "w", newline="")
|
|
handles[stream] = h
|
|
w = csv.writer(h)
|
|
w.writerow(HEADER)
|
|
writers[stream] = w
|
|
|
|
# Emit overdue NPIs first (sorted by days_overdue desc) for each stream, then
|
|
# the remainder. Best-intent recipients land at the top of every file.
|
|
def sort_key(npi):
|
|
info = reval.get(npi, {})
|
|
return -(info.get("days_overdue", -10**9) if info else -10**9)
|
|
|
|
for npi in sorted(npi_emails.keys(), key=sort_key):
|
|
for email, stream in npi_emails[npi]:
|
|
w = writers.get(stream)
|
|
if w is None:
|
|
continue
|
|
w.writerow(row_for(npi, email, stream))
|
|
counts[stream] += 1
|
|
|
|
for h in handles.values():
|
|
h.close()
|
|
|
|
print("\n=== OUTPUT (one file per outbound stream) ===")
|
|
print(f"HEALTHCARE HOT (institutional practice domains): {counts['institutional']:,} rows "
|
|
f"-> {stream_files['institutional']}")
|
|
print(f"Consumer webmail (rides trucking trickle): {counts['consumer']:,} rows "
|
|
f"-> {stream_files['consumer']}")
|
|
print(f"Direct/HISP (parked until DirectTrust): {counts['direct']:,} rows "
|
|
f"-> {stream_files['direct']}")
|
|
print("\nNext steps:")
|
|
print(" - Free MX + SMTP RCPT verify the INSTITUTIONAL file on a NON-sending IP")
|
|
print(" (scripts/workers/email_verifier.py), keep only deliverable.")
|
|
print(" - Import the verified institutional file into listmonk-hc; send via the")
|
|
print(" HEALTHCARE HOT stream (port 2526 -> hc IPs, own 10k/day cap).")
|
|
print(" - Feed the consumer file into the existing trucking-discipline stream.")
|
|
print(" - Park the Direct/HISP file until DirectTrust signup.")
|
|
return 0
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|