new-site/scripts/hc_data_refresh.py
justin 54b92b1f06 fix(hc deliverability): MX-based Google-host exclusion during warmup
Found via live mail.log: Google-Workspace-hosted PRACTICE domains (custom
domains whose MX is aspmx.l.google.com, e.g. moosepharmacy.com, hc2kidney.com)
were getting hard 550-5.7.1 rejects from Google's cold-IP bulk filter -- exactly
the bounces that wreck a warming IP's reputation. The original google/non-google
split classified by the email's domain STRING, which can't see that a custom
domain silently uses Google Workspace; only an MX lookup reveals it (33% of our
domains, 228/689, are Google-hosted this way).

- hc_data_refresh.py: new MX classification (one lookup per unique domain via
  dnspython, cached) writes an mx_provider=google/other flag into the master and
  propagates it into the channel CSVs (auto-adding the column). --skip-mx for a
  fast status-only run.
- build_healthcare_campaigns_cron.py: warm_segment now drops mx_provider=google
  rows during warmup (HC_SKIP_GOOGLE=1 default; set 0 once IPs are warm). This is
  defense-in-depth -- correct regardless of which CSV the cron is pointed at.

Verified: today's sends (nongoogle CSV) had 0 Google bounces; the guard cuts the
Google-containing week1_verified cohort's revalidation candidates 82->8.
2026-06-08 03:32:12 -05:00

372 lines
16 KiB
Python

