add pipeline orchestrator, mailbox 1583 flow, EIN + virtual-mailbox services

- Pipeline orchestrator: chains sequential fulfillment for new carrier bundles
  (formation → EIN → USDOT → MC → BOC-3 → MCS-150 → D&A → UCR)
- Mailbox setup: Anytime Mailbox provisioning with USPS 1583 e-sign + online notarization
- New services: ein-application ($79), virtual-mailbox ($149/yr)
- Registered all new handlers in SERVICE_HANDLERS
- Pipeline cron: every 5 minutes
This commit is contained in:
justin 2026-05-30 22:56:54 -05:00
parent e1ece093cd
commit e0ba8acc90
5 changed files with 466 additions and 0 deletions

View file

@ -0,0 +1,252 @@
"""Pipeline Orchestrator — chains sequential service fulfillment.
For new carrier bundles and entity upgrades, services must be fulfilled
in a specific order because each step depends on the prior one:
1. Form LLC/Corp wait for state approval get entity name + number
2. Get EIN wait for IRS (Mon-Fri 7am-10pm ET) get EIN
3. File USDOT uses entity name + EIN get DOT number
4. File MC Authority needs active USDOT get MC/authority
5. File BOC-3 needs authority process agent on file
6. File MCS-150 needs USDOT biennial update on record
7. Enroll D&A needs DOT# → program active
8. Register UCR needs DOT# → registration active
This module manages the pipeline state and advances to the next step
when the prior step completes. Steps can complete via:
- Service handler returning success
- Admin marking a todo as done
- Webhook callback (e.g., Stripe, ERPNext)
- Cron check (polling for completion)
Run via cron every 5 minutes:
python -m scripts.workers.pipeline_orchestrator
"""
from __future__ import annotations
import json
import logging
import os
import sys
from datetime import datetime, timezone
import psycopg2
LOG = logging.getLogger("workers.pipeline_orchestrator")
DB_URL = os.getenv("DATABASE_URL", "")
# Pipeline definitions — order matters
PIPELINES = {
"new-carrier-bundle": [
{"slug": "llc-formation", "name": "LLC Formation", "wait_for": None},
{"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"},
{"slug": "dot-registration", "name": "USDOT Registration", "wait_for": "ein-application"},
{"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration"},
{"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": "dot-registration"},
{"slug": "mcs150-update", "name": "MCS-150 Filing", "wait_for": "dot-registration"},
{"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration"},
{"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration"},
],
"entity-upgrade-bundle": [
{"slug": "llc-formation", "name": "LLC Formation", "wait_for": None},
{"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"},
{"slug": "mcs150-update", "name": "MCS-150 Update (New Entity)", "wait_for": "ein-application"},
{"slug": "mc-authority", "name": "Authority Transfer", "wait_for": "mcs150-update"},
{"slug": "boc3-filing", "name": "BOC-3 Update", "wait_for": "mcs150-update"},
],
"dot-full-compliance": [
{"slug": "mcs150-update", "name": "MCS-150 Update", "wait_for": None},
{"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": None},
{"slug": "ucr-registration", "name": "UCR Registration", "wait_for": None},
{"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": None},
{"slug": "dot-audit-prep", "name": "Audit Preparation", "wait_for": None},
],
}
def check_pipelines():
"""Check all active pipeline orders and advance where possible."""
conn = psycopg2.connect(DB_URL)
cur = conn.cursor()
# Find batch orders that have pipeline-eligible slugs
for pipeline_slug, steps in PIPELINES.items():
cur.execute("""
SELECT DISTINCT batch_id
FROM compliance_orders
WHERE batch_id IS NOT NULL
AND payment_status = 'paid'
AND service_slug = %s
AND (intake_data->>'pipeline_complete') IS NULL
""", (steps[0]["slug"],))
for (batch_id,) in cur.fetchall():
try:
advance_pipeline(conn, batch_id, pipeline_slug, steps)
except Exception as exc:
LOG.error("[pipeline] Error advancing %s: %s", batch_id, exc)
conn.rollback()
conn.close()
def advance_pipeline(conn, batch_id: str, pipeline_slug: str, steps: list):
"""Check and advance a specific pipeline."""
cur = conn.cursor()
# Get all orders in this batch
cur.execute("""
SELECT order_number, service_slug, payment_status, intake_data
FROM compliance_orders
WHERE batch_id = %s
ORDER BY created_at
""", (batch_id,))
orders = {row[1]: {"order_number": row[0], "status": row[2], "intake_data": row[3] or {}} for row in cur.fetchall()}
# Check each step
for step in steps:
slug = step["slug"]
wait_for = step["wait_for"]
# Skip if this service isn't in the batch
if slug not in orders:
continue
order = orders[slug]
intake = order["intake_data"]
if isinstance(intake, str):
intake = json.loads(intake)
# Check if already completed
step_status = intake.get("pipeline_step_status", "pending")
if step_status == "completed":
continue
# Check if dependency is met
if wait_for and wait_for in orders:
dep_intake = orders[wait_for]["intake_data"]
if isinstance(dep_intake, str):
dep_intake = json.loads(dep_intake)
dep_status = dep_intake.get("pipeline_step_status", "pending")
if dep_status != "completed":
LOG.debug("[pipeline] %s waiting for %s (status: %s)", slug, wait_for, dep_status)
continue
# Check if step needs dispatching
if step_status == "pending":
# Mark as dispatched and trigger the service handler
LOG.info("[pipeline] Dispatching %s for batch %s (order %s)",
slug, batch_id, order["order_number"])
# Copy forward data from completed dependencies
forward_data = {}
if wait_for and wait_for in orders:
dep_intake = orders[wait_for]["intake_data"]
if isinstance(dep_intake, str):
dep_intake = json.loads(dep_intake)
# Forward entity name, DOT#, EIN, etc.
for key in ["entity_name", "dot_number", "mc_number", "ein",
"formation_state", "entity_number", "formation_date"]:
if dep_intake.get(key):
forward_data[key] = dep_intake[key]
# Update order with forwarded data + mark as dispatched
cur.execute("""
UPDATE compliance_orders
SET intake_data = intake_data || %s::jsonb
WHERE order_number = %s
""", (
json.dumps({**forward_data, "pipeline_step_status": "dispatched"}),
order["order_number"],
))
conn.commit()
# Dispatch to job server
_dispatch_service(order["order_number"], slug)
# Check if all steps are complete
all_complete = True
for step in steps:
if step["slug"] in orders:
intake = orders[step["slug"]]["intake_data"]
if isinstance(intake, str):
intake = json.loads(intake)
if intake.get("pipeline_step_status") != "completed":
all_complete = False
break
if all_complete:
LOG.info("[pipeline] Pipeline complete for batch %s", batch_id)
# Mark the primary order as pipeline complete
first_slug = steps[0]["slug"]
if first_slug in orders:
cur.execute("""
UPDATE compliance_orders
SET intake_data = jsonb_set(
COALESCE(intake_data, '{}'::jsonb),
'{pipeline_complete}', 'true'::jsonb
)
WHERE order_number = %s
""", (orders[first_slug]["order_number"],))
conn.commit()
# Send Telegram notification
_notify_pipeline_complete(batch_id, pipeline_slug)
def _dispatch_service(order_number: str, service_slug: str):
"""Dispatch a service to the job server for processing."""
import urllib.request
worker_url = os.getenv("WORKER_URL", "http://localhost:8090")
try:
data = json.dumps({
"action": "process_compliance_service",
"order_name": order_number,
"service_slug": service_slug,
}).encode()
req = urllib.request.Request(
f"{worker_url}/jobs",
data=data,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=10)
LOG.info("[pipeline] Dispatched %s for %s", service_slug, order_number)
except Exception as exc:
LOG.error("[pipeline] Failed to dispatch %s for %s: %s", service_slug, order_number, exc)
def _notify_pipeline_complete(batch_id: str, pipeline_slug: str):
"""Telegram notification when a pipeline completes."""
try:
import urllib.request
bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot_token or not chat_id:
return
text = f"✅ Pipeline Complete\nBatch: {batch_id}\nType: {pipeline_slug}\nAll steps finished."
data = json.dumps({"chat_id": chat_id, "text": text}).encode()
req = urllib.request.Request(
f"https://api.telegram.org/bot{bot_token}/sendMessage",
data=data,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=10)
except Exception:
pass
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
LOG.info("Pipeline orchestrator — checking active pipelines")
check_pipelines()
if __name__ == "__main__":
main()

