#!/usr/bin/env python3 """Build NPI outreach lists from free public CMS data. Joins NPPES endpoint emails to the CMS Medicare revalidation-due list (and flags OIG LEIE exclusions + opt-out expirations) to produce ready-to-send outreach segments. No paid email-append vendor required. Two email channels are produced: 1. cold_emailable — normal inboxes (consumer webmail + practice domains) you can email TODAY from a standard MTA. 2. direct_secure — DirectTrust / HISP addresses (e.g. *.direct.*) that route only inside the DirectTrust network. Hold these until you sign up for DirectTrust, then reach them via Direct Secure Messaging (high-trust, spam-resistant). Inputs (download first; see docs/new-sector-compliance-targets.md sec 7-8): endpoint.csv NPPES endpoint_pfile (NPI -> endpoint email) revalidation_due.csv CMS Revalidation Due Date List leie.csv OIG LEIE exclusions (optional cross-flag) optout.csv Medicare opt-out affidavits (optional cross-flag) Output: CSVs under ./out/ plus a summary to stdout. Usage: python3 scripts/build_npi_outreach_lists.py --data-dir /tmp/npi_companion """ from __future__ import annotations import argparse import csv import datetime import os import re import sys from collections import defaultdict EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") # Domains that are DirectTrust/HISP secure-messaging gateways, not normal inboxes. # Heuristic: contains a 'direct' token, a known HISP marker, or *.direct-ci.com etc. DIRECT_MARKERS = ( "direct.", ".direct", "direct-", "-direct", "directaddress", "hisp", "direct-ci.com", "directplus", "ehrdirect", "mayoclinicmsg", "allscriptsdirect", "eclinicaldirect", "surescripts", ) # Common real inboxes a clinician would actually read. CONSUMER_WEBMAIL = { "gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "aol.com", "icloud.com", "comcast.net", "att.net", "sbcglobal.net", "me.com", "ymail.com", "live.com", "msn.com", "protonmail.com", "verizon.net", } def is_direct_secure(domain: str) -> bool: d = domain.lower() return any(m in d for m in DIRECT_MARKERS) def csv_open(path: str): # CMS files are latin-1; NPPES is utf-8 but latin-1 reads it safely too. return open(path, newline="", encoding="latin-1") def load_endpoint_emails(path: str): """NPI -> list of (email, channel). channel in {cold, direct}.""" npi_emails: dict[str, list[tuple[str, str]]] = defaultdict(list) seen: set[tuple[str, str]] = set() stats = defaultdict(int) with csv_open(path) as f: r = csv.reader(f) next(r, None) for row in r: if len(row) < 4: continue npi = row[0].strip().strip('"') ep = row[3].strip().strip('"') if not npi or not EMAIL_RE.match(ep): continue ep_l = ep.lower() domain = ep_l.split("@")[-1] channel = "direct" if is_direct_secure(domain) else "cold" key = (npi, ep_l) if key in seen: continue seen.add(key) npi_emails[npi].append((ep, channel)) stats[channel] += 1 if channel == "cold" and domain in CONSUMER_WEBMAIL: stats["cold_consumer"] += 1 return npi_emails, stats def load_revalidation(path: str): """NPI -> (due_date_str, days_overdue|None, name, specialty, state).""" today = datetime.date.today() out: dict[str, dict] = {} with csv_open(path) as f: r = csv.reader(f) next(r, None) for row in r: if len(row) < 11: continue npi = row[1].strip() if not npi: continue dd = (row[10].strip() or row[9].strip()) # adjusted else due if not dd or dd.upper() == "TBD": continue try: d = datetime.datetime.strptime(dd, "%m/%d/%Y").date() except ValueError: continue overdue = (today - d).days # positive = overdue name = f"{row[2].strip()} {row[3].strip()}".strip() or row[4].strip() out[npi] = { "due_date": dd, "days_overdue": overdue, "overdue": overdue > 0, "name": name, "specialty": row[8].strip(), "state": row[5].strip(), } return out def load_leie_npis(path: str) -> set[str]: npis = set() if not os.path.exists(path): return npis with csv_open(path) as f: r = csv.reader(f) next(r, None) for row in r: if len(row) < 8: continue npi = row[7].strip().strip('"') if npi and npi != "0000000000" and len(npi) == 10: npis.add(npi) return npis def load_optout(path: str): """NPI -> optout_end_date for those ending within 12 months.""" today = datetime.date.today() horizon = today + datetime.timedelta(days=365) out = {} if not os.path.exists(path): return out with csv_open(path) as f: r = csv.reader(f) next(r, None) for row in r: if len(row) < 6: continue npi = row[2].strip() end = row[5].strip() try: d = datetime.datetime.strptime(end, "%m/%d/%Y").date() except ValueError: continue if today <= d <= horizon: out[npi] = end return out def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--data-dir", default="/tmp/npi_companion") ap.add_argument("--out-dir", default=None) args = ap.parse_args() d = args.data_dir out_dir = args.out_dir or os.path.join(d, "out") os.makedirs(out_dir, exist_ok=True) print("Loading endpoint emails ...") npi_emails, estats = load_endpoint_emails(os.path.join(d, "endpoint.csv")) print(f" NPIs with email: {len(npi_emails):,} | cold: {estats['cold']:,} " f"(consumer webmail: {estats['cold_consumer']:,}) | direct: {estats['direct']:,}") print("Loading revalidation due dates ...") reval = load_revalidation(os.path.join(d, "revalidation_due.csv")) overdue = {n: v for n, v in reval.items() if v["overdue"]} upcoming = {n: v for n, v in reval.items() if not v["overdue"]} print(f" NPIs with concrete due date: {len(reval):,} | overdue: {len(overdue):,} | upcoming: {len(upcoming):,}") leie = load_leie_npis(os.path.join(d, "leie.csv")) optout = load_optout(os.path.join(d, "optout.csv")) print(f" LEIE w/ NPI: {len(leie):,} | opt-out ending <12mo: {len(optout):,}") # Build the joined outreach rows. cold_path = os.path.join(out_dir, "npi_overdue_cold_emailable.csv") direct_path = os.path.join(out_dir, "npi_overdue_direct_secure.csv") counts = defaultdict(int) def write_segment(path, channel): n = 0 with open(path, "w", newline="") as f: w = csv.writer(f) w.writerow(["npi", "email", "channel", "name", "specialty", "state", "due_date", "days_overdue", "leie_excluded", "optout_ending"]) for npi, info in overdue.items(): emails = [e for e in npi_emails.get(npi, []) if e[1] == channel] for email, ch in emails: w.writerow([ npi, email, ch, info["name"], info["specialty"], info["state"], info["due_date"], info["days_overdue"], "Y" if npi in leie else "", optout.get(npi, ""), ]) n += 1 return n counts["cold"] = write_segment(cold_path, "cold") counts["direct"] = write_segment(direct_path, "direct") # Broad segment: EVERY cold-emailable NPI (not just overdue), enriched with # whatever revalidation/exclusion/opt-out signal we have. This is the real # starting volume for the general compliance-bundle campaign. allcold_path = os.path.join(out_dir, "npi_all_cold_emailable.csv") with open(allcold_path, "w", newline="") as f: w = csv.writer(f) w.writerow(["npi", "email", "name", "specialty", "state", "reval_due_date", "days_overdue", "reval_status", "leie_excluded", "optout_ending"]) for npi, emails in npi_emails.items(): cold = [e for e, ch in emails if ch == "cold"] if not cold: continue info = reval.get(npi, {}) if info: status = "overdue" if info.get("overdue") else "upcoming" else: status = "no_reval_flag" for email in cold: w.writerow([ npi, email, info.get("name", ""), info.get("specialty", ""), info.get("state", ""), info.get("due_date", ""), info.get("days_overdue", ""), status, "Y" if npi in leie else "", optout.get(npi, ""), ]) counts["all_cold"] += 1 print("\n=== OUTPUT ===") print(f"ALL cold-emailable NPIs (broad bundle list): {counts['all_cold']:,} rows -> {allcold_path}") print(f"Cold-emailable overdue-revalidation list: {counts['cold']:,} rows -> {cold_path}") print(f"Direct-secure (DirectTrust later) list: {counts['direct']:,} rows -> {direct_path}") print("\nNext steps:") print(" - MX/SMTP-verify the cold list (port 25 + MX confirmed available).") print(" - Send revalidation campaign to verified cold emails now.") print(" - Park the direct-secure list until DirectTrust signup, then send via HISP.") return 0 if __name__ == "__main__": sys.exit(main())