Add CMS revalidation-overdue mailable-pool builder (red-result urgent leads)
The institutional NPPES pool is ~98% not_on_list org NPIs, which land on a
soft/green result in the public NPI tool ('you're basically fine') -- so they
click and leave with no buy trigger. The providers who get a RED 'revalidation
PAST DUE, CMS may deactivate' result (a real $599 reason to act) are the CMS
revalidation-overdue NPIs.
This joins the CMS Revalidation Due Date List to NPPES endpoint emails and
applies the warmup deliverability gates in the correct order, critically
re-filtering Google by REAL MX *after* SMTP verification populates mx_provider
(a one-off manual run on 2026-06-26 filtered on the pre-verify empty mx_provider
column and leaked 214 sends to Gmail, earning a low-reputation bounce).
This commit is contained in:
parent
b350a1367d
commit
48fb9f7fbb
1 changed files with 216 additions and 0 deletions
216
scripts/build_cms_overdue_pool.py
Normal file
216
scripts/build_cms_overdue_pool.py
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Build a cold-mailable pool of CMS Medicare *revalidation-overdue* providers.
|
||||
|
||||
WHY: The institutional NPPES pool is ~98% `not_on_list` org NPIs. Those land on
|
||||
a SOFT/green result in the public NPI compliance tool ("you're basically fine"),
|
||||
so they click and leave -- no buy trigger. The providers who get a RED "your
|
||||
Medicare revalidation is PAST DUE -- CMS may deactivate you" result (a real $599
|
||||
reason to act) are the CMS revalidation-overdue NPIs. This script harvests their
|
||||
mailable inboxes by joining:
|
||||
|
||||
CMS Revalidation Due Date List (revalidation_base.csv) -- NPI + due date
|
||||
NPPES endpoint_pfile -- NPI -> email
|
||||
|
||||
and applies the SAME deliverability gates the warmup uses, IN THE RIGHT ORDER:
|
||||
|
||||
1. overdue only (Adjusted/Revalidation Due Date in the past)
|
||||
2. drop Direct/HISP secure-messaging gateways (not cold-mailable)
|
||||
3. drop Gmail/Google-hosted by ADDRESS heuristic (cheap pre-filter)
|
||||
4. SMTP-verify (writes <out>_verified.csv) -- this populates mx_provider
|
||||
5. drop Google-hosted by REAL MX <-- the step the one-off 2026-06-26 run
|
||||
missed (mx_provider was empty pre-verify), which leaked 214 sends to
|
||||
Gmail and earned a "low reputation" bounce. Do NOT skip this.
|
||||
|
||||
Output columns match the institutional CSV schema so the warmup cron can consume
|
||||
the result directly as the `revalidation_overdue` segment's source.
|
||||
|
||||
Usage:
|
||||
python3 scripts/build_cms_overdue_pool.py \
|
||||
--reval data/npi_build/revalidation_base.csv \
|
||||
--endpoint data/npi_build/endpoint_pfile_*.csv \
|
||||
--out data/hc_cms_overdue \
|
||||
[--max-overdue-days N] # optional cap; default no cap
|
||||
"""
|
||||
import argparse
|
||||
import csv
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import date, datetime
|
||||
|
||||
sys.path.insert(0, "/opt/performancewest/scripts")
|
||||
sys.path.insert(0, "scripts")
|
||||
|
||||
# Direct Secure Messaging (HISP) markers + known EHR gateway hosts -- NOT
|
||||
# cold-mailable from a normal MTA.
|
||||
_HISP_MARKERS = ("direct", "hisp", "secure", "directtrust")
|
||||
_GATEWAY_PAT = (
|
||||
".hin.us", ".medallies.net", ".aprima.com", ".updox.com", ".surescripts",
|
||||
".nextgen", ".e-mds", ".athenahealth", ".epic", ".cerner", ".allscripts",
|
||||
".greenway", ".kno2", ".maxmd", ".zixcorp", ".globalmed", ".mdtoolbox",
|
||||
".intellechart", ".carequality",
|
||||
)
|
||||
_GOOGLE_HINTS = ("gmail.com", "googlemail.com")
|
||||
|
||||
|
||||
def is_hisp(email: str) -> bool:
|
||||
d = email.split("@")[-1].lower()
|
||||
return any(m in d for m in _HISP_MARKERS) or any(p in d for p in _GATEWAY_PAT)
|
||||
|
||||
|
||||
def looks_google(email: str) -> bool:
|
||||
return email.split("@")[-1].lower() in _GOOGLE_HINTS
|
||||
|
||||
|
||||
def parse_due(s: str):
|
||||
s = (s or "").strip()
|
||||
if not s or s.upper() == "TBD":
|
||||
return None
|
||||
for fmt in ("%m/%d/%Y", "%Y-%m-%d", "%m/%d/%y"):
|
||||
try:
|
||||
return datetime.strptime(s, fmt).date()
|
||||
except ValueError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def load_overdue(reval_path: str, max_days: int | None):
|
||||
"""NPI -> {days, due, first, last, org, spec}."""
|
||||
today = date.today()
|
||||
out = {}
|
||||
with open(reval_path, encoding="latin-1") as f:
|
||||
for r in csv.DictReader(f):
|
||||
npi = (r.get("National Provider Identifier") or "").strip()
|
||||
if not npi:
|
||||
continue
|
||||
dt = parse_due(r.get("Adjusted Due Date") or r.get("Revalidation Due Date"))
|
||||
if not dt:
|
||||
continue
|
||||
days = (today - dt).days
|
||||
if days <= 0:
|
||||
continue
|
||||
if max_days is not None and days > max_days:
|
||||
continue
|
||||
out[npi] = {
|
||||
"days": days, "due": dt.isoformat(),
|
||||
"first": (r.get("First Name") or "").strip().title(),
|
||||
"last": (r.get("Last Name") or "").strip().title(),
|
||||
"org": (r.get("Organization Name") or "").strip(),
|
||||
"spec": (r.get("Enrollment Specialty") or r.get("Provider Type Text") or "").strip(),
|
||||
}
|
||||
return out
|
||||
|
||||
|
||||
def load_mailed() -> set:
|
||||
"""Emails already known to listmonk-hc (so we never re-mail)."""
|
||||
mailed = set()
|
||||
try:
|
||||
out = subprocess.run(
|
||||
["docker", "exec", "performancewest-api-postgres-1", "psql", "-U", "pw",
|
||||
"-d", "listmonk_hc", "-tAc", "select lower(email) from subscribers"],
|
||||
capture_output=True, text=True, timeout=60).stdout
|
||||
for line in out.splitlines():
|
||||
e = line.strip()
|
||||
if e:
|
||||
mailed.add(e)
|
||||
except Exception as e: # noqa: BLE001
|
||||
print(f" WARN: could not load mailed set ({e}); proceeding without dedup", file=sys.stderr)
|
||||
return mailed
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--reval", default="data/npi_build/revalidation_base.csv")
|
||||
ap.add_argument("--endpoint", required=True)
|
||||
ap.add_argument("--out", default="data/hc_cms_overdue")
|
||||
ap.add_argument("--max-overdue-days", type=int, default=None)
|
||||
ap.add_argument("--skip-verify", action="store_true",
|
||||
help="write candidates only; do not SMTP-verify")
|
||||
args = ap.parse_args()
|
||||
|
||||
print("loading CMS overdue NPIs...")
|
||||
overdue = load_overdue(args.reval, args.max_overdue_days)
|
||||
print(f" overdue NPIs: {len(overdue)}")
|
||||
|
||||
mailed = load_mailed()
|
||||
print(f" already-mailed emails: {len(mailed)}")
|
||||
|
||||
print("scanning NPPES endpoints for mailable overdue inboxes...")
|
||||
seen = set()
|
||||
rows = []
|
||||
dropped = {"hisp": 0, "google": 0, "mailed": 0, "dup": 0}
|
||||
with open(args.endpoint, encoding="latin-1") as f:
|
||||
rd = csv.reader(f)
|
||||
next(rd, None)
|
||||
for row in rd:
|
||||
if len(row) < 4:
|
||||
continue
|
||||
npi = row[0].strip()
|
||||
ep = row[3].strip().lower().replace("mailto:", "")
|
||||
if npi not in overdue or "@" not in ep:
|
||||
continue
|
||||
if ep in seen:
|
||||
dropped["dup"] += 1
|
||||
continue
|
||||
if is_hisp(ep):
|
||||
dropped["hisp"] += 1
|
||||
continue
|
||||
if looks_google(ep):
|
||||
dropped["google"] += 1
|
||||
continue
|
||||
if ep in mailed:
|
||||
dropped["mailed"] += 1
|
||||
continue
|
||||
seen.add(ep)
|
||||
info = overdue[npi]
|
||||
org = info["org"]
|
||||
practice = org or (f"Dr. {info['last']}" if info["last"] else "your practice")
|
||||
greet = info["first"] or org or "there"
|
||||
rows.append({
|
||||
"npi": npi, "email": ep, "stream": "institutional",
|
||||
"verify_ok": "", "verify_reason": "", "mx_provider": "",
|
||||
"reval_due_date": info["due"], "days_overdue": info["days"],
|
||||
"reval_status": "overdue", "nppes_last_updated": "",
|
||||
"nppes_enumeration": "", "nppes_years_stale": "", "nppes_deactivated": "",
|
||||
"name_for_greeting": greet, "practice": practice, "specialty": info["spec"],
|
||||
})
|
||||
print(f" candidates: {len(rows)} (dropped: {dropped})")
|
||||
|
||||
cand_path = args.out + "_candidates.csv"
|
||||
cols = list(rows[0].keys()) if rows else []
|
||||
with open(cand_path, "w", newline="") as f:
|
||||
w = csv.DictWriter(f, fieldnames=cols)
|
||||
w.writeheader()
|
||||
w.writerows(rows)
|
||||
print(f" wrote {cand_path}")
|
||||
|
||||
if args.skip_verify:
|
||||
print(" --skip-verify set; stop here")
|
||||
return
|
||||
|
||||
print("SMTP-verifying (this populates mx_provider for the Google MX filter)...")
|
||||
subprocess.run(
|
||||
["python3", "scripts/verify_csv_emails.py", "--in", cand_path,
|
||||
"--out", args.out, "--workers", "15"], check=True)
|
||||
|
||||
# FINAL Google-by-real-MX filter -- the step the one-off run missed.
|
||||
ver_path = args.out + "_verified.csv"
|
||||
send_path = args.out + "_send.csv"
|
||||
kept = 0
|
||||
drop_google_mx = 0
|
||||
with open(ver_path) as fin, open(send_path, "w", newline="") as fout:
|
||||
rd = csv.DictReader(fin)
|
||||
w = csv.DictWriter(fout, fieldnames=rd.fieldnames)
|
||||
w.writeheader()
|
||||
for r in rd:
|
||||
mx = (r.get("mx_provider") or "").strip().lower()
|
||||
if "google" in mx:
|
||||
drop_google_mx += 1
|
||||
continue
|
||||
w.writerow(r)
|
||||
kept += 1
|
||||
print(f" final send pool (non-google by real MX): {kept} "
|
||||
f"(dropped {drop_google_mx} google-MX) -> {send_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue