Dependency edges can now require the prerequisite be ACTIVE at FMCSA, not just our handler completed. mc-authority/ucr/d&a now wait for an active USDOT; BOC-3 stays parallel-OK (can file while authority pending). Adds _prerequisite_active() polling FMCSA QC API, a waiting_on_activation hold state with next-check timestamp, and a 21-day authority vetting estimate for customer comms. Branch logic unit-tested.
349 lines
14 KiB
Python
349 lines
14 KiB
Python
"""Pipeline Orchestrator — chains sequential service fulfillment.
|
|
|
|
For new carrier bundles and entity upgrades, services must be fulfilled
|
|
in a specific order because each step depends on the prior one:
|
|
|
|
1. Form LLC/Corp → wait for state approval → get entity name + number
|
|
2. Get EIN → wait for IRS (Mon-Fri 7am-10pm ET) → get EIN
|
|
3. File USDOT → uses entity name + EIN → get DOT number
|
|
4. File MC Authority → needs active USDOT → get MC/authority
|
|
5. File BOC-3 → needs authority → process agent on file
|
|
6. File MCS-150 → needs USDOT → biennial update on record
|
|
7. Enroll D&A → needs DOT# → program active
|
|
8. Register UCR → needs DOT# → registration active
|
|
|
|
This module manages the pipeline state and advances to the next step
|
|
when the prior step completes. Steps can complete via:
|
|
- Service handler returning success
|
|
- Admin marking a todo as done
|
|
- Webhook callback (e.g., Stripe, ERPNext)
|
|
- Cron check (polling for completion)
|
|
|
|
Run via cron every 5 minutes:
|
|
python -m scripts.workers.pipeline_orchestrator
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
|
|
import psycopg2
|
|
|
|
LOG = logging.getLogger("workers.pipeline_orchestrator")
|
|
|
|
DB_URL = os.getenv("DATABASE_URL", "")
|
|
|
|
# Pipeline definitions — order matters.
|
|
#
|
|
# `require_active` (optional, default False): when True the dependency is only
|
|
# considered satisfied once FMCSA reports the prerequisite ACTIVE at the agency,
|
|
# not merely when our handler finished. Used for true agency dependency chains
|
|
# (e.g. MC authority needs an *active* USDOT; UCR/D&A need an active USDOT).
|
|
# `activation_target` names what must be active at FMCSA: "usdot" | "authority".
|
|
PIPELINES = {
|
|
"new-carrier-bundle": [
|
|
{"slug": "llc-formation", "name": "LLC Formation", "wait_for": None},
|
|
{"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"},
|
|
{"slug": "dot-registration", "name": "USDOT Registration", "wait_for": "ein-application"},
|
|
{"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration",
|
|
"require_active": True, "activation_target": "usdot"},
|
|
# BOC-3 + insurance can be filed while authority is pending (parallel OK),
|
|
# so BOC-3 only needs the USDOT registration submitted, not active.
|
|
{"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": "dot-registration"},
|
|
{"slug": "mcs150-update", "name": "MCS-150 Filing", "wait_for": "dot-registration"},
|
|
{"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration",
|
|
"require_active": True, "activation_target": "usdot"},
|
|
{"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration",
|
|
"require_active": True, "activation_target": "usdot"},
|
|
],
|
|
"entity-upgrade-bundle": [
|
|
{"slug": "llc-formation", "name": "LLC Formation", "wait_for": None},
|
|
{"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"},
|
|
{"slug": "mcs150-update", "name": "MCS-150 Update (New Entity)", "wait_for": "ein-application"},
|
|
{"slug": "mc-authority", "name": "Authority Transfer", "wait_for": "mcs150-update"},
|
|
{"slug": "boc3-filing", "name": "BOC-3 Update", "wait_for": "mcs150-update"},
|
|
],
|
|
"dot-full-compliance": [
|
|
{"slug": "mcs150-update", "name": "MCS-150 Update", "wait_for": None},
|
|
{"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": None},
|
|
{"slug": "ucr-registration", "name": "UCR Registration", "wait_for": None},
|
|
{"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": None},
|
|
{"slug": "dot-audit-prep", "name": "Audit Preparation", "wait_for": None},
|
|
],
|
|
}
|
|
|
|
# Expected FMCSA authority vetting/protest window (calendar days) — used to set
|
|
# customer-facing expectations on a `waiting_on_activation` step.
|
|
AUTHORITY_VETTING_DAYS = 21
|
|
|
|
|
|
def _fmcsa_carrier(dot_number: str) -> dict | None:
|
|
"""Fetch raw FMCSA QC carrier record (or None on any failure)."""
|
|
if not dot_number:
|
|
return None
|
|
api_key = os.environ.get("FMCSA_API_KEY", "")
|
|
if not api_key:
|
|
return None
|
|
try:
|
|
import urllib.request
|
|
url = (
|
|
f"https://mobile.fmcsa.dot.gov/qc/services/carriers/"
|
|
f"{dot_number}?webKey={api_key}"
|
|
)
|
|
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
data = json.loads(resp.read())
|
|
return data.get("content", {}).get("carrier", {}) or {}
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.warning("[pipeline] FMCSA lookup failed for DOT %s: %s", dot_number, exc)
|
|
return None
|
|
|
|
|
|
def _prerequisite_active(target: str, dot_number: str) -> bool:
|
|
"""
|
|
Return True if the named prerequisite is ACTIVE at FMCSA.
|
|
|
|
target == "usdot": the USDOT record is allowed to operate (not OOS/inactive).
|
|
target == "authority": at least one operating authority is active.
|
|
On lookup failure we return False (hold) to avoid filing prematurely.
|
|
"""
|
|
carrier = _fmcsa_carrier(dot_number)
|
|
if carrier is None:
|
|
return False
|
|
if target == "usdot":
|
|
allowed = str(carrier.get("allowedToOperate", "")).upper()
|
|
oos = str(carrier.get("oosDate") or "")
|
|
status = str(carrier.get("statusCode", "")).upper()
|
|
# Active = allowed to operate, not out-of-service, status active.
|
|
return allowed == "Y" and not oos and status in ("A", "")
|
|
if target == "authority":
|
|
for k in ("commonAuthorityStatus", "contractAuthorityStatus", "brokerAuthorityStatus"):
|
|
if str(carrier.get(k, "")).upper() == "A":
|
|
return True
|
|
return False
|
|
return False
|
|
|
|
|
|
def check_pipelines():
|
|
"""Check all active pipeline orders and advance where possible."""
|
|
conn = psycopg2.connect(DB_URL)
|
|
cur = conn.cursor()
|
|
|
|
# Find batch orders that have pipeline-eligible slugs
|
|
for pipeline_slug, steps in PIPELINES.items():
|
|
cur.execute("""
|
|
SELECT DISTINCT batch_id
|
|
FROM compliance_orders
|
|
WHERE batch_id IS NOT NULL
|
|
AND payment_status = 'paid'
|
|
AND service_slug = %s
|
|
AND (intake_data->>'pipeline_complete') IS NULL
|
|
""", (steps[0]["slug"],))
|
|
|
|
for (batch_id,) in cur.fetchall():
|
|
try:
|
|
advance_pipeline(conn, batch_id, pipeline_slug, steps)
|
|
except Exception as exc:
|
|
LOG.error("[pipeline] Error advancing %s: %s", batch_id, exc)
|
|
conn.rollback()
|
|
|
|
conn.close()
|
|
|
|
|
|
def advance_pipeline(conn, batch_id: str, pipeline_slug: str, steps: list):
|
|
"""Check and advance a specific pipeline."""
|
|
cur = conn.cursor()
|
|
|
|
# Get all orders in this batch
|
|
cur.execute("""
|
|
SELECT order_number, service_slug, payment_status, intake_data
|
|
FROM compliance_orders
|
|
WHERE batch_id = %s
|
|
ORDER BY created_at
|
|
""", (batch_id,))
|
|
orders = {row[1]: {"order_number": row[0], "status": row[2], "intake_data": row[3] or {}} for row in cur.fetchall()}
|
|
|
|
# Check each step
|
|
for step in steps:
|
|
slug = step["slug"]
|
|
wait_for = step["wait_for"]
|
|
|
|
# Skip if this service isn't in the batch
|
|
if slug not in orders:
|
|
continue
|
|
|
|
order = orders[slug]
|
|
intake = order["intake_data"]
|
|
if isinstance(intake, str):
|
|
intake = json.loads(intake)
|
|
|
|
# Check if already completed
|
|
step_status = intake.get("pipeline_step_status", "pending")
|
|
if step_status == "completed":
|
|
continue
|
|
|
|
# Check if dependency is met
|
|
if wait_for and wait_for in orders:
|
|
dep_intake = orders[wait_for]["intake_data"]
|
|
if isinstance(dep_intake, str):
|
|
dep_intake = json.loads(dep_intake)
|
|
dep_status = dep_intake.get("pipeline_step_status", "pending")
|
|
if dep_status != "completed":
|
|
LOG.debug("[pipeline] %s waiting for %s (status: %s)", slug, wait_for, dep_status)
|
|
continue
|
|
|
|
# Activation gate: some edges need the prerequisite ACTIVE at FMCSA,
|
|
# not merely our handler finished. Poll FMCSA before dispatching.
|
|
if step.get("require_active"):
|
|
target = step.get("activation_target", "usdot")
|
|
dot_number = (intake.get("dot_number")
|
|
or dep_intake.get("dot_number") or "")
|
|
if not _prerequisite_active(target, dot_number):
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
LOG.info("[pipeline] %s held: %s not yet ACTIVE at FMCSA "
|
|
"(DOT %s)", slug, target, dot_number or "?")
|
|
cur.execute("""
|
|
UPDATE compliance_orders
|
|
SET intake_data = intake_data || %s::jsonb
|
|
WHERE order_number = %s
|
|
""", (
|
|
json.dumps({
|
|
"pipeline_step_status": "waiting_on_activation",
|
|
"activation_target": target,
|
|
"activation_last_checked": now,
|
|
"activation_note": (
|
|
f"Holding until {target} is ACTIVE at FMCSA"
|
|
+ (f" (~{AUTHORITY_VETTING_DAYS}-day vetting window)"
|
|
if target == "authority" else "")
|
|
),
|
|
}),
|
|
order["order_number"],
|
|
))
|
|
conn.commit()
|
|
continue
|
|
|
|
# A step previously parked in waiting_on_activation becomes dispatchable
|
|
# once the gate above passes; treat it like pending.
|
|
if step_status == "waiting_on_activation":
|
|
step_status = "pending"
|
|
|
|
# Check if step needs dispatching
|
|
if step_status == "pending":
|
|
# Mark as dispatched and trigger the service handler
|
|
LOG.info("[pipeline] Dispatching %s for batch %s (order %s)",
|
|
slug, batch_id, order["order_number"])
|
|
|
|
# Copy forward data from completed dependencies
|
|
forward_data = {}
|
|
if wait_for and wait_for in orders:
|
|
dep_intake = orders[wait_for]["intake_data"]
|
|
if isinstance(dep_intake, str):
|
|
dep_intake = json.loads(dep_intake)
|
|
# Forward entity name, DOT#, EIN, etc.
|
|
for key in ["entity_name", "dot_number", "mc_number", "ein",
|
|
"formation_state", "entity_number", "formation_date"]:
|
|
if dep_intake.get(key):
|
|
forward_data[key] = dep_intake[key]
|
|
|
|
# Update order with forwarded data + mark as dispatched
|
|
cur.execute("""
|
|
UPDATE compliance_orders
|
|
SET intake_data = intake_data || %s::jsonb
|
|
WHERE order_number = %s
|
|
""", (
|
|
json.dumps({**forward_data, "pipeline_step_status": "dispatched"}),
|
|
order["order_number"],
|
|
))
|
|
conn.commit()
|
|
|
|
# Dispatch to job server
|
|
_dispatch_service(order["order_number"], slug)
|
|
|
|
# Check if all steps are complete
|
|
all_complete = True
|
|
for step in steps:
|
|
if step["slug"] in orders:
|
|
intake = orders[step["slug"]]["intake_data"]
|
|
if isinstance(intake, str):
|
|
intake = json.loads(intake)
|
|
if intake.get("pipeline_step_status") != "completed":
|
|
all_complete = False
|
|
break
|
|
|
|
if all_complete:
|
|
LOG.info("[pipeline] Pipeline complete for batch %s", batch_id)
|
|
# Mark the primary order as pipeline complete
|
|
first_slug = steps[0]["slug"]
|
|
if first_slug in orders:
|
|
cur.execute("""
|
|
UPDATE compliance_orders
|
|
SET intake_data = jsonb_set(
|
|
COALESCE(intake_data, '{}'::jsonb),
|
|
'{pipeline_complete}', 'true'::jsonb
|
|
)
|
|
WHERE order_number = %s
|
|
""", (orders[first_slug]["order_number"],))
|
|
conn.commit()
|
|
|
|
# Send Telegram notification
|
|
_notify_pipeline_complete(batch_id, pipeline_slug)
|
|
|
|
|
|
def _dispatch_service(order_number: str, service_slug: str):
|
|
"""Dispatch a service to the job server for processing."""
|
|
import urllib.request
|
|
|
|
worker_url = os.getenv("WORKER_URL", "http://localhost:8090")
|
|
try:
|
|
data = json.dumps({
|
|
"action": "process_compliance_service",
|
|
"order_name": order_number,
|
|
"service_slug": service_slug,
|
|
}).encode()
|
|
req = urllib.request.Request(
|
|
f"{worker_url}/jobs",
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
urllib.request.urlopen(req, timeout=10)
|
|
LOG.info("[pipeline] Dispatched %s for %s", service_slug, order_number)
|
|
except Exception as exc:
|
|
LOG.error("[pipeline] Failed to dispatch %s for %s: %s", service_slug, order_number, exc)
|
|
|
|
|
|
def _notify_pipeline_complete(batch_id: str, pipeline_slug: str):
|
|
"""Telegram notification when a pipeline completes."""
|
|
try:
|
|
import urllib.request
|
|
bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
|
|
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
|
|
if not bot_token or not chat_id:
|
|
return
|
|
|
|
text = f"✅ Pipeline Complete\nBatch: {batch_id}\nType: {pipeline_slug}\nAll steps finished."
|
|
data = json.dumps({"chat_id": chat_id, "text": text}).encode()
|
|
req = urllib.request.Request(
|
|
f"https://api.telegram.org/bot{bot_token}/sendMessage",
|
|
data=data,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
urllib.request.urlopen(req, timeout=10)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def main():
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
)
|
|
LOG.info("Pipeline orchestrator — checking active pipelines")
|
|
check_pipelines()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|