diff --git a/api/src/service-catalog.ts b/api/src/service-catalog.ts index 4036036..3de381b 100644 --- a/api/src/service-catalog.ts +++ b/api/src/service-catalog.ts @@ -522,6 +522,12 @@ export const COMPLIANCE_SERVICES: Record = { erpnext_item: "NPPES-UPDATE", discountable: true, }, + "clia-renewal": { + name: "CLIA Certificate Renewal", + price_cents: 44900, + erpnext_item: "CLIA-RENEWAL", + discountable: true, + }, "medicare-enrollment": { name: "Medicare Enrollment (PECOS)", price_cents: 69900, diff --git a/data/hc_campaigns/hc_clia_renewal.html b/data/hc_campaigns/hc_clia_renewal.html new file mode 100644 index 0000000..9fcee98 --- /dev/null +++ b/data/hc_campaigns/hc_clia_renewal.html @@ -0,0 +1,117 @@ + +
+
+ + + + + + + + + + + +
diff --git a/scripts/harvest_clia_renewals.py b/scripts/harvest_clia_renewals.py new file mode 100644 index 0000000..ecc929f --- /dev/null +++ b/scripts/harvest_clia_renewals.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +"""Harvest active CLIA labs with an upcoming certificate expiration. + +Reads the CMS "Provider of Services File - Clinical Laboratories" CSV +(clia.DATA.Qx_YYYY.csv from data.cms.gov) and writes the labs whose CLIA +certificate expires within a configurable window. CLIA certificates run on a +2-year cycle, so the expiration date (TRMNTN_EXPRTN_DT) is the recurring +reminder trigger. + +The POS/CLIA file has NO NPI and NO email -- only facility name, mailing +address and phone. So this harvest emits the matchable identity columns +(name + city/state/zip + phone) plus the cert dates; a separate matcher joins +to NPPES (by name+zip) to recover an emailable NPI where possible. Labs that +never match still have a clean phone + postal address for a phone/mail channel. + +Usage: + python3 scripts/harvest_clia_renewals.py CLIA_INPUT.csv OUT.csv [--window-days 120] +""" +from __future__ import annotations + +import argparse +import csv +import sys +from collections import Counter +from datetime import date, datetime, timedelta + +# Columns we need from the POS CLIA file (by header name; robust to reordering). +WANT = { + "clia": "PRVDR_NUM", + "name": "FAC_NAME", + "addr": "ST_ADR", + "city": "CITY_NAME", + "state": "STATE_CD", + "zip": "ZIP_CD", + "phone": "PHNE_NUM", + "expiry": "TRMNTN_EXPRTN_DT", + "effective": "CRTFCT_EFCTV_DT", + "cert_type": "CRTFCT_TYPE_CD", + "compliance": "CMPLNC_STUS_CD", +} + +# CLIA certificate types worth reminding (all are renewable 2yr certs): +# 1 = Registration, 2 = Compliance, 3 = Accreditation, +# 4 = PPM (Provider-Performed Microscopy), 9 = Waiver +# We keep all of them; the expiry window is the real filter. + + +def parse_yyyymmdd(s: str): + s = (s or "").strip() + if len(s) == 8 and s.isdigit(): + try: + return datetime.strptime(s, "%Y%m%d").date() + except ValueError: + return None + return None + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("infile") + ap.add_argument("outfile") + ap.add_argument("--window-days", type=int, default=120, + help="emit labs expiring within this many days from today (default 120)") + ap.add_argument("--include-recently-expired-days", type=int, default=30, + help="also include labs that expired up to N days ago (lapsed, still renewable)") + args = ap.parse_args() + + today = date.today() + horizon = today + timedelta(days=args.window_days) + grace = today - timedelta(days=args.include_recently_expired_days) + + with open(args.infile, newline="", encoding="latin-1") as f: + reader = csv.reader(f) + header = next(reader) + idx = {c: i for i, c in enumerate(header)} + missing = [k for k, col in WANT.items() if col not in idx] + if missing: + print(f"ERROR: input missing columns: {[WANT[m] for m in missing]}", file=sys.stderr) + return 2 + + stats = Counter() + rows_out = [] + for row in reader: + stats["total"] += 1 + if len(row) <= max(idx[c] for c in WANT.values()): + stats["short_row"] += 1 + continue + + exp = parse_yyyymmdd(row[idx[WANT["expiry"]]]) + if not exp: + stats["no_expiry"] += 1 + continue + if not (grace <= exp <= horizon): + stats["outside_window"] += 1 + continue + + name = row[idx[WANT["name"]]].strip() + state = row[idx[WANT["state"]]].strip() + zipc = row[idx[WANT["zip"]]].strip()[:5] + if not name or not state: + stats["no_name_state"] += 1 + continue + + days_until = (exp - today).days + rows_out.append({ + "clia": row[idx[WANT["clia"]]].strip(), + "name": name, + "addr": row[idx[WANT["addr"]]].strip(), + "city": row[idx[WANT["city"]]].strip(), + "state": state, + "zip": zipc, + "phone": row[idx[WANT["phone"]]].strip(), + "expiry_date": exp.isoformat(), + "days_until": days_until, + "cert_type": row[idx[WANT["cert_type"]]].strip(), + "status": "lapsed" if days_until < 0 else "upcoming", + }) + stats["emitted"] += 1 + + rows_out.sort(key=lambda r: r["days_until"]) # most urgent first + with open(args.outfile, "w", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=list(rows_out[0].keys()) if rows_out else + ["clia", "name", "addr", "city", "state", "zip", "phone", + "expiry_date", "days_until", "cert_type", "status"]) + w.writeheader() + w.writerows(rows_out) + + print(f"CLIA harvest: {stats['total']:,} rows scanned") + for k in ("no_expiry", "outside_window", "no_name_state", "short_row"): + if stats[k]: + print(f" skipped {k}: {stats[k]:,}") + print(f" EMITTED (expiring in [-{args.include_recently_expired_days}d, " + f"+{args.window_days}d]): {stats['emitted']:,} -> {args.outfile}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/match_clia_to_nppes.py b/scripts/match_clia_to_nppes.py new file mode 100644 index 0000000..794d330 --- /dev/null +++ b/scripts/match_clia_to_nppes.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +"""Match CLIA labs to an emailable NPPES org by (normalized name + zip5). + +CLIA POS files have no NPI/email; our NPPES verified set is keyed by NPI. This +bridges them: it streams the big NPPES npidata_pfile, keeps ONLY the orgs whose +NPI already has a verified email (so the scan stays cheap), indexes them by +normalized org-name + zip5, then matches each CLIA lab to recover its NPI+email. + +Outputs the CLIA renewal rows that got an emailable match, with email + +mx_provider appended (ready to feed the HC campaign builder as a CLIA segment). + +Usage: + python3 scripts/match_clia_to_nppes.py \ + CLIA_RENEWALS.csv NPPES_VERIFIED.csv NPIDATA_PFILE.csv OUT.csv +""" +from __future__ import annotations + +import csv +import re +import sys + +csv.field_size_limit(10_000_000) + +# npidata_pfile column names we use (stable header names in the NPPES file). +COL_NPI = "NPI" +COL_ORG = "Provider Organization Name (Legal Business Name)" +COL_ZIP_PRACTICE = "Provider Business Practice Location Address Postal Code" +COL_ENTITY = "Entity Type Code" # 2 = organization + + +def norm_name(s: str) -> str: + s = (s or "").upper() + s = re.sub(r"[^A-Z0-9 ]", " ", s) + # drop common suffixes/noise that differ between CLIA and NPPES spellings + s = re.sub(r"\b(LLC|INC|PC|PLLC|PA|LTD|CORP|CO|LP|LLP|THE|DBA)\b", " ", s) + s = re.sub(r"\s+", " ", s).strip() + return s + + +def main() -> int: + clia_f, nppes_verified_f, npidata_f, out_f = sys.argv[1:5] + + # 1) emailable NPIs -> (email, mx_provider) + email_by_npi: dict[str, tuple[str, str]] = {} + with open(nppes_verified_f, newline="", encoding="utf-8") as f: + for r in csv.DictReader(f): + npi = (r.get("npi") or "").strip() + email = (r.get("email") or "").strip() + if npi and email and (r.get("verify_ok", "Y") in ("Y", "", "true", "True")): + email_by_npi[npi] = (email, r.get("mx_provider", "")) + print(f"emailable NPIs: {len(email_by_npi):,}", file=sys.stderr) + + # 2) stream npidata_pfile, keep only those NPIs -> index by (name, zip5) + idx: dict[tuple[str, str], str] = {} + with open(npidata_f, newline="", encoding="latin-1") as f: + reader = csv.DictReader(f) + seen = 0 + for row in reader: + npi = (row.get(COL_NPI) or "").strip() + if npi not in email_by_npi: + continue + org = norm_name(row.get(COL_ORG, "")) + zip5 = (row.get(COL_ZIP_PRACTICE) or "").strip()[:5] + if org and zip5: + idx[(org, zip5)] = npi + seen += 1 + if seen == len(email_by_npi): + break + print(f"indexed emailable orgs by name+zip: {len(idx):,}", file=sys.stderr) + + # 3) match CLIA -> index + matched = 0 + total = 0 + with open(clia_f, newline="", encoding="utf-8") as fin, \ + open(out_f, "w", newline="", encoding="utf-8") as fout: + reader = csv.DictReader(fin) + fieldnames = reader.fieldnames + ["npi", "email", "mx_provider"] + w = csv.DictWriter(fout, fieldnames=fieldnames) + w.writeheader() + for row in reader: + total += 1 + key = (norm_name(row["name"]), (row["zip"] or "")[:5]) + npi = idx.get(key) + if not npi: + continue + email, mx = email_by_npi[npi] + row["npi"] = npi + row["email"] = email + row["mx_provider"] = mx + w.writerow(row) + matched += 1 + + print(f"CLIA labs: {total:,} | matched to emailable NPPES org: {matched:,} " + f"({100*matched/max(total,1):.1f}%)") + print(f" -> {out_f}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/site/src/lib/intake_manifest.ts b/site/src/lib/intake_manifest.ts index 2f80bc8..af7daba 100644 --- a/site/src/lib/intake_manifest.ts +++ b/site/src/lib/intake_manifest.ts @@ -150,6 +150,7 @@ export const INTAKE_MANIFEST: Record = { "npi-revalidation": ["npi-intake", "review", "payment"], "npi-reactivation": ["npi-intake", "review", "payment"], "nppes-update": ["npi-intake", "review", "payment"], + "clia-renewal": ["npi-intake", "review", "payment"], "medicare-enrollment": ["npi-intake", "review", "payment"], "oig-sam-screening": ["npi-intake", "review", "payment"], "provider-compliance-bundle": ["npi-intake", "review", "payment"], diff --git a/site/src/lib/service-catalog.generated.ts b/site/src/lib/service-catalog.generated.ts index c76b7c6..deb5345 100644 --- a/site/src/lib/service-catalog.generated.ts +++ b/site/src/lib/service-catalog.generated.ts @@ -27,6 +27,7 @@ export const SERVICE_META: Record = { "cdr-storage-tier1": { name: "CDR Storage Tier 1 (50 GB / 50M calls)", price_cents: 9900 }, "cdr-storage-tier2": { name: "CDR Storage Tier 2 (250 GB / 250M calls)", price_cents: 29900 }, "cdr-storage-tier3": { name: "CDR Storage Tier 3 (1 TB / 1B calls)", price_cents: 79900 }, + "clia-renewal": { name: "CLIA Certificate Renewal", price_cents: 44900 }, "cores-frn-registration": { name: "CORES / FRN Registration", price_cents: 14900 }, "corp-formation": { name: "Corporation Formation", price_cents: 24900 }, "cpni-certification": { name: "CPNI Annual Certification", price_cents: 19900 }, diff --git a/site/src/pages/order/clia-renewal.astro b/site/src/pages/order/clia-renewal.astro new file mode 100644 index 0000000..53165b4 --- /dev/null +++ b/site/src/pages/order/clia-renewal.astro @@ -0,0 +1,35 @@ +--- +import Base from "../../layouts/Base.astro"; +import VerticalOrderHeader from "../../components/VerticalOrderHeader.astro"; +import Wizard from "../../components/intake/Wizard.astro"; +import OrderPriceBanner from "../../components/OrderPriceBanner.astro"; +import { INTAKE_MANIFEST, SERVICE_META } from "../../lib/intake_manifest"; + +const slug = "clia-renewal"; +const steps = INTAKE_MANIFEST[slug]; +const meta = SERVICE_META[slug]; +const title = meta ? `Order ${meta.name}` : "Order"; +const description = "Renew your CLIA laboratory certificate with CMS before it expires. CLIA certificates run on a 2-year cycle; an expired certificate stops you from legally performing or billing for lab testing."; +--- + + +
+
+

{meta?.name}

+

{description}

+
+ + + + + + +
+ + +