feat(healthcare): split outreach list into 3 outbound streams

Add scripts/healthcare_email_streams.py as the single source of truth for
classifying NPPES-endpoint emails into institutional (HOT stream) / consumer
(trucking-discipline stream) / direct (DirectTrust, parked), plus an exclude set
for non-prospect giants (va.gov, *.mil, cvshealth, walgreens, walmart).

Rework build_npi_outreach_lists.py to emit one CSV per stream
(npi_healthcare_institutional/consumer + npi_direct_secure), overdue-first
sorted, with companion files (revalidation/leie/optout) now optional.

Verified on May 2026 NPPES endpoint_pfile: 89,557 institutional / 19,366 consumer
/ 242,441 direct rows.
This commit is contained in:
justin 2026-06-05 18:59:44 -05:00
parent 54a342059b
commit 289c3b91be
2 changed files with 186 additions and 84 deletions

View file

@ -34,27 +34,17 @@ import re
import sys import sys
from collections import defaultdict from collections import defaultdict
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from healthcare_email_streams import ( # noqa: E402
# Domains that are DirectTrust/HISP secure-messaging gateways, not normal inboxes. classify as classify_stream,
# Heuristic: contains a 'direct' token, a known HISP marker, or *.direct-ci.com etc. EMAIL_RE as _STREAM_EMAIL_RE,
DIRECT_MARKERS = (
"direct.", ".direct", "direct-", "-direct", "directaddress", "hisp",
"direct-ci.com", "directplus", "ehrdirect", "mayoclinicmsg",
"allscriptsdirect", "eclinicaldirect", "surescripts",
) )
# Common real inboxes a clinician would actually read. EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
CONSUMER_WEBMAIL = {
"gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "aol.com",
"icloud.com", "comcast.net", "att.net", "sbcglobal.net", "me.com",
"ymail.com", "live.com", "msn.com", "protonmail.com", "verizon.net",
}
# Stream classification (institutional / consumer / direct) lives in the shared
def is_direct_secure(domain: str) -> bool: # scripts/healthcare_email_streams.py module so the list builder and any
d = domain.lower() # campaign-import tooling can never drift. Imported above as classify_stream.
return any(m in d for m in DIRECT_MARKERS)
def csv_open(path: str): def csv_open(path: str):
@ -63,7 +53,15 @@ def csv_open(path: str):
def load_endpoint_emails(path: str): def load_endpoint_emails(path: str):
"""NPI -> list of (email, channel). channel in {cold, direct}.""" """NPI -> list of (email, stream). stream in
{institutional, consumer, direct}. 'excluded'/'invalid' are dropped.
Streams map to the dual-stream MTA design (see
docs/healthcare-email-stream-plan.md):
institutional -> HEALTHCARE HOT stream
consumer -> trucking-discipline (low-cap) stream
direct -> parked until DirectTrust
"""
npi_emails: dict[str, list[tuple[str, str]]] = defaultdict(list) npi_emails: dict[str, list[tuple[str, str]]] = defaultdict(list)
seen: set[tuple[str, str]] = set() seen: set[tuple[str, str]] = set()
stats = defaultdict(int) stats = defaultdict(int)
@ -75,26 +73,32 @@ def load_endpoint_emails(path: str):
continue continue
npi = row[0].strip().strip('"') npi = row[0].strip().strip('"')
ep = row[3].strip().strip('"') ep = row[3].strip().strip('"')
if not npi or not EMAIL_RE.match(ep): if not npi:
continue
stream = classify_stream(ep)
if stream in ("invalid", "excluded"):
stats[stream] += 1
continue continue
ep_l = ep.lower() ep_l = ep.lower()
domain = ep_l.split("@")[-1]
channel = "direct" if is_direct_secure(domain) else "cold"
key = (npi, ep_l) key = (npi, ep_l)
if key in seen: if key in seen:
continue continue
seen.add(key) seen.add(key)
npi_emails[npi].append((ep, channel)) npi_emails[npi].append((ep, stream))
stats[channel] += 1 stats[stream] += 1
if channel == "cold" and domain in CONSUMER_WEBMAIL:
stats["cold_consumer"] += 1
return npi_emails, stats return npi_emails, stats
def load_revalidation(path: str): def load_revalidation(path: str):
"""NPI -> (due_date_str, days_overdue|None, name, specialty, state).""" """NPI -> (due_date_str, days_overdue|None, name, specialty, state).
today = datetime.date.today()
The revalidation list is optional enrichment; if absent, returns {} so the
builder still produces the stream files from endpoint emails alone.
"""
out: dict[str, dict] = {} out: dict[str, dict] = {}
if not os.path.exists(path):
return out
today = datetime.date.today()
with csv_open(path) as f: with csv_open(path) as f:
r = csv.reader(f) r = csv.reader(f)
next(r, None) next(r, None)
@ -176,8 +180,10 @@ def main() -> int:
print("Loading endpoint emails ...") print("Loading endpoint emails ...")
npi_emails, estats = load_endpoint_emails(os.path.join(d, "endpoint.csv")) npi_emails, estats = load_endpoint_emails(os.path.join(d, "endpoint.csv"))
print(f" NPIs with email: {len(npi_emails):,} | cold: {estats['cold']:,} " print(f" NPIs with email: {len(npi_emails):,} | institutional: "
f"(consumer webmail: {estats['cold_consumer']:,}) | direct: {estats['direct']:,}") f"{estats['institutional']:,} | consumer: {estats['consumer']:,} | "
f"direct/HISP: {estats['direct']:,} "
f"(dropped excluded={estats['excluded']:,}, invalid={estats['invalid']:,})")
print("Loading revalidation due dates ...") print("Loading revalidation due dates ...")
reval = load_revalidation(os.path.join(d, "revalidation_due.csv")) reval = load_revalidation(os.path.join(d, "revalidation_due.csv"))
@ -189,69 +195,77 @@ def main() -> int:
optout = load_optout(os.path.join(d, "optout.csv")) optout = load_optout(os.path.join(d, "optout.csv"))
print(f" LEIE w/ NPI: {len(leie):,} | opt-out ending <12mo: {len(optout):,}") print(f" LEIE w/ NPI: {len(leie):,} | opt-out ending <12mo: {len(optout):,}")
# Build the joined outreach rows.
cold_path = os.path.join(out_dir, "npi_overdue_cold_emailable.csv")
direct_path = os.path.join(out_dir, "npi_overdue_direct_secure.csv")
counts = defaultdict(int) counts = defaultdict(int)
HEADER = ["npi", "email", "stream", "name", "specialty", "state",
"reval_due_date", "days_overdue", "reval_status",
"leie_excluded", "optout_ending"]
def write_segment(path, channel): def row_for(npi, email, stream):
n = 0 info = reval.get(npi, {})
with open(path, "w", newline="") as f: if info:
w = csv.writer(f) status = "overdue" if info.get("overdue") else "upcoming"
w.writerow(["npi", "email", "channel", "name", "specialty", "state", else:
"due_date", "days_overdue", "leie_excluded", "optout_ending"]) status = "no_reval_flag"
for npi, info in overdue.items(): return [
emails = [e for e in npi_emails.get(npi, []) if e[1] == channel] npi, email, stream, info.get("name", ""), info.get("specialty", ""),
for email, ch in emails: info.get("state", ""), info.get("due_date", ""),
w.writerow([ info.get("days_overdue", ""), status,
npi, email, ch, info["name"], info["specialty"], info["state"], "Y" if npi in leie else "", optout.get(npi, ""),
info["due_date"], info["days_overdue"], ]
"Y" if npi in leie else "",
optout.get(npi, ""),
])
n += 1
return n
counts["cold"] = write_segment(cold_path, "cold") # One file per outbound stream (the dual-stream MTA routing key).
counts["direct"] = write_segment(direct_path, "direct") # institutional -> HEALTHCARE HOT stream (own IPs + 10k/day cap)
# consumer -> trucking-discipline (low-cap) stream
# direct -> parked until DirectTrust
# Within each, rows are sorted overdue-first (highest intent = best send order).
stream_files = {
"institutional": os.path.join(out_dir, "npi_healthcare_institutional.csv"),
"consumer": os.path.join(out_dir, "npi_healthcare_consumer.csv"),
"direct": os.path.join(out_dir, "npi_direct_secure.csv"),
}
writers = {}
handles = {}
for stream, path in stream_files.items():
h = open(path, "w", newline="")
handles[stream] = h
w = csv.writer(h)
w.writerow(HEADER)
writers[stream] = w
# Broad segment: EVERY cold-emailable NPI (not just overdue), enriched with # Emit overdue NPIs first (sorted by days_overdue desc) for each stream, then
# whatever revalidation/exclusion/opt-out signal we have. This is the real # the remainder. Best-intent recipients land at the top of every file.
# starting volume for the general compliance-bundle campaign. def sort_key(npi):
allcold_path = os.path.join(out_dir, "npi_all_cold_emailable.csv") info = reval.get(npi, {})
with open(allcold_path, "w", newline="") as f: return -(info.get("days_overdue", -10**9) if info else -10**9)
w = csv.writer(f)
w.writerow(["npi", "email", "name", "specialty", "state", for npi in sorted(npi_emails.keys(), key=sort_key):
"reval_due_date", "days_overdue", "reval_status", for email, stream in npi_emails[npi]:
"leie_excluded", "optout_ending"]) w = writers.get(stream)
for npi, emails in npi_emails.items(): if w is None:
cold = [e for e, ch in emails if ch == "cold"]
if not cold:
continue continue
info = reval.get(npi, {}) w.writerow(row_for(npi, email, stream))
if info: counts[stream] += 1
status = "overdue" if info.get("overdue") else "upcoming"
else:
status = "no_reval_flag"
for email in cold:
w.writerow([
npi, email, info.get("name", ""), info.get("specialty", ""),
info.get("state", ""), info.get("due_date", ""),
info.get("days_overdue", ""), status,
"Y" if npi in leie else "", optout.get(npi, ""),
])
counts["all_cold"] += 1
print("\n=== OUTPUT ===") for h in handles.values():
print(f"ALL cold-emailable NPIs (broad bundle list): {counts['all_cold']:,} rows -> {allcold_path}") h.close()
print(f"Cold-emailable overdue-revalidation list: {counts['cold']:,} rows -> {cold_path}")
print(f"Direct-secure (DirectTrust later) list: {counts['direct']:,} rows -> {direct_path}") print("\n=== OUTPUT (one file per outbound stream) ===")
print(f"HEALTHCARE HOT (institutional practice domains): {counts['institutional']:,} rows "
f"-> {stream_files['institutional']}")
print(f"Consumer webmail (rides trucking trickle): {counts['consumer']:,} rows "
f"-> {stream_files['consumer']}")
print(f"Direct/HISP (parked until DirectTrust): {counts['direct']:,} rows "
f"-> {stream_files['direct']}")
print("\nNext steps:") print("\nNext steps:")
print(" - MX/SMTP-verify the cold list (port 25 + MX confirmed available).") print(" - Free MX + SMTP RCPT verify the INSTITUTIONAL file on a NON-sending IP")
print(" - Send revalidation campaign to verified cold emails now.") print(" (scripts/workers/email_verifier.py), keep only deliverable.")
print(" - Park the direct-secure list until DirectTrust signup, then send via HISP.") print(" - Import the verified institutional file into listmonk-hc; send via the")
print(" HEALTHCARE HOT stream (port 2526 -> hc IPs, own 10k/day cap).")
print(" - Feed the consumer file into the existing trucking-discipline stream.")
print(" - Park the Direct/HISP file until DirectTrust signup.")
return 0 return 0
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) sys.exit(main())

