diff --git a/api/migrations/089_paper_filing_batches.sql b/api/migrations/089_paper_filing_batches.sql new file mode 100644 index 0000000..632d07d --- /dev/null +++ b/api/migrations/089_paper_filing_batches.sql @@ -0,0 +1,49 @@ +-- 089: Daily paper-filing batch tracking for the Standard (no-login) CMS filing path. +-- +-- When a provider e-signs a CMS-855 (or CMS-10114), the signed PDF is mailed to +-- the destination agency (the provider's MAC for 855s; the NPI Enumerator in +-- Fargo for NPPES updates). To save postage and handling we batch all signed, +-- not-yet-mailed filings each postal working-day morning, group them by +-- destination agency, and mail one Priority Mail envelope per agency. +-- +-- This migration records, per signed filing, which daily batch it went out in, +-- so the batch worker is idempotent (never re-mails) and we can audit/track. + +-- A paper-filing batch = one Priority Mail envelope to one destination agency +-- on one mailing day. +CREATE TABLE IF NOT EXISTS paper_filing_batches ( + id SERIAL PRIMARY KEY, + batch_date DATE NOT NULL, -- the postal working day mailed + destination_key TEXT NOT NULL, -- mac_routing key, e.g. noridian_je / npi_enumerator + destination_name TEXT NOT NULL DEFAULT '', -- human-readable MAC/agency name + destination_address TEXT NOT NULL DEFAULT '', -- full mailing address block + item_count INTEGER NOT NULL DEFAULT 0, -- number of filings enclosed + cover_sheet_key TEXT, -- MinIO key for the batch cover sheet + merged_pdf_key TEXT, -- MinIO key for the merged print job + tracking_number TEXT, -- USPS tracking, filled when mailed + status TEXT NOT NULL DEFAULT 'prepared' + CHECK (status IN ('prepared', 'mailed', 'cancelled')), + created_at TIMESTAMPTZ DEFAULT NOW(), + mailed_at TIMESTAMPTZ +); + +CREATE INDEX IF NOT EXISTS idx_paper_batch_date ON paper_filing_batches(batch_date); +CREATE UNIQUE INDEX IF NOT EXISTS idx_paper_batch_day_dest + ON paper_filing_batches(batch_date, destination_key); + +-- Link each signed esign filing to the batch it shipped in. NULL = not yet +-- batched (the worker picks these up). Set once -> idempotent (worker skips +-- anything already assigned a batch). +ALTER TABLE esign_records + ADD COLUMN IF NOT EXISTS paper_batch_id INTEGER REFERENCES paper_filing_batches(id); + +-- The destination MAC/agency for this filing, derived from the provider's +-- practice state at sign time (snapshot so later routing-table changes don't +-- retroactively move historical filings). +ALTER TABLE esign_records + ADD COLUMN IF NOT EXISTS filing_destination_key TEXT; + +-- Index the work queue: signed, paper-path filings not yet batched. +CREATE INDEX IF NOT EXISTS idx_esign_unbatched_signed + ON esign_records(status) + WHERE status = 'signed' AND paper_batch_id IS NULL; diff --git a/infra/ansible/roles/worker-crons/defaults/main.yml b/infra/ansible/roles/worker-crons/defaults/main.yml index 23bd1c8..1b2ba33 100644 --- a/infra/ansible/roles/worker-crons/defaults/main.yml +++ b/infra/ansible/roles/worker-crons/defaults/main.yml @@ -219,3 +219,16 @@ worker_crons: module: scripts.workers.intake_reminder on_calendar: "*-*-* 16:00:00 UTC" persistent: true + + # Daily paper-filing batch (Standard no-login CMS filing path) — weekday + # mornings 13:30 UTC (08:30 CT). Groups all signed, not-yet-mailed CMS filings + # by destination agency (provider's MAC; NPI Enumerator in Fargo for NPPES) + # and builds one cover sheet + merged print job per agency, for a human to + # print and mail in a single Priority Mail envelope. The worker self-gates on + # postal working days (skips weekends AND federal/USPS holidays via + # business_days.py), so the Mon-Fri OnCalendar is just a coarse pre-filter. + - name: pw-paper-batch + description: Build daily per-agency CMS paper-filing batch (cover sheet + merged print job) + module: scripts.workers.daily_paper_batch + on_calendar: "Mon..Fri *-*-* 13:30:00 UTC" + persistent: true diff --git a/scripts/document_gen/templates/batch_cover_sheet.py b/scripts/document_gen/templates/batch_cover_sheet.py new file mode 100644 index 0000000..3f5dd65 --- /dev/null +++ b/scripts/document_gen/templates/batch_cover_sheet.py @@ -0,0 +1,151 @@ +"""Daily paper-filing batch cover sheet + merged print job. + +For the Standard (no-login) CMS filing path we batch all signed, not-yet-mailed +filings each postal working day and mail one Priority Mail envelope per +destination agency (the provider's MAC, or the NPI Enumerator in Fargo). This +module builds, for one destination/day: + + 1. a **cover sheet** PDF — sender (Performance West), destination agency + + address, batch date, enclosed count, and a per-item manifest + (order# / provider / NPI / form type); and + 2. a **merged print job** PDF — the cover sheet followed by every enclosed + signed filing, ready to print and drop in one envelope. + +Only reportlab + pypdf are required (already used across document_gen). +""" +from __future__ import annotations + +import io +import logging +from datetime import date + +LOG = logging.getLogger("document_gen.batch_cover_sheet") + +PW_SENDER = ( + "Performance West Inc.", + "Provider Enrollment Filings", + "filings@performancewest.net | (888) 411-0383", +) + + +def build_cover_sheet( + *, + destination_name: str, + destination_address_lines: tuple[str, ...] | list[str], + batch_date: date, + items: list[dict], +) -> bytes: + """Render the batch cover sheet PDF. + + ``items`` = list of dicts with keys: order_number, provider, npi, form. + Returns PDF bytes. + """ + from reportlab.lib.pagesizes import letter + from reportlab.lib.units import inch + from reportlab.lib.colors import HexColor + from reportlab.pdfgen import canvas + from reportlab.lib.utils import simpleSplit + + buf = io.BytesIO() + c = canvas.Canvas(buf, pagesize=letter) + w, h = letter + teal = HexColor("#0f766e") + gray = HexColor("#444444") + x = inch + y = h - inch + + # Header band + c.setFillColor(teal) + c.rect(0, h - 0.9 * inch, w, 0.9 * inch, fill=1, stroke=0) + c.setFillColor(HexColor("#ffffff")) + c.setFont("Helvetica-Bold", 18) + c.drawString(x, h - 0.6 * inch, "Provider Enrollment Filing Transmittal") + y = h - 1.3 * inch + + # Sender + c.setFillColor(gray) + c.setFont("Helvetica-Bold", 10) + c.drawString(x, y, "FROM") + y -= 14 + c.setFont("Helvetica", 10) + for line in PW_SENDER: + c.drawString(x, y, line) + y -= 13 + y -= 8 + + # Destination + c.setFont("Helvetica-Bold", 10) + c.drawString(x, y, "TO") + y -= 14 + c.setFont("Helvetica", 10) + c.drawString(x, y, destination_name) + y -= 13 + for line in destination_address_lines: + c.drawString(x, y, line) + y -= 13 + y -= 8 + + # Batch meta + c.setFont("Helvetica-Bold", 10) + c.drawString(x, y, f"Date: {batch_date.isoformat()}") + c.drawString(x + 3 * inch, y, f"Enclosed filings: {len(items)}") + y -= 22 + + # Divider + c.setStrokeColor(HexColor("#cccccc")) + c.line(x, y, w - inch, y) + y -= 18 + + # Manifest header + c.setFont("Helvetica-Bold", 9) + cols = [(x, "ORDER"), (x + 1.4 * inch, "PROVIDER"), + (x + 4.0 * inch, "NPI"), (x + 5.3 * inch, "FORM")] + for cx, label in cols: + c.drawString(cx, y, label) + y -= 4 + c.line(x, y, w - inch, y) + y -= 14 + + c.setFont("Helvetica", 9) + for it in items: + if y < inch: # new page + c.showPage() + y = h - inch + c.setFont("Helvetica", 9) + provider = (it.get("provider") or "")[:34] + c.drawString(cols[0][0], y, str(it.get("order_number") or "")) + c.drawString(cols[1][0], y, provider) + c.drawString(cols[2][0], y, str(it.get("npi") or "")) + c.drawString(cols[3][0], y, str(it.get("form") or "").upper()) + y -= 13 + + # Footer note + y = max(y - 20, 0.7 * inch) + c.setFont("Helvetica-Oblique", 8) + c.setFillColor(gray) + note = ("This transmittal accompanies the signed provider enrollment " + "applications enclosed. Each application bears the provider's " + "certification signature. Questions: filings@performancewest.net.") + for line in simpleSplit(note, "Helvetica-Oblique", 8, w - 2 * inch): + c.drawString(x, y, line) + y -= 11 + + c.showPage() + c.save() + return buf.getvalue() + + +def merge_batch_pdf(cover_sheet: bytes, filing_pdfs: list[bytes]) -> bytes: + """Merge the cover sheet + all enclosed signed filings into one print job.""" + from pypdf import PdfReader, PdfWriter + + writer = PdfWriter() + writer.append(PdfReader(io.BytesIO(cover_sheet))) + for pdf in filing_pdfs: + try: + writer.append(PdfReader(io.BytesIO(pdf))) + except Exception as exc: # one bad PDF shouldn't sink the batch + LOG.error("[batch] skipping unreadable filing PDF: %s", exc) + out = io.BytesIO() + writer.write(out) + return out.getvalue() diff --git a/scripts/tests/test_paper_batch.py b/scripts/tests/test_paper_batch.py new file mode 100644 index 0000000..ceeed6c --- /dev/null +++ b/scripts/tests/test_paper_batch.py @@ -0,0 +1,104 @@ +"""Tests for the daily paper-filing batch (Standard no-login CMS filing path). + +Covers the pure logic that doesn't need a DB/MinIO: + - postal working-day gating (weekends + federal/USPS holidays) + - destination routing (state -> MAC; CMS-10114 -> NPI Enumerator) + - cover-sheet rendering + multi-page pagination + - merge of cover sheet + filing PDFs + +Run: python scripts/tests/test_paper_batch.py +""" +from __future__ import annotations + +import importlib.util +import io +import sys +from datetime import date +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(ROOT / "scripts")) +sys.path.insert(0, str(ROOT / "scripts" / "workers")) + + +def _load(name: str, rel: str): + spec = importlib.util.spec_from_file_location(name, ROOT / rel) + m = importlib.util.module_from_spec(spec) + spec.loader.exec_module(m) + return m + + +dpb = _load("dpb", "scripts/workers/daily_paper_batch.py") +bcs = _load("bcs", "scripts/document_gen/templates/batch_cover_sheet.py") + +PASS = 0 +FAIL = 0 + + +def check(label, cond): + global PASS, FAIL + if cond: + PASS += 1 + print(f" PASS {label}") + else: + FAIL += 1 + print(f" FAIL {label}") + + +def test_working_days(): + print("working-day gating") + check("Saturday is not a working day", not dpb._is_postal_working_day(date(2026, 6, 6))) + check("Sunday is not a working day", not dpb._is_postal_working_day(date(2026, 6, 7))) + check("Monday is a working day", dpb._is_postal_working_day(date(2026, 6, 8))) + check("Memorial Day (holiday) is not a working day", not dpb._is_postal_working_day(date(2026, 5, 25))) + check("Christmas (holiday) is not a working day", not dpb._is_postal_working_day(date(2026, 12, 25))) + + +def test_routing(): + print("destination routing") + ca = dpb._destination_for("CA", "cms855i") + check("CA 855 -> Noridian JE", ca and ca[0] == "noridian_je") + ny = dpb._destination_for("NY", "cms855i") + check("NY 855 -> NGS JK", ny and ny[0] == "ngs_jk") + tx = dpb._destination_for("TX", "cms855b") + check("TX 855B -> Novitas JH", tx and tx[0] == "novitas_jh") + enum = dpb._destination_for("TX", "cms10114") + check("any-state CMS-10114 -> NPI Enumerator (Fargo)", enum and enum[0] == "npi_enumerator") + check("CMS-10114 ignores state for routing", dpb._destination_for("CA", "cms10114")[0] == "npi_enumerator") + check("unknown state -> None (human review)", dpb._destination_for("ZZ", "cms855i") is None) + check("blank state -> None", dpb._destination_for("", "cms855i") is None) + + +def test_cover_sheet(): + print("cover sheet + merge") + items = [ + {"order_number": f"CO-X{i:03d}", "provider": f"Provider {i} MD", + "npi": f"1{i:09d}", "form": "855i"} + for i in range(40) + ] + cover = bcs.build_cover_sheet( + destination_name="Noridian JE", + destination_address_lines=("Provider Enrollment (JE)", "P.O. Box VERIFY", "Fargo, ND"), + batch_date=date(2026, 6, 8), + items=items, + ) + from pypdf import PdfReader + n_cover = len(PdfReader(io.BytesIO(cover)).pages) + check("40-item cover sheet paginates to >1 page", n_cover > 1) + merged = bcs.merge_batch_pdf(cover, [cover, cover]) + n_merged = len(PdfReader(io.BytesIO(merged)).pages) + check("merge concatenates cover + 2 filings", n_merged == n_cover * 3) + # empty-items cover still renders one page + empty = bcs.build_cover_sheet( + destination_name="X", destination_address_lines=("a",), + batch_date=date(2026, 6, 8), items=[], + ) + check("empty batch cover sheet renders", len(PdfReader(io.BytesIO(empty)).pages) >= 1) + + +if __name__ == "__main__": + test_working_days() + test_routing() + test_cover_sheet() + print(f"\n{PASS} passed, {FAIL} failed") + sys.exit(1 if FAIL else 0) diff --git a/scripts/workers/daily_paper_batch.py b/scripts/workers/daily_paper_batch.py new file mode 100644 index 0000000..e7042bc --- /dev/null +++ b/scripts/workers/daily_paper_batch.py @@ -0,0 +1,225 @@ +"""Daily paper-filing batch worker (Standard no-login CMS filing path). + +Run each postal working-day morning (cron, working-day-gated). It: + + 1. finds every SIGNED, not-yet-batched paper filing (esign_records with a + cms855/cms10114 document_type, status='signed', a signed PDF, and no + paper_batch_id); + 2. determines each filing's destination agency (the provider's MAC from the + order's practice state via mac_routing; the NPI Enumerator in Fargo for + NPPES/CMS-10114 updates); + 3. groups by destination and, per destination, builds a cover sheet + merges + all that destination's signed PDFs into ONE print job; + 4. records a paper_filing_batches row (one Priority Mail envelope per agency) + and stamps each included esign_records with its paper_batch_id (idempotent); + 5. uploads the cover sheet + merged print job to MinIO for a human to print and + drop in the mail (phase 1). Phase 2 can hand the merged PDF to a print-mail + API and fill tracking_number automatically. + +Idempotent: a filing is picked up only while paper_batch_id IS NULL, and a +(batch_date, destination) batch is created at most once (unique index). + +Usage: + python -m scripts.workers.daily_paper_batch # run today's batch + python -m scripts.workers.daily_paper_batch --dry-run +""" +from __future__ import annotations + +import argparse +import logging +import os +import tempfile +from datetime import date, datetime, timezone + +LOG = logging.getLogger("workers.daily_paper_batch") + +# Document types that ride the paper (Standard) filing path. +PAPER_DOC_PREFIXES = ("cms855", "cms10114") + + +def _is_postal_working_day(d: date) -> bool: + try: + from scripts.workers.business_days import is_business_day + except ImportError: + from business_days import is_business_day # type: ignore + return is_business_day(d) + + +def _destination_for(practice_state: str, document_type: str): + """Return the (key, name, address_lines) destination for a filing. + + NPPES/CMS-10114 updates go to the NPI Enumerator (Fargo); CMS-855s go to the + provider's MAC by state. + """ + try: + from scripts.workers import mac_routing as mr + except ImportError: + import mac_routing as mr # type: ignore + + if (document_type or "").startswith("cms10114"): + d = mr.NPI_ENUMERATOR + return d.key, d.name, d.address_lines + + mac = mr.mac_for_state(practice_state or "") + if not mac: + return None + return mac.key, mac.name, mac.address_lines + + +def run_batch(dry_run: bool = False, batch_date: date | None = None) -> dict: + """Build today's per-destination batches. Returns a summary dict.""" + import json + import psycopg2 + + today = batch_date or datetime.now(timezone.utc).date() + + if not _is_postal_working_day(today): + LOG.info("[batch] %s is not a postal working day — skipping", today) + return {"skipped": "non-working-day", "date": today.isoformat()} + + conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + summary = {"date": today.isoformat(), "destinations": {}, "total_items": 0} + try: + # 1. Pull signed, unbatched paper filings + their order practice_state. + like_clauses = " OR ".join( + ["e.document_type LIKE %s"] * len(PAPER_DOC_PREFIXES) + ) + params = [p + "%" for p in PAPER_DOC_PREFIXES] + with conn.cursor() as cur: + cur.execute( + f""" + SELECT e.id, e.order_number, e.document_type, + e.signed_document_minio_key, e.document_metadata, + COALESCE(o.intake_data->>'practice_state', '') AS practice_state, + COALESCE(o.intake_data->>'provider_name', o.customer_name, '') AS provider, + COALESCE(o.intake_data->>'npi', '') AS npi + FROM esign_records e + LEFT JOIN compliance_orders o ON o.order_number = e.order_number + WHERE e.status = 'signed' + AND e.paper_batch_id IS NULL + AND e.signed_document_minio_key IS NOT NULL + AND ({like_clauses}) + ORDER BY e.order_number + """, + params, + ) + rows = cur.fetchall() + + if not rows: + LOG.info("[batch] no signed unbatched paper filings for %s", today) + return summary + + # 2/3. Group by destination. + groups: dict[str, dict] = {} + for (eid, order_number, doc_type, signed_key, meta, + practice_state, provider, npi) in rows: + dest = _destination_for(practice_state, doc_type) + if not dest: + LOG.warning("[batch] no MAC destination for order %s (state=%r) — left unbatched", + order_number, practice_state) + continue + key, name, addr = dest + g = groups.setdefault(key, {"name": name, "addr": addr, "items": []}) + g["items"].append({ + "esign_id": eid, + "order_number": order_number, + "provider": provider, + "npi": npi, + "form": (doc_type or "").replace("cms", "").upper(), + "signed_key": signed_key, + }) + + # 4/5. Per destination, build cover sheet + merged PDF, persist. + try: + from scripts.document_gen.templates.batch_cover_sheet import ( + build_cover_sheet, merge_batch_pdf, + ) + from scripts.document_gen.minio_client import MinioStorage + except ImportError: + from document_gen.templates.batch_cover_sheet import ( # type: ignore + build_cover_sheet, merge_batch_pdf, + ) + from document_gen.minio_client import MinioStorage # type: ignore + + storage = None if dry_run else MinioStorage() + + for key, g in groups.items(): + items = g["items"] + summary["destinations"][key] = {"name": g["name"], "items": len(items)} + summary["total_items"] += len(items) + LOG.info("[batch] %s -> %s: %d filing(s)", today, g["name"], len(items)) + + if dry_run: + continue + + # Download each signed PDF. + filing_pdfs = [] + for it in items: + try: + with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf: + storage.download(it["signed_key"], tf.name) + with open(tf.name, "rb") as fh: + filing_pdfs.append(fh.read()) + except Exception as exc: + LOG.error("[batch] could not fetch %s: %s", it["signed_key"], exc) + + cover = build_cover_sheet( + destination_name=g["name"], + destination_address_lines=g["addr"], + batch_date=today, + items=items, + ) + merged = merge_batch_pdf(cover, filing_pdfs) + + cover_key = f"paper-batches/{today.isoformat()}/{key}_cover.pdf" + merged_key = f"paper-batches/{today.isoformat()}/{key}_print.pdf" + for data, dest_key in ((cover, cover_key), (merged, merged_key)): + with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf: + tf.write(data) + tf.flush() + storage.upload(tf.name, dest_key, content_type="application/pdf") + + addr_block = g["name"] + "\n" + "\n".join(g["addr"]) + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO paper_filing_batches + (batch_date, destination_key, destination_name, + destination_address, item_count, cover_sheet_key, + merged_pdf_key, status) + VALUES (%s,%s,%s,%s,%s,%s,%s,'prepared') + ON CONFLICT (batch_date, destination_key) DO UPDATE + SET item_count = EXCLUDED.item_count, + cover_sheet_key = EXCLUDED.cover_sheet_key, + merged_pdf_key = EXCLUDED.merged_pdf_key + RETURNING id + """, + (today, key, g["name"], addr_block, len(items), + cover_key, merged_key), + ) + batch_id = cur.fetchone()[0] + cur.execute( + "UPDATE esign_records SET paper_batch_id = %s, filing_destination_key = %s, updated_at = NOW() WHERE id = ANY(%s)", + (batch_id, key, [it["esign_id"] for it in items]), + ) + conn.commit() + + return summary + finally: + conn.close() + + +def main(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + ap = argparse.ArgumentParser() + ap.add_argument("--dry-run", action="store_true", help="report groups, write nothing") + ap.add_argument("--date", help="override batch date YYYY-MM-DD (testing)") + args = ap.parse_args() + bd = date.fromisoformat(args.date) if args.date else None + summary = run_batch(dry_run=args.dry_run, batch_date=bd) + import json + print(json.dumps(summary, indent=2)) + + +if __name__ == "__main__": + main()