#!/usr/bin/env python3 """Harvest active CLIA labs with an upcoming certificate expiration. Reads the CMS "Provider of Services File - Clinical Laboratories" CSV (clia.DATA.Qx_YYYY.csv from data.cms.gov) and writes the labs whose CLIA certificate expires within a configurable window. CLIA certificates run on a 2-year cycle, so the expiration date (TRMNTN_EXPRTN_DT) is the recurring reminder trigger. The POS/CLIA file has NO NPI and NO email -- only facility name, mailing address and phone. So this harvest emits the matchable identity columns (name + city/state/zip + phone) plus the cert dates; a separate matcher joins to NPPES (by name+zip) to recover an emailable NPI where possible. Labs that never match still have a clean phone + postal address for a phone/mail channel. Usage: python3 scripts/harvest_clia_renewals.py CLIA_INPUT.csv OUT.csv [--window-days 120] """ from __future__ import annotations import argparse import csv import sys from collections import Counter from datetime import date, datetime, timedelta # Columns we need from the POS CLIA file (by header name; robust to reordering). WANT = { "clia": "PRVDR_NUM", "name": "FAC_NAME", "addr": "ST_ADR", "city": "CITY_NAME", "state": "STATE_CD", "zip": "ZIP_CD", "phone": "PHNE_NUM", "expiry": "TRMNTN_EXPRTN_DT", "effective": "CRTFCT_EFCTV_DT", "cert_type": "CRTFCT_TYPE_CD", "compliance": "CMPLNC_STUS_CD", } # CLIA certificate types worth reminding (all are renewable 2yr certs): # 1 = Registration, 2 = Compliance, 3 = Accreditation, # 4 = PPM (Provider-Performed Microscopy), 9 = Waiver # We keep all of them; the expiry window is the real filter. def parse_yyyymmdd(s: str): s = (s or "").strip() if len(s) == 8 and s.isdigit(): try: return datetime.strptime(s, "%Y%m%d").date() except ValueError: return None return None def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("infile") ap.add_argument("outfile") ap.add_argument("--window-days", type=int, default=120, help="emit labs expiring within this many days from today (default 120)") ap.add_argument("--include-recently-expired-days", type=int, default=30, help="also include labs that expired up to N days ago (lapsed, still renewable)") args = ap.parse_args() today = date.today() horizon = today + timedelta(days=args.window_days) grace = today - timedelta(days=args.include_recently_expired_days) with open(args.infile, newline="", encoding="latin-1") as f: reader = csv.reader(f) header = next(reader) idx = {c: i for i, c in enumerate(header)} missing = [k for k, col in WANT.items() if col not in idx] if missing: print(f"ERROR: input missing columns: {[WANT[m] for m in missing]}", file=sys.stderr) return 2 stats = Counter() rows_out = [] for row in reader: stats["total"] += 1 if len(row) <= max(idx[c] for c in WANT.values()): stats["short_row"] += 1 continue exp = parse_yyyymmdd(row[idx[WANT["expiry"]]]) if not exp: stats["no_expiry"] += 1 continue if not (grace <= exp <= horizon): stats["outside_window"] += 1 continue name = row[idx[WANT["name"]]].strip() state = row[idx[WANT["state"]]].strip() zipc = row[idx[WANT["zip"]]].strip()[:5] if not name or not state: stats["no_name_state"] += 1 continue days_until = (exp - today).days rows_out.append({ "clia": row[idx[WANT["clia"]]].strip(), "name": name, "addr": row[idx[WANT["addr"]]].strip(), "city": row[idx[WANT["city"]]].strip(), "state": state, "zip": zipc, "phone": row[idx[WANT["phone"]]].strip(), "expiry_date": exp.isoformat(), "days_until": days_until, "cert_type": row[idx[WANT["cert_type"]]].strip(), "status": "lapsed" if days_until < 0 else "upcoming", }) stats["emitted"] += 1 rows_out.sort(key=lambda r: r["days_until"]) # most urgent first with open(args.outfile, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=list(rows_out[0].keys()) if rows_out else ["clia", "name", "addr", "city", "state", "zip", "phone", "expiry_date", "days_until", "cert_type", "status"]) w.writeheader() w.writerows(rows_out) print(f"CLIA harvest: {stats['total']:,} rows scanned") for k in ("no_expiry", "outside_window", "no_name_state", "short_row"): if stats[k]: print(f" skipped {k}: {stats[k]:,}") print(f" EMITTED (expiring in [-{args.include_recently_expired_days}d, " f"+{args.window_days}d]): {stats['emitted']:,} -> {args.outfile}") return 0 if __name__ == "__main__": raise SystemExit(main())