feat(pipeline): FMCSA activation gating (require_active edges)
Dependency edges can now require the prerequisite be ACTIVE at FMCSA, not just our handler completed. mc-authority/ucr/d&a now wait for an active USDOT; BOC-3 stays parallel-OK (can file while authority pending). Adds _prerequisite_active() polling FMCSA QC API, a waiting_on_activation hold state with next-check timestamp, and a 21-day authority vetting estimate for customer comms. Branch logic unit-tested.
This commit is contained in:
parent
bbbfeaeaa1
commit
63a28f99de
1 changed files with 101 additions and 4 deletions
|
|
@ -37,17 +37,28 @@ LOG = logging.getLogger("workers.pipeline_orchestrator")
|
|||
|
||||
DB_URL = os.getenv("DATABASE_URL", "")
|
||||
|
||||
# Pipeline definitions — order matters
|
||||
# Pipeline definitions — order matters.
|
||||
#
|
||||
# `require_active` (optional, default False): when True the dependency is only
|
||||
# considered satisfied once FMCSA reports the prerequisite ACTIVE at the agency,
|
||||
# not merely when our handler finished. Used for true agency dependency chains
|
||||
# (e.g. MC authority needs an *active* USDOT; UCR/D&A need an active USDOT).
|
||||
# `activation_target` names what must be active at FMCSA: "usdot" | "authority".
|
||||
PIPELINES = {
|
||||
"new-carrier-bundle": [
|
||||
{"slug": "llc-formation", "name": "LLC Formation", "wait_for": None},
|
||||
{"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"},
|
||||
{"slug": "dot-registration", "name": "USDOT Registration", "wait_for": "ein-application"},
|
||||
{"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration"},
|
||||
{"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration",
|
||||
"require_active": True, "activation_target": "usdot"},
|
||||
# BOC-3 + insurance can be filed while authority is pending (parallel OK),
|
||||
# so BOC-3 only needs the USDOT registration submitted, not active.
|
||||
{"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": "dot-registration"},
|
||||
{"slug": "mcs150-update", "name": "MCS-150 Filing", "wait_for": "dot-registration"},
|
||||
{"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration"},
|
||||
{"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration"},
|
||||
{"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration",
|
||||
"require_active": True, "activation_target": "usdot"},
|
||||
{"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration",
|
||||
"require_active": True, "activation_target": "usdot"},
|
||||
],
|
||||
"entity-upgrade-bundle": [
|
||||
{"slug": "llc-formation", "name": "LLC Formation", "wait_for": None},
|
||||
|
|
@ -65,6 +76,57 @@ PIPELINES = {
|
|||
],
|
||||
}
|
||||
|
||||
# Expected FMCSA authority vetting/protest window (calendar days) — used to set
|
||||
# customer-facing expectations on a `waiting_on_activation` step.
|
||||
AUTHORITY_VETTING_DAYS = 21
|
||||
|
||||
|
||||
def _fmcsa_carrier(dot_number: str) -> dict | None:
|
||||
"""Fetch raw FMCSA QC carrier record (or None on any failure)."""
|
||||
if not dot_number:
|
||||
return None
|
||||
api_key = os.environ.get("FMCSA_API_KEY", "")
|
||||
if not api_key:
|
||||
return None
|
||||
try:
|
||||
import urllib.request
|
||||
url = (
|
||||
f"https://mobile.fmcsa.dot.gov/qc/services/carriers/"
|
||||
f"{dot_number}?webKey={api_key}"
|
||||
)
|
||||
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
data = json.loads(resp.read())
|
||||
return data.get("content", {}).get("carrier", {}) or {}
|
||||
except Exception as exc: # noqa: BLE001
|
||||
LOG.warning("[pipeline] FMCSA lookup failed for DOT %s: %s", dot_number, exc)
|
||||
return None
|
||||
|
||||
|
||||
def _prerequisite_active(target: str, dot_number: str) -> bool:
|
||||
"""
|
||||
Return True if the named prerequisite is ACTIVE at FMCSA.
|
||||
|
||||
target == "usdot": the USDOT record is allowed to operate (not OOS/inactive).
|
||||
target == "authority": at least one operating authority is active.
|
||||
On lookup failure we return False (hold) to avoid filing prematurely.
|
||||
"""
|
||||
carrier = _fmcsa_carrier(dot_number)
|
||||
if carrier is None:
|
||||
return False
|
||||
if target == "usdot":
|
||||
allowed = str(carrier.get("allowedToOperate", "")).upper()
|
||||
oos = str(carrier.get("oosDate") or "")
|
||||
status = str(carrier.get("statusCode", "")).upper()
|
||||
# Active = allowed to operate, not out-of-service, status active.
|
||||
return allowed == "Y" and not oos and status in ("A", "")
|
||||
if target == "authority":
|
||||
for k in ("commonAuthorityStatus", "contractAuthorityStatus", "brokerAuthorityStatus"):
|
||||
if str(carrier.get(k, "")).upper() == "A":
|
||||
return True
|
||||
return False
|
||||
return False
|
||||
|
||||
|
||||
def check_pipelines():
|
||||
"""Check all active pipeline orders and advance where possible."""
|
||||
|
|
@ -134,6 +196,41 @@ def advance_pipeline(conn, batch_id: str, pipeline_slug: str, steps: list):
|
|||
LOG.debug("[pipeline] %s waiting for %s (status: %s)", slug, wait_for, dep_status)
|
||||
continue
|
||||
|
||||
# Activation gate: some edges need the prerequisite ACTIVE at FMCSA,
|
||||
# not merely our handler finished. Poll FMCSA before dispatching.
|
||||
if step.get("require_active"):
|
||||
target = step.get("activation_target", "usdot")
|
||||
dot_number = (intake.get("dot_number")
|
||||
or dep_intake.get("dot_number") or "")
|
||||
if not _prerequisite_active(target, dot_number):
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
LOG.info("[pipeline] %s held: %s not yet ACTIVE at FMCSA "
|
||||
"(DOT %s)", slug, target, dot_number or "?")
|
||||
cur.execute("""
|
||||
UPDATE compliance_orders
|
||||
SET intake_data = intake_data || %s::jsonb
|
||||
WHERE order_number = %s
|
||||
""", (
|
||||
json.dumps({
|
||||
"pipeline_step_status": "waiting_on_activation",
|
||||
"activation_target": target,
|
||||
"activation_last_checked": now,
|
||||
"activation_note": (
|
||||
f"Holding until {target} is ACTIVE at FMCSA"
|
||||
+ (f" (~{AUTHORITY_VETTING_DAYS}-day vetting window)"
|
||||
if target == "authority" else "")
|
||||
),
|
||||
}),
|
||||
order["order_number"],
|
||||
))
|
||||
conn.commit()
|
||||
continue
|
||||
|
||||
# A step previously parked in waiting_on_activation becomes dispatchable
|
||||
# once the gate above passes; treat it like pending.
|
||||
if step_status == "waiting_on_activation":
|
||||
step_status = "pending"
|
||||
|
||||
# Check if step needs dispatching
|
||||
if step_status == "pending":
|
||||
# Mark as dispatched and trigger the service handler
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue