"""Pipeline Orchestrator — chains sequential service fulfillment. For new carrier bundles and entity upgrades, services must be fulfilled in a specific order because each step depends on the prior one: 1. Form LLC/Corp → wait for state approval → get entity name + number 2. Get EIN → wait for IRS (Mon-Fri 7am-10pm ET) → get EIN 3. File USDOT → uses entity name + EIN → get DOT number 4. File MC Authority → needs active USDOT → get MC/authority 5. File BOC-3 → needs authority → process agent on file 6. File MCS-150 → needs USDOT → biennial update on record 7. Enroll D&A → needs DOT# → program active 8. Register UCR → needs DOT# → registration active This module manages the pipeline state and advances to the next step when the prior step completes. Steps can complete via: - Service handler returning success - Admin marking a todo as done - Webhook callback (e.g., Stripe, ERPNext) - Cron check (polling for completion) Run via cron every 5 minutes: python -m scripts.workers.pipeline_orchestrator """ from __future__ import annotations import json import logging import os import sys from datetime import datetime, timezone import psycopg2 LOG = logging.getLogger("workers.pipeline_orchestrator") DB_URL = os.getenv("DATABASE_URL", "") # Pipeline definitions — order matters. # # `require_active` (optional, default False): when True the dependency is only # considered satisfied once FMCSA reports the prerequisite ACTIVE at the agency, # not merely when our handler finished. Used for true agency dependency chains # (e.g. MC authority needs an *active* USDOT; UCR/D&A need an active USDOT). # `activation_target` names what must be active at FMCSA: "usdot" | "authority". PIPELINES = { # Trucking/DOT new-carrier onboarding pipeline. Keyed by the DOT-specific # slug (dot-new-carrier-bundle); the bare "new-carrier-bundle" slug is the # TELECOM VoIP bundle, handled by NewCarrierBundleHandler -- they must not # share a key here or trucking orders would run the telecom pipeline. "dot-new-carrier-bundle": [ {"slug": "llc-formation", "name": "LLC Formation", "wait_for": None}, {"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"}, {"slug": "dot-registration", "name": "USDOT Registration", "wait_for": "ein-application"}, {"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration", "require_active": True, "activation_target": "usdot"}, # BOC-3 + insurance can be filed while authority is pending (parallel OK), # so BOC-3 only needs the USDOT registration submitted, not active. {"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": "dot-registration"}, {"slug": "mcs150-update", "name": "MCS-150 Filing", "wait_for": "dot-registration"}, {"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration", "require_active": True, "activation_target": "usdot"}, {"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration", "require_active": True, "activation_target": "usdot"}, ], "entity-upgrade-bundle": [ {"slug": "llc-formation", "name": "LLC Formation", "wait_for": None}, {"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"}, {"slug": "mcs150-update", "name": "MCS-150 Update (New Entity)", "wait_for": "ein-application"}, {"slug": "mc-authority", "name": "Authority Transfer", "wait_for": "mcs150-update"}, {"slug": "boc3-filing", "name": "BOC-3 Update", "wait_for": "mcs150-update"}, ], "dot-full-compliance": [ {"slug": "mcs150-update", "name": "MCS-150 Update", "wait_for": None}, {"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": None}, {"slug": "ucr-registration", "name": "UCR Registration", "wait_for": None}, {"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": None}, {"slug": "dot-audit-prep", "name": "Audit Preparation", "wait_for": None}, ], } # Expected FMCSA authority vetting/protest window (calendar days) — used to set # customer-facing expectations on a `waiting_on_activation` step. AUTHORITY_VETTING_DAYS = 21 def _fmcsa_carrier(dot_number: str) -> dict | None: """Fetch raw FMCSA QC carrier record (or None on any failure).""" if not dot_number: return None api_key = os.environ.get("FMCSA_API_KEY", "") if not api_key: return None try: import urllib.request url = ( f"https://mobile.fmcsa.dot.gov/qc/services/carriers/" f"{dot_number}?webKey={api_key}" ) req = urllib.request.Request(url, headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read()) return data.get("content", {}).get("carrier", {}) or {} except Exception as exc: # noqa: BLE001 LOG.warning("[pipeline] FMCSA lookup failed for DOT %s: %s", dot_number, exc) return None def _prerequisite_active(target: str, dot_number: str) -> bool: """ Return True if the named prerequisite is ACTIVE at FMCSA. target == "usdot": the USDOT record is allowed to operate (not OOS/inactive). target == "authority": at least one operating authority is active. On lookup failure we return False (hold) to avoid filing prematurely. """ carrier = _fmcsa_carrier(dot_number) if carrier is None: return False if target == "usdot": allowed = str(carrier.get("allowedToOperate", "")).upper() oos = str(carrier.get("oosDate") or "") status = str(carrier.get("statusCode", "")).upper() # Active = allowed to operate, not out-of-service, status active. return allowed == "Y" and not oos and status in ("A", "") if target == "authority": for k in ("commonAuthorityStatus", "contractAuthorityStatus", "brokerAuthorityStatus"): if str(carrier.get(k, "")).upper() == "A": return True return False return False def check_pipelines(): """Check all active pipeline orders and advance where possible.""" conn = psycopg2.connect(DB_URL) cur = conn.cursor() # Find batch orders that have pipeline-eligible slugs for pipeline_slug, steps in PIPELINES.items(): cur.execute(""" SELECT DISTINCT batch_id FROM compliance_orders WHERE batch_id IS NOT NULL AND payment_status = 'paid' AND service_slug = %s AND (intake_data->>'pipeline_complete') IS NULL """, (steps[0]["slug"],)) for (batch_id,) in cur.fetchall(): try: advance_pipeline(conn, batch_id, pipeline_slug, steps) except Exception as exc: LOG.error("[pipeline] Error advancing %s: %s", batch_id, exc) conn.rollback() conn.close() def advance_pipeline(conn, batch_id: str, pipeline_slug: str, steps: list): """Check and advance a specific pipeline.""" cur = conn.cursor() # Get all orders in this batch cur.execute(""" SELECT order_number, service_slug, payment_status, intake_data FROM compliance_orders WHERE batch_id = %s ORDER BY created_at """, (batch_id,)) orders = {row[1]: {"order_number": row[0], "status": row[2], "intake_data": row[3] or {}} for row in cur.fetchall()} # Check each step for step in steps: slug = step["slug"] wait_for = step["wait_for"] # Skip if this service isn't in the batch if slug not in orders: continue order = orders[slug] intake = order["intake_data"] if isinstance(intake, str): intake = json.loads(intake) # Check if already completed step_status = intake.get("pipeline_step_status", "pending") if step_status == "completed": continue # Check if dependency is met if wait_for and wait_for in orders: dep_intake = orders[wait_for]["intake_data"] if isinstance(dep_intake, str): dep_intake = json.loads(dep_intake) dep_status = dep_intake.get("pipeline_step_status", "pending") if dep_status != "completed": LOG.debug("[pipeline] %s waiting for %s (status: %s)", slug, wait_for, dep_status) continue # Activation gate: some edges need the prerequisite ACTIVE at FMCSA, # not merely our handler finished. Poll FMCSA before dispatching. if step.get("require_active"): target = step.get("activation_target", "usdot") dot_number = (intake.get("dot_number") or dep_intake.get("dot_number") or "") if not _prerequisite_active(target, dot_number): now = datetime.now(timezone.utc).isoformat() LOG.info("[pipeline] %s held: %s not yet ACTIVE at FMCSA " "(DOT %s)", slug, target, dot_number or "?") cur.execute(""" UPDATE compliance_orders SET intake_data = intake_data || %s::jsonb WHERE order_number = %s """, ( json.dumps({ "pipeline_step_status": "waiting_on_activation", "activation_target": target, "activation_last_checked": now, "activation_note": ( f"Holding until {target} is ACTIVE at FMCSA" + (f" (~{AUTHORITY_VETTING_DAYS}-day vetting window)" if target == "authority" else "") ), }), order["order_number"], )) conn.commit() continue # A step previously parked in waiting_on_activation becomes dispatchable # once the gate above passes; treat it like pending. if step_status == "waiting_on_activation": step_status = "pending" # Check if step needs dispatching if step_status == "pending": # Mark as dispatched and trigger the service handler LOG.info("[pipeline] Dispatching %s for batch %s (order %s)", slug, batch_id, order["order_number"]) # Copy forward data from completed dependencies forward_data = {} if wait_for and wait_for in orders: dep_intake = orders[wait_for]["intake_data"] if isinstance(dep_intake, str): dep_intake = json.loads(dep_intake) # Forward entity name, DOT#, EIN, etc. for key in ["entity_name", "dot_number", "mc_number", "ein", "formation_state", "entity_number", "formation_date"]: if dep_intake.get(key): forward_data[key] = dep_intake[key] # Update order with forwarded data + mark as dispatched cur.execute(""" UPDATE compliance_orders SET intake_data = intake_data || %s::jsonb WHERE order_number = %s """, ( json.dumps({**forward_data, "pipeline_step_status": "dispatched"}), order["order_number"], )) conn.commit() # Dispatch to job server _dispatch_service(order["order_number"], slug) # Check if all steps are complete all_complete = True for step in steps: if step["slug"] in orders: intake = orders[step["slug"]]["intake_data"] if isinstance(intake, str): intake = json.loads(intake) if intake.get("pipeline_step_status") != "completed": all_complete = False break if all_complete: LOG.info("[pipeline] Pipeline complete for batch %s", batch_id) # Mark the primary order as pipeline complete first_slug = steps[0]["slug"] if first_slug in orders: cur.execute(""" UPDATE compliance_orders SET intake_data = jsonb_set( COALESCE(intake_data, '{}'::jsonb), '{pipeline_complete}', 'true'::jsonb ) WHERE order_number = %s """, (orders[first_slug]["order_number"],)) conn.commit() # Send Telegram notification _notify_pipeline_complete(batch_id, pipeline_slug) def _dispatch_service(order_number: str, service_slug: str): """Dispatch a service to the job server for processing.""" import urllib.request worker_url = os.getenv("WORKER_URL", "http://localhost:8090") try: data = json.dumps({ "action": "process_compliance_service", "order_name": order_number, "service_slug": service_slug, }).encode() req = urllib.request.Request( f"{worker_url}/jobs", data=data, headers={"Content-Type": "application/json"}, ) urllib.request.urlopen(req, timeout=10) LOG.info("[pipeline] Dispatched %s for %s", service_slug, order_number) except Exception as exc: LOG.error("[pipeline] Failed to dispatch %s for %s: %s", service_slug, order_number, exc) def _notify_pipeline_complete(batch_id: str, pipeline_slug: str): """Telegram notification when a pipeline completes.""" try: import urllib.request bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "") chat_id = os.getenv("TELEGRAM_CHAT_ID", "") if not bot_token or not chat_id: return text = f"✅ Pipeline Complete\nBatch: {batch_id}\nType: {pipeline_slug}\nAll steps finished." data = json.dumps({"chat_id": chat_id, "text": text}).encode() req = urllib.request.Request( f"https://api.telegram.org/bot{bot_token}/sendMessage", data=data, headers={"Content-Type": "application/json"}, ) urllib.request.urlopen(req, timeout=10) except Exception: pass def main(): logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", ) LOG.info("Pipeline orchestrator — checking active pipelines") check_pipelines() if __name__ == "__main__": main()