#!/usr/bin/env python3 """Build a cold-mailable pool of CMS Medicare *revalidation-overdue* providers. WHY: The institutional NPPES pool is ~98% `not_on_list` org NPIs. Those land on a SOFT/green result in the public NPI compliance tool ("you're basically fine"), so they click and leave -- no buy trigger. The providers who get a RED "your Medicare revalidation is PAST DUE -- CMS may deactivate you" result (a real $599 reason to act) are the CMS revalidation-overdue NPIs. This script harvests their mailable inboxes by joining: CMS Revalidation Due Date List (revalidation_base.csv) -- NPI + due date NPPES endpoint_pfile -- NPI -> email and applies the SAME deliverability gates the warmup uses, IN THE RIGHT ORDER: 1. overdue only (Adjusted/Revalidation Due Date in the past) 2. drop Direct/HISP secure-messaging gateways (not cold-mailable) 3. drop Gmail/Google-hosted by ADDRESS heuristic (cheap pre-filter) 4. SMTP-verify (writes _verified.csv) -- this populates mx_provider 5. drop Google-hosted by REAL MX <-- the step the one-off 2026-06-26 run missed (mx_provider was empty pre-verify), which leaked 214 sends to Gmail and earned a "low reputation" bounce. Do NOT skip this. Output columns match the institutional CSV schema so the warmup cron can consume the result directly as the `revalidation_overdue` segment's source. Usage: python3 scripts/build_cms_overdue_pool.py \ --reval data/npi_build/revalidation_base.csv \ --endpoint data/npi_build/endpoint_pfile_*.csv \ --out data/hc_cms_overdue \ [--max-overdue-days N] # optional cap; default no cap """ import argparse import csv import subprocess import sys from datetime import date, datetime sys.path.insert(0, "/opt/performancewest/scripts") sys.path.insert(0, "scripts") # Direct Secure Messaging (HISP) markers + known EHR gateway hosts -- NOT # cold-mailable from a normal MTA. _HISP_MARKERS = ("direct", "hisp", "secure", "directtrust") _GATEWAY_PAT = ( ".hin.us", ".medallies.net", ".aprima.com", ".updox.com", ".surescripts", ".nextgen", ".e-mds", ".athenahealth", ".epic", ".cerner", ".allscripts", ".greenway", ".kno2", ".maxmd", ".zixcorp", ".globalmed", ".mdtoolbox", ".intellechart", ".carequality", ) _GOOGLE_HINTS = ("gmail.com", "googlemail.com") def is_hisp(email: str) -> bool: d = email.split("@")[-1].lower() return any(m in d for m in _HISP_MARKERS) or any(p in d for p in _GATEWAY_PAT) def looks_google(email: str) -> bool: return email.split("@")[-1].lower() in _GOOGLE_HINTS def parse_due(s: str): s = (s or "").strip() if not s or s.upper() == "TBD": return None for fmt in ("%m/%d/%Y", "%Y-%m-%d", "%m/%d/%y"): try: return datetime.strptime(s, fmt).date() except ValueError: pass return None def load_overdue(reval_path: str, max_days: int | None): """NPI -> {days, due, first, last, org, spec}.""" today = date.today() out = {} with open(reval_path, encoding="latin-1") as f: for r in csv.DictReader(f): npi = (r.get("National Provider Identifier") or "").strip() if not npi: continue dt = parse_due(r.get("Adjusted Due Date") or r.get("Revalidation Due Date")) if not dt: continue days = (today - dt).days if days <= 0: continue if max_days is not None and days > max_days: continue out[npi] = { "days": days, "due": dt.isoformat(), "first": (r.get("First Name") or "").strip().title(), "last": (r.get("Last Name") or "").strip().title(), "org": (r.get("Organization Name") or "").strip(), "spec": (r.get("Enrollment Specialty") or r.get("Provider Type Text") or "").strip(), } return out def load_mailed() -> set: """Emails already known to listmonk-hc (so we never re-mail).""" mailed = set() try: out = subprocess.run( ["docker", "exec", "performancewest-api-postgres-1", "psql", "-U", "pw", "-d", "listmonk_hc", "-tAc", "select lower(email) from subscribers"], capture_output=True, text=True, timeout=60).stdout for line in out.splitlines(): e = line.strip() if e: mailed.add(e) except Exception as e: # noqa: BLE001 print(f" WARN: could not load mailed set ({e}); proceeding without dedup", file=sys.stderr) return mailed def main(): ap = argparse.ArgumentParser() ap.add_argument("--reval", default="data/npi_build/revalidation_base.csv") ap.add_argument("--endpoint", required=True) ap.add_argument("--out", default="data/hc_cms_overdue") ap.add_argument("--max-overdue-days", type=int, default=None) ap.add_argument("--skip-verify", action="store_true", help="write candidates only; do not SMTP-verify") args = ap.parse_args() print("loading CMS overdue NPIs...") overdue = load_overdue(args.reval, args.max_overdue_days) print(f" overdue NPIs: {len(overdue)}") mailed = load_mailed() print(f" already-mailed emails: {len(mailed)}") print("scanning NPPES endpoints for mailable overdue inboxes...") seen = set() rows = [] dropped = {"hisp": 0, "google": 0, "mailed": 0, "dup": 0} with open(args.endpoint, encoding="latin-1") as f: rd = csv.reader(f) next(rd, None) for row in rd: if len(row) < 4: continue npi = row[0].strip() ep = row[3].strip().lower().replace("mailto:", "") if npi not in overdue or "@" not in ep: continue if ep in seen: dropped["dup"] += 1 continue if is_hisp(ep): dropped["hisp"] += 1 continue if looks_google(ep): dropped["google"] += 1 continue if ep in mailed: dropped["mailed"] += 1 continue seen.add(ep) info = overdue[npi] org = info["org"] practice = org or (f"Dr. {info['last']}" if info["last"] else "your practice") greet = info["first"] or org or "there" rows.append({ "npi": npi, "email": ep, "stream": "institutional", "verify_ok": "", "verify_reason": "", "mx_provider": "", "reval_due_date": info["due"], "days_overdue": info["days"], "reval_status": "overdue", "nppes_last_updated": "", "nppes_enumeration": "", "nppes_years_stale": "", "nppes_deactivated": "", "name_for_greeting": greet, "practice": practice, "specialty": info["spec"], }) print(f" candidates: {len(rows)} (dropped: {dropped})") cand_path = args.out + "_candidates.csv" cols = list(rows[0].keys()) if rows else [] with open(cand_path, "w", newline="") as f: w = csv.DictWriter(f, fieldnames=cols) w.writeheader() w.writerows(rows) print(f" wrote {cand_path}") if args.skip_verify: print(" --skip-verify set; stop here") return print("SMTP-verifying (this populates mx_provider for the Google MX filter)...") subprocess.run( ["python3", "scripts/verify_csv_emails.py", "--in", cand_path, "--out", args.out, "--workers", "15"], check=True) # FINAL Google-by-real-MX filter -- the step the one-off run missed. ver_path = args.out + "_verified.csv" send_path = args.out + "_send.csv" kept = 0 drop_google_mx = 0 with open(ver_path) as fin, open(send_path, "w", newline="") as fout: rd = csv.DictReader(fin) w = csv.DictWriter(fout, fieldnames=rd.fieldnames) w.writeheader() for r in rd: mx = (r.get("mx_provider") or "").strip().lower() if "google" in mx: drop_google_mx += 1 continue w.writerow(r) kept += 1 print(f" final send pool (non-google by real MX): {kept} " f"(dropped {drop_google_mx} google-MX) -> {send_path}") if __name__ == "__main__": main()