new-site/scripts/workers/daily_paper_batch.py
justin e6a630ada1 healthcare: verify CMS-10114 update path, correct NPI Enumerator address, build CMS-10114 filler
Verified firsthand against the live CMS-10114 (Rev. 02/25, OMB 0938-0931):
- Section 1A confirms paper is valid for Change of Information (#2) AND
  Reactivation (#4), not just initial enumeration. Resolves the UNCERTAIN flag.
- Current mailing address is CMS NPI Enumerator Services, Mail Stop DO-01-51,
  7500 Security Blvd, Baltimore MD 21244. The old Fargo PO Box 6059 is retired;
  corrected in mac_routing.NPI_ENUMERATOR + all docs.
- No electronic no-login equivalent exists for CMS (NPI Registry API is
  read-only; PECOS/NPPES-IA require login), unlike FMCSA's ask.fmcsa ticket form.
  So tiers stay: Standard=paper CMS-10114 (no login), Expedited=NPPES surrogate.

New: cms10114_pdf_filler.py fills the flat official form via text overlay
(reason checkbox + NPI + Section 2A identity + Section 4A cert name + signature
anchor); wired into npi_provider._generate_10114_for_signing for nppes-update.
Signed forms route to the NPI Enumerator via the existing daily batch.

Tests: test_cms10114.py 27/27, test_paper_batch.py 15/15, Astro build 58 pages.
2026-06-07 02:04:41 -05:00

225 lines
9.1 KiB
Python

"""Daily paper-filing batch worker (Standard no-login CMS filing path).
Run each postal working-day morning (cron, working-day-gated). It:
1. finds every SIGNED, not-yet-batched paper filing (esign_records with a
cms855/cms10114 document_type, status='signed', a signed PDF, and no
paper_batch_id);
2. determines each filing's destination agency (the provider's MAC from the
order's practice state via mac_routing; the NPI Enumerator in Fargo for
NPPES/CMS-10114 updates);
3. groups by destination and, per destination, builds a cover sheet + merges
all that destination's signed PDFs into ONE print job;
4. records a paper_filing_batches row (one Priority Mail envelope per agency)
and stamps each included esign_records with its paper_batch_id (idempotent);
5. uploads the cover sheet + merged print job to MinIO for a human to print and
drop in the mail (phase 1). Phase 2 can hand the merged PDF to a print-mail
API and fill tracking_number automatically.
Idempotent: a filing is picked up only while paper_batch_id IS NULL, and a
(batch_date, destination) batch is created at most once (unique index).
Usage:
python -m scripts.workers.daily_paper_batch # run today's batch
python -m scripts.workers.daily_paper_batch --dry-run
"""
from __future__ import annotations
import argparse
import logging
import os
import tempfile
from datetime import date, datetime, timezone
LOG = logging.getLogger("workers.daily_paper_batch")
# Document types that ride the paper (Standard) filing path.
PAPER_DOC_PREFIXES = ("cms855", "cms10114")
def _is_postal_working_day(d: date) -> bool:
try:
from scripts.workers.business_days import is_business_day
except ImportError:
from business_days import is_business_day # type: ignore
return is_business_day(d)
def _destination_for(practice_state: str, document_type: str):
"""Return the (key, name, address_lines) destination for a filing.
NPPES/CMS-10114 updates go to the NPI Enumerator (Baltimore, MD); CMS-855s go
to the provider's MAC by state.
"""
try:
from scripts.workers import mac_routing as mr
except ImportError:
import mac_routing as mr # type: ignore
if (document_type or "").startswith("cms10114"):
d = mr.NPI_ENUMERATOR
return d.key, d.name, d.address_lines
mac = mr.mac_for_state(practice_state or "")
if not mac:
return None
return mac.key, mac.name, mac.address_lines
def run_batch(dry_run: bool = False, batch_date: date | None = None) -> dict:
"""Build today's per-destination batches. Returns a summary dict."""
import json
import psycopg2
today = batch_date or datetime.now(timezone.utc).date()
if not _is_postal_working_day(today):
LOG.info("[batch] %s is not a postal working day — skipping", today)
return {"skipped": "non-working-day", "date": today.isoformat()}
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
summary = {"date": today.isoformat(), "destinations": {}, "total_items": 0}
try:
# 1. Pull signed, unbatched paper filings + their order practice_state.
like_clauses = " OR ".join(
["e.document_type LIKE %s"] * len(PAPER_DOC_PREFIXES)
)
params = [p + "%" for p in PAPER_DOC_PREFIXES]
with conn.cursor() as cur:
cur.execute(
f"""
SELECT e.id, e.order_number, e.document_type,
e.signed_document_minio_key, e.document_metadata,
COALESCE(o.intake_data->>'practice_state', '') AS practice_state,
COALESCE(o.intake_data->>'provider_name', o.customer_name, '') AS provider,
COALESCE(o.intake_data->>'npi', '') AS npi
FROM esign_records e
LEFT JOIN compliance_orders o ON o.order_number = e.order_number
WHERE e.status = 'signed'
AND e.paper_batch_id IS NULL
AND e.signed_document_minio_key IS NOT NULL
AND ({like_clauses})
ORDER BY e.order_number
""",
params,
)
rows = cur.fetchall()
if not rows:
LOG.info("[batch] no signed unbatched paper filings for %s", today)
return summary
# 2/3. Group by destination.
groups: dict[str, dict] = {}
for (eid, order_number, doc_type, signed_key, meta,
practice_state, provider, npi) in rows:
dest = _destination_for(practice_state, doc_type)
if not dest:
LOG.warning("[batch] no MAC destination for order %s (state=%r) — left unbatched",
order_number, practice_state)
continue
key, name, addr = dest
g = groups.setdefault(key, {"name": name, "addr": addr, "items": []})
g["items"].append({
"esign_id": eid,
"order_number": order_number,
"provider": provider,
"npi": npi,
"form": (doc_type or "").replace("cms", "").upper(),
"signed_key": signed_key,
})
# 4/5. Per destination, build cover sheet + merged PDF, persist.
try:
from scripts.document_gen.templates.batch_cover_sheet import (
build_cover_sheet, merge_batch_pdf,
)
from scripts.document_gen.minio_client import MinioStorage
except ImportError:
from document_gen.templates.batch_cover_sheet import ( # type: ignore
build_cover_sheet, merge_batch_pdf,
)
from document_gen.minio_client import MinioStorage # type: ignore
storage = None if dry_run else MinioStorage()
for key, g in groups.items():
items = g["items"]
summary["destinations"][key] = {"name": g["name"], "items": len(items)}
summary["total_items"] += len(items)
LOG.info("[batch] %s -> %s: %d filing(s)", today, g["name"], len(items))
if dry_run:
continue
# Download each signed PDF.
filing_pdfs = []
for it in items:
try:
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf:
storage.download(it["signed_key"], tf.name)
with open(tf.name, "rb") as fh:
filing_pdfs.append(fh.read())
except Exception as exc:
LOG.error("[batch] could not fetch %s: %s", it["signed_key"], exc)
cover = build_cover_sheet(
destination_name=g["name"],
destination_address_lines=g["addr"],
batch_date=today,
items=items,
)
merged = merge_batch_pdf(cover, filing_pdfs)
cover_key = f"paper-batches/{today.isoformat()}/{key}_cover.pdf"
merged_key = f"paper-batches/{today.isoformat()}/{key}_print.pdf"
for data, dest_key in ((cover, cover_key), (merged, merged_key)):
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf:
tf.write(data)
tf.flush()
storage.upload(tf.name, dest_key, content_type="application/pdf")
addr_block = g["name"] + "\n" + "\n".join(g["addr"])
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO paper_filing_batches
(batch_date, destination_key, destination_name,
destination_address, item_count, cover_sheet_key,
merged_pdf_key, status)
VALUES (%s,%s,%s,%s,%s,%s,%s,'prepared')
ON CONFLICT (batch_date, destination_key) DO UPDATE
SET item_count = EXCLUDED.item_count,
cover_sheet_key = EXCLUDED.cover_sheet_key,
merged_pdf_key = EXCLUDED.merged_pdf_key
RETURNING id
""",
(today, key, g["name"], addr_block, len(items),
cover_key, merged_key),
)
batch_id = cur.fetchone()[0]
cur.execute(
"UPDATE esign_records SET paper_batch_id = %s, filing_destination_key = %s, updated_at = NOW() WHERE id = ANY(%s)",
(batch_id, key, [it["esign_id"] for it in items]),
)
conn.commit()
return summary
finally:
conn.close()
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
ap = argparse.ArgumentParser()
ap.add_argument("--dry-run", action="store_true", help="report groups, write nothing")
ap.add_argument("--date", help="override batch date YYYY-MM-DD (testing)")
args = ap.parse_args()
bd = date.fromisoformat(args.date) if args.date else None
summary = run_batch(dry_run=args.dry_run, batch_date=bd)
import json
print(json.dumps(summary, indent=2))
if __name__ == "__main__":
main()