From 63a28f99dedbdfc1c1acbd2e2428013842456fc5 Mon Sep 17 00:00:00 2001 From: justin Date: Tue, 2 Jun 2026 03:32:37 -0500 Subject: [PATCH] feat(pipeline): FMCSA activation gating (require_active edges) Dependency edges can now require the prerequisite be ACTIVE at FMCSA, not just our handler completed. mc-authority/ucr/d&a now wait for an active USDOT; BOC-3 stays parallel-OK (can file while authority pending). Adds _prerequisite_active() polling FMCSA QC API, a waiting_on_activation hold state with next-check timestamp, and a 21-day authority vetting estimate for customer comms. Branch logic unit-tested. --- scripts/workers/pipeline_orchestrator.py | 105 ++++++++++++++++++++++- 1 file changed, 101 insertions(+), 4 deletions(-) diff --git a/scripts/workers/pipeline_orchestrator.py b/scripts/workers/pipeline_orchestrator.py index 4a4558a..b5407f1 100644 --- a/scripts/workers/pipeline_orchestrator.py +++ b/scripts/workers/pipeline_orchestrator.py @@ -37,17 +37,28 @@ LOG = logging.getLogger("workers.pipeline_orchestrator") DB_URL = os.getenv("DATABASE_URL", "") -# Pipeline definitions — order matters +# Pipeline definitions — order matters. +# +# `require_active` (optional, default False): when True the dependency is only +# considered satisfied once FMCSA reports the prerequisite ACTIVE at the agency, +# not merely when our handler finished. Used for true agency dependency chains +# (e.g. MC authority needs an *active* USDOT; UCR/D&A need an active USDOT). +# `activation_target` names what must be active at FMCSA: "usdot" | "authority". PIPELINES = { "new-carrier-bundle": [ {"slug": "llc-formation", "name": "LLC Formation", "wait_for": None}, {"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"}, {"slug": "dot-registration", "name": "USDOT Registration", "wait_for": "ein-application"}, - {"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration"}, + {"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration", + "require_active": True, "activation_target": "usdot"}, + # BOC-3 + insurance can be filed while authority is pending (parallel OK), + # so BOC-3 only needs the USDOT registration submitted, not active. {"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": "dot-registration"}, {"slug": "mcs150-update", "name": "MCS-150 Filing", "wait_for": "dot-registration"}, - {"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration"}, - {"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration"}, + {"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration", + "require_active": True, "activation_target": "usdot"}, + {"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration", + "require_active": True, "activation_target": "usdot"}, ], "entity-upgrade-bundle": [ {"slug": "llc-formation", "name": "LLC Formation", "wait_for": None}, @@ -65,6 +76,57 @@ PIPELINES = { ], } +# Expected FMCSA authority vetting/protest window (calendar days) — used to set +# customer-facing expectations on a `waiting_on_activation` step. +AUTHORITY_VETTING_DAYS = 21 + + +def _fmcsa_carrier(dot_number: str) -> dict | None: + """Fetch raw FMCSA QC carrier record (or None on any failure).""" + if not dot_number: + return None + api_key = os.environ.get("FMCSA_API_KEY", "") + if not api_key: + return None + try: + import urllib.request + url = ( + f"https://mobile.fmcsa.dot.gov/qc/services/carriers/" + f"{dot_number}?webKey={api_key}" + ) + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read()) + return data.get("content", {}).get("carrier", {}) or {} + except Exception as exc: # noqa: BLE001 + LOG.warning("[pipeline] FMCSA lookup failed for DOT %s: %s", dot_number, exc) + return None + + +def _prerequisite_active(target: str, dot_number: str) -> bool: + """ + Return True if the named prerequisite is ACTIVE at FMCSA. + + target == "usdot": the USDOT record is allowed to operate (not OOS/inactive). + target == "authority": at least one operating authority is active. + On lookup failure we return False (hold) to avoid filing prematurely. + """ + carrier = _fmcsa_carrier(dot_number) + if carrier is None: + return False + if target == "usdot": + allowed = str(carrier.get("allowedToOperate", "")).upper() + oos = str(carrier.get("oosDate") or "") + status = str(carrier.get("statusCode", "")).upper() + # Active = allowed to operate, not out-of-service, status active. + return allowed == "Y" and not oos and status in ("A", "") + if target == "authority": + for k in ("commonAuthorityStatus", "contractAuthorityStatus", "brokerAuthorityStatus"): + if str(carrier.get(k, "")).upper() == "A": + return True + return False + return False + def check_pipelines(): """Check all active pipeline orders and advance where possible.""" @@ -134,6 +196,41 @@ def advance_pipeline(conn, batch_id: str, pipeline_slug: str, steps: list): LOG.debug("[pipeline] %s waiting for %s (status: %s)", slug, wait_for, dep_status) continue + # Activation gate: some edges need the prerequisite ACTIVE at FMCSA, + # not merely our handler finished. Poll FMCSA before dispatching. + if step.get("require_active"): + target = step.get("activation_target", "usdot") + dot_number = (intake.get("dot_number") + or dep_intake.get("dot_number") or "") + if not _prerequisite_active(target, dot_number): + now = datetime.now(timezone.utc).isoformat() + LOG.info("[pipeline] %s held: %s not yet ACTIVE at FMCSA " + "(DOT %s)", slug, target, dot_number or "?") + cur.execute(""" + UPDATE compliance_orders + SET intake_data = intake_data || %s::jsonb + WHERE order_number = %s + """, ( + json.dumps({ + "pipeline_step_status": "waiting_on_activation", + "activation_target": target, + "activation_last_checked": now, + "activation_note": ( + f"Holding until {target} is ACTIVE at FMCSA" + + (f" (~{AUTHORITY_VETTING_DAYS}-day vetting window)" + if target == "authority" else "") + ), + }), + order["order_number"], + )) + conn.commit() + continue + + # A step previously parked in waiting_on_activation becomes dispatchable + # once the gate above passes; treat it like pending. + if step_status == "waiting_on_activation": + step_status = "pending" + # Check if step needs dispatching if step_status == "pending": # Mark as dispatched and trigger the service handler