healthcare: daily batched paper-filing fulfillment
Standard (no-login) CMS filings are mailed in one Priority Mail envelope per destination agency, batched each postal working-day morning to save postage. - migration 089: paper_filing_batches table + esign_records.paper_batch_id / filing_destination_key (idempotent: a filing is batched at most once). - batch_cover_sheet.py: per-agency cover sheet (sender/dest/date/manifest) + merged print-job PDF (cover + all enclosed signed filings). - daily_paper_batch.py worker: gather signed+unbatched cms855/cms10114 filings, group by destination (MAC by state via mac_routing; Fargo for CMS-10114), build cover+merged PDF per agency, persist batch, mark filings batched. Self-gates on postal working days (skips weekends + federal/USPS holidays). Phase 1 = human prints+mails; phase 2 = wire print-mail API. - worker-crons: pw-paper-batch systemd timer (Mon-Fri 13:30 UTC, self-gated). - test_paper_batch.py: 15/15 pass (working-day gating, routing, cover+merge).
This commit is contained in:
parent
258d23bdc6
commit
138fec17e9
5 changed files with 542 additions and 0 deletions
49
api/migrations/089_paper_filing_batches.sql
Normal file
49
api/migrations/089_paper_filing_batches.sql
Normal file
|
|
@ -0,0 +1,49 @@
|
||||||
|
-- 089: Daily paper-filing batch tracking for the Standard (no-login) CMS filing path.
|
||||||
|
--
|
||||||
|
-- When a provider e-signs a CMS-855 (or CMS-10114), the signed PDF is mailed to
|
||||||
|
-- the destination agency (the provider's MAC for 855s; the NPI Enumerator in
|
||||||
|
-- Fargo for NPPES updates). To save postage and handling we batch all signed,
|
||||||
|
-- not-yet-mailed filings each postal working-day morning, group them by
|
||||||
|
-- destination agency, and mail one Priority Mail envelope per agency.
|
||||||
|
--
|
||||||
|
-- This migration records, per signed filing, which daily batch it went out in,
|
||||||
|
-- so the batch worker is idempotent (never re-mails) and we can audit/track.
|
||||||
|
|
||||||
|
-- A paper-filing batch = one Priority Mail envelope to one destination agency
|
||||||
|
-- on one mailing day.
|
||||||
|
CREATE TABLE IF NOT EXISTS paper_filing_batches (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
batch_date DATE NOT NULL, -- the postal working day mailed
|
||||||
|
destination_key TEXT NOT NULL, -- mac_routing key, e.g. noridian_je / npi_enumerator
|
||||||
|
destination_name TEXT NOT NULL DEFAULT '', -- human-readable MAC/agency name
|
||||||
|
destination_address TEXT NOT NULL DEFAULT '', -- full mailing address block
|
||||||
|
item_count INTEGER NOT NULL DEFAULT 0, -- number of filings enclosed
|
||||||
|
cover_sheet_key TEXT, -- MinIO key for the batch cover sheet
|
||||||
|
merged_pdf_key TEXT, -- MinIO key for the merged print job
|
||||||
|
tracking_number TEXT, -- USPS tracking, filled when mailed
|
||||||
|
status TEXT NOT NULL DEFAULT 'prepared'
|
||||||
|
CHECK (status IN ('prepared', 'mailed', 'cancelled')),
|
||||||
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
||||||
|
mailed_at TIMESTAMPTZ
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_paper_batch_date ON paper_filing_batches(batch_date);
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_paper_batch_day_dest
|
||||||
|
ON paper_filing_batches(batch_date, destination_key);
|
||||||
|
|
||||||
|
-- Link each signed esign filing to the batch it shipped in. NULL = not yet
|
||||||
|
-- batched (the worker picks these up). Set once -> idempotent (worker skips
|
||||||
|
-- anything already assigned a batch).
|
||||||
|
ALTER TABLE esign_records
|
||||||
|
ADD COLUMN IF NOT EXISTS paper_batch_id INTEGER REFERENCES paper_filing_batches(id);
|
||||||
|
|
||||||
|
-- The destination MAC/agency for this filing, derived from the provider's
|
||||||
|
-- practice state at sign time (snapshot so later routing-table changes don't
|
||||||
|
-- retroactively move historical filings).
|
||||||
|
ALTER TABLE esign_records
|
||||||
|
ADD COLUMN IF NOT EXISTS filing_destination_key TEXT;
|
||||||
|
|
||||||
|
-- Index the work queue: signed, paper-path filings not yet batched.
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_esign_unbatched_signed
|
||||||
|
ON esign_records(status)
|
||||||
|
WHERE status = 'signed' AND paper_batch_id IS NULL;
|
||||||
|
|
@ -219,3 +219,16 @@ worker_crons:
|
||||||
module: scripts.workers.intake_reminder
|
module: scripts.workers.intake_reminder
|
||||||
on_calendar: "*-*-* 16:00:00 UTC"
|
on_calendar: "*-*-* 16:00:00 UTC"
|
||||||
persistent: true
|
persistent: true
|
||||||
|
|
||||||
|
# Daily paper-filing batch (Standard no-login CMS filing path) — weekday
|
||||||
|
# mornings 13:30 UTC (08:30 CT). Groups all signed, not-yet-mailed CMS filings
|
||||||
|
# by destination agency (provider's MAC; NPI Enumerator in Fargo for NPPES)
|
||||||
|
# and builds one cover sheet + merged print job per agency, for a human to
|
||||||
|
# print and mail in a single Priority Mail envelope. The worker self-gates on
|
||||||
|
# postal working days (skips weekends AND federal/USPS holidays via
|
||||||
|
# business_days.py), so the Mon-Fri OnCalendar is just a coarse pre-filter.
|
||||||
|
- name: pw-paper-batch
|
||||||
|
description: Build daily per-agency CMS paper-filing batch (cover sheet + merged print job)
|
||||||
|
module: scripts.workers.daily_paper_batch
|
||||||
|
on_calendar: "Mon..Fri *-*-* 13:30:00 UTC"
|
||||||
|
persistent: true
|
||||||
|
|
|
||||||
151
scripts/document_gen/templates/batch_cover_sheet.py
Normal file
151
scripts/document_gen/templates/batch_cover_sheet.py
Normal file
|
|
@ -0,0 +1,151 @@
|
||||||
|
"""Daily paper-filing batch cover sheet + merged print job.
|
||||||
|
|
||||||
|
For the Standard (no-login) CMS filing path we batch all signed, not-yet-mailed
|
||||||
|
filings each postal working day and mail one Priority Mail envelope per
|
||||||
|
destination agency (the provider's MAC, or the NPI Enumerator in Fargo). This
|
||||||
|
module builds, for one destination/day:
|
||||||
|
|
||||||
|
1. a **cover sheet** PDF — sender (Performance West), destination agency +
|
||||||
|
address, batch date, enclosed count, and a per-item manifest
|
||||||
|
(order# / provider / NPI / form type); and
|
||||||
|
2. a **merged print job** PDF — the cover sheet followed by every enclosed
|
||||||
|
signed filing, ready to print and drop in one envelope.
|
||||||
|
|
||||||
|
Only reportlab + pypdf are required (already used across document_gen).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import io
|
||||||
|
import logging
|
||||||
|
from datetime import date
|
||||||
|
|
||||||
|
LOG = logging.getLogger("document_gen.batch_cover_sheet")
|
||||||
|
|
||||||
|
PW_SENDER = (
|
||||||
|
"Performance West Inc.",
|
||||||
|
"Provider Enrollment Filings",
|
||||||
|
"filings@performancewest.net | (888) 411-0383",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def build_cover_sheet(
|
||||||
|
*,
|
||||||
|
destination_name: str,
|
||||||
|
destination_address_lines: tuple[str, ...] | list[str],
|
||||||
|
batch_date: date,
|
||||||
|
items: list[dict],
|
||||||
|
) -> bytes:
|
||||||
|
"""Render the batch cover sheet PDF.
|
||||||
|
|
||||||
|
``items`` = list of dicts with keys: order_number, provider, npi, form.
|
||||||
|
Returns PDF bytes.
|
||||||
|
"""
|
||||||
|
from reportlab.lib.pagesizes import letter
|
||||||
|
from reportlab.lib.units import inch
|
||||||
|
from reportlab.lib.colors import HexColor
|
||||||
|
from reportlab.pdfgen import canvas
|
||||||
|
from reportlab.lib.utils import simpleSplit
|
||||||
|
|
||||||
|
buf = io.BytesIO()
|
||||||
|
c = canvas.Canvas(buf, pagesize=letter)
|
||||||
|
w, h = letter
|
||||||
|
teal = HexColor("#0f766e")
|
||||||
|
gray = HexColor("#444444")
|
||||||
|
x = inch
|
||||||
|
y = h - inch
|
||||||
|
|
||||||
|
# Header band
|
||||||
|
c.setFillColor(teal)
|
||||||
|
c.rect(0, h - 0.9 * inch, w, 0.9 * inch, fill=1, stroke=0)
|
||||||
|
c.setFillColor(HexColor("#ffffff"))
|
||||||
|
c.setFont("Helvetica-Bold", 18)
|
||||||
|
c.drawString(x, h - 0.6 * inch, "Provider Enrollment Filing Transmittal")
|
||||||
|
y = h - 1.3 * inch
|
||||||
|
|
||||||
|
# Sender
|
||||||
|
c.setFillColor(gray)
|
||||||
|
c.setFont("Helvetica-Bold", 10)
|
||||||
|
c.drawString(x, y, "FROM")
|
||||||
|
y -= 14
|
||||||
|
c.setFont("Helvetica", 10)
|
||||||
|
for line in PW_SENDER:
|
||||||
|
c.drawString(x, y, line)
|
||||||
|
y -= 13
|
||||||
|
y -= 8
|
||||||
|
|
||||||
|
# Destination
|
||||||
|
c.setFont("Helvetica-Bold", 10)
|
||||||
|
c.drawString(x, y, "TO")
|
||||||
|
y -= 14
|
||||||
|
c.setFont("Helvetica", 10)
|
||||||
|
c.drawString(x, y, destination_name)
|
||||||
|
y -= 13
|
||||||
|
for line in destination_address_lines:
|
||||||
|
c.drawString(x, y, line)
|
||||||
|
y -= 13
|
||||||
|
y -= 8
|
||||||
|
|
||||||
|
# Batch meta
|
||||||
|
c.setFont("Helvetica-Bold", 10)
|
||||||
|
c.drawString(x, y, f"Date: {batch_date.isoformat()}")
|
||||||
|
c.drawString(x + 3 * inch, y, f"Enclosed filings: {len(items)}")
|
||||||
|
y -= 22
|
||||||
|
|
||||||
|
# Divider
|
||||||
|
c.setStrokeColor(HexColor("#cccccc"))
|
||||||
|
c.line(x, y, w - inch, y)
|
||||||
|
y -= 18
|
||||||
|
|
||||||
|
# Manifest header
|
||||||
|
c.setFont("Helvetica-Bold", 9)
|
||||||
|
cols = [(x, "ORDER"), (x + 1.4 * inch, "PROVIDER"),
|
||||||
|
(x + 4.0 * inch, "NPI"), (x + 5.3 * inch, "FORM")]
|
||||||
|
for cx, label in cols:
|
||||||
|
c.drawString(cx, y, label)
|
||||||
|
y -= 4
|
||||||
|
c.line(x, y, w - inch, y)
|
||||||
|
y -= 14
|
||||||
|
|
||||||
|
c.setFont("Helvetica", 9)
|
||||||
|
for it in items:
|
||||||
|
if y < inch: # new page
|
||||||
|
c.showPage()
|
||||||
|
y = h - inch
|
||||||
|
c.setFont("Helvetica", 9)
|
||||||
|
provider = (it.get("provider") or "")[:34]
|
||||||
|
c.drawString(cols[0][0], y, str(it.get("order_number") or ""))
|
||||||
|
c.drawString(cols[1][0], y, provider)
|
||||||
|
c.drawString(cols[2][0], y, str(it.get("npi") or ""))
|
||||||
|
c.drawString(cols[3][0], y, str(it.get("form") or "").upper())
|
||||||
|
y -= 13
|
||||||
|
|
||||||
|
# Footer note
|
||||||
|
y = max(y - 20, 0.7 * inch)
|
||||||
|
c.setFont("Helvetica-Oblique", 8)
|
||||||
|
c.setFillColor(gray)
|
||||||
|
note = ("This transmittal accompanies the signed provider enrollment "
|
||||||
|
"applications enclosed. Each application bears the provider's "
|
||||||
|
"certification signature. Questions: filings@performancewest.net.")
|
||||||
|
for line in simpleSplit(note, "Helvetica-Oblique", 8, w - 2 * inch):
|
||||||
|
c.drawString(x, y, line)
|
||||||
|
y -= 11
|
||||||
|
|
||||||
|
c.showPage()
|
||||||
|
c.save()
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
def merge_batch_pdf(cover_sheet: bytes, filing_pdfs: list[bytes]) -> bytes:
|
||||||
|
"""Merge the cover sheet + all enclosed signed filings into one print job."""
|
||||||
|
from pypdf import PdfReader, PdfWriter
|
||||||
|
|
||||||
|
writer = PdfWriter()
|
||||||
|
writer.append(PdfReader(io.BytesIO(cover_sheet)))
|
||||||
|
for pdf in filing_pdfs:
|
||||||
|
try:
|
||||||
|
writer.append(PdfReader(io.BytesIO(pdf)))
|
||||||
|
except Exception as exc: # one bad PDF shouldn't sink the batch
|
||||||
|
LOG.error("[batch] skipping unreadable filing PDF: %s", exc)
|
||||||
|
out = io.BytesIO()
|
||||||
|
writer.write(out)
|
||||||
|
return out.getvalue()
|
||||||
104
scripts/tests/test_paper_batch.py
Normal file
104
scripts/tests/test_paper_batch.py
Normal file
|
|
@ -0,0 +1,104 @@
|
||||||
|
"""Tests for the daily paper-filing batch (Standard no-login CMS filing path).
|
||||||
|
|
||||||
|
Covers the pure logic that doesn't need a DB/MinIO:
|
||||||
|
- postal working-day gating (weekends + federal/USPS holidays)
|
||||||
|
- destination routing (state -> MAC; CMS-10114 -> NPI Enumerator)
|
||||||
|
- cover-sheet rendering + multi-page pagination
|
||||||
|
- merge of cover sheet + filing PDFs
|
||||||
|
|
||||||
|
Run: python scripts/tests/test_paper_batch.py
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import importlib.util
|
||||||
|
import io
|
||||||
|
import sys
|
||||||
|
from datetime import date
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
ROOT = Path(__file__).resolve().parents[2]
|
||||||
|
sys.path.insert(0, str(ROOT / "scripts"))
|
||||||
|
sys.path.insert(0, str(ROOT / "scripts" / "workers"))
|
||||||
|
|
||||||
|
|
||||||
|
def _load(name: str, rel: str):
|
||||||
|
spec = importlib.util.spec_from_file_location(name, ROOT / rel)
|
||||||
|
m = importlib.util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(m)
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
|
dpb = _load("dpb", "scripts/workers/daily_paper_batch.py")
|
||||||
|
bcs = _load("bcs", "scripts/document_gen/templates/batch_cover_sheet.py")
|
||||||
|
|
||||||
|
PASS = 0
|
||||||
|
FAIL = 0
|
||||||
|
|
||||||
|
|
||||||
|
def check(label, cond):
|
||||||
|
global PASS, FAIL
|
||||||
|
if cond:
|
||||||
|
PASS += 1
|
||||||
|
print(f" PASS {label}")
|
||||||
|
else:
|
||||||
|
FAIL += 1
|
||||||
|
print(f" FAIL {label}")
|
||||||
|
|
||||||
|
|
||||||
|
def test_working_days():
|
||||||
|
print("working-day gating")
|
||||||
|
check("Saturday is not a working day", not dpb._is_postal_working_day(date(2026, 6, 6)))
|
||||||
|
check("Sunday is not a working day", not dpb._is_postal_working_day(date(2026, 6, 7)))
|
||||||
|
check("Monday is a working day", dpb._is_postal_working_day(date(2026, 6, 8)))
|
||||||
|
check("Memorial Day (holiday) is not a working day", not dpb._is_postal_working_day(date(2026, 5, 25)))
|
||||||
|
check("Christmas (holiday) is not a working day", not dpb._is_postal_working_day(date(2026, 12, 25)))
|
||||||
|
|
||||||
|
|
||||||
|
def test_routing():
|
||||||
|
print("destination routing")
|
||||||
|
ca = dpb._destination_for("CA", "cms855i")
|
||||||
|
check("CA 855 -> Noridian JE", ca and ca[0] == "noridian_je")
|
||||||
|
ny = dpb._destination_for("NY", "cms855i")
|
||||||
|
check("NY 855 -> NGS JK", ny and ny[0] == "ngs_jk")
|
||||||
|
tx = dpb._destination_for("TX", "cms855b")
|
||||||
|
check("TX 855B -> Novitas JH", tx and tx[0] == "novitas_jh")
|
||||||
|
enum = dpb._destination_for("TX", "cms10114")
|
||||||
|
check("any-state CMS-10114 -> NPI Enumerator (Fargo)", enum and enum[0] == "npi_enumerator")
|
||||||
|
check("CMS-10114 ignores state for routing", dpb._destination_for("CA", "cms10114")[0] == "npi_enumerator")
|
||||||
|
check("unknown state -> None (human review)", dpb._destination_for("ZZ", "cms855i") is None)
|
||||||
|
check("blank state -> None", dpb._destination_for("", "cms855i") is None)
|
||||||
|
|
||||||
|
|
||||||
|
def test_cover_sheet():
|
||||||
|
print("cover sheet + merge")
|
||||||
|
items = [
|
||||||
|
{"order_number": f"CO-X{i:03d}", "provider": f"Provider {i} MD",
|
||||||
|
"npi": f"1{i:09d}", "form": "855i"}
|
||||||
|
for i in range(40)
|
||||||
|
]
|
||||||
|
cover = bcs.build_cover_sheet(
|
||||||
|
destination_name="Noridian JE",
|
||||||
|
destination_address_lines=("Provider Enrollment (JE)", "P.O. Box VERIFY", "Fargo, ND"),
|
||||||
|
batch_date=date(2026, 6, 8),
|
||||||
|
items=items,
|
||||||
|
)
|
||||||
|
from pypdf import PdfReader
|
||||||
|
n_cover = len(PdfReader(io.BytesIO(cover)).pages)
|
||||||
|
check("40-item cover sheet paginates to >1 page", n_cover > 1)
|
||||||
|
merged = bcs.merge_batch_pdf(cover, [cover, cover])
|
||||||
|
n_merged = len(PdfReader(io.BytesIO(merged)).pages)
|
||||||
|
check("merge concatenates cover + 2 filings", n_merged == n_cover * 3)
|
||||||
|
# empty-items cover still renders one page
|
||||||
|
empty = bcs.build_cover_sheet(
|
||||||
|
destination_name="X", destination_address_lines=("a",),
|
||||||
|
batch_date=date(2026, 6, 8), items=[],
|
||||||
|
)
|
||||||
|
check("empty batch cover sheet renders", len(PdfReader(io.BytesIO(empty)).pages) >= 1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_working_days()
|
||||||
|
test_routing()
|
||||||
|
test_cover_sheet()
|
||||||
|
print(f"\n{PASS} passed, {FAIL} failed")
|
||||||
|
sys.exit(1 if FAIL else 0)
|
||||||
225
scripts/workers/daily_paper_batch.py
Normal file
225
scripts/workers/daily_paper_batch.py
Normal file
|
|
@ -0,0 +1,225 @@
|
||||||
|
"""Daily paper-filing batch worker (Standard no-login CMS filing path).
|
||||||
|
|
||||||
|
Run each postal working-day morning (cron, working-day-gated). It:
|
||||||
|
|
||||||
|
1. finds every SIGNED, not-yet-batched paper filing (esign_records with a
|
||||||
|
cms855/cms10114 document_type, status='signed', a signed PDF, and no
|
||||||
|
paper_batch_id);
|
||||||
|
2. determines each filing's destination agency (the provider's MAC from the
|
||||||
|
order's practice state via mac_routing; the NPI Enumerator in Fargo for
|
||||||
|
NPPES/CMS-10114 updates);
|
||||||
|
3. groups by destination and, per destination, builds a cover sheet + merges
|
||||||
|
all that destination's signed PDFs into ONE print job;
|
||||||
|
4. records a paper_filing_batches row (one Priority Mail envelope per agency)
|
||||||
|
and stamps each included esign_records with its paper_batch_id (idempotent);
|
||||||
|
5. uploads the cover sheet + merged print job to MinIO for a human to print and
|
||||||
|
drop in the mail (phase 1). Phase 2 can hand the merged PDF to a print-mail
|
||||||
|
API and fill tracking_number automatically.
|
||||||
|
|
||||||
|
Idempotent: a filing is picked up only while paper_batch_id IS NULL, and a
|
||||||
|
(batch_date, destination) batch is created at most once (unique index).
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python -m scripts.workers.daily_paper_batch # run today's batch
|
||||||
|
python -m scripts.workers.daily_paper_batch --dry-run
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from datetime import date, datetime, timezone
|
||||||
|
|
||||||
|
LOG = logging.getLogger("workers.daily_paper_batch")
|
||||||
|
|
||||||
|
# Document types that ride the paper (Standard) filing path.
|
||||||
|
PAPER_DOC_PREFIXES = ("cms855", "cms10114")
|
||||||
|
|
||||||
|
|
||||||
|
def _is_postal_working_day(d: date) -> bool:
|
||||||
|
try:
|
||||||
|
from scripts.workers.business_days import is_business_day
|
||||||
|
except ImportError:
|
||||||
|
from business_days import is_business_day # type: ignore
|
||||||
|
return is_business_day(d)
|
||||||
|
|
||||||
|
|
||||||
|
def _destination_for(practice_state: str, document_type: str):
|
||||||
|
"""Return the (key, name, address_lines) destination for a filing.
|
||||||
|
|
||||||
|
NPPES/CMS-10114 updates go to the NPI Enumerator (Fargo); CMS-855s go to the
|
||||||
|
provider's MAC by state.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from scripts.workers import mac_routing as mr
|
||||||
|
except ImportError:
|
||||||
|
import mac_routing as mr # type: ignore
|
||||||
|
|
||||||
|
if (document_type or "").startswith("cms10114"):
|
||||||
|
d = mr.NPI_ENUMERATOR
|
||||||
|
return d.key, d.name, d.address_lines
|
||||||
|
|
||||||
|
mac = mr.mac_for_state(practice_state or "")
|
||||||
|
if not mac:
|
||||||
|
return None
|
||||||
|
return mac.key, mac.name, mac.address_lines
|
||||||
|
|
||||||
|
|
||||||
|
def run_batch(dry_run: bool = False, batch_date: date | None = None) -> dict:
|
||||||
|
"""Build today's per-destination batches. Returns a summary dict."""
|
||||||
|
import json
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
today = batch_date or datetime.now(timezone.utc).date()
|
||||||
|
|
||||||
|
if not _is_postal_working_day(today):
|
||||||
|
LOG.info("[batch] %s is not a postal working day — skipping", today)
|
||||||
|
return {"skipped": "non-working-day", "date": today.isoformat()}
|
||||||
|
|
||||||
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
||||||
|
summary = {"date": today.isoformat(), "destinations": {}, "total_items": 0}
|
||||||
|
try:
|
||||||
|
# 1. Pull signed, unbatched paper filings + their order practice_state.
|
||||||
|
like_clauses = " OR ".join(
|
||||||
|
["e.document_type LIKE %s"] * len(PAPER_DOC_PREFIXES)
|
||||||
|
)
|
||||||
|
params = [p + "%" for p in PAPER_DOC_PREFIXES]
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
f"""
|
||||||
|
SELECT e.id, e.order_number, e.document_type,
|
||||||
|
e.signed_document_minio_key, e.document_metadata,
|
||||||
|
COALESCE(o.intake_data->>'practice_state', '') AS practice_state,
|
||||||
|
COALESCE(o.intake_data->>'provider_name', o.customer_name, '') AS provider,
|
||||||
|
COALESCE(o.intake_data->>'npi', '') AS npi
|
||||||
|
FROM esign_records e
|
||||||
|
LEFT JOIN compliance_orders o ON o.order_number = e.order_number
|
||||||
|
WHERE e.status = 'signed'
|
||||||
|
AND e.paper_batch_id IS NULL
|
||||||
|
AND e.signed_document_minio_key IS NOT NULL
|
||||||
|
AND ({like_clauses})
|
||||||
|
ORDER BY e.order_number
|
||||||
|
""",
|
||||||
|
params,
|
||||||
|
)
|
||||||
|
rows = cur.fetchall()
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
LOG.info("[batch] no signed unbatched paper filings for %s", today)
|
||||||
|
return summary
|
||||||
|
|
||||||
|
# 2/3. Group by destination.
|
||||||
|
groups: dict[str, dict] = {}
|
||||||
|
for (eid, order_number, doc_type, signed_key, meta,
|
||||||
|
practice_state, provider, npi) in rows:
|
||||||
|
dest = _destination_for(practice_state, doc_type)
|
||||||
|
if not dest:
|
||||||
|
LOG.warning("[batch] no MAC destination for order %s (state=%r) — left unbatched",
|
||||||
|
order_number, practice_state)
|
||||||
|
continue
|
||||||
|
key, name, addr = dest
|
||||||
|
g = groups.setdefault(key, {"name": name, "addr": addr, "items": []})
|
||||||
|
g["items"].append({
|
||||||
|
"esign_id": eid,
|
||||||
|
"order_number": order_number,
|
||||||
|
"provider": provider,
|
||||||
|
"npi": npi,
|
||||||
|
"form": (doc_type or "").replace("cms", "").upper(),
|
||||||
|
"signed_key": signed_key,
|
||||||
|
})
|
||||||
|
|
||||||
|
# 4/5. Per destination, build cover sheet + merged PDF, persist.
|
||||||
|
try:
|
||||||
|
from scripts.document_gen.templates.batch_cover_sheet import (
|
||||||
|
build_cover_sheet, merge_batch_pdf,
|
||||||
|
)
|
||||||
|
from scripts.document_gen.minio_client import MinioStorage
|
||||||
|
except ImportError:
|
||||||
|
from document_gen.templates.batch_cover_sheet import ( # type: ignore
|
||||||
|
build_cover_sheet, merge_batch_pdf,
|
||||||
|
)
|
||||||
|
from document_gen.minio_client import MinioStorage # type: ignore
|
||||||
|
|
||||||
|
storage = None if dry_run else MinioStorage()
|
||||||
|
|
||||||
|
for key, g in groups.items():
|
||||||
|
items = g["items"]
|
||||||
|
summary["destinations"][key] = {"name": g["name"], "items": len(items)}
|
||||||
|
summary["total_items"] += len(items)
|
||||||
|
LOG.info("[batch] %s -> %s: %d filing(s)", today, g["name"], len(items))
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Download each signed PDF.
|
||||||
|
filing_pdfs = []
|
||||||
|
for it in items:
|
||||||
|
try:
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf:
|
||||||
|
storage.download(it["signed_key"], tf.name)
|
||||||
|
with open(tf.name, "rb") as fh:
|
||||||
|
filing_pdfs.append(fh.read())
|
||||||
|
except Exception as exc:
|
||||||
|
LOG.error("[batch] could not fetch %s: %s", it["signed_key"], exc)
|
||||||
|
|
||||||
|
cover = build_cover_sheet(
|
||||||
|
destination_name=g["name"],
|
||||||
|
destination_address_lines=g["addr"],
|
||||||
|
batch_date=today,
|
||||||
|
items=items,
|
||||||
|
)
|
||||||
|
merged = merge_batch_pdf(cover, filing_pdfs)
|
||||||
|
|
||||||
|
cover_key = f"paper-batches/{today.isoformat()}/{key}_cover.pdf"
|
||||||
|
merged_key = f"paper-batches/{today.isoformat()}/{key}_print.pdf"
|
||||||
|
for data, dest_key in ((cover, cover_key), (merged, merged_key)):
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf:
|
||||||
|
tf.write(data)
|
||||||
|
tf.flush()
|
||||||
|
storage.upload(tf.name, dest_key, content_type="application/pdf")
|
||||||
|
|
||||||
|
addr_block = g["name"] + "\n" + "\n".join(g["addr"])
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO paper_filing_batches
|
||||||
|
(batch_date, destination_key, destination_name,
|
||||||
|
destination_address, item_count, cover_sheet_key,
|
||||||
|
merged_pdf_key, status)
|
||||||
|
VALUES (%s,%s,%s,%s,%s,%s,%s,'prepared')
|
||||||
|
ON CONFLICT (batch_date, destination_key) DO UPDATE
|
||||||
|
SET item_count = EXCLUDED.item_count,
|
||||||
|
cover_sheet_key = EXCLUDED.cover_sheet_key,
|
||||||
|
merged_pdf_key = EXCLUDED.merged_pdf_key
|
||||||
|
RETURNING id
|
||||||
|
""",
|
||||||
|
(today, key, g["name"], addr_block, len(items),
|
||||||
|
cover_key, merged_key),
|
||||||
|
)
|
||||||
|
batch_id = cur.fetchone()[0]
|
||||||
|
cur.execute(
|
||||||
|
"UPDATE esign_records SET paper_batch_id = %s, filing_destination_key = %s, updated_at = NOW() WHERE id = ANY(%s)",
|
||||||
|
(batch_id, key, [it["esign_id"] for it in items]),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
return summary
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("--dry-run", action="store_true", help="report groups, write nothing")
|
||||||
|
ap.add_argument("--date", help="override batch date YYYY-MM-DD (testing)")
|
||||||
|
args = ap.parse_args()
|
||||||
|
bd = date.fromisoformat(args.date) if args.date else None
|
||||||
|
summary = run_batch(dry_run=args.dry_run, batch_date=bd)
|
||||||
|
import json
|
||||||
|
print(json.dumps(summary, indent=2))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Add a link
Reference in a new issue