#!/usr/bin/env python3 """Build NPI outreach lists from free public CMS data. Joins NPPES endpoint emails to the CMS Medicare revalidation-due list (and flags OIG LEIE exclusions + opt-out expirations) to produce ready-to-send outreach segments. No paid email-append vendor required. Two email channels are produced: 1. cold_emailable — normal inboxes (consumer webmail + practice domains) you can email TODAY from a standard MTA. 2. direct_secure — DirectTrust / HISP addresses (e.g. *.direct.*) that route only inside the DirectTrust network. Hold these until you sign up for DirectTrust, then reach them via Direct Secure Messaging (high-trust, spam-resistant). Inputs (download first; see docs/new-sector-compliance-targets.md sec 7-8): endpoint.csv NPPES endpoint_pfile (NPI -> endpoint email) revalidation_due.csv CMS Revalidation Due Date List leie.csv OIG LEIE exclusions (optional cross-flag) optout.csv Medicare opt-out affidavits (optional cross-flag) Output: CSVs under ./out/ plus a summary to stdout. Usage: python3 scripts/build_npi_outreach_lists.py --data-dir /tmp/npi_companion """ from __future__ import annotations import argparse import csv import datetime import os import re import sys from collections import defaultdict sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from healthcare_email_streams import ( # noqa: E402 classify as classify_stream, EMAIL_RE as _STREAM_EMAIL_RE, ) EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") # Stream classification (institutional / consumer / direct) lives in the shared # scripts/healthcare_email_streams.py module so the list builder and any # campaign-import tooling can never drift. Imported above as classify_stream. def csv_open(path: str): # CMS files are latin-1; NPPES is utf-8 but latin-1 reads it safely too. return open(path, newline="", encoding="latin-1") def load_endpoint_emails(path: str): """NPI -> list of (email, stream). stream in {institutional, consumer, direct}. 'excluded'/'invalid' are dropped. Streams map to the dual-stream MTA design (see docs/healthcare-email-stream-plan.md): institutional -> HEALTHCARE HOT stream consumer -> trucking-discipline (low-cap) stream direct -> parked until DirectTrust """ npi_emails: dict[str, list[tuple[str, str]]] = defaultdict(list) seen: set[tuple[str, str]] = set() stats = defaultdict(int) with csv_open(path) as f: r = csv.reader(f) next(r, None) for row in r: if len(row) < 4: continue npi = row[0].strip().strip('"') ep = row[3].strip().strip('"') if not npi: continue stream = classify_stream(ep) if stream in ("invalid", "excluded"): stats[stream] += 1 continue ep_l = ep.lower() key = (npi, ep_l) if key in seen: continue seen.add(key) npi_emails[npi].append((ep, stream)) stats[stream] += 1 return npi_emails, stats def load_revalidation(path: str): """NPI -> (due_date_str, days_overdue|None, name, specialty, state). The revalidation list is optional enrichment; if absent, returns {} so the builder still produces the stream files from endpoint emails alone. """ out: dict[str, dict] = {} if not os.path.exists(path): return out today = datetime.date.today() with csv_open(path) as f: r = csv.reader(f) next(r, None) for row in r: if len(row) < 11: continue npi = row[1].strip() if not npi: continue dd = (row[10].strip() or row[9].strip()) # adjusted else due if not dd or dd.upper() == "TBD": continue try: d = datetime.datetime.strptime(dd, "%m/%d/%Y").date() except ValueError: continue overdue = (today - d).days # positive = overdue name = f"{row[2].strip()} {row[3].strip()}".strip() or row[4].strip() out[npi] = { "due_date": dd, "days_overdue": overdue, "overdue": overdue > 0, "name": name, "specialty": row[8].strip(), "state": row[5].strip(), } return out def load_leie_npis(path: str) -> set[str]: npis = set() if not os.path.exists(path): return npis with csv_open(path) as f: r = csv.reader(f) next(r, None) for row in r: if len(row) < 8: continue npi = row[7].strip().strip('"') if npi and npi != "0000000000" and len(npi) == 10: npis.add(npi) return npis def load_optout(path: str): """NPI -> optout_end_date for those ending within 12 months.""" today = datetime.date.today() horizon = today + datetime.timedelta(days=365) out = {} if not os.path.exists(path): return out with csv_open(path) as f: r = csv.reader(f) next(r, None) for row in r: if len(row) < 6: continue npi = row[2].strip() end = row[5].strip() try: d = datetime.datetime.strptime(end, "%m/%d/%Y").date() except ValueError: continue if today <= d <= horizon: out[npi] = end return out def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--data-dir", default="/tmp/npi_companion") ap.add_argument("--out-dir", default=None) args = ap.parse_args() d = args.data_dir out_dir = args.out_dir or os.path.join(d, "out") os.makedirs(out_dir, exist_ok=True) print("Loading endpoint emails ...") npi_emails, estats = load_endpoint_emails(os.path.join(d, "endpoint.csv")) print(f" NPIs with email: {len(npi_emails):,} | institutional: " f"{estats['institutional']:,} | consumer: {estats['consumer']:,} | " f"direct/HISP: {estats['direct']:,} " f"(dropped excluded={estats['excluded']:,}, invalid={estats['invalid']:,})") print("Loading revalidation due dates ...") reval = load_revalidation(os.path.join(d, "revalidation_due.csv")) overdue = {n: v for n, v in reval.items() if v["overdue"]} upcoming = {n: v for n, v in reval.items() if not v["overdue"]} print(f" NPIs with concrete due date: {len(reval):,} | overdue: {len(overdue):,} | upcoming: {len(upcoming):,}") leie = load_leie_npis(os.path.join(d, "leie.csv")) optout = load_optout(os.path.join(d, "optout.csv")) print(f" LEIE w/ NPI: {len(leie):,} | opt-out ending <12mo: {len(optout):,}") counts = defaultdict(int) HEADER = ["npi", "email", "stream", "name", "specialty", "state", "reval_due_date", "days_overdue", "reval_status", "leie_excluded", "optout_ending"] def row_for(npi, email, stream): info = reval.get(npi, {}) if info: status = "overdue" if info.get("overdue") else "upcoming" else: status = "no_reval_flag" return [ npi, email, stream, info.get("name", ""), info.get("specialty", ""), info.get("state", ""), info.get("due_date", ""), info.get("days_overdue", ""), status, "Y" if npi in leie else "", optout.get(npi, ""), ] # One file per outbound stream (the dual-stream MTA routing key). # institutional -> HEALTHCARE HOT stream (own IPs + 10k/day cap) # consumer -> trucking-discipline (low-cap) stream # direct -> parked until DirectTrust # Within each, rows are sorted overdue-first (highest intent = best send order). stream_files = { "institutional": os.path.join(out_dir, "npi_healthcare_institutional.csv"), "consumer": os.path.join(out_dir, "npi_healthcare_consumer.csv"), "direct": os.path.join(out_dir, "npi_direct_secure.csv"), } writers = {} handles = {} for stream, path in stream_files.items(): h = open(path, "w", newline="") handles[stream] = h w = csv.writer(h) w.writerow(HEADER) writers[stream] = w # Emit overdue NPIs first (sorted by days_overdue desc) for each stream, then # the remainder. Best-intent recipients land at the top of every file. def sort_key(npi): info = reval.get(npi, {}) return -(info.get("days_overdue", -10**9) if info else -10**9) for npi in sorted(npi_emails.keys(), key=sort_key): for email, stream in npi_emails[npi]: w = writers.get(stream) if w is None: continue w.writerow(row_for(npi, email, stream)) counts[stream] += 1 for h in handles.values(): h.close() print("\n=== OUTPUT (one file per outbound stream) ===") print(f"HEALTHCARE HOT (institutional practice domains): {counts['institutional']:,} rows " f"-> {stream_files['institutional']}") print(f"Consumer webmail (rides trucking trickle): {counts['consumer']:,} rows " f"-> {stream_files['consumer']}") print(f"Direct/HISP (parked until DirectTrust): {counts['direct']:,} rows " f"-> {stream_files['direct']}") print("\nNext steps:") print(" - Free MX + SMTP RCPT verify the INSTITUTIONAL file on a NON-sending IP") print(" (scripts/workers/email_verifier.py), keep only deliverable.") print(" - Import the verified institutional file into listmonk-hc; send via the") print(" HEALTHCARE HOT stream (port 2526 -> hc IPs, own 10k/day cap).") print(" - Feed the consumer file into the existing trucking-discipline stream.") print(" - Park the Direct/HISP file until DirectTrust signup.") return 0 if __name__ == "__main__": sys.exit(main())