diff --git a/scripts/harvest_nppes_mailable.py b/scripts/harvest_nppes_mailable.py new file mode 100644 index 0000000..2a5b83a --- /dev/null +++ b/scripts/harvest_nppes_mailable.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +"""Harvest the cold-mailable NPPES endpoint inboxes from the endpoint_pfile. + +Reads the NPPES endpoint_pfile, classifies each endpoint email with the shared +healthcare_email_streams.classify (so it stays consistent with the warmup +import), and writes only the COLD-MAILABLE streams (institutional + consumer) +to a CSV. Direct/HISP and invalid endpoints are dropped (they can't be +cold-emailed from a normal MTA). Reports the universe sizes. + +Usage: + python3 scripts/harvest_nppes_mailable.py ENDPOINT_PFILE.csv OUT.csv +""" +import csv +import sys +from collections import defaultdict + +sys.path.insert(0, "/opt/performancewest/scripts") +sys.path.insert(0, "scripts") +from healthcare_email_streams import classify # noqa: E402 + +ENDPOINT_TYPE_COL = 1 +NPI_COL = 0 +EMAIL_COL = 3 + +# Direct Secure Messaging (HISP) domains are NOT cold-mailable from a normal MTA +# -- they route only inside DirectTrust and will fail/bounce. The stream +# classifier's "institutional" bucket leaks these (e.g. upmcdirect.com, +# *.providencedirect.org, *shdirect.org, epicdirect.promedica.org), so we filter +# them out here by the unmistakable HISP domain patterns. +_HISP_MARKERS = ("direct", "hisp", "secure", "directtrust") + + +def is_hisp_domain(domain: str) -> bool: + d = domain.lower() + # Any domain whose label contains a Direct/HISP marker word, or a known + # *.org/.com Direct gateway shape. "direct" as a substring catches the vast + # majority (xdirect.org, directX.com, *.providencedirect.org, etc.). + return any(m in d for m in _HISP_MARKERS) + + +# Common real consumer-inbox providers -- always genuinely mailable. +_CONSUMER_DOMAINS = { + "gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "aol.com", + "icloud.com", "msn.com", "live.com", "comcast.net", "att.net", + "sbcglobal.net", "verizon.net", "me.com", "ymail.com", "protonmail.com", +} + + +def main(): + src = sys.argv[1] + out = sys.argv[2] if len(sys.argv) > 2 else "nppes_mailable.csv" + + stats = defaultdict(int) + domains = defaultdict(int) + seen = set() # (npi, email) dedupe + mailable = [] # (npi, email, stream) + + with open(src, newline="", encoding="latin-1") as f: + r = csv.reader(f) + next(r, None) # header + for row in r: + if len(row) <= EMAIL_COL: + continue + npi = row[NPI_COL].strip().strip('"') + ep = row[EMAIL_COL].strip().strip('"') + if not npi or not ep: + continue + stream = classify(ep) + stats[stream] += 1 + if stream in ("institutional", "consumer"): + dom = ep.rsplit("@", 1)[-1].lower() + # Drop Direct/HISP gateways that leak into 'institutional'. + if is_hisp_domain(dom): + stats["hisp_filtered"] += 1 + continue + key = (npi, ep.lower()) + if key in seen: + continue + seen.add(key) + mailable.append((npi, ep, stream)) + domains[dom] += 1 + + with open(out, "w", newline="") as f: + w = csv.writer(f) + w.writerow(["npi", "email", "stream"]) + w.writerows(mailable) + + print("=== NPPES endpoint classification ===") + for k in sorted(stats, key=lambda k: -stats[k]): + print(f" {k:14} {stats[k]:>8,}") + print() + print(f"COLD-MAILABLE (institutional+consumer), deduped: {len(mailable):,}") + print(f" -> wrote {out}") + print() + print("Top mailable domains:") + for dom, n in sorted(domains.items(), key=lambda x: -x[1])[:15]: + print(f" {dom:30} {n:>7,}") + + +if __name__ == "__main__": + main()