#!/usr/bin/env python3 """Match CLIA labs to an emailable NPPES org by (normalized name + zip5). CLIA POS files have no NPI/email; our NPPES verified set is keyed by NPI. This bridges them: it streams the big NPPES npidata_pfile, keeps ONLY the orgs whose NPI already has a verified email (so the scan stays cheap), indexes them by normalized org-name + zip5, then matches each CLIA lab to recover its NPI+email. Outputs the CLIA renewal rows that got an emailable match, with email + mx_provider appended (ready to feed the HC campaign builder as a CLIA segment). Usage: python3 scripts/match_clia_to_nppes.py \ CLIA_RENEWALS.csv NPPES_VERIFIED.csv NPIDATA_PFILE.csv OUT.csv """ from __future__ import annotations import csv import re import sys csv.field_size_limit(10_000_000) # npidata_pfile column names we use (stable header names in the NPPES file). COL_NPI = "NPI" COL_ORG = "Provider Organization Name (Legal Business Name)" COL_ZIP_PRACTICE = "Provider Business Practice Location Address Postal Code" COL_ENTITY = "Entity Type Code" # 2 = organization def norm_name(s: str) -> str: s = (s or "").upper() s = re.sub(r"[^A-Z0-9 ]", " ", s) # drop common suffixes/noise that differ between CLIA and NPPES spellings s = re.sub(r"\b(LLC|INC|PC|PLLC|PA|LTD|CORP|CO|LP|LLP|THE|DBA)\b", " ", s) s = re.sub(r"\s+", " ", s).strip() return s def main() -> int: clia_f, nppes_verified_f, npidata_f, out_f = sys.argv[1:5] # 1) emailable NPIs -> (email, mx_provider) email_by_npi: dict[str, tuple[str, str]] = {} with open(nppes_verified_f, newline="", encoding="utf-8") as f: for r in csv.DictReader(f): npi = (r.get("npi") or "").strip() email = (r.get("email") or "").strip() if npi and email and (r.get("verify_ok", "Y") in ("Y", "", "true", "True")): email_by_npi[npi] = (email, r.get("mx_provider", "")) print(f"emailable NPIs: {len(email_by_npi):,}", file=sys.stderr) # 2) stream npidata_pfile, keep only those NPIs -> index by (name, zip5) idx: dict[tuple[str, str], str] = {} with open(npidata_f, newline="", encoding="latin-1") as f: reader = csv.DictReader(f) seen = 0 for row in reader: npi = (row.get(COL_NPI) or "").strip() if npi not in email_by_npi: continue org = norm_name(row.get(COL_ORG, "")) zip5 = (row.get(COL_ZIP_PRACTICE) or "").strip()[:5] if org and zip5: idx[(org, zip5)] = npi seen += 1 if seen == len(email_by_npi): break print(f"indexed emailable orgs by name+zip: {len(idx):,}", file=sys.stderr) # 3) match CLIA -> index matched = 0 total = 0 with open(clia_f, newline="", encoding="utf-8") as fin, \ open(out_f, "w", newline="", encoding="utf-8") as fout: reader = csv.DictReader(fin) fieldnames = reader.fieldnames + ["npi", "email", "mx_provider"] w = csv.DictWriter(fout, fieldnames=fieldnames) w.writeheader() for row in reader: total += 1 key = (norm_name(row["name"]), (row["zip"] or "")[:5]) npi = idx.get(key) if not npi: continue email, mx = email_by_npi[npi] row["npi"] = npi row["email"] = email row["mx_provider"] = mx w.writerow(row) matched += 1 print(f"CLIA labs: {total:,} | matched to emailable NPPES org: {matched:,} " f"({100*matched/max(total,1):.1f}%)") print(f" -> {out_f}") return 0 if __name__ == "__main__": raise SystemExit(main())