View file

@ -50,6 +50,9 @@ from .mcs150_update import MCS150UpdateHandler
from .boc3_filing import BOC3FilingHandler
# State-level trucking compliance (IRP, IFTA, weight taxes, MCP, etc.)
from .state_trucking import StateTruckingHandler
# EIN application + virtual mailbox
from .ein_application import EINApplicationHandler
from .mailbox_setup import MailboxSetupHandler
SERVICE_HANDLERS: dict[str, type] = {
"flsa-audit": FLSAAuditHandler,
@ -105,6 +108,14 @@ SERVICE_HANDLERS: dict[str, type] = {
"dot-drug-alcohol": MCS150UpdateHandler, # admin-assisted (partner enrollment)
"dot-audit-prep": MCS150UpdateHandler, # admin-assisted (document prep)
"dot-full-compliance": MCS150UpdateHandler, # fans out to individual services
"usdot-reactivation": MCS150UpdateHandler, # same FMCSA submission flow
"emergency-temporary-authority": MCS150UpdateHandler, # ask.fmcsa.dot.gov type 308
"ein-application": EINApplicationHandler,
"virtual-mailbox": MailboxSetupHandler,
"annual-report-filing": MCS150UpdateHandler, # admin-assisted
"registered-agent": MCS150UpdateHandler, # admin-assisted (NWRA/CorpTools)
"entity-reinstatement": MCS150UpdateHandler, # admin-assisted
"entity-upgrade-bundle": MCS150UpdateHandler, # pipeline orchestrator manages sequence
# ── State-Level Trucking Compliance ───────────────────────────────
"irp-registration": StateTruckingHandler,
"ifta-application": StateTruckingHandler,

View file

@ -0,0 +1,182 @@
"""Anytime Mailbox Setup — virtual mailbox provisioning for out-of-state formations.
When a carrier forms in Wyoming but operates elsewhere, they need a Wyoming
mailing address for official correspondence. We use Anytime Mailbox for this.
Setup requires USPS Form 1583 notarized authorization for mail receipt.
The customer's photo ID (already collected during intake) is used for the 1583.
Flow:
1. Customer orders Wyoming formation package
2. LLC is formed in Wyoming
3. We open an Anytime Mailbox in Wyoming under the LLC name
4. Generate USPS Form 1583 pre-filled with customer + LLC info
5. Customer e-signs the 1583 via our portal
6. Schedule online notarization session (same service as CRTC BITS)
7. Submit notarized 1583 to Anytime Mailbox
8. Mailbox active address provided to customer
Service slug: virtual-mailbox
Price: $149/yr
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime
LOG = logging.getLogger("workers.services.mailbox_setup")
class MailboxSetupHandler:
"""Handle virtual mailbox setup orders."""
SERVICE_SLUG = "virtual-mailbox"
SERVICE_NAME = "Virtual Mailbox (Anytime Mailbox)"
async def process(self, order_data: dict) -> list[str]:
"""Entry point called by job_server."""
order_number = order_data.get("order_number", order_data.get("name", ""))
return self.handle(order_data, order_number)
def handle(self, order_data: dict, order_number: str) -> list[str]:
"""Process a virtual mailbox setup order."""
LOG.info("[%s] Processing virtual mailbox setup", order_number)
intake = order_data.get("intake_data") or {}
if isinstance(intake, str):
intake = json.loads(intake)
entity_name = intake.get("entity_name", order_data.get("customer_name", ""))
customer_email = order_data.get("customer_email", "")
formation_state = intake.get("formation_state", "WY")
customer_name = intake.get("signer_name", order_data.get("customer_name", ""))
# Step 1: Create e-sign record for USPS Form 1583
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
cur = conn.cursor()
cur.execute("""
INSERT INTO esign_records (
order_number, document_type, document_title, entity_name,
document_metadata, requires_perjury, status,
expires_at
) VALUES (%s, %s, %s, %s, %s, FALSE, 'pending', NOW() + interval '14 days')
ON CONFLICT (order_number, document_type)
WHERE status IN ('pending', 'signed') DO NOTHING
""", (
order_number,
"usps-1583",
"USPS Form 1583 — Authorization to Receive Mail",
entity_name,
json.dumps({
"form_type": "usps-1583",
"mailbox_state": formation_state,
"entity_name": entity_name,
"customer_name": customer_name,
"requires_notarization": True,
"notarization_service": "online", # Same as CRTC BITS
"instructions": (
"This form authorizes Anytime Mailbox to receive mail on behalf of "
f"{entity_name}. After you sign, we will schedule a brief online "
"notarization session (5-minute video call with a notary). Your photo "
"ID on file will be used for identity verification."
),
}),
))
conn.commit()
conn.close()
LOG.info("[%s] E-sign record created for USPS 1583", order_number)
except Exception as exc:
LOG.error("[%s] E-sign 1583 setup failed: %s", order_number, exc)
# Step 2: Send e-sign link to customer
self._send_1583_email(order_number, entity_name, customer_email, customer_name)
# Step 3: Create admin todo
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
with conn.cursor() as cur:
cur.execute("""
INSERT INTO admin_todos (
title, category, priority, order_number, service_slug,
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""", (
f"Mailbox Setup — {entity_name} ({formation_state})",
"provisioning",
"normal",
order_number,
self.SERVICE_SLUG,
f"Set up Anytime Mailbox for {entity_name} in {formation_state}.\n\n"
f"Steps:\n"
f"1. Customer e-signs USPS Form 1583 (link sent)\n"
f"2. Schedule online notarization session\n"
f"3. Submit notarized 1583 to Anytime Mailbox\n"
f"4. Activate mailbox, provide address to customer\n\n"
f"Customer: {customer_email}\n"
f"Photo ID: on file in MinIO (from intake)\n"
f"Notarization: use same service as CRTC BITS",
json.dumps({
"order_number": order_number,
"entity_name": entity_name,
"formation_state": formation_state,
"customer_email": customer_email,
}),
))
conn.commit()
conn.close()
except Exception as exc:
LOG.error("[%s] Failed to create mailbox todo: %s", order_number, exc)
return []
def _send_1583_email(self, order_number, entity_name, customer_email, customer_name):
"""Send e-sign link for USPS Form 1583."""
if not customer_email:
return
try:
import smtplib
import jwt
from email.mime.text import MIMEText
secret = os.environ.get("JWT_SECRET", os.environ.get("ADMIN_JWT_SECRET", ""))
token = jwt.encode(
{"order_id": order_number, "order_type": "usps-1583", "email": customer_email},
secret, algorithm="HS256",
)
domain = os.environ.get("DOMAIN", "performancewest.net")
esign_url = f"https://{domain}/portal/esign?token={token}"
first = customer_name.split()[0] if customer_name else "there"
body = (
f"Hi {first},\n\n"
f"Your virtual mailbox for {entity_name} is being set up. "
f"To activate it, we need your authorization on USPS Form 1583.\n\n"
f"This is a quick process:\n"
f"1. Click the link below to review and sign the form\n"
f"2. We'll schedule a brief 5-minute video call with a notary\n"
f"3. Your photo ID on file will be used for verification\n"
f"4. Once notarized, your mailbox will be active within 24 hours\n\n"
f"Sign here: {esign_url}\n\n"
f"This link expires in 14 days.\n\n"
f"Questions? Call (888) 411-0383.\n\n"
f"Performance West Inc.\n"
)
msg = MIMEText(body)
msg["Subject"] = f"Action Required: Sign USPS Form 1583 — {entity_name} Mailbox"
msg["From"] = "noreply@performancewest.net"
msg["To"] = customer_email
with smtplib.SMTP("localhost", 25) as s:
s.sendmail(msg["From"], [customer_email], msg.as_string())
LOG.info("[%s] 1583 e-sign email sent to %s", order_number, customer_email)
except Exception as exc:
LOG.warning("[%s] Failed to send 1583 email: %s", order_number, exc)