test(workers): NPI recurring-cycle fulfillment path (13 assertions)

Runs the real _BaseNPIHandler.handle() with _create_todo monkeypatched (no DB /
ERPNext / email side effects) and asserts:
 - first OIG/SAM screening has no [Monthly cycle] prefix / RECURRING banner
 - a recurring_cycle order gets the [Monthly cycle] title prefix, the
   "RECURRING MONTHLY CYCLE" banner, the invoice id, and the re-run-against-
   CURRENT-data + issue-NEW-certificate instructions
 - recurring_cycle works with and without an invoice id
 - the bundle handler's first run is not flagged recurring

Verified passing both locally and inside the deployed workers container.
This commit is contained in:
justin 2026-06-18 09:38:26 -05:00
parent 0083bc1354
commit 5c1f239307

View file

@ -0,0 +1,113 @@
"""
Logic tests for the OIG/SAM recurring-monitoring fulfillment path in
npi_provider.py. Runs the REAL _BaseNPIHandler.handle() with _create_todo
monkeypatched to capture the todo (so no DB / ERPNext / email side effects),
asserting the recurring-cycle branch produces the right title prefix + notes.
Run: python3 -m pytest scripts/workers/services/test_npi_recurring.py -q
or: python3 scripts/workers/services/test_npi_recurring.py
"""
import asyncio
import os
import sys
# Make 'scripts' importable when run directly from repo root.
_REPO = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if _REPO not in sys.path:
sys.path.insert(0, _REPO)
from scripts.workers.services import npi_provider as M # noqa: E402
_FAILS = []
def check(label, cond):
print((" PASS " if cond else " FAIL ") + label)
if not cond:
_FAILS.append(label)
def run_handler(handler_cls, order_data, order_number="HC-TEST-1"):
"""Run handler.handle() capturing the todo via a monkeypatched _create_todo."""
captured = {}
def fake_create_todo(self, order_number, intake, title, description, priority="normal"):
captured["order_number"] = order_number
captured["title"] = title
captured["description"] = description
captured["priority"] = priority
orig = M._BaseNPIHandler._create_todo
M._BaseNPIHandler._create_todo = fake_create_todo
try:
h = handler_cls()
asyncio.run(h.handle(order_data, order_number))
finally:
M._BaseNPIHandler._create_todo = orig
return captured
BASE_INTAKE = {
"npi": "1234567890",
"provider_name": "Dr. Jane Smith",
"specialty": "Internal Medicine",
"practice_state": "TX",
"email": "jane@example.com",
}
# ── 1. OIG/SAM first fulfillment (NOT recurring) ────────────────────────────
first = run_handler(
M.OIGSAMScreeningHandler,
{"intake_data": dict(BASE_INTAKE), "customer_name": "Jane Smith"},
)
check("first screening has no [Monthly cycle] prefix", "[Monthly cycle]" not in first["title"])
check("first screening has no RECURRING note", "RECURRING MONTHLY CYCLE" not in first["description"])
check("first screening title names the provider", "Dr. Jane Smith" in first["title"])
check("first screening title has the NPI", "1234567890" in first["title"])
# ── 2. OIG/SAM recurring monthly cycle ──────────────────────────────────────
renew = run_handler(
M.OIGSAMScreeningHandler,
{
"intake_data": dict(BASE_INTAKE),
"customer_name": "Jane Smith",
"recurring_cycle": True,
"recurring_invoice_id": "in_LIVE123",
},
)
check("recurring title HAS [Monthly cycle] prefix", renew["title"].startswith("[Monthly cycle] "))
check("recurring desc has RECURRING MONTHLY CYCLE banner", "RECURRING MONTHLY CYCLE" in renew["description"])
check("recurring desc references the invoice id", "in_LIVE123" in renew["description"])
check("recurring desc says re-run against CURRENT data", "CURRENT OIG LEIE" in renew["description"])
check("recurring desc says issue NEW dated certificate", "NEW dated certificate" in renew["description"])
# ── 3. recurring_cycle without invoice id still works ───────────────────────
renew_noinv = run_handler(
M.OIGSAMScreeningHandler,
{"intake_data": dict(BASE_INTAKE), "recurring_cycle": True},
)
check("recurring (no invoice) still has prefix", renew_noinv["title"].startswith("[Monthly cycle] "))
check("recurring (no invoice) has banner", "RECURRING MONTHLY CYCLE" in renew_noinv["description"])
# ── 4. Bundle handler: first screening, no recurring prefix on first run ─────
bundle = run_handler(
M.ProviderComplianceBundleHandler,
{"intake_data": dict(BASE_INTAKE), "customer_name": "Jane Smith"},
)
check("bundle first run has no [Monthly cycle] prefix", "[Monthly cycle]" not in bundle["title"])
# ── 5. recurring flag default is falsey ─────────────────────────────────────
check("recurring branch is off by default", "[Monthly cycle]" not in first["title"])
print()
if _FAILS:
print(f"{len(_FAILS)} FAILED: " + ", ".join(_FAILS))
sys.exit(1)
print("ALL NPI RECURRING TESTS PASSED")