#!/usr/bin/env python3 """Harvest the cold-mailable NPPES endpoint inboxes from the endpoint_pfile. Reads the NPPES endpoint_pfile, classifies each endpoint email with the shared healthcare_email_streams.classify (so it stays consistent with the warmup import), and writes only the COLD-MAILABLE streams (institutional + consumer) to a CSV. Direct/HISP and invalid endpoints are dropped (they can't be cold-emailed from a normal MTA). Reports the universe sizes. Usage: python3 scripts/harvest_nppes_mailable.py ENDPOINT_PFILE.csv OUT.csv """ import csv import sys from collections import defaultdict sys.path.insert(0, "/opt/performancewest/scripts") sys.path.insert(0, "scripts") from healthcare_email_streams import classify # noqa: E402 ENDPOINT_TYPE_COL = 1 NPI_COL = 0 EMAIL_COL = 3 # Direct Secure Messaging (HISP) domains are NOT cold-mailable from a normal MTA # -- they route only inside DirectTrust and will fail/bounce. The stream # classifier's "institutional" bucket leaks these (e.g. upmcdirect.com, # *.providencedirect.org, *shdirect.org, epicdirect.promedica.org), so we filter # them out here by the unmistakable HISP domain patterns. _HISP_MARKERS = ("direct", "hisp", "secure", "directtrust") def is_hisp_domain(domain: str) -> bool: d = domain.lower() # Any domain whose label contains a Direct/HISP marker word, or a known # *.org/.com Direct gateway shape. "direct" as a substring catches the vast # majority (xdirect.org, directX.com, *.providencedirect.org, etc.). return any(m in d for m in _HISP_MARKERS) # Common real consumer-inbox providers -- always genuinely mailable. _CONSUMER_DOMAINS = { "gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "aol.com", "icloud.com", "msn.com", "live.com", "comcast.net", "att.net", "sbcglobal.net", "verizon.net", "me.com", "ymail.com", "protonmail.com", } def main(): src = sys.argv[1] out = sys.argv[2] if len(sys.argv) > 2 else "nppes_mailable.csv" stats = defaultdict(int) domains = defaultdict(int) seen = set() # (npi, email) dedupe mailable = [] # (npi, email, stream) with open(src, newline="", encoding="latin-1") as f: r = csv.reader(f) next(r, None) # header for row in r: if len(row) <= EMAIL_COL: continue npi = row[NPI_COL].strip().strip('"') ep = row[EMAIL_COL].strip().strip('"') if not npi or not ep: continue stream = classify(ep) stats[stream] += 1 if stream in ("institutional", "consumer"): dom = ep.rsplit("@", 1)[-1].lower() # Drop Direct/HISP gateways that leak into 'institutional'. if is_hisp_domain(dom): stats["hisp_filtered"] += 1 continue key = (npi, ep.lower()) if key in seen: continue seen.add(key) mailable.append((npi, ep, stream)) domains[dom] += 1 with open(out, "w", newline="") as f: w = csv.writer(f) w.writerow(["npi", "email", "stream"]) w.writerows(mailable) print("=== NPPES endpoint classification ===") for k in sorted(stats, key=lambda k: -stats[k]): print(f" {k:14} {stats[k]:>8,}") print() print(f"COLD-MAILABLE (institutional+consumer), deduped: {len(mailable):,}") print(f" -> wrote {out}") print() print("Top mailable domains:") for dom, n in sorted(domains.items(), key=lambda x: -x[1])[:15]: print(f" {dom:30} {n:>7,}") if __name__ == "__main__": main()