Set up the CLIA recurring-renewal vein (every clinical lab renews its CLIA cert on a 2-year cycle; CMS publishes the full lab file with expiration dates): - service-catalog: clia-renewal ($449, discountable) + order page (npi-intake steps) + intake manifest entry. - harvest_clia_renewals.py: parse the CMS Provider-of-Services CLIA file, filter to labs expiring within a window (default 120d), emit name/address/phone/expiry. 676k labs -> ~70k expiring in the next ~4 months. - match_clia_to_nppes.py: CLIA has no NPI/email, so bridge to emailable NPPES orgs by normalized name+zip to recover NPI+email (yield TBD; labs that do not match still have clean phone+postal for a phone/mail channel). - hc_clia_renewal.html: warm turnover-safety-net email with the striped official- record card (CLIA #, expiry, status), verify-on-CMS-QCOR, founder guarantee card, full CAN-SPAM address.
100 lines
3.6 KiB
Python
100 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Match CLIA labs to an emailable NPPES org by (normalized name + zip5).
|
|
|
|
CLIA POS files have no NPI/email; our NPPES verified set is keyed by NPI. This
|
|
bridges them: it streams the big NPPES npidata_pfile, keeps ONLY the orgs whose
|
|
NPI already has a verified email (so the scan stays cheap), indexes them by
|
|
normalized org-name + zip5, then matches each CLIA lab to recover its NPI+email.
|
|
|
|
Outputs the CLIA renewal rows that got an emailable match, with email +
|
|
mx_provider appended (ready to feed the HC campaign builder as a CLIA segment).
|
|
|
|
Usage:
|
|
python3 scripts/match_clia_to_nppes.py \
|
|
CLIA_RENEWALS.csv NPPES_VERIFIED.csv NPIDATA_PFILE.csv OUT.csv
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import re
|
|
import sys
|
|
|
|
csv.field_size_limit(10_000_000)
|
|
|
|
# npidata_pfile column names we use (stable header names in the NPPES file).
|
|
COL_NPI = "NPI"
|
|
COL_ORG = "Provider Organization Name (Legal Business Name)"
|
|
COL_ZIP_PRACTICE = "Provider Business Practice Location Address Postal Code"
|
|
COL_ENTITY = "Entity Type Code" # 2 = organization
|
|
|
|
|
|
def norm_name(s: str) -> str:
|
|
s = (s or "").upper()
|
|
s = re.sub(r"[^A-Z0-9 ]", " ", s)
|
|
# drop common suffixes/noise that differ between CLIA and NPPES spellings
|
|
s = re.sub(r"\b(LLC|INC|PC|PLLC|PA|LTD|CORP|CO|LP|LLP|THE|DBA)\b", " ", s)
|
|
s = re.sub(r"\s+", " ", s).strip()
|
|
return s
|
|
|
|
|
|
def main() -> int:
|
|
clia_f, nppes_verified_f, npidata_f, out_f = sys.argv[1:5]
|
|
|
|
# 1) emailable NPIs -> (email, mx_provider)
|
|
email_by_npi: dict[str, tuple[str, str]] = {}
|
|
with open(nppes_verified_f, newline="", encoding="utf-8") as f:
|
|
for r in csv.DictReader(f):
|
|
npi = (r.get("npi") or "").strip()
|
|
email = (r.get("email") or "").strip()
|
|
if npi and email and (r.get("verify_ok", "Y") in ("Y", "", "true", "True")):
|
|
email_by_npi[npi] = (email, r.get("mx_provider", ""))
|
|
print(f"emailable NPIs: {len(email_by_npi):,}", file=sys.stderr)
|
|
|
|
# 2) stream npidata_pfile, keep only those NPIs -> index by (name, zip5)
|
|
idx: dict[tuple[str, str], str] = {}
|
|
with open(npidata_f, newline="", encoding="latin-1") as f:
|
|
reader = csv.DictReader(f)
|
|
seen = 0
|
|
for row in reader:
|
|
npi = (row.get(COL_NPI) or "").strip()
|
|
if npi not in email_by_npi:
|
|
continue
|
|
org = norm_name(row.get(COL_ORG, ""))
|
|
zip5 = (row.get(COL_ZIP_PRACTICE) or "").strip()[:5]
|
|
if org and zip5:
|
|
idx[(org, zip5)] = npi
|
|
seen += 1
|
|
if seen == len(email_by_npi):
|
|
break
|
|
print(f"indexed emailable orgs by name+zip: {len(idx):,}", file=sys.stderr)
|
|
|
|
# 3) match CLIA -> index
|
|
matched = 0
|
|
total = 0
|
|
with open(clia_f, newline="", encoding="utf-8") as fin, \
|
|
open(out_f, "w", newline="", encoding="utf-8") as fout:
|
|
reader = csv.DictReader(fin)
|
|
fieldnames = reader.fieldnames + ["npi", "email", "mx_provider"]
|
|
w = csv.DictWriter(fout, fieldnames=fieldnames)
|
|
w.writeheader()
|
|
for row in reader:
|
|
total += 1
|
|
key = (norm_name(row["name"]), (row["zip"] or "")[:5])
|
|
npi = idx.get(key)
|
|
if not npi:
|
|
continue
|
|
email, mx = email_by_npi[npi]
|
|
row["npi"] = npi
|
|
row["email"] = email
|
|
row["mx_provider"] = mx
|
|
w.writerow(row)
|
|
matched += 1
|
|
|
|
print(f"CLIA labs: {total:,} | matched to emailable NPPES org: {matched:,} "
|
|
f"({100*matched/max(total,1):.1f}%)")
|
|
print(f" -> {out_f}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|