From 289c3b91be3a69baeb371be66c2471c0a7be189a Mon Sep 17 00:00:00 2001 From: justin Date: Fri, 5 Jun 2026 18:59:44 -0500 Subject: [PATCH] feat(healthcare): split outreach list into 3 outbound streams Add scripts/healthcare_email_streams.py as the single source of truth for classifying NPPES-endpoint emails into institutional (HOT stream) / consumer (trucking-discipline stream) / direct (DirectTrust, parked), plus an exclude set for non-prospect giants (va.gov, *.mil, cvshealth, walgreens, walmart). Rework build_npi_outreach_lists.py to emit one CSV per stream (npi_healthcare_institutional/consumer + npi_direct_secure), overdue-first sorted, with companion files (revalidation/leie/optout) now optional. Verified on May 2026 NPPES endpoint_pfile: 89,557 institutional / 19,366 consumer / 242,441 direct rows. --- scripts/build_npi_outreach_lists.py | 182 +++++++++++++++------------- scripts/healthcare_email_streams.py | 88 ++++++++++++++ 2 files changed, 186 insertions(+), 84 deletions(-) create mode 100644 scripts/healthcare_email_streams.py diff --git a/scripts/build_npi_outreach_lists.py b/scripts/build_npi_outreach_lists.py index ba65026..d8a122b 100644 --- a/scripts/build_npi_outreach_lists.py +++ b/scripts/build_npi_outreach_lists.py @@ -34,27 +34,17 @@ import re import sys from collections import defaultdict -EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") - -# Domains that are DirectTrust/HISP secure-messaging gateways, not normal inboxes. -# Heuristic: contains a 'direct' token, a known HISP marker, or *.direct-ci.com etc. -DIRECT_MARKERS = ( - "direct.", ".direct", "direct-", "-direct", "directaddress", "hisp", - "direct-ci.com", "directplus", "ehrdirect", "mayoclinicmsg", - "allscriptsdirect", "eclinicaldirect", "surescripts", +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from healthcare_email_streams import ( # noqa: E402 + classify as classify_stream, + EMAIL_RE as _STREAM_EMAIL_RE, ) -# Common real inboxes a clinician would actually read. -CONSUMER_WEBMAIL = { - "gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "aol.com", - "icloud.com", "comcast.net", "att.net", "sbcglobal.net", "me.com", - "ymail.com", "live.com", "msn.com", "protonmail.com", "verizon.net", -} +EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") - -def is_direct_secure(domain: str) -> bool: - d = domain.lower() - return any(m in d for m in DIRECT_MARKERS) +# Stream classification (institutional / consumer / direct) lives in the shared +# scripts/healthcare_email_streams.py module so the list builder and any +# campaign-import tooling can never drift. Imported above as classify_stream. def csv_open(path: str): @@ -63,7 +53,15 @@ def csv_open(path: str): def load_endpoint_emails(path: str): - """NPI -> list of (email, channel). channel in {cold, direct}.""" + """NPI -> list of (email, stream). stream in + {institutional, consumer, direct}. 'excluded'/'invalid' are dropped. + + Streams map to the dual-stream MTA design (see + docs/healthcare-email-stream-plan.md): + institutional -> HEALTHCARE HOT stream + consumer -> trucking-discipline (low-cap) stream + direct -> parked until DirectTrust + """ npi_emails: dict[str, list[tuple[str, str]]] = defaultdict(list) seen: set[tuple[str, str]] = set() stats = defaultdict(int) @@ -75,26 +73,32 @@ def load_endpoint_emails(path: str): continue npi = row[0].strip().strip('"') ep = row[3].strip().strip('"') - if not npi or not EMAIL_RE.match(ep): + if not npi: + continue + stream = classify_stream(ep) + if stream in ("invalid", "excluded"): + stats[stream] += 1 continue ep_l = ep.lower() - domain = ep_l.split("@")[-1] - channel = "direct" if is_direct_secure(domain) else "cold" key = (npi, ep_l) if key in seen: continue seen.add(key) - npi_emails[npi].append((ep, channel)) - stats[channel] += 1 - if channel == "cold" and domain in CONSUMER_WEBMAIL: - stats["cold_consumer"] += 1 + npi_emails[npi].append((ep, stream)) + stats[stream] += 1 return npi_emails, stats def load_revalidation(path: str): - """NPI -> (due_date_str, days_overdue|None, name, specialty, state).""" - today = datetime.date.today() + """NPI -> (due_date_str, days_overdue|None, name, specialty, state). + + The revalidation list is optional enrichment; if absent, returns {} so the + builder still produces the stream files from endpoint emails alone. + """ out: dict[str, dict] = {} + if not os.path.exists(path): + return out + today = datetime.date.today() with csv_open(path) as f: r = csv.reader(f) next(r, None) @@ -176,8 +180,10 @@ def main() -> int: print("Loading endpoint emails ...") npi_emails, estats = load_endpoint_emails(os.path.join(d, "endpoint.csv")) - print(f" NPIs with email: {len(npi_emails):,} | cold: {estats['cold']:,} " - f"(consumer webmail: {estats['cold_consumer']:,}) | direct: {estats['direct']:,}") + print(f" NPIs with email: {len(npi_emails):,} | institutional: " + f"{estats['institutional']:,} | consumer: {estats['consumer']:,} | " + f"direct/HISP: {estats['direct']:,} " + f"(dropped excluded={estats['excluded']:,}, invalid={estats['invalid']:,})") print("Loading revalidation due dates ...") reval = load_revalidation(os.path.join(d, "revalidation_due.csv")) @@ -189,69 +195,77 @@ def main() -> int: optout = load_optout(os.path.join(d, "optout.csv")) print(f" LEIE w/ NPI: {len(leie):,} | opt-out ending <12mo: {len(optout):,}") - # Build the joined outreach rows. - cold_path = os.path.join(out_dir, "npi_overdue_cold_emailable.csv") - direct_path = os.path.join(out_dir, "npi_overdue_direct_secure.csv") counts = defaultdict(int) + HEADER = ["npi", "email", "stream", "name", "specialty", "state", + "reval_due_date", "days_overdue", "reval_status", + "leie_excluded", "optout_ending"] - def write_segment(path, channel): - n = 0 - with open(path, "w", newline="") as f: - w = csv.writer(f) - w.writerow(["npi", "email", "channel", "name", "specialty", "state", - "due_date", "days_overdue", "leie_excluded", "optout_ending"]) - for npi, info in overdue.items(): - emails = [e for e in npi_emails.get(npi, []) if e[1] == channel] - for email, ch in emails: - w.writerow([ - npi, email, ch, info["name"], info["specialty"], info["state"], - info["due_date"], info["days_overdue"], - "Y" if npi in leie else "", - optout.get(npi, ""), - ]) - n += 1 - return n + def row_for(npi, email, stream): + info = reval.get(npi, {}) + if info: + status = "overdue" if info.get("overdue") else "upcoming" + else: + status = "no_reval_flag" + return [ + npi, email, stream, info.get("name", ""), info.get("specialty", ""), + info.get("state", ""), info.get("due_date", ""), + info.get("days_overdue", ""), status, + "Y" if npi in leie else "", optout.get(npi, ""), + ] - counts["cold"] = write_segment(cold_path, "cold") - counts["direct"] = write_segment(direct_path, "direct") + # One file per outbound stream (the dual-stream MTA routing key). + # institutional -> HEALTHCARE HOT stream (own IPs + 10k/day cap) + # consumer -> trucking-discipline (low-cap) stream + # direct -> parked until DirectTrust + # Within each, rows are sorted overdue-first (highest intent = best send order). + stream_files = { + "institutional": os.path.join(out_dir, "npi_healthcare_institutional.csv"), + "consumer": os.path.join(out_dir, "npi_healthcare_consumer.csv"), + "direct": os.path.join(out_dir, "npi_direct_secure.csv"), + } + writers = {} + handles = {} + for stream, path in stream_files.items(): + h = open(path, "w", newline="") + handles[stream] = h + w = csv.writer(h) + w.writerow(HEADER) + writers[stream] = w - # Broad segment: EVERY cold-emailable NPI (not just overdue), enriched with - # whatever revalidation/exclusion/opt-out signal we have. This is the real - # starting volume for the general compliance-bundle campaign. - allcold_path = os.path.join(out_dir, "npi_all_cold_emailable.csv") - with open(allcold_path, "w", newline="") as f: - w = csv.writer(f) - w.writerow(["npi", "email", "name", "specialty", "state", - "reval_due_date", "days_overdue", "reval_status", - "leie_excluded", "optout_ending"]) - for npi, emails in npi_emails.items(): - cold = [e for e, ch in emails if ch == "cold"] - if not cold: + # Emit overdue NPIs first (sorted by days_overdue desc) for each stream, then + # the remainder. Best-intent recipients land at the top of every file. + def sort_key(npi): + info = reval.get(npi, {}) + return -(info.get("days_overdue", -10**9) if info else -10**9) + + for npi in sorted(npi_emails.keys(), key=sort_key): + for email, stream in npi_emails[npi]: + w = writers.get(stream) + if w is None: continue - info = reval.get(npi, {}) - if info: - status = "overdue" if info.get("overdue") else "upcoming" - else: - status = "no_reval_flag" - for email in cold: - w.writerow([ - npi, email, info.get("name", ""), info.get("specialty", ""), - info.get("state", ""), info.get("due_date", ""), - info.get("days_overdue", ""), status, - "Y" if npi in leie else "", optout.get(npi, ""), - ]) - counts["all_cold"] += 1 + w.writerow(row_for(npi, email, stream)) + counts[stream] += 1 - print("\n=== OUTPUT ===") - print(f"ALL cold-emailable NPIs (broad bundle list): {counts['all_cold']:,} rows -> {allcold_path}") - print(f"Cold-emailable overdue-revalidation list: {counts['cold']:,} rows -> {cold_path}") - print(f"Direct-secure (DirectTrust later) list: {counts['direct']:,} rows -> {direct_path}") + for h in handles.values(): + h.close() + + print("\n=== OUTPUT (one file per outbound stream) ===") + print(f"HEALTHCARE HOT (institutional practice domains): {counts['institutional']:,} rows " + f"-> {stream_files['institutional']}") + print(f"Consumer webmail (rides trucking trickle): {counts['consumer']:,} rows " + f"-> {stream_files['consumer']}") + print(f"Direct/HISP (parked until DirectTrust): {counts['direct']:,} rows " + f"-> {stream_files['direct']}") print("\nNext steps:") - print(" - MX/SMTP-verify the cold list (port 25 + MX confirmed available).") - print(" - Send revalidation campaign to verified cold emails now.") - print(" - Park the direct-secure list until DirectTrust signup, then send via HISP.") + print(" - Free MX + SMTP RCPT verify the INSTITUTIONAL file on a NON-sending IP") + print(" (scripts/workers/email_verifier.py), keep only deliverable.") + print(" - Import the verified institutional file into listmonk-hc; send via the") + print(" HEALTHCARE HOT stream (port 2526 -> hc IPs, own 10k/day cap).") + print(" - Feed the consumer file into the existing trucking-discipline stream.") + print(" - Park the Direct/HISP file until DirectTrust signup.") return 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/scripts/healthcare_email_streams.py b/scripts/healthcare_email_streams.py new file mode 100644 index 0000000..342b7ab --- /dev/null +++ b/scripts/healthcare_email_streams.py @@ -0,0 +1,88 @@ +"""Healthcare email-stream segmentation. + +Splits NPPES-endpoint emails into the three outbound streams used by the +dual-stream MTA design (see docs/healthcare-email-stream-plan.md): + + institutional practice/clinic domains -> HEALTHCARE HOT stream (own IPs+cap) + consumer gmail/outlook/icloud... -> rides the TRUCKING consumer-discipline + stream (low cap), NOT the hot one + direct DirectTrust / HISP -> parked until DirectTrust signup + (will not cold-deliver via SMTP) + +Also drops a small set of non-prospect institutional giants (federal, big-box +pharmacy/retail) that are not our small-practice buyer and would only add +volume + complaint risk. + +This is the single source of truth for the classification; both the list +builder and any campaign-import tooling import from here so the streams can +never drift. +""" + +from __future__ import annotations + +import re + +EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") + +# DirectTrust / HISP secure-messaging gateways. These route only inside the +# DirectTrust network and will NOT accept normal cold email. Substring match on +# the domain. Verified against the May 2026 endpoint_pfile top domains (catches +# direct*.org, *.medicity.net (NextGen HISP), Surescripts, Updox, MaxMD, etc.). +DIRECT_MARKERS: tuple[str, ...] = ( + "direct", "hisp", "medicity.net", "surescripts", "updox", "maxmd", + "secureexchange", "directaddress", "directplus", "ehrdirect", + "mayoclinicmsg", "allscriptsdirect", "eclinicaldirect", "phicure", + "directtrust", "secure-health", "directnppes", +) + +# Consumer webmail: real inboxes a clinician reads, but reputation-sensitive +# (Gmail/Microsoft cold-mail heuristics). These ride the trucking-discipline +# (low-cap) stream, never the hot institutional one. +CONSUMER_WEBMAIL: frozenset[str] = frozenset({ + "gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "aol.com", + "icloud.com", "comcast.net", "att.net", "sbcglobal.net", "me.com", + "ymail.com", "live.com", "msn.com", "protonmail.com", "verizon.net", + "mail.com", "gmx.com", +}) + +# Institutional domains that are NOT our small-practice buyer: federal/military +# and big-box retail/pharmacy. Exclude from the hot stream (low yield, high +# complaint/volume risk). Substring match. +INSTITUTIONAL_EXCLUDE_MARKERS: tuple[str, ...] = ( + "va.gov", "mail.mil", "health.mil", ".mil", ".gov", + "cvshealth.com", "walgreens.com", "wal-mart.com", "walmart.com", +) + + +def domain_of(email: str) -> str: + if "@" not in email: + return "" + return email.rsplit("@", 1)[-1].strip().lower() + + +def is_direct_secure(domain: str) -> bool: + d = domain.lower() + return any(m in d for m in DIRECT_MARKERS) + + +def is_consumer(domain: str) -> bool: + return domain.lower() in CONSUMER_WEBMAIL + + +def is_institutional_excluded(domain: str) -> bool: + d = domain.lower() + return any(m in d for m in INSTITUTIONAL_EXCLUDE_MARKERS) + + +def classify(email: str) -> str: + """Return one of: 'direct', 'consumer', 'institutional', 'excluded', 'invalid'.""" + if not EMAIL_RE.match(email or ""): + return "invalid" + dom = domain_of(email) + if is_direct_secure(dom): + return "direct" + if is_consumer(dom): + return "consumer" + if is_institutional_excluded(dom): + return "excluded" + return "institutional"