new-site/scripts/harvest_clia_renewals.py
justin 9c7a08f5c9 clia: new CLIA certificate renewal service, order page, email template + harvest
Set up the CLIA recurring-renewal vein (every clinical lab renews its CLIA cert
on a 2-year cycle; CMS publishes the full lab file with expiration dates):
- service-catalog: clia-renewal ($449, discountable) + order page (npi-intake
  steps) + intake manifest entry.
- harvest_clia_renewals.py: parse the CMS Provider-of-Services CLIA file, filter
  to labs expiring within a window (default 120d), emit name/address/phone/expiry.
  676k labs -> ~70k expiring in the next ~4 months.
- match_clia_to_nppes.py: CLIA has no NPI/email, so bridge to emailable NPPES
  orgs by normalized name+zip to recover NPI+email (yield TBD; labs that do not
  match still have clean phone+postal for a phone/mail channel).
- hc_clia_renewal.html: warm turnover-safety-net email with the striped official-
  record card (CLIA #, expiry, status), verify-on-CMS-QCOR, founder guarantee
  card, full CAN-SPAM address.
2026-06-13 22:10:51 -05:00

138 lines
5.1 KiB
Python

#!/usr/bin/env python3
"""Harvest active CLIA labs with an upcoming certificate expiration.
Reads the CMS "Provider of Services File - Clinical Laboratories" CSV
(clia.DATA.Qx_YYYY.csv from data.cms.gov) and writes the labs whose CLIA
certificate expires within a configurable window. CLIA certificates run on a
2-year cycle, so the expiration date (TRMNTN_EXPRTN_DT) is the recurring
reminder trigger.
The POS/CLIA file has NO NPI and NO email -- only facility name, mailing
address and phone. So this harvest emits the matchable identity columns
(name + city/state/zip + phone) plus the cert dates; a separate matcher joins
to NPPES (by name+zip) to recover an emailable NPI where possible. Labs that
never match still have a clean phone + postal address for a phone/mail channel.
Usage:
python3 scripts/harvest_clia_renewals.py CLIA_INPUT.csv OUT.csv [--window-days 120]
"""
from __future__ import annotations
import argparse
import csv
import sys
from collections import Counter
from datetime import date, datetime, timedelta
# Columns we need from the POS CLIA file (by header name; robust to reordering).
WANT = {
"clia": "PRVDR_NUM",
"name": "FAC_NAME",
"addr": "ST_ADR",
"city": "CITY_NAME",
"state": "STATE_CD",
"zip": "ZIP_CD",
"phone": "PHNE_NUM",
"expiry": "TRMNTN_EXPRTN_DT",
"effective": "CRTFCT_EFCTV_DT",
"cert_type": "CRTFCT_TYPE_CD",
"compliance": "CMPLNC_STUS_CD",
}
# CLIA certificate types worth reminding (all are renewable 2yr certs):
# 1 = Registration, 2 = Compliance, 3 = Accreditation,
# 4 = PPM (Provider-Performed Microscopy), 9 = Waiver
# We keep all of them; the expiry window is the real filter.
def parse_yyyymmdd(s: str):
s = (s or "").strip()
if len(s) == 8 and s.isdigit():
try:
return datetime.strptime(s, "%Y%m%d").date()
except ValueError:
return None
return None
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("infile")
ap.add_argument("outfile")
ap.add_argument("--window-days", type=int, default=120,
help="emit labs expiring within this many days from today (default 120)")
ap.add_argument("--include-recently-expired-days", type=int, default=30,
help="also include labs that expired up to N days ago (lapsed, still renewable)")
args = ap.parse_args()
today = date.today()
horizon = today + timedelta(days=args.window_days)
grace = today - timedelta(days=args.include_recently_expired_days)
with open(args.infile, newline="", encoding="latin-1") as f:
reader = csv.reader(f)
header = next(reader)
idx = {c: i for i, c in enumerate(header)}
missing = [k for k, col in WANT.items() if col not in idx]
if missing:
print(f"ERROR: input missing columns: {[WANT[m] for m in missing]}", file=sys.stderr)
return 2
stats = Counter()
rows_out = []
for row in reader:
stats["total"] += 1
if len(row) <= max(idx[c] for c in WANT.values()):
stats["short_row"] += 1
continue
exp = parse_yyyymmdd(row[idx[WANT["expiry"]]])
if not exp:
stats["no_expiry"] += 1
continue
if not (grace <= exp <= horizon):
stats["outside_window"] += 1
continue
name = row[idx[WANT["name"]]].strip()
state = row[idx[WANT["state"]]].strip()
zipc = row[idx[WANT["zip"]]].strip()[:5]
if not name or not state:
stats["no_name_state"] += 1
continue
days_until = (exp - today).days
rows_out.append({
"clia": row[idx[WANT["clia"]]].strip(),
"name": name,
"addr": row[idx[WANT["addr"]]].strip(),
"city": row[idx[WANT["city"]]].strip(),
"state": state,
"zip": zipc,
"phone": row[idx[WANT["phone"]]].strip(),
"expiry_date": exp.isoformat(),
"days_until": days_until,
"cert_type": row[idx[WANT["cert_type"]]].strip(),
"status": "lapsed" if days_until < 0 else "upcoming",
})
stats["emitted"] += 1
rows_out.sort(key=lambda r: r["days_until"]) # most urgent first
with open(args.outfile, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=list(rows_out[0].keys()) if rows_out else
["clia", "name", "addr", "city", "state", "zip", "phone",
"expiry_date", "days_until", "cert_type", "status"])
w.writeheader()
w.writerows(rows_out)
print(f"CLIA harvest: {stats['total']:,} rows scanned")
for k in ("no_expiry", "outside_window", "no_name_state", "short_row"):
if stats[k]:
print(f" skipped {k}: {stats[k]:,}")
print(f" EMITTED (expiring in [-{args.include_recently_expired_days}d, "
f"+{args.window_days}d]): {stats['emitted']:,} -> {args.outfile}")
return 0
if __name__ == "__main__":
raise SystemExit(main())