173 lines
6 KiB
Python
173 lines
6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Load CMS/OIG NPI companion data into Postgres for the NPI compliance check.
|
|
|
|
Populates:
|
|
npi_revalidation_due <- CMS Revalidation Due List
|
|
npi_exclusions <- OIG LEIE
|
|
npi_optout <- CMS Medicare Opt Out
|
|
|
|
Usage:
|
|
DATABASE_URL=postgresql://... python3 scripts/load_npi_companion_data.py \
|
|
--dir /tmp/npi_companion
|
|
|
|
Source CSVs (free/public):
|
|
revalidation_due.csv data.cms.gov Medicare Revalidation Due List
|
|
leie.csv oig.hhs.gov LEIE downloadable database
|
|
optout.csv data.cms.gov Medicare Opt Out
|
|
"""
|
|
import argparse
|
|
import csv
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
|
|
DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
|
|
|
|
# CMS/OIG exports are not always clean UTF-8 (stray latin-1 bytes like 0xa0).
|
|
# Decode leniently so a few bad bytes don't abort a multi-hundred-MB load.
|
|
def open_csv(path):
|
|
return open(path, newline="", encoding="utf-8-sig", errors="replace")
|
|
|
|
|
|
def parse_date(s):
|
|
if not s:
|
|
return None
|
|
s = s.strip()
|
|
if not s or s in ("00000000", "TBD"):
|
|
return None
|
|
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%Y%m%d", "%m/%d/%y"):
|
|
try:
|
|
return datetime.strptime(s, fmt).date()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def clean_npi(s):
|
|
s = (s or "").strip()
|
|
return s if s and s != "0000000000" and len(s) == 10 and s.isdigit() else (s or None)
|
|
|
|
|
|
def load_revalidation(conn, path):
|
|
rows = []
|
|
with open_csv(path) as f:
|
|
for r in csv.DictReader(f):
|
|
npi = (r.get("National Provider Identifier") or "").strip()
|
|
if not (npi.isdigit() and len(npi) == 10):
|
|
continue
|
|
rows.append((
|
|
npi,
|
|
(r.get("Enrollment ID") or "").strip() or None,
|
|
(r.get("First Name") or "").strip() or None,
|
|
(r.get("Last Name") or "").strip() or None,
|
|
(r.get("Organization Name") or "").strip() or None,
|
|
(r.get("Enrollment State Code") or "").strip() or None,
|
|
(r.get("Enrollment Type") or "").strip() or None,
|
|
(r.get("Provider Type Text") or "").strip() or None,
|
|
(r.get("Enrollment Specialty") or "").strip() or None,
|
|
parse_date(r.get("Revalidation Due Date")),
|
|
parse_date(r.get("Adjusted Due Date")),
|
|
(r.get("Individual Total Reassign To") or "").strip() or None,
|
|
(r.get("Receiving Benefits Reassignment") or "").strip() or None,
|
|
))
|
|
with conn.cursor() as cur:
|
|
cur.execute("TRUNCATE npi_revalidation_due RESTART IDENTITY")
|
|
execute_values(cur, """
|
|
INSERT INTO npi_revalidation_due
|
|
(npi, enrollment_id, first_name, last_name, organization_name,
|
|
enrollment_state, enrollment_type, provider_type, specialty,
|
|
revalidation_due_date, adjusted_due_date, reassign_to, receiving_reassignment)
|
|
VALUES %s
|
|
""", rows, page_size=5000)
|
|
conn.commit()
|
|
return len(rows)
|
|
|
|
|
|
def load_exclusions(conn, path):
|
|
rows = []
|
|
with open_csv(path) as f:
|
|
for r in csv.DictReader(f):
|
|
rows.append((
|
|
clean_npi(r.get("NPI")),
|
|
(r.get("LASTNAME") or "").strip() or None,
|
|
(r.get("FIRSTNAME") or "").strip() or None,
|
|
(r.get("MIDNAME") or "").strip() or None,
|
|
(r.get("BUSNAME") or "").strip() or None,
|
|
(r.get("GENERAL") or "").strip() or None,
|
|
(r.get("SPECIALTY") or "").strip() or None,
|
|
(r.get("STATE") or "").strip() or None,
|
|
(r.get("EXCLTYPE") or "").strip() or None,
|
|
parse_date(r.get("EXCLDATE")),
|
|
parse_date(r.get("REINDATE")),
|
|
))
|
|
with conn.cursor() as cur:
|
|
cur.execute("TRUNCATE npi_exclusions RESTART IDENTITY")
|
|
execute_values(cur, """
|
|
INSERT INTO npi_exclusions
|
|
(npi, last_name, first_name, middle_name, business_name,
|
|
general_category, specialty, state, exclusion_type,
|
|
exclusion_date, reinstatement_date)
|
|
VALUES %s
|
|
""", rows, page_size=5000)
|
|
conn.commit()
|
|
return len(rows)
|
|
|
|
|
|
def load_optout(conn, path):
|
|
rows = []
|
|
with open_csv(path) as f:
|
|
for r in csv.DictReader(f):
|
|
npi = (r.get("npi") or r.get("NPI") or "").strip()
|
|
if not (npi.isdigit() and len(npi) == 10):
|
|
continue
|
|
rows.append((
|
|
npi,
|
|
(r.get("First Name") or "").strip() or None,
|
|
(r.get("Last Name") or "").strip() or None,
|
|
(r.get("Specialty") or "").strip() or None,
|
|
parse_date(r.get("Optout Effective Date")),
|
|
parse_date(r.get("Optout End Date")),
|
|
(r.get("State Code") or "").strip() or None,
|
|
))
|
|
with conn.cursor() as cur:
|
|
cur.execute("TRUNCATE npi_optout RESTART IDENTITY")
|
|
execute_values(cur, """
|
|
INSERT INTO npi_optout
|
|
(npi, first_name, last_name, specialty,
|
|
optout_effective_date, optout_end_date, state)
|
|
VALUES %s
|
|
""", rows, page_size=5000)
|
|
conn.commit()
|
|
return len(rows)
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--dir", default="/tmp/npi_companion")
|
|
args = ap.parse_args()
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
try:
|
|
jobs = [
|
|
("revalidation_due.csv", load_revalidation),
|
|
("leie.csv", load_exclusions),
|
|
("optout.csv", load_optout),
|
|
]
|
|
for fname, fn in jobs:
|
|
path = os.path.join(args.dir, fname)
|
|
if not os.path.exists(path):
|
|
print(f" SKIP {fname} (not found at {path})")
|
|
continue
|
|
n = fn(conn, path)
|
|
print(f" loaded {n:,} rows from {fname}")
|
|
finally:
|
|
conn.close()
|
|
print("Done.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|