#!/usr/bin/env python3
"""Weekly healthcare-data refresh.
Keeps the warmup segment CSVs source-grounded by re-checking the NPIs we can
already email (the master emailable list) against the LIVE government sources,
then re-deriving the per-segment warmup CSVs the campaign cron sends from.
Why: the original warmup CSVs were a one-time snapshot. The four source
datasets refresh on their own cadences (SAM daily, NPPES weekly, OIG LEIE +
CMS revalidation monthly). Emailing a provider a stale "N days overdue" figure
- or one who has since revalidated - undercuts the exact trust the campaigns
are built on. This refresh re-pulls status weekly so every send is accurate.
Sources (all free, all public, all verified reachable):
CMS Revalidation Due Date List data.cms.gov data-api (per-NPI filter)
OIG LEIE exclusions oig.hhs.gov CSV download (full file)
SAM.gov exclusions api.sam.gov v4 (needs SAM_GOV_API_KEY)
Input : the master emailable list (npi,email,stream,name,specialty,state,...).
Output: refreshed master + the per-segment CSVs the cron consumes, written
atomically (temp + rename) so a partial run never corrupts a live CSV.
Usage:
python3 scripts/hc_data_refresh.py # full weekly refresh
python3 scripts/hc_data_refresh.py --dry-run # report only
python3 scripts/hc_data_refresh.py --skip-sam # if no SAM key
python3 scripts/hc_data_refresh.py --master PATH --out-dir DIR
"""
from __future__ import annotations
import argparse, csv, datetime, json, os, sys, tempfile, time
import urllib.request, urllib.parse, urllib.error
DATA_DIR = os.getenv("HC_DATA_DIR", "/opt/performancewest/data")
MASTER = os.getenv("HC_MASTER_CSV", os.path.join(DATA_DIR, "hc_warmup_week1.csv"))
CMS_REVAL_DATASET = "3746498e-874d-45d8-9c69-68603cafea60"
CMS_API = f"https://data.cms.gov/data-api/v1/dataset/{CMS_REVAL_DATASET}/data"
OIG_LEIE_URL = "https://oig.hhs.gov/exclusions/downloadables/UPDATED.csv"
SAM_API = "https://api.sam.gov/entity-information/v4/exclusions"
UA = "PerformanceWest-HCRefresh/1.0 (compliance@performancewest.net)"
# Master/segment CSV schema (kept identical to build_npi_outreach_lists.py so the
# campaign cron's column expectations never change).
HEADER = ["npi", "email", "stream", "name", "specialty", "state",
"reval_due_date", "days_overdue", "reval_status",
"leie_excluded", "optout_ending", "mx_provider"]
def log(*a):
print(f"[hc-refresh {datetime.datetime.now(datetime.timezone.utc).isoformat(timespec='seconds')}]", *a)
def http_json(url: str, timeout: int = 30):
req = urllib.request.Request(url, headers={"Accept": "application/json", "User-Agent": UA})
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read().decode())
# ── Source pulls ────────────────────────────────────────────────────────────
# Mail providers whose MX indicates the domain is hosted by Google Workspace.
# Google rejects bulk mail from cold/warming IPs hard (550-5.7.1), so these must
# be held out of the warmup -- and the only reliable signal is the MX record,
# since a custom domain (e.g. practice.com) can silently use Google Workspace.
_GOOGLE_MX_SUFFIXES = ("google.com", "googlemail.com", "aspmx.l.google.com")
def classify_mx(domain: str) -> str:
"""Return 'google' if the domain's MX is Google-hosted, else 'other'.
Best-effort: DNS failures classify as 'other' (we don't want a transient
resolver error to permanently exclude a deliverable domain)."""
try:
import dns.resolver # type: ignore
answers = dns.resolver.resolve(domain, "MX", lifetime=5)
hosts = [str(r.exchange).rstrip(".").lower() for r in answers]
except Exception:
return "other"
for h in hosts:
if any(h == s or h.endswith("." + s) or h.endswith(s) for s in _GOOGLE_MX_SUFFIXES):
return "google"
return "other"
def classify_domains_mx(emails: list[str]) -> dict[str, str]:
"""Map each unique email domain -> 'google'/'other' via one MX lookup per
domain (cached), so the daily campaign cron can skip Google-hosted addresses
during warmup without re-resolving."""
domains = sorted({e.split("@", 1)[1].strip().lower() for e in emails if "@" in e})
out: dict[str, str] = {}
for i, d in enumerate(domains, 1):
out[d] = classify_mx(d)
if i % 100 == 0:
log(f" mx: classified {i}/{len(domains)} domains")
return out
def sam_key() -> str | None:
t = os.getenv("SAM_GOV_API_KEY")
if t:
return t
for p in (os.path.join(DATA_DIR, "../.secrets/sam-api-key"),
"/opt/performancewest/.secrets/sam-api-key"):
if os.path.exists(p):
try:
return open(p).read().strip()
except PermissionError:
continue
return None
def cms_revalidation_for(npis: list[str]) -> dict[str, dict]:
"""Per-NPI revalidation lookup against the live CMS data-api. Returns
{npi: {due_date, days_overdue, overdue}}. Only NPIs with a concrete due
date are included; the API filters server-side so this is cheap even for a
few thousand NPIs."""
today = datetime.date.today()
out: dict[str, dict] = {}
for i, npi in enumerate(npis, 1):
if i % 100 == 0:
log(f" cms: checked {i}/{len(npis)} NPIs")
time.sleep(0.05) # be polite to data.cms.gov (every request, not just hits)
q = urllib.parse.urlencode({"filter[National Provider Identifier]": npi, "size": 1})
try:
rows = http_json(f"{CMS_API}?{q}", timeout=20)
except Exception as e:
log(f" cms lookup failed for {npi}: {e}")
continue
if not rows:
continue
row = rows[0]
dd = (row.get("Adjusted Due Date") or "").strip() or (row.get("Revalidation Due Date") or "").strip()
if not dd or dd.upper() == "TBD":
# On the list but no concrete date = "due within 6 months / TBD".
out[npi] = {"due_date": "", "days_overdue": 0, "overdue": False, "on_list": True}
continue
d = _parse_due(dd)
if d is None:
continue
# Normalize to the master CSV's MM/DD/YYYY display format.
dd_disp = d.strftime("%m/%d/%Y")
overdue = (today - d).days
out[npi] = {"due_date": dd_disp, "days_overdue": overdue, "overdue": overdue > 0, "on_list": True}
return out
def _parse_due(s: str):
"""CMS returns either ISO (2025-12-31) or US (12/31/2025) due dates over
time; accept both."""
for fmt in ("%Y-%m-%d", "%m/%d/%Y"):
try:
return datetime.datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
def oig_leie_npis(tmp_dir: str) -> set[str]:
"""Download the full OIG LEIE CSV and return the set of excluded NPIs."""
dest = os.path.join(tmp_dir, "leie.csv")
req = urllib.request.Request(OIG_LEIE_URL, headers={"User-Agent": UA})
with urllib.request.urlopen(req, timeout=120) as r, open(dest, "wb") as f:
f.write(r.read())
npis: set[str] = set()
with open(dest, newline="", encoding="latin-1") as f:
rr = csv.reader(f)
header = next(rr, [])
# LEIE NPI column is named "NPI"; fall back to index 7 (legacy layout).
try:
npi_idx = [h.strip().upper() for h in header].index("NPI")
except ValueError:
npi_idx = 7
for row in rr:
if len(row) <= npi_idx:
continue
npi = row[npi_idx].strip().strip('"')
if npi and npi != "0000000000" and len(npi) == 10 and npi.isdigit():
npis.add(npi)
return npis
def sam_excluded_npis(key: str, max_pages: int = 0) -> set[str]:
"""Pull SAM exclusions (v4) and return the subset that carry an NPI.
NOTE: SAM exclusions are keyed by name/UEI/CAGE; only a tiny fraction carry
an NPI, and the full set is ~167k records (~1,700 pages). For the warmup
data refresh the OIG LEIE (which DOES carry NPIs) is the meaningful exclusion
cross-flag, so SAM is OFF by default here. SAM's real value is the live
per-name screening service we sell, not a bulk NPI join. Pass --sam-pages N
to crawl the first N pages if you want a best-effort NPI cross-flag anyway.
"""
npis: set[str] = set()
page, size = 0, 100
while True:
q = urllib.parse.urlencode({"api_key": key, "page": page, "size": size})
try:
data = http_json(f"{SAM_API}?{q}", timeout=40)
except Exception as e:
log(f" sam page {page} failed: {e}")
break
rows = data.get("excludedEntity", []) or []
for e in rows:
ident = e.get("exclusionIdentification", {}) or {}
npi = str(ident.get("npi") or "").strip()
if npi and len(npi) == 10 and npi.isdigit():
npis.add(npi)
total = data.get("totalRecords", 0)
page += 1
if page * size >= total or not rows:
break
if max_pages and page >= max_pages:
break
time.sleep(0.1)
return npis
# ── Refresh ─────────────────────────────────────────────────────────────────
def load_master(path: str) -> list[dict]:
with open(path, newline="") as f:
return list(csv.DictReader(f))
def write_atomic(path: str, rows: list[dict], header: list[str]):
d = os.path.dirname(path) or "."
fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp")
with os.fdopen(fd, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=header, extrasaction="ignore")
w.writeheader()
for r in rows:
w.writerow(r)
os.replace(tmp, path)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--master", default=MASTER)
ap.add_argument("--out-dir", default=DATA_DIR)
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--skip-sam", action="store_true", help="(default) skip SAM bulk NPI crawl")
ap.add_argument("--sam-pages", type=int, default=0,
help="crawl first N SAM exclusion pages for an NPI cross-flag (slow; default off)")
ap.add_argument("--skip-cms", action="store_true")
ap.add_argument("--skip-oig", action="store_true")
ap.add_argument("--skip-mx", action="store_true", help="skip MX (Google-host) classification")
args = ap.parse_args()
if not os.path.exists(args.master):
log(f"master CSV not found: {args.master}")
return 1
rows = load_master(args.master)
npis = sorted({r["npi"].strip() for r in rows if r.get("npi", "").strip()})
log(f"master={args.master} emailable_npis={len(npis)}")
with tempfile.TemporaryDirectory() as tmp:
reval = {} if args.skip_cms else cms_revalidation_for(npis)
log(f"cms: {len(reval)} NPIs on the revalidation list "
f"(overdue={sum(1 for v in reval.values() if v['overdue'])})")
leie = set() if args.skip_oig else oig_leie_npis(tmp)
log(f"oig leie: {len(leie):,} excluded NPIs total; "
f"{len(leie & set(npis))} overlap our list")
sam = set()
if args.sam_pages:
key = sam_key()
if key:
sam = sam_excluded_npis(key, max_pages=args.sam_pages)
log(f"sam: {len(sam):,} excluded NPIs w/ NPI (first {args.sam_pages} pages); "
f"{len(sam & set(npis))} overlap our list")
else:
log("sam: no SAM_GOV_API_KEY / secret found, skipping")
else:
log("sam: skipped (default; OIG LEIE is the NPI-bearing exclusion source)")
excluded = leie | sam
today = datetime.date.today()
# MX classification (Google Workspace vs other) for the warmup deliverability
# guard. Done once per unique domain. Skippable for a fast status-only run.
mx_map = {}
if not args.skip_mx:
all_emails = [r.get("email", "") for r in rows]
mx_map = classify_domains_mx(all_emails)
n_google = sum(1 for v in mx_map.values() if v == "google")
log(f"mx: {len(mx_map)} domains classified; {n_google} Google-hosted")
refreshed = []
for r in rows:
npi = r["npi"].strip()
if mx_map:
dom = r.get("email", "").split("@", 1)[-1].strip().lower()
r["mx_provider"] = mx_map.get(dom, "other")
if not npi:
# No NPI to re-check; leave the row's existing status untouched.
refreshed.append(r)
continue
rv = reval.get(npi)
if rv is not None and not args.skip_cms:
r["reval_due_date"] = rv["due_date"]
r["days_overdue"] = str(rv["days_overdue"]) if rv["due_date"] else ""
r["reval_status"] = ("overdue" if rv["overdue"]
else ("upcoming" if rv["due_date"] else "on_list_tbd"))
elif not args.skip_cms:
# No longer on the revalidation list -> they've revalidated / dropped.
# Use the same vocabulary the original list builder emits.
r["reval_status"] = "no_reval_flag"
r["reval_due_date"] = ""
r["days_overdue"] = ""
# Only rewrite the exclusion flag when OIG was actually pulled, so a
# --skip-oig run never blanks existing flags. SAM is supplemental.
if not args.skip_oig:
r["leie_excluded"] = "Y" if npi in excluded else ""
refreshed.append(r)
n_overdue = sum(1 for r in refreshed if r.get("reval_status") == "overdue")
n_upcoming = sum(1 for r in refreshed if r.get("reval_status") == "upcoming")
n_excluded = sum(1 for r in refreshed if (r.get("leie_excluded") or "").strip() not in ("", "0", "false"))
log(f"refreshed: overdue={n_overdue} upcoming={n_upcoming} excluded={n_excluded}")
if args.dry_run:
log("dry-run, no files written")
return 0
# Preserve any columns the master already had beyond HEADER (so we never
# silently drop data on write). HEADER first to keep the canonical order.
master_cols = list(dict.fromkeys(HEADER + [k for r in refreshed for k in r]))
write_atomic(args.master, refreshed, master_cols)
log(f"wrote {args.master} ({len(refreshed)} rows)")
# Propagate the fresh status fields into the channel CSVs the campaign cron
# actually reads. These are email-keyed subsets of the master with extra
# deliverability columns (verify_ok/verify_reason) we must preserve; we only
# overwrite the status fields the refresh owns.
# The refresh OWNS these status fields; it must not touch others (notably
# optout_ending, which only the original list builder computes -- including
# it here would blank it and starve the compliance_bundle segment).
REFRESHED_FIELDS = ["reval_due_date", "days_overdue", "reval_status",
"leie_excluded", "mx_provider", "name", "specialty", "state"]
by_email = {r["email"].strip().lower(): r for r in refreshed if r.get("email")}
channel_csvs = [os.path.join(args.out_dir, f) for f in
("hc_warmup_nongoogle.csv", "hc_warmup_google.csv",
"hc_warmup_week1_verified.csv")]
for path in channel_csvs:
if not os.path.exists(path):
continue
with open(path, newline="") as f:
rdr = csv.DictReader(f)
cols = list(rdr.fieldnames or [])
rows_ch = list(rdr)
# Add any refreshed field the channel CSV doesn't have yet (e.g. a newly
# introduced mx_provider column) so the cron can read it.
for fld in REFRESHED_FIELDS:
if fld not in cols:
cols.append(fld)
updated = 0
for r in rows_ch:
m = by_email.get(r.get("email", "").strip().lower())
if not m:
continue
for fld in REFRESHED_FIELDS:
if fld in m:
r[fld] = m[fld]
updated += 1
write_atomic(path, rows_ch, cols)
log(f"propagated to {os.path.basename(path)}: {updated}/{len(rows_ch)} rows updated")
return 0
if __name__ == "__main__":
sys.exit(main())