View file

@ -0,0 +1,88 @@
"""Healthcare email-stream segmentation.
Splits NPPES-endpoint emails into the three outbound streams used by the
dual-stream MTA design (see docs/healthcare-email-stream-plan.md):
institutional practice/clinic domains -> HEALTHCARE HOT stream (own IPs+cap)
consumer gmail/outlook/icloud... -> rides the TRUCKING consumer-discipline
stream (low cap), NOT the hot one
direct DirectTrust / HISP -> parked until DirectTrust signup
(will not cold-deliver via SMTP)
Also drops a small set of non-prospect institutional giants (federal, big-box
pharmacy/retail) that are not our small-practice buyer and would only add
volume + complaint risk.
This is the single source of truth for the classification; both the list
builder and any campaign-import tooling import from here so the streams can
never drift.
"""
from __future__ import annotations
import re
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
# DirectTrust / HISP secure-messaging gateways. These route only inside the
# DirectTrust network and will NOT accept normal cold email. Substring match on
# the domain. Verified against the May 2026 endpoint_pfile top domains (catches
# direct*.org, *.medicity.net (NextGen HISP), Surescripts, Updox, MaxMD, etc.).
DIRECT_MARKERS: tuple[str, ...] = (
"direct", "hisp", "medicity.net", "surescripts", "updox", "maxmd",
"secureexchange", "directaddress", "directplus", "ehrdirect",
"mayoclinicmsg", "allscriptsdirect", "eclinicaldirect", "phicure",
"directtrust", "secure-health", "directnppes",
)
# Consumer webmail: real inboxes a clinician reads, but reputation-sensitive
# (Gmail/Microsoft cold-mail heuristics). These ride the trucking-discipline
# (low-cap) stream, never the hot institutional one.
CONSUMER_WEBMAIL: frozenset[str] = frozenset({
"gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "aol.com",
"icloud.com", "comcast.net", "att.net", "sbcglobal.net", "me.com",
"ymail.com", "live.com", "msn.com", "protonmail.com", "verizon.net",
"mail.com", "gmx.com",
})
# Institutional domains that are NOT our small-practice buyer: federal/military
# and big-box retail/pharmacy. Exclude from the hot stream (low yield, high
# complaint/volume risk). Substring match.
INSTITUTIONAL_EXCLUDE_MARKERS: tuple[str, ...] = (
"va.gov", "mail.mil", "health.mil", ".mil", ".gov",
"cvshealth.com", "walgreens.com", "wal-mart.com", "walmart.com",
)
def domain_of(email: str) -> str:
if "@" not in email:
return ""
return email.rsplit("@", 1)[-1].strip().lower()
def is_direct_secure(domain: str) -> bool:
d = domain.lower()
return any(m in d for m in DIRECT_MARKERS)
def is_consumer(domain: str) -> bool:
return domain.lower() in CONSUMER_WEBMAIL
def is_institutional_excluded(domain: str) -> bool:
d = domain.lower()
return any(m in d for m in INSTITUTIONAL_EXCLUDE_MARKERS)
def classify(email: str) -> str:
"""Return one of: 'direct', 'consumer', 'institutional', 'excluded', 'invalid'."""
if not EMAIL_RE.match(email or ""):
return "invalid"
dom = domain_of(email)
if is_direct_secure(dom):
return "direct"
if is_consumer(dom):
return "consumer"
if is_institutional_excluded(dom):
return "excluded"
return "institutional"