From e0ba8acc90d0a3f21bfc59d2515a1b3c8130efa4 Mon Sep 17 00:00:00 2001 From: justin Date: Sat, 30 May 2026 22:56:54 -0500 Subject: [PATCH] add pipeline orchestrator, mailbox 1583 flow, EIN + virtual-mailbox services MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pipeline orchestrator: chains sequential fulfillment for new carrier bundles (formation → EIN → USDOT → MC → BOC-3 → MCS-150 → D&A → UCR) - Mailbox setup: Anytime Mailbox provisioning with USPS 1583 e-sign + online notarization - New services: ein-application ($79), virtual-mailbox ($149/yr) - Registered all new handlers in SERVICE_HANDLERS - Pipeline cron: every 5 minutes --- api/src/routes/compliance-orders.ts | 12 + .../roles/worker-crons/defaults/main.yml | 9 + scripts/workers/pipeline_orchestrator.py | 252 ++++++++++++++++++ scripts/workers/services/__init__.py | 11 + scripts/workers/services/mailbox_setup.py | 182 +++++++++++++ 5 files changed, 466 insertions(+) create mode 100644 scripts/workers/pipeline_orchestrator.py create mode 100644 scripts/workers/services/mailbox_setup.py diff --git a/api/src/routes/compliance-orders.ts b/api/src/routes/compliance-orders.ts index 832118c..e4e3d42 100644 --- a/api/src/routes/compliance-orders.ts +++ b/api/src/routes/compliance-orders.ts @@ -380,6 +380,18 @@ const COMPLIANCE_SERVICES: Record< erpnext_item: "ENTITY-REINSTATEMENT", discountable: true, }, + "virtual-mailbox": { + name: "Virtual Mailbox (1 Year)", + price_cents: 14900, + erpnext_item: "VIRTUAL-MAILBOX", + discountable: true, + }, + "ein-application": { + name: "EIN Application (IRS SS-4)", + price_cents: 7900, + erpnext_item: "EIN-APPLICATION", + discountable: true, + }, "entity-upgrade-bundle": { name: "Entity Upgrade Package (Sole Prop → LLC)", price_cents: 59900, diff --git a/infra/ansible/roles/worker-crons/defaults/main.yml b/infra/ansible/roles/worker-crons/defaults/main.yml index f24006d..332c08c 100644 --- a/infra/ansible/roles/worker-crons/defaults/main.yml +++ b/infra/ansible/roles/worker-crons/defaults/main.yml @@ -175,6 +175,15 @@ worker_crons: on_calendar: "*-*-* 12:00:00 UTC" persistent: true + # Pipeline orchestrator — every 5 min. + # Advances sequential service fulfillment (new carrier bundles, entity upgrades). + # Checks dependencies, forwards data between steps, dispatches next service. + - name: pw-pipeline-orchestrator + description: Advance multi-step compliance pipelines + module: scripts.workers.pipeline_orchestrator + on_calendar: "*-*-* *:0/5:00 UTC" + persistent: false + # FMCSA email monitor — every 15 min (offset :05), checks for filing confirmations. - name: pw-fmcsa-email-monitor description: Check for FMCSA filing confirmation emails diff --git a/scripts/workers/pipeline_orchestrator.py b/scripts/workers/pipeline_orchestrator.py new file mode 100644 index 0000000..4a4558a --- /dev/null +++ b/scripts/workers/pipeline_orchestrator.py @@ -0,0 +1,252 @@ +"""Pipeline Orchestrator — chains sequential service fulfillment. + +For new carrier bundles and entity upgrades, services must be fulfilled +in a specific order because each step depends on the prior one: + + 1. Form LLC/Corp → wait for state approval → get entity name + number + 2. Get EIN → wait for IRS (Mon-Fri 7am-10pm ET) → get EIN + 3. File USDOT → uses entity name + EIN → get DOT number + 4. File MC Authority → needs active USDOT → get MC/authority + 5. File BOC-3 → needs authority → process agent on file + 6. File MCS-150 → needs USDOT → biennial update on record + 7. Enroll D&A → needs DOT# → program active + 8. Register UCR → needs DOT# → registration active + +This module manages the pipeline state and advances to the next step +when the prior step completes. Steps can complete via: + - Service handler returning success + - Admin marking a todo as done + - Webhook callback (e.g., Stripe, ERPNext) + - Cron check (polling for completion) + +Run via cron every 5 minutes: + python -m scripts.workers.pipeline_orchestrator +""" + +from __future__ import annotations + +import json +import logging +import os +import sys +from datetime import datetime, timezone + +import psycopg2 + +LOG = logging.getLogger("workers.pipeline_orchestrator") + +DB_URL = os.getenv("DATABASE_URL", "") + +# Pipeline definitions — order matters +PIPELINES = { + "new-carrier-bundle": [ + {"slug": "llc-formation", "name": "LLC Formation", "wait_for": None}, + {"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"}, + {"slug": "dot-registration", "name": "USDOT Registration", "wait_for": "ein-application"}, + {"slug": "mc-authority", "name": "Operating Authority", "wait_for": "dot-registration"}, + {"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": "dot-registration"}, + {"slug": "mcs150-update", "name": "MCS-150 Filing", "wait_for": "dot-registration"}, + {"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": "dot-registration"}, + {"slug": "ucr-registration", "name": "UCR Registration", "wait_for": "dot-registration"}, + ], + "entity-upgrade-bundle": [ + {"slug": "llc-formation", "name": "LLC Formation", "wait_for": None}, + {"slug": "ein-application", "name": "EIN Application", "wait_for": "llc-formation"}, + {"slug": "mcs150-update", "name": "MCS-150 Update (New Entity)", "wait_for": "ein-application"}, + {"slug": "mc-authority", "name": "Authority Transfer", "wait_for": "mcs150-update"}, + {"slug": "boc3-filing", "name": "BOC-3 Update", "wait_for": "mcs150-update"}, + ], + "dot-full-compliance": [ + {"slug": "mcs150-update", "name": "MCS-150 Update", "wait_for": None}, + {"slug": "boc3-filing", "name": "BOC-3 Filing", "wait_for": None}, + {"slug": "ucr-registration", "name": "UCR Registration", "wait_for": None}, + {"slug": "dot-drug-alcohol", "name": "Drug & Alcohol Program", "wait_for": None}, + {"slug": "dot-audit-prep", "name": "Audit Preparation", "wait_for": None}, + ], +} + + +def check_pipelines(): + """Check all active pipeline orders and advance where possible.""" + conn = psycopg2.connect(DB_URL) + cur = conn.cursor() + + # Find batch orders that have pipeline-eligible slugs + for pipeline_slug, steps in PIPELINES.items(): + cur.execute(""" + SELECT DISTINCT batch_id + FROM compliance_orders + WHERE batch_id IS NOT NULL + AND payment_status = 'paid' + AND service_slug = %s + AND (intake_data->>'pipeline_complete') IS NULL + """, (steps[0]["slug"],)) + + for (batch_id,) in cur.fetchall(): + try: + advance_pipeline(conn, batch_id, pipeline_slug, steps) + except Exception as exc: + LOG.error("[pipeline] Error advancing %s: %s", batch_id, exc) + conn.rollback() + + conn.close() + + +def advance_pipeline(conn, batch_id: str, pipeline_slug: str, steps: list): + """Check and advance a specific pipeline.""" + cur = conn.cursor() + + # Get all orders in this batch + cur.execute(""" + SELECT order_number, service_slug, payment_status, intake_data + FROM compliance_orders + WHERE batch_id = %s + ORDER BY created_at + """, (batch_id,)) + orders = {row[1]: {"order_number": row[0], "status": row[2], "intake_data": row[3] or {}} for row in cur.fetchall()} + + # Check each step + for step in steps: + slug = step["slug"] + wait_for = step["wait_for"] + + # Skip if this service isn't in the batch + if slug not in orders: + continue + + order = orders[slug] + intake = order["intake_data"] + if isinstance(intake, str): + intake = json.loads(intake) + + # Check if already completed + step_status = intake.get("pipeline_step_status", "pending") + if step_status == "completed": + continue + + # Check if dependency is met + if wait_for and wait_for in orders: + dep_intake = orders[wait_for]["intake_data"] + if isinstance(dep_intake, str): + dep_intake = json.loads(dep_intake) + dep_status = dep_intake.get("pipeline_step_status", "pending") + if dep_status != "completed": + LOG.debug("[pipeline] %s waiting for %s (status: %s)", slug, wait_for, dep_status) + continue + + # Check if step needs dispatching + if step_status == "pending": + # Mark as dispatched and trigger the service handler + LOG.info("[pipeline] Dispatching %s for batch %s (order %s)", + slug, batch_id, order["order_number"]) + + # Copy forward data from completed dependencies + forward_data = {} + if wait_for and wait_for in orders: + dep_intake = orders[wait_for]["intake_data"] + if isinstance(dep_intake, str): + dep_intake = json.loads(dep_intake) + # Forward entity name, DOT#, EIN, etc. + for key in ["entity_name", "dot_number", "mc_number", "ein", + "formation_state", "entity_number", "formation_date"]: + if dep_intake.get(key): + forward_data[key] = dep_intake[key] + + # Update order with forwarded data + mark as dispatched + cur.execute(""" + UPDATE compliance_orders + SET intake_data = intake_data || %s::jsonb + WHERE order_number = %s + """, ( + json.dumps({**forward_data, "pipeline_step_status": "dispatched"}), + order["order_number"], + )) + conn.commit() + + # Dispatch to job server + _dispatch_service(order["order_number"], slug) + + # Check if all steps are complete + all_complete = True + for step in steps: + if step["slug"] in orders: + intake = orders[step["slug"]]["intake_data"] + if isinstance(intake, str): + intake = json.loads(intake) + if intake.get("pipeline_step_status") != "completed": + all_complete = False + break + + if all_complete: + LOG.info("[pipeline] Pipeline complete for batch %s", batch_id) + # Mark the primary order as pipeline complete + first_slug = steps[0]["slug"] + if first_slug in orders: + cur.execute(""" + UPDATE compliance_orders + SET intake_data = jsonb_set( + COALESCE(intake_data, '{}'::jsonb), + '{pipeline_complete}', 'true'::jsonb + ) + WHERE order_number = %s + """, (orders[first_slug]["order_number"],)) + conn.commit() + + # Send Telegram notification + _notify_pipeline_complete(batch_id, pipeline_slug) + + +def _dispatch_service(order_number: str, service_slug: str): + """Dispatch a service to the job server for processing.""" + import urllib.request + + worker_url = os.getenv("WORKER_URL", "http://localhost:8090") + try: + data = json.dumps({ + "action": "process_compliance_service", + "order_name": order_number, + "service_slug": service_slug, + }).encode() + req = urllib.request.Request( + f"{worker_url}/jobs", + data=data, + headers={"Content-Type": "application/json"}, + ) + urllib.request.urlopen(req, timeout=10) + LOG.info("[pipeline] Dispatched %s for %s", service_slug, order_number) + except Exception as exc: + LOG.error("[pipeline] Failed to dispatch %s for %s: %s", service_slug, order_number, exc) + + +def _notify_pipeline_complete(batch_id: str, pipeline_slug: str): + """Telegram notification when a pipeline completes.""" + try: + import urllib.request + bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "") + chat_id = os.getenv("TELEGRAM_CHAT_ID", "") + if not bot_token or not chat_id: + return + + text = f"✅ Pipeline Complete\nBatch: {batch_id}\nType: {pipeline_slug}\nAll steps finished." + data = json.dumps({"chat_id": chat_id, "text": text}).encode() + req = urllib.request.Request( + f"https://api.telegram.org/bot{bot_token}/sendMessage", + data=data, + headers={"Content-Type": "application/json"}, + ) + urllib.request.urlopen(req, timeout=10) + except Exception: + pass + + +def main(): + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", + ) + LOG.info("Pipeline orchestrator — checking active pipelines") + check_pipelines() + + +if __name__ == "__main__": + main() diff --git a/scripts/workers/services/__init__.py b/scripts/workers/services/__init__.py index 7bd2f99..e28d759 100644 --- a/scripts/workers/services/__init__.py +++ b/scripts/workers/services/__init__.py @@ -50,6 +50,9 @@ from .mcs150_update import MCS150UpdateHandler from .boc3_filing import BOC3FilingHandler # State-level trucking compliance (IRP, IFTA, weight taxes, MCP, etc.) from .state_trucking import StateTruckingHandler +# EIN application + virtual mailbox +from .ein_application import EINApplicationHandler +from .mailbox_setup import MailboxSetupHandler SERVICE_HANDLERS: dict[str, type] = { "flsa-audit": FLSAAuditHandler, @@ -105,6 +108,14 @@ SERVICE_HANDLERS: dict[str, type] = { "dot-drug-alcohol": MCS150UpdateHandler, # admin-assisted (partner enrollment) "dot-audit-prep": MCS150UpdateHandler, # admin-assisted (document prep) "dot-full-compliance": MCS150UpdateHandler, # fans out to individual services + "usdot-reactivation": MCS150UpdateHandler, # same FMCSA submission flow + "emergency-temporary-authority": MCS150UpdateHandler, # ask.fmcsa.dot.gov type 308 + "ein-application": EINApplicationHandler, + "virtual-mailbox": MailboxSetupHandler, + "annual-report-filing": MCS150UpdateHandler, # admin-assisted + "registered-agent": MCS150UpdateHandler, # admin-assisted (NWRA/CorpTools) + "entity-reinstatement": MCS150UpdateHandler, # admin-assisted + "entity-upgrade-bundle": MCS150UpdateHandler, # pipeline orchestrator manages sequence # ── State-Level Trucking Compliance ─────────────────────────────── "irp-registration": StateTruckingHandler, "ifta-application": StateTruckingHandler, diff --git a/scripts/workers/services/mailbox_setup.py b/scripts/workers/services/mailbox_setup.py new file mode 100644 index 0000000..a6690d4 --- /dev/null +++ b/scripts/workers/services/mailbox_setup.py @@ -0,0 +1,182 @@ +"""Anytime Mailbox Setup — virtual mailbox provisioning for out-of-state formations. + +When a carrier forms in Wyoming but operates elsewhere, they need a Wyoming +mailing address for official correspondence. We use Anytime Mailbox for this. + +Setup requires USPS Form 1583 — notarized authorization for mail receipt. +The customer's photo ID (already collected during intake) is used for the 1583. + +Flow: + 1. Customer orders Wyoming formation package + 2. LLC is formed in Wyoming + 3. We open an Anytime Mailbox in Wyoming under the LLC name + 4. Generate USPS Form 1583 pre-filled with customer + LLC info + 5. Customer e-signs the 1583 via our portal + 6. Schedule online notarization session (same service as CRTC BITS) + 7. Submit notarized 1583 to Anytime Mailbox + 8. Mailbox active — address provided to customer + +Service slug: virtual-mailbox +Price: $149/yr +""" + +from __future__ import annotations + +import json +import logging +import os +from datetime import datetime + +LOG = logging.getLogger("workers.services.mailbox_setup") + + +class MailboxSetupHandler: + """Handle virtual mailbox setup orders.""" + + SERVICE_SLUG = "virtual-mailbox" + SERVICE_NAME = "Virtual Mailbox (Anytime Mailbox)" + + async def process(self, order_data: dict) -> list[str]: + """Entry point called by job_server.""" + order_number = order_data.get("order_number", order_data.get("name", "")) + return self.handle(order_data, order_number) + + def handle(self, order_data: dict, order_number: str) -> list[str]: + """Process a virtual mailbox setup order.""" + LOG.info("[%s] Processing virtual mailbox setup", order_number) + + intake = order_data.get("intake_data") or {} + if isinstance(intake, str): + intake = json.loads(intake) + + entity_name = intake.get("entity_name", order_data.get("customer_name", "")) + customer_email = order_data.get("customer_email", "") + formation_state = intake.get("formation_state", "WY") + customer_name = intake.get("signer_name", order_data.get("customer_name", "")) + + # Step 1: Create e-sign record for USPS Form 1583 + try: + import psycopg2 + conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + cur = conn.cursor() + + cur.execute(""" + INSERT INTO esign_records ( + order_number, document_type, document_title, entity_name, + document_metadata, requires_perjury, status, + expires_at + ) VALUES (%s, %s, %s, %s, %s, FALSE, 'pending', NOW() + interval '14 days') + ON CONFLICT (order_number, document_type) + WHERE status IN ('pending', 'signed') DO NOTHING + """, ( + order_number, + "usps-1583", + "USPS Form 1583 — Authorization to Receive Mail", + entity_name, + json.dumps({ + "form_type": "usps-1583", + "mailbox_state": formation_state, + "entity_name": entity_name, + "customer_name": customer_name, + "requires_notarization": True, + "notarization_service": "online", # Same as CRTC BITS + "instructions": ( + "This form authorizes Anytime Mailbox to receive mail on behalf of " + f"{entity_name}. After you sign, we will schedule a brief online " + "notarization session (5-minute video call with a notary). Your photo " + "ID on file will be used for identity verification." + ), + }), + )) + conn.commit() + conn.close() + LOG.info("[%s] E-sign record created for USPS 1583", order_number) + except Exception as exc: + LOG.error("[%s] E-sign 1583 setup failed: %s", order_number, exc) + + # Step 2: Send e-sign link to customer + self._send_1583_email(order_number, entity_name, customer_email, customer_name) + + # Step 3: Create admin todo + try: + import psycopg2 + conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO admin_todos ( + title, category, priority, order_number, service_slug, + description, data, status + ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') + """, ( + f"Mailbox Setup — {entity_name} ({formation_state})", + "provisioning", + "normal", + order_number, + self.SERVICE_SLUG, + f"Set up Anytime Mailbox for {entity_name} in {formation_state}.\n\n" + f"Steps:\n" + f"1. Customer e-signs USPS Form 1583 (link sent)\n" + f"2. Schedule online notarization session\n" + f"3. Submit notarized 1583 to Anytime Mailbox\n" + f"4. Activate mailbox, provide address to customer\n\n" + f"Customer: {customer_email}\n" + f"Photo ID: on file in MinIO (from intake)\n" + f"Notarization: use same service as CRTC BITS", + json.dumps({ + "order_number": order_number, + "entity_name": entity_name, + "formation_state": formation_state, + "customer_email": customer_email, + }), + )) + conn.commit() + conn.close() + except Exception as exc: + LOG.error("[%s] Failed to create mailbox todo: %s", order_number, exc) + + return [] + + def _send_1583_email(self, order_number, entity_name, customer_email, customer_name): + """Send e-sign link for USPS Form 1583.""" + if not customer_email: + return + try: + import smtplib + import jwt + from email.mime.text import MIMEText + + secret = os.environ.get("JWT_SECRET", os.environ.get("ADMIN_JWT_SECRET", "")) + token = jwt.encode( + {"order_id": order_number, "order_type": "usps-1583", "email": customer_email}, + secret, algorithm="HS256", + ) + domain = os.environ.get("DOMAIN", "performancewest.net") + esign_url = f"https://{domain}/portal/esign?token={token}" + + first = customer_name.split()[0] if customer_name else "there" + body = ( + f"Hi {first},\n\n" + f"Your virtual mailbox for {entity_name} is being set up. " + f"To activate it, we need your authorization on USPS Form 1583.\n\n" + f"This is a quick process:\n" + f"1. Click the link below to review and sign the form\n" + f"2. We'll schedule a brief 5-minute video call with a notary\n" + f"3. Your photo ID on file will be used for verification\n" + f"4. Once notarized, your mailbox will be active within 24 hours\n\n" + f"Sign here: {esign_url}\n\n" + f"This link expires in 14 days.\n\n" + f"Questions? Call (888) 411-0383.\n\n" + f"Performance West Inc.\n" + ) + + msg = MIMEText(body) + msg["Subject"] = f"Action Required: Sign USPS Form 1583 — {entity_name} Mailbox" + msg["From"] = "noreply@performancewest.net" + msg["To"] = customer_email + + with smtplib.SMTP("localhost", 25) as s: + s.sendmail(msg["From"], [customer_email], msg.as_string()) + + LOG.info("[%s] 1583 e-sign email sent to %s", order_number, customer_email) + except Exception as exc: + LOG.warning("[%s] Failed to send 1583 email: %s", order_number, exc)