clia: new CLIA certificate renewal service, order page, email template + harvest
Set up the CLIA recurring-renewal vein (every clinical lab renews its CLIA cert on a 2-year cycle; CMS publishes the full lab file with expiration dates): - service-catalog: clia-renewal ($449, discountable) + order page (npi-intake steps) + intake manifest entry. - harvest_clia_renewals.py: parse the CMS Provider-of-Services CLIA file, filter to labs expiring within a window (default 120d), emit name/address/phone/expiry. 676k labs -> ~70k expiring in the next ~4 months. - match_clia_to_nppes.py: CLIA has no NPI/email, so bridge to emailable NPPES orgs by normalized name+zip to recover NPI+email (yield TBD; labs that do not match still have clean phone+postal for a phone/mail channel). - hc_clia_renewal.html: warm turnover-safety-net email with the striped official- record card (CLIA #, expiry, status), verify-on-CMS-QCOR, founder guarantee card, full CAN-SPAM address.
This commit is contained in:
parent
d1a9260854
commit
9c7a08f5c9
7 changed files with 398 additions and 0 deletions
138
scripts/harvest_clia_renewals.py
Normal file
138
scripts/harvest_clia_renewals.py
Normal file
|
|
@ -0,0 +1,138 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Harvest active CLIA labs with an upcoming certificate expiration.
|
||||
|
||||
Reads the CMS "Provider of Services File - Clinical Laboratories" CSV
|
||||
(clia.DATA.Qx_YYYY.csv from data.cms.gov) and writes the labs whose CLIA
|
||||
certificate expires within a configurable window. CLIA certificates run on a
|
||||
2-year cycle, so the expiration date (TRMNTN_EXPRTN_DT) is the recurring
|
||||
reminder trigger.
|
||||
|
||||
The POS/CLIA file has NO NPI and NO email -- only facility name, mailing
|
||||
address and phone. So this harvest emits the matchable identity columns
|
||||
(name + city/state/zip + phone) plus the cert dates; a separate matcher joins
|
||||
to NPPES (by name+zip) to recover an emailable NPI where possible. Labs that
|
||||
never match still have a clean phone + postal address for a phone/mail channel.
|
||||
|
||||
Usage:
|
||||
python3 scripts/harvest_clia_renewals.py CLIA_INPUT.csv OUT.csv [--window-days 120]
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import sys
|
||||
from collections import Counter
|
||||
from datetime import date, datetime, timedelta
|
||||
|
||||
# Columns we need from the POS CLIA file (by header name; robust to reordering).
|
||||
WANT = {
|
||||
"clia": "PRVDR_NUM",
|
||||
"name": "FAC_NAME",
|
||||
"addr": "ST_ADR",
|
||||
"city": "CITY_NAME",
|
||||
"state": "STATE_CD",
|
||||
"zip": "ZIP_CD",
|
||||
"phone": "PHNE_NUM",
|
||||
"expiry": "TRMNTN_EXPRTN_DT",
|
||||
"effective": "CRTFCT_EFCTV_DT",
|
||||
"cert_type": "CRTFCT_TYPE_CD",
|
||||
"compliance": "CMPLNC_STUS_CD",
|
||||
}
|
||||
|
||||
# CLIA certificate types worth reminding (all are renewable 2yr certs):
|
||||
# 1 = Registration, 2 = Compliance, 3 = Accreditation,
|
||||
# 4 = PPM (Provider-Performed Microscopy), 9 = Waiver
|
||||
# We keep all of them; the expiry window is the real filter.
|
||||
|
||||
|
||||
def parse_yyyymmdd(s: str):
|
||||
s = (s or "").strip()
|
||||
if len(s) == 8 and s.isdigit():
|
||||
try:
|
||||
return datetime.strptime(s, "%Y%m%d").date()
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("infile")
|
||||
ap.add_argument("outfile")
|
||||
ap.add_argument("--window-days", type=int, default=120,
|
||||
help="emit labs expiring within this many days from today (default 120)")
|
||||
ap.add_argument("--include-recently-expired-days", type=int, default=30,
|
||||
help="also include labs that expired up to N days ago (lapsed, still renewable)")
|
||||
args = ap.parse_args()
|
||||
|
||||
today = date.today()
|
||||
horizon = today + timedelta(days=args.window_days)
|
||||
grace = today - timedelta(days=args.include_recently_expired_days)
|
||||
|
||||
with open(args.infile, newline="", encoding="latin-1") as f:
|
||||
reader = csv.reader(f)
|
||||
header = next(reader)
|
||||
idx = {c: i for i, c in enumerate(header)}
|
||||
missing = [k for k, col in WANT.items() if col not in idx]
|
||||
if missing:
|
||||
print(f"ERROR: input missing columns: {[WANT[m] for m in missing]}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
stats = Counter()
|
||||
rows_out = []
|
||||
for row in reader:
|
||||
stats["total"] += 1
|
||||
if len(row) <= max(idx[c] for c in WANT.values()):
|
||||
stats["short_row"] += 1
|
||||
continue
|
||||
|
||||
exp = parse_yyyymmdd(row[idx[WANT["expiry"]]])
|
||||
if not exp:
|
||||
stats["no_expiry"] += 1
|
||||
continue
|
||||
if not (grace <= exp <= horizon):
|
||||
stats["outside_window"] += 1
|
||||
continue
|
||||
|
||||
name = row[idx[WANT["name"]]].strip()
|
||||
state = row[idx[WANT["state"]]].strip()
|
||||
zipc = row[idx[WANT["zip"]]].strip()[:5]
|
||||
if not name or not state:
|
||||
stats["no_name_state"] += 1
|
||||
continue
|
||||
|
||||
days_until = (exp - today).days
|
||||
rows_out.append({
|
||||
"clia": row[idx[WANT["clia"]]].strip(),
|
||||
"name": name,
|
||||
"addr": row[idx[WANT["addr"]]].strip(),
|
||||
"city": row[idx[WANT["city"]]].strip(),
|
||||
"state": state,
|
||||
"zip": zipc,
|
||||
"phone": row[idx[WANT["phone"]]].strip(),
|
||||
"expiry_date": exp.isoformat(),
|
||||
"days_until": days_until,
|
||||
"cert_type": row[idx[WANT["cert_type"]]].strip(),
|
||||
"status": "lapsed" if days_until < 0 else "upcoming",
|
||||
})
|
||||
stats["emitted"] += 1
|
||||
|
||||
rows_out.sort(key=lambda r: r["days_until"]) # most urgent first
|
||||
with open(args.outfile, "w", newline="", encoding="utf-8") as f:
|
||||
w = csv.DictWriter(f, fieldnames=list(rows_out[0].keys()) if rows_out else
|
||||
["clia", "name", "addr", "city", "state", "zip", "phone",
|
||||
"expiry_date", "days_until", "cert_type", "status"])
|
||||
w.writeheader()
|
||||
w.writerows(rows_out)
|
||||
|
||||
print(f"CLIA harvest: {stats['total']:,} rows scanned")
|
||||
for k in ("no_expiry", "outside_window", "no_name_state", "short_row"):
|
||||
if stats[k]:
|
||||
print(f" skipped {k}: {stats[k]:,}")
|
||||
print(f" EMITTED (expiring in [-{args.include_recently_expired_days}d, "
|
||||
f"+{args.window_days}d]): {stats['emitted']:,} -> {args.outfile}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
100
scripts/match_clia_to_nppes.py
Normal file
100
scripts/match_clia_to_nppes.py
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Match CLIA labs to an emailable NPPES org by (normalized name + zip5).
|
||||
|
||||
CLIA POS files have no NPI/email; our NPPES verified set is keyed by NPI. This
|
||||
bridges them: it streams the big NPPES npidata_pfile, keeps ONLY the orgs whose
|
||||
NPI already has a verified email (so the scan stays cheap), indexes them by
|
||||
normalized org-name + zip5, then matches each CLIA lab to recover its NPI+email.
|
||||
|
||||
Outputs the CLIA renewal rows that got an emailable match, with email +
|
||||
mx_provider appended (ready to feed the HC campaign builder as a CLIA segment).
|
||||
|
||||
Usage:
|
||||
python3 scripts/match_clia_to_nppes.py \
|
||||
CLIA_RENEWALS.csv NPPES_VERIFIED.csv NPIDATA_PFILE.csv OUT.csv
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import re
|
||||
import sys
|
||||
|
||||
csv.field_size_limit(10_000_000)
|
||||
|
||||
# npidata_pfile column names we use (stable header names in the NPPES file).
|
||||
COL_NPI = "NPI"
|
||||
COL_ORG = "Provider Organization Name (Legal Business Name)"
|
||||
COL_ZIP_PRACTICE = "Provider Business Practice Location Address Postal Code"
|
||||
COL_ENTITY = "Entity Type Code" # 2 = organization
|
||||
|
||||
|
||||
def norm_name(s: str) -> str:
|
||||
s = (s or "").upper()
|
||||
s = re.sub(r"[^A-Z0-9 ]", " ", s)
|
||||
# drop common suffixes/noise that differ between CLIA and NPPES spellings
|
||||
s = re.sub(r"\b(LLC|INC|PC|PLLC|PA|LTD|CORP|CO|LP|LLP|THE|DBA)\b", " ", s)
|
||||
s = re.sub(r"\s+", " ", s).strip()
|
||||
return s
|
||||
|
||||
|
||||
def main() -> int:
|
||||
clia_f, nppes_verified_f, npidata_f, out_f = sys.argv[1:5]
|
||||
|
||||
# 1) emailable NPIs -> (email, mx_provider)
|
||||
email_by_npi: dict[str, tuple[str, str]] = {}
|
||||
with open(nppes_verified_f, newline="", encoding="utf-8") as f:
|
||||
for r in csv.DictReader(f):
|
||||
npi = (r.get("npi") or "").strip()
|
||||
email = (r.get("email") or "").strip()
|
||||
if npi and email and (r.get("verify_ok", "Y") in ("Y", "", "true", "True")):
|
||||
email_by_npi[npi] = (email, r.get("mx_provider", ""))
|
||||
print(f"emailable NPIs: {len(email_by_npi):,}", file=sys.stderr)
|
||||
|
||||
# 2) stream npidata_pfile, keep only those NPIs -> index by (name, zip5)
|
||||
idx: dict[tuple[str, str], str] = {}
|
||||
with open(npidata_f, newline="", encoding="latin-1") as f:
|
||||
reader = csv.DictReader(f)
|
||||
seen = 0
|
||||
for row in reader:
|
||||
npi = (row.get(COL_NPI) or "").strip()
|
||||
if npi not in email_by_npi:
|
||||
continue
|
||||
org = norm_name(row.get(COL_ORG, ""))
|
||||
zip5 = (row.get(COL_ZIP_PRACTICE) or "").strip()[:5]
|
||||
if org and zip5:
|
||||
idx[(org, zip5)] = npi
|
||||
seen += 1
|
||||
if seen == len(email_by_npi):
|
||||
break
|
||||
print(f"indexed emailable orgs by name+zip: {len(idx):,}", file=sys.stderr)
|
||||
|
||||
# 3) match CLIA -> index
|
||||
matched = 0
|
||||
total = 0
|
||||
with open(clia_f, newline="", encoding="utf-8") as fin, \
|
||||
open(out_f, "w", newline="", encoding="utf-8") as fout:
|
||||
reader = csv.DictReader(fin)
|
||||
fieldnames = reader.fieldnames + ["npi", "email", "mx_provider"]
|
||||
w = csv.DictWriter(fout, fieldnames=fieldnames)
|
||||
w.writeheader()
|
||||
for row in reader:
|
||||
total += 1
|
||||
key = (norm_name(row["name"]), (row["zip"] or "")[:5])
|
||||
npi = idx.get(key)
|
||||
if not npi:
|
||||
continue
|
||||
email, mx = email_by_npi[npi]
|
||||
row["npi"] = npi
|
||||
row["email"] = email
|
||||
row["mx_provider"] = mx
|
||||
w.writerow(row)
|
||||
matched += 1
|
||||
|
||||
print(f"CLIA labs: {total:,} | matched to emailable NPPES org: {matched:,} "
|
||||
f"({100*matched/max(total,1):.1f}%)")
|
||||
print(f" -> {out_f}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Loading…
Add table
Add a link
Reference in a new issue