new-site/scripts/workers/pipeline_orchestrator.py
justin e0ba8acc90 add pipeline orchestrator, mailbox 1583 flow, EIN + virtual-mailbox services
- Pipeline orchestrator: chains sequential fulfillment for new carrier bundles
  (formation → EIN → USDOT → MC → BOC-3 → MCS-150 → D&A → UCR)
- Mailbox setup: Anytime Mailbox provisioning with USPS 1583 e-sign + online notarization
- New services: ein-application ($79), virtual-mailbox ($149/yr)
- Registered all new handlers in SERVICE_HANDLERS
- Pipeline cron: every 5 minutes
2026-05-30 22:56:54 -05:00

252 lines
9.6 KiB
Python

"""Pipeline Orchestrator — chains sequential service fulfillment.
For new carrier bundles and entity upgrades, services must be fulfilled
in a specific order because each step depends on the prior one:
1. Form LLC/Corp → wait for state approval → get entity name + number
2. Get EIN → wait for IRS (Mon-Fri 7am-10pm ET) → get EIN
3. File USDOT → uses entity name + EIN → get DOT number
4. File MC Authority → needs active USDOT → get MC/authority
5. File BOC-3 → needs authority → process agent on file
6. File MCS-150 → needs USDOT → biennial update on record
7. Enroll D&A → needs DOT# → program active
8. Register UCR → needs DOT# → registration active
This module manages the pipeline state and advances to the next step
when the prior step completes. Steps can complete via:
- Service handler returning success
- Admin marking a todo as done
- Webhook callback (e.g., Stripe, ERPNext)
- Cron check (polling for completion)
Run via cron every 5 minutes:
python -m scripts.workers.pipeline_orchestrator
"""
from __future__ import annotations
import json
import logging
import os
import sys
from datetime import datetime, timezone
import psycopg2
LOG = logging.getLogger("workers.pipeline_orchestrator")
DB_URL = os.getenv("DATABASE_URL", "")
# Pipeline definitions — order matters
PIPELINES = {
"new-carrier-bundle": [
{"slug": "llc-formation", "name": "LLC Formation", "wait_for": None},
{"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"},
{"slug": "dot-registration", "name": "USDOT Registration", "wait_for": "ein-application"},
{"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration"},
{"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": "dot-registration"},
{"slug": "mcs150-update", "name": "MCS-150 Filing", "wait_for": "dot-registration"},
{"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration"},
{"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration"},
],
"entity-upgrade-bundle": [
{"slug": "llc-formation", "name": "LLC Formation", "wait_for": None},
{"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"},
{"slug": "mcs150-update", "name": "MCS-150 Update (New Entity)", "wait_for": "ein-application"},
{"slug": "mc-authority", "name": "Authority Transfer", "wait_for": "mcs150-update"},
{"slug": "boc3-filing", "name": "BOC-3 Update", "wait_for": "mcs150-update"},
],
"dot-full-compliance": [
{"slug": "mcs150-update", "name": "MCS-150 Update", "wait_for": None},
{"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": None},
{"slug": "ucr-registration", "name": "UCR Registration", "wait_for": None},
{"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": None},
{"slug": "dot-audit-prep", "name": "Audit Preparation", "wait_for": None},
],
}
def check_pipelines():
"""Check all active pipeline orders and advance where possible."""
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Find batch orders that have pipeline-eligible slugs
for pipeline_slug, steps in PIPELINES.items():
cur.execute("""
SELECT DISTINCT batch_id
FROM compliance_orders
WHERE batch_id IS NOT NULL
AND payment_status = 'paid'
AND service_slug = %s
AND (intake_data->>'pipeline_complete') IS NULL
""", (steps[0]["slug"],))
for (batch_id,) in cur.fetchall():
try:
advance_pipeline(conn, batch_id, pipeline_slug, steps)
except Exception as exc:
LOG.error("[pipeline] Error advancing %s: %s", batch_id, exc)
conn.rollback()
conn.close()
def advance_pipeline(conn, batch_id: str, pipeline_slug: str, steps: list):
"""Check and advance a specific pipeline."""
cur = conn.cursor()
# Get all orders in this batch
cur.execute("""
SELECT order_number, service_slug, payment_status, intake_data
FROM compliance_orders
WHERE batch_id = %s
ORDER BY created_at
""", (batch_id,))
orders = {row[1]: {"order_number": row[0], "status": row[2], "intake_data": row[3] or {}} for row in cur.fetchall()}
# Check each step
for step in steps:
slug = step["slug"]
wait_for = step["wait_for"]
# Skip if this service isn't in the batch
if slug not in orders:
continue
order = orders[slug]
intake = order["intake_data"]
if isinstance(intake, str):
intake = json.loads(intake)
# Check if already completed
step_status = intake.get("pipeline_step_status", "pending")
if step_status == "completed":
continue
# Check if dependency is met
if wait_for and wait_for in orders:
dep_intake = orders[wait_for]["intake_data"]
if isinstance(dep_intake, str):
dep_intake = json.loads(dep_intake)
dep_status = dep_intake.get("pipeline_step_status", "pending")
if dep_status != "completed":
LOG.debug("[pipeline] %s waiting for %s (status: %s)", slug, wait_for, dep_status)
continue
# Check if step needs dispatching
if step_status == "pending":
# Mark as dispatched and trigger the service handler
LOG.info("[pipeline] Dispatching %s for batch %s (order %s)",
slug, batch_id, order["order_number"])
# Copy forward data from completed dependencies
forward_data = {}
if wait_for and wait_for in orders:
dep_intake = orders[wait_for]["intake_data"]
if isinstance(dep_intake, str):
dep_intake = json.loads(dep_intake)
# Forward entity name, DOT#, EIN, etc.
for key in ["entity_name", "dot_number", "mc_number", "ein",
"formation_state", "entity_number", "formation_date"]:
if dep_intake.get(key):
forward_data[key] = dep_intake[key]
# Update order with forwarded data + mark as dispatched
cur.execute("""
UPDATE compliance_orders
SET intake_data = intake_data || %s::jsonb
WHERE order_number = %s
""", (
json.dumps({**forward_data, "pipeline_step_status": "dispatched"}),
order["order_number"],
))
conn.commit()
# Dispatch to job server
_dispatch_service(order["order_number"], slug)
# Check if all steps are complete
all_complete = True
for step in steps:
if step["slug"] in orders:
intake = orders[step["slug"]]["intake_data"]
if isinstance(intake, str):
intake = json.loads(intake)
if intake.get("pipeline_step_status") != "completed":
all_complete = False
break
if all_complete:
LOG.info("[pipeline] Pipeline complete for batch %s", batch_id)
# Mark the primary order as pipeline complete
first_slug = steps[0]["slug"]
if first_slug in orders:
cur.execute("""
UPDATE compliance_orders
SET intake_data = jsonb_set(
COALESCE(intake_data, '{}'::jsonb),
'{pipeline_complete}', 'true'::jsonb
)
WHERE order_number = %s
""", (orders[first_slug]["order_number"],))
conn.commit()
# Send Telegram notification
_notify_pipeline_complete(batch_id, pipeline_slug)
def _dispatch_service(order_number: str, service_slug: str):
"""Dispatch a service to the job server for processing."""
import urllib.request
worker_url = os.getenv("WORKER_URL", "http://localhost:8090")
try:
data = json.dumps({
"action": "process_compliance_service",
"order_name": order_number,
"service_slug": service_slug,
}).encode()
req = urllib.request.Request(
f"{worker_url}/jobs",
data=data,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=10)
LOG.info("[pipeline] Dispatched %s for %s", service_slug, order_number)
except Exception as exc:
LOG.error("[pipeline] Failed to dispatch %s for %s: %s", service_slug, order_number, exc)
def _notify_pipeline_complete(batch_id: str, pipeline_slug: str):
"""Telegram notification when a pipeline completes."""
try:
import urllib.request
bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot_token or not chat_id:
return
text = f"✅ Pipeline Complete\nBatch: {batch_id}\nType: {pipeline_slug}\nAll steps finished."
data = json.dumps({"chat_id": chat_id, "text": text}).encode()
req = urllib.request.Request(
f"https://api.telegram.org/bot{bot_token}/sendMessage",
data=data,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=10)
except Exception:
pass
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
LOG.info("Pipeline orchestrator — checking active pipelines")
check_pipelines()
if __name__ == "__main__":
main()