#!/usr/bin/env python3 """ Load CMS/OIG NPI companion data into Postgres for the NPI compliance check. Populates: npi_revalidation_due <- CMS Revalidation Due List npi_exclusions <- OIG LEIE npi_optout <- CMS Medicare Opt Out Usage: DATABASE_URL=postgresql://... python3 scripts/load_npi_companion_data.py \ --dir /tmp/npi_companion Source CSVs (free/public): revalidation_due.csv data.cms.gov Medicare Revalidation Due List leie.csv oig.hhs.gov LEIE downloadable database optout.csv data.cms.gov Medicare Opt Out """ import argparse import csv import os import sys from datetime import datetime import psycopg2 from psycopg2.extras import execute_values DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") # CMS/OIG exports are not always clean UTF-8 (stray latin-1 bytes like 0xa0). # Decode leniently so a few bad bytes don't abort a multi-hundred-MB load. def open_csv(path): return open(path, newline="", encoding="utf-8-sig", errors="replace") def parse_date(s): if not s: return None s = s.strip() if not s or s in ("00000000", "TBD"): return None for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%Y%m%d", "%m/%d/%y"): try: return datetime.strptime(s, fmt).date() except ValueError: continue return None def clean_npi(s): s = (s or "").strip() return s if s and s != "0000000000" and len(s) == 10 and s.isdigit() else (s or None) def load_revalidation(conn, path): rows = [] with open_csv(path) as f: for r in csv.DictReader(f): npi = (r.get("National Provider Identifier") or "").strip() if not (npi.isdigit() and len(npi) == 10): continue rows.append(( npi, (r.get("Enrollment ID") or "").strip() or None, (r.get("First Name") or "").strip() or None, (r.get("Last Name") or "").strip() or None, (r.get("Organization Name") or "").strip() or None, (r.get("Enrollment State Code") or "").strip() or None, (r.get("Enrollment Type") or "").strip() or None, (r.get("Provider Type Text") or "").strip() or None, (r.get("Enrollment Specialty") or "").strip() or None, parse_date(r.get("Revalidation Due Date")), parse_date(r.get("Adjusted Due Date")), (r.get("Individual Total Reassign To") or "").strip() or None, (r.get("Receiving Benefits Reassignment") or "").strip() or None, )) with conn.cursor() as cur: cur.execute("TRUNCATE npi_revalidation_due RESTART IDENTITY") execute_values(cur, """ INSERT INTO npi_revalidation_due (npi, enrollment_id, first_name, last_name, organization_name, enrollment_state, enrollment_type, provider_type, specialty, revalidation_due_date, adjusted_due_date, reassign_to, receiving_reassignment) VALUES %s """, rows, page_size=5000) conn.commit() return len(rows) def load_exclusions(conn, path): rows = [] with open_csv(path) as f: for r in csv.DictReader(f): rows.append(( clean_npi(r.get("NPI")), (r.get("LASTNAME") or "").strip() or None, (r.get("FIRSTNAME") or "").strip() or None, (r.get("MIDNAME") or "").strip() or None, (r.get("BUSNAME") or "").strip() or None, (r.get("GENERAL") or "").strip() or None, (r.get("SPECIALTY") or "").strip() or None, (r.get("STATE") or "").strip() or None, (r.get("EXCLTYPE") or "").strip() or None, parse_date(r.get("EXCLDATE")), parse_date(r.get("REINDATE")), )) with conn.cursor() as cur: cur.execute("TRUNCATE npi_exclusions RESTART IDENTITY") execute_values(cur, """ INSERT INTO npi_exclusions (npi, last_name, first_name, middle_name, business_name, general_category, specialty, state, exclusion_type, exclusion_date, reinstatement_date) VALUES %s """, rows, page_size=5000) conn.commit() return len(rows) def load_optout(conn, path): rows = [] with open_csv(path) as f: for r in csv.DictReader(f): npi = (r.get("npi") or r.get("NPI") or "").strip() if not (npi.isdigit() and len(npi) == 10): continue rows.append(( npi, (r.get("First Name") or "").strip() or None, (r.get("Last Name") or "").strip() or None, (r.get("Specialty") or "").strip() or None, parse_date(r.get("Optout Effective Date")), parse_date(r.get("Optout End Date")), (r.get("State Code") or "").strip() or None, )) with conn.cursor() as cur: cur.execute("TRUNCATE npi_optout RESTART IDENTITY") execute_values(cur, """ INSERT INTO npi_optout (npi, first_name, last_name, specialty, optout_effective_date, optout_end_date, state) VALUES %s """, rows, page_size=5000) conn.commit() return len(rows) def main(): ap = argparse.ArgumentParser() ap.add_argument("--dir", default="/tmp/npi_companion") args = ap.parse_args() conn = psycopg2.connect(DATABASE_URL) try: jobs = [ ("revalidation_due.csv", load_revalidation), ("leie.csv", load_exclusions), ("optout.csv", load_optout), ] for fname, fn in jobs: path = os.path.join(args.dir, fname) if not os.path.exists(path): print(f" SKIP {fname} (not found at {path})") continue n = fn(conn, path) print(f" loaded {n:,} rows from {fname}") finally: conn.close() print("Done.") if __name__ == "__main__": main()