diff --git a/scripts/build_cms_overdue_pool.py b/scripts/build_cms_overdue_pool.py new file mode 100644 index 0000000..8d6b7aa --- /dev/null +++ b/scripts/build_cms_overdue_pool.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +"""Build a cold-mailable pool of CMS Medicare *revalidation-overdue* providers. + +WHY: The institutional NPPES pool is ~98% `not_on_list` org NPIs. Those land on +a SOFT/green result in the public NPI compliance tool ("you're basically fine"), +so they click and leave -- no buy trigger. The providers who get a RED "your +Medicare revalidation is PAST DUE -- CMS may deactivate you" result (a real $599 +reason to act) are the CMS revalidation-overdue NPIs. This script harvests their +mailable inboxes by joining: + + CMS Revalidation Due Date List (revalidation_base.csv) -- NPI + due date + NPPES endpoint_pfile -- NPI -> email + +and applies the SAME deliverability gates the warmup uses, IN THE RIGHT ORDER: + + 1. overdue only (Adjusted/Revalidation Due Date in the past) + 2. drop Direct/HISP secure-messaging gateways (not cold-mailable) + 3. drop Gmail/Google-hosted by ADDRESS heuristic (cheap pre-filter) + 4. SMTP-verify (writes _verified.csv) -- this populates mx_provider + 5. drop Google-hosted by REAL MX <-- the step the one-off 2026-06-26 run + missed (mx_provider was empty pre-verify), which leaked 214 sends to + Gmail and earned a "low reputation" bounce. Do NOT skip this. + +Output columns match the institutional CSV schema so the warmup cron can consume +the result directly as the `revalidation_overdue` segment's source. + +Usage: + python3 scripts/build_cms_overdue_pool.py \ + --reval data/npi_build/revalidation_base.csv \ + --endpoint data/npi_build/endpoint_pfile_*.csv \ + --out data/hc_cms_overdue \ + [--max-overdue-days N] # optional cap; default no cap +""" +import argparse +import csv +import subprocess +import sys +from datetime import date, datetime + +sys.path.insert(0, "/opt/performancewest/scripts") +sys.path.insert(0, "scripts") + +# Direct Secure Messaging (HISP) markers + known EHR gateway hosts -- NOT +# cold-mailable from a normal MTA. +_HISP_MARKERS = ("direct", "hisp", "secure", "directtrust") +_GATEWAY_PAT = ( + ".hin.us", ".medallies.net", ".aprima.com", ".updox.com", ".surescripts", + ".nextgen", ".e-mds", ".athenahealth", ".epic", ".cerner", ".allscripts", + ".greenway", ".kno2", ".maxmd", ".zixcorp", ".globalmed", ".mdtoolbox", + ".intellechart", ".carequality", +) +_GOOGLE_HINTS = ("gmail.com", "googlemail.com") + + +def is_hisp(email: str) -> bool: + d = email.split("@")[-1].lower() + return any(m in d for m in _HISP_MARKERS) or any(p in d for p in _GATEWAY_PAT) + + +def looks_google(email: str) -> bool: + return email.split("@")[-1].lower() in _GOOGLE_HINTS + + +def parse_due(s: str): + s = (s or "").strip() + if not s or s.upper() == "TBD": + return None + for fmt in ("%m/%d/%Y", "%Y-%m-%d", "%m/%d/%y"): + try: + return datetime.strptime(s, fmt).date() + except ValueError: + pass + return None + + +def load_overdue(reval_path: str, max_days: int | None): + """NPI -> {days, due, first, last, org, spec}.""" + today = date.today() + out = {} + with open(reval_path, encoding="latin-1") as f: + for r in csv.DictReader(f): + npi = (r.get("National Provider Identifier") or "").strip() + if not npi: + continue + dt = parse_due(r.get("Adjusted Due Date") or r.get("Revalidation Due Date")) + if not dt: + continue + days = (today - dt).days + if days <= 0: + continue + if max_days is not None and days > max_days: + continue + out[npi] = { + "days": days, "due": dt.isoformat(), + "first": (r.get("First Name") or "").strip().title(), + "last": (r.get("Last Name") or "").strip().title(), + "org": (r.get("Organization Name") or "").strip(), + "spec": (r.get("Enrollment Specialty") or r.get("Provider Type Text") or "").strip(), + } + return out + + +def load_mailed() -> set: + """Emails already known to listmonk-hc (so we never re-mail).""" + mailed = set() + try: + out = subprocess.run( + ["docker", "exec", "performancewest-api-postgres-1", "psql", "-U", "pw", + "-d", "listmonk_hc", "-tAc", "select lower(email) from subscribers"], + capture_output=True, text=True, timeout=60).stdout + for line in out.splitlines(): + e = line.strip() + if e: + mailed.add(e) + except Exception as e: # noqa: BLE001 + print(f" WARN: could not load mailed set ({e}); proceeding without dedup", file=sys.stderr) + return mailed + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--reval", default="data/npi_build/revalidation_base.csv") + ap.add_argument("--endpoint", required=True) + ap.add_argument("--out", default="data/hc_cms_overdue") + ap.add_argument("--max-overdue-days", type=int, default=None) + ap.add_argument("--skip-verify", action="store_true", + help="write candidates only; do not SMTP-verify") + args = ap.parse_args() + + print("loading CMS overdue NPIs...") + overdue = load_overdue(args.reval, args.max_overdue_days) + print(f" overdue NPIs: {len(overdue)}") + + mailed = load_mailed() + print(f" already-mailed emails: {len(mailed)}") + + print("scanning NPPES endpoints for mailable overdue inboxes...") + seen = set() + rows = [] + dropped = {"hisp": 0, "google": 0, "mailed": 0, "dup": 0} + with open(args.endpoint, encoding="latin-1") as f: + rd = csv.reader(f) + next(rd, None) + for row in rd: + if len(row) < 4: + continue + npi = row[0].strip() + ep = row[3].strip().lower().replace("mailto:", "") + if npi not in overdue or "@" not in ep: + continue + if ep in seen: + dropped["dup"] += 1 + continue + if is_hisp(ep): + dropped["hisp"] += 1 + continue + if looks_google(ep): + dropped["google"] += 1 + continue + if ep in mailed: + dropped["mailed"] += 1 + continue + seen.add(ep) + info = overdue[npi] + org = info["org"] + practice = org or (f"Dr. {info['last']}" if info["last"] else "your practice") + greet = info["first"] or org or "there" + rows.append({ + "npi": npi, "email": ep, "stream": "institutional", + "verify_ok": "", "verify_reason": "", "mx_provider": "", + "reval_due_date": info["due"], "days_overdue": info["days"], + "reval_status": "overdue", "nppes_last_updated": "", + "nppes_enumeration": "", "nppes_years_stale": "", "nppes_deactivated": "", + "name_for_greeting": greet, "practice": practice, "specialty": info["spec"], + }) + print(f" candidates: {len(rows)} (dropped: {dropped})") + + cand_path = args.out + "_candidates.csv" + cols = list(rows[0].keys()) if rows else [] + with open(cand_path, "w", newline="") as f: + w = csv.DictWriter(f, fieldnames=cols) + w.writeheader() + w.writerows(rows) + print(f" wrote {cand_path}") + + if args.skip_verify: + print(" --skip-verify set; stop here") + return + + print("SMTP-verifying (this populates mx_provider for the Google MX filter)...") + subprocess.run( + ["python3", "scripts/verify_csv_emails.py", "--in", cand_path, + "--out", args.out, "--workers", "15"], check=True) + + # FINAL Google-by-real-MX filter -- the step the one-off run missed. + ver_path = args.out + "_verified.csv" + send_path = args.out + "_send.csv" + kept = 0 + drop_google_mx = 0 + with open(ver_path) as fin, open(send_path, "w", newline="") as fout: + rd = csv.DictReader(fin) + w = csv.DictWriter(fout, fieldnames=rd.fieldnames) + w.writeheader() + for r in rd: + mx = (r.get("mx_provider") or "").strip().lower() + if "google" in mx: + drop_google_mx += 1 + continue + w.writerow(r) + kept += 1 + print(f" final send pool (non-google by real MX): {kept} " + f"(dropped {drop_google_mx} google-MX) -> {send_path}") + + +if __name__ == "__main__": + main()