"""Pipeline Orchestrator — chains sequential service fulfillment. For new carrier bundles and entity upgrades, services must be fulfilled in a specific order because each step depends on the prior one: 1. Form LLC/Corp → wait for state approval → get entity name + number 2. Get EIN → wait for IRS (Mon-Fri 7am-10pm ET) → get EIN 3. File USDOT → uses entity name + EIN → get DOT number 4. File MC Authority → needs active USDOT → get MC/authority 5. File BOC-3 → needs authority → process agent on file 6. File MCS-150 → needs USDOT → biennial update on record 7. Enroll D&A → needs DOT# → program active 8. Register UCR → needs DOT# → registration active This module manages the pipeline state and advances to the next step when the prior step completes. Steps can complete via: - Service handler returning success - Admin marking a todo as done - Webhook callback (e.g., Stripe, ERPNext) - Cron check (polling for completion) Run via cron every 5 minutes: python -m scripts.workers.pipeline_orchestrator """ from __future__ import annotations import json import logging import os import sys from datetime import datetime, timezone import psycopg2 LOG = logging.getLogger("workers.pipeline_orchestrator") DB_URL = os.getenv("DATABASE_URL", "") # Pipeline definitions — order matters PIPELINES = { "new-carrier-bundle": [ {"slug": "llc-formation", "name": "LLC Formation", "wait_for": None}, {"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"}, {"slug": "dot-registration", "name": "USDOT Registration", "wait_for": "ein-application"}, {"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration"}, {"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": "dot-registration"}, {"slug": "mcs150-update", "name": "MCS-150 Filing", "wait_for": "dot-registration"}, {"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration"}, {"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration"}, ], "entity-upgrade-bundle": [ {"slug": "llc-formation", "name": "LLC Formation", "wait_for": None}, {"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"}, {"slug": "mcs150-update", "name": "MCS-150 Update (New Entity)", "wait_for": "ein-application"}, {"slug": "mc-authority", "name": "Authority Transfer", "wait_for": "mcs150-update"}, {"slug": "boc3-filing", "name": "BOC-3 Update", "wait_for": "mcs150-update"}, ], "dot-full-compliance": [ {"slug": "mcs150-update", "name": "MCS-150 Update", "wait_for": None}, {"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": None}, {"slug": "ucr-registration", "name": "UCR Registration", "wait_for": None}, {"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": None}, {"slug": "dot-audit-prep", "name": "Audit Preparation", "wait_for": None}, ], } def check_pipelines(): """Check all active pipeline orders and advance where possible.""" conn = psycopg2.connect(DB_URL) cur = conn.cursor() # Find batch orders that have pipeline-eligible slugs for pipeline_slug, steps in PIPELINES.items(): cur.execute(""" SELECT DISTINCT batch_id FROM compliance_orders WHERE batch_id IS NOT NULL AND payment_status = 'paid' AND service_slug = %s AND (intake_data->>'pipeline_complete') IS NULL """, (steps[0]["slug"],)) for (batch_id,) in cur.fetchall(): try: advance_pipeline(conn, batch_id, pipeline_slug, steps) except Exception as exc: LOG.error("[pipeline] Error advancing %s: %s", batch_id, exc) conn.rollback() conn.close() def advance_pipeline(conn, batch_id: str, pipeline_slug: str, steps: list): """Check and advance a specific pipeline.""" cur = conn.cursor() # Get all orders in this batch cur.execute(""" SELECT order_number, service_slug, payment_status, intake_data FROM compliance_orders WHERE batch_id = %s ORDER BY created_at """, (batch_id,)) orders = {row[1]: {"order_number": row[0], "status": row[2], "intake_data": row[3] or {}} for row in cur.fetchall()} # Check each step for step in steps: slug = step["slug"] wait_for = step["wait_for"] # Skip if this service isn't in the batch if slug not in orders: continue order = orders[slug] intake = order["intake_data"] if isinstance(intake, str): intake = json.loads(intake) # Check if already completed step_status = intake.get("pipeline_step_status", "pending") if step_status == "completed": continue # Check if dependency is met if wait_for and wait_for in orders: dep_intake = orders[wait_for]["intake_data"] if isinstance(dep_intake, str): dep_intake = json.loads(dep_intake) dep_status = dep_intake.get("pipeline_step_status", "pending") if dep_status != "completed": LOG.debug("[pipeline] %s waiting for %s (status: %s)", slug, wait_for, dep_status) continue # Check if step needs dispatching if step_status == "pending": # Mark as dispatched and trigger the service handler LOG.info("[pipeline] Dispatching %s for batch %s (order %s)", slug, batch_id, order["order_number"]) # Copy forward data from completed dependencies forward_data = {} if wait_for and wait_for in orders: dep_intake = orders[wait_for]["intake_data"] if isinstance(dep_intake, str): dep_intake = json.loads(dep_intake) # Forward entity name, DOT#, EIN, etc. for key in ["entity_name", "dot_number", "mc_number", "ein", "formation_state", "entity_number", "formation_date"]: if dep_intake.get(key): forward_data[key] = dep_intake[key] # Update order with forwarded data + mark as dispatched cur.execute(""" UPDATE compliance_orders SET intake_data = intake_data || %s::jsonb WHERE order_number = %s """, ( json.dumps({**forward_data, "pipeline_step_status": "dispatched"}), order["order_number"], )) conn.commit() # Dispatch to job server _dispatch_service(order["order_number"], slug) # Check if all steps are complete all_complete = True for step in steps: if step["slug"] in orders: intake = orders[step["slug"]]["intake_data"] if isinstance(intake, str): intake = json.loads(intake) if intake.get("pipeline_step_status") != "completed": all_complete = False break if all_complete: LOG.info("[pipeline] Pipeline complete for batch %s", batch_id) # Mark the primary order as pipeline complete first_slug = steps[0]["slug"] if first_slug in orders: cur.execute(""" UPDATE compliance_orders SET intake_data = jsonb_set( COALESCE(intake_data, '{}'::jsonb), '{pipeline_complete}', 'true'::jsonb ) WHERE order_number = %s """, (orders[first_slug]["order_number"],)) conn.commit() # Send Telegram notification _notify_pipeline_complete(batch_id, pipeline_slug) def _dispatch_service(order_number: str, service_slug: str): """Dispatch a service to the job server for processing.""" import urllib.request worker_url = os.getenv("WORKER_URL", "http://localhost:8090") try: data = json.dumps({ "action": "process_compliance_service", "order_name": order_number, "service_slug": service_slug, }).encode() req = urllib.request.Request( f"{worker_url}/jobs", data=data, headers={"Content-Type": "application/json"}, ) urllib.request.urlopen(req, timeout=10) LOG.info("[pipeline] Dispatched %s for %s", service_slug, order_number) except Exception as exc: LOG.error("[pipeline] Failed to dispatch %s for %s: %s", service_slug, order_number, exc) def _notify_pipeline_complete(batch_id: str, pipeline_slug: str): """Telegram notification when a pipeline completes.""" try: import urllib.request bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "") chat_id = os.getenv("TELEGRAM_CHAT_ID", "") if not bot_token or not chat_id: return text = f"✅ Pipeline Complete\nBatch: {batch_id}\nType: {pipeline_slug}\nAll steps finished." data = json.dumps({"chat_id": chat_id, "text": text}).encode() req = urllib.request.Request( f"https://api.telegram.org/bot{bot_token}/sendMessage", data=data, headers={"Content-Type": "application/json"}, ) urllib.request.urlopen(req, timeout=10) except Exception: pass def main(): logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", ) LOG.info("Pipeline orchestrator — checking active pipelines") check_pipelines() if __name__ == "__main__": main()