""" Logic tests for the OIG/SAM recurring-monitoring fulfillment path in npi_provider.py. Runs the REAL _BaseNPIHandler.handle() with _create_todo monkeypatched to capture the todo (so no DB / ERPNext / email side effects), asserting the recurring-cycle branch produces the right title prefix + notes. Run: python3 -m pytest scripts/workers/services/test_npi_recurring.py -q or: python3 scripts/workers/services/test_npi_recurring.py """ import asyncio import os import sys # Make 'scripts' importable when run directly from repo root. _REPO = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")) if _REPO not in sys.path: sys.path.insert(0, _REPO) from scripts.workers.services import npi_provider as M # noqa: E402 _FAILS = [] def check(label, cond): print((" PASS " if cond else " FAIL ") + label) if not cond: _FAILS.append(label) def run_handler(handler_cls, order_data, order_number="HC-TEST-1"): """Run handler.handle() capturing the todo via a monkeypatched _create_todo.""" captured = {} def fake_create_todo(self, order_number, intake, title, description, priority="normal"): captured["order_number"] = order_number captured["title"] = title captured["description"] = description captured["priority"] = priority orig = M._BaseNPIHandler._create_todo M._BaseNPIHandler._create_todo = fake_create_todo try: h = handler_cls() asyncio.run(h.handle(order_data, order_number)) finally: M._BaseNPIHandler._create_todo = orig return captured BASE_INTAKE = { "npi": "1234567890", "provider_name": "Dr. Jane Smith", "specialty": "Internal Medicine", "practice_state": "TX", "email": "jane@example.com", } # ── 1. OIG/SAM first fulfillment (NOT recurring) ──────────────────────────── first = run_handler( M.OIGSAMScreeningHandler, {"intake_data": dict(BASE_INTAKE), "customer_name": "Jane Smith"}, ) check("first screening has no [Monthly cycle] prefix", "[Monthly cycle]" not in first["title"]) check("first screening has no RECURRING note", "RECURRING MONTHLY CYCLE" not in first["description"]) check("first screening title names the provider", "Dr. Jane Smith" in first["title"]) check("first screening title has the NPI", "1234567890" in first["title"]) # ── 2. OIG/SAM recurring monthly cycle ────────────────────────────────────── renew = run_handler( M.OIGSAMScreeningHandler, { "intake_data": dict(BASE_INTAKE), "customer_name": "Jane Smith", "recurring_cycle": True, "recurring_invoice_id": "in_LIVE123", }, ) check("recurring title HAS [Monthly cycle] prefix", renew["title"].startswith("[Monthly cycle] ")) check("recurring desc has RECURRING MONTHLY CYCLE banner", "RECURRING MONTHLY CYCLE" in renew["description"]) check("recurring desc references the invoice id", "in_LIVE123" in renew["description"]) check("recurring desc says re-run against CURRENT data", "CURRENT OIG LEIE" in renew["description"]) check("recurring desc says issue NEW dated certificate", "NEW dated certificate" in renew["description"]) # ── 3. recurring_cycle without invoice id still works ─────────────────────── renew_noinv = run_handler( M.OIGSAMScreeningHandler, {"intake_data": dict(BASE_INTAKE), "recurring_cycle": True}, ) check("recurring (no invoice) still has prefix", renew_noinv["title"].startswith("[Monthly cycle] ")) check("recurring (no invoice) has banner", "RECURRING MONTHLY CYCLE" in renew_noinv["description"]) # ── 4. Bundle handler: first screening, no recurring prefix on first run ───── bundle = run_handler( M.ProviderComplianceBundleHandler, {"intake_data": dict(BASE_INTAKE), "customer_name": "Jane Smith"}, ) check("bundle first run has no [Monthly cycle] prefix", "[Monthly cycle]" not in bundle["title"]) # ── 5. recurring flag default is falsey ───────────────────────────────────── check("recurring branch is off by default", "[Monthly cycle]" not in first["title"]) print() if _FAILS: print(f"{len(_FAILS)} FAILED: " + ", ".join(_FAILS)) sys.exit(1) print("ALL NPI RECURRING TESTS PASSED")