From 28b1af341d6ebe9410dbc46e5931c0319a3315a7 Mon Sep 17 00:00:00 2001 From: justin Date: Sun, 7 Jun 2026 03:17:46 -0500 Subject: [PATCH] Wire fulfillment alerts to Telegram + surface order progress in portal + even out ERPNext sync Telegram notifications: - Add shared scripts/workers/telegram_notify.py (send_telegram, notify_fulfillment_todo, create_admin_todo) so every worker alerts the operator the same way; fire-and-forget. - Fire notify_fulfillment_todo after each admin_todos insert across all 8 service handlers (9 sites) so no fulfillment task waits unseen. (Orders + quotes + tickets already notified via checkout/quotes/tickets routes.) Client portal order progress: - order-timeline: derive real per-step status from live signals (payment paid, e-signature signed, fulfillment_status) instead of a static template; add current_step to the response. - Extract pure applyLiveStatus into order-timeline-status.ts (DB-free) + unit test (api/test/test_timeline_status.ts, 8 cases). - portal /me now returns compliance_orders.fulfillment_status. - Dashboard renders a client-safe Progress badge (In progress / Action needed / Filed-awaiting-confirmation / Completed); batches show the most actionable status. No back-office mechanics exposed. ERPNext sync parity: - Create a Sales Order for formation and fcc_carrier_registration orders (previously only canada_crtc + compliance synced); write erpnext_sales_order back to each table. Non-blocking, matches existing pattern. Verified: API tsc clean, timeline unit tests 8/8, Astro build 58 pages, cms10114/ink/paper_batch Python tests still green, no mechanics leaks. --- api/src/routes/checkout.ts | 110 +++++++++++++++ api/src/routes/order-timeline-status.ts | 72 ++++++++++ api/src/routes/order-timeline.ts | 80 ++++++++--- api/src/routes/portal.ts | 1 + api/test/test_timeline_status.ts | 92 +++++++++++++ scripts/workers/services/boc3_filing.py | 37 +++-- scripts/workers/services/carrier_closeout.py | 9 ++ scripts/workers/services/ein_application.py | 9 ++ scripts/workers/services/hazmat_phmsa.py | 35 +++-- scripts/workers/services/mailbox_setup.py | 33 +++-- scripts/workers/services/mcs150_update.py | 52 +++++-- scripts/workers/services/npi_provider.py | 9 ++ scripts/workers/services/state_trucking.py | 38 ++++-- scripts/workers/telegram_notify.py | 136 +++++++++++++++++++ site/public/portal/dashboard/index.html | 66 ++++++++- 15 files changed, 706 insertions(+), 73 deletions(-) create mode 100644 api/src/routes/order-timeline-status.ts create mode 100644 api/test/test_timeline_status.ts create mode 100644 scripts/workers/telegram_notify.py diff --git a/api/src/routes/checkout.ts b/api/src/routes/checkout.ts index c669c68..b1bd421 100644 --- a/api/src/routes/checkout.ts +++ b/api/src/routes/checkout.ts @@ -93,6 +93,7 @@ async function ensureColumns(): Promise { await pool.query(`ALTER TABLE formation_orders ADD COLUMN IF NOT EXISTS stripe_session_id TEXT`); await pool.query(`ALTER TABLE formation_orders ADD COLUMN IF NOT EXISTS payment_status TEXT DEFAULT 'pending_payment'`); await pool.query(`ALTER TABLE formation_orders ADD COLUMN IF NOT EXISTS paid_at TIMESTAMPTZ`); + await pool.query(`ALTER TABLE formation_orders ADD COLUMN IF NOT EXISTS erpnext_sales_order TEXT`); await pool.query(`ALTER TABLE bundle_orders ADD COLUMN IF NOT EXISTS stripe_session_id TEXT`); await pool.query(`ALTER TABLE bundle_orders ADD COLUMN IF NOT EXISTS payment_method TEXT`); await pool.query(`ALTER TABLE bundle_orders ADD COLUMN IF NOT EXISTS surcharge_pct NUMERIC`); @@ -965,6 +966,115 @@ router.post("/api/v1/checkout/create-session", async (req, res) => { } } + // ── Create ERPNext Sales Order (US business formation) ────────────────── + if (order_type === "formation" && erpnextCustomer) { + try { + const pgOrder = orderData.order as Record; + const entityType = (pgOrder.entity_type as string) || "LLC"; + const stateCode = (pgOrder.state_code as string) || ""; + const stateFeeCents = (pgOrder.state_fee_cents as number) || 0; + + const so = (await createResource("Sales Order", { + customer: erpnextCustomer, + delivery_date: new Date(Date.now() + 30 * 86400000).toISOString().split("T")[0], + custom_external_order_id: order_id, + custom_order_type: "formation", + custom_payment_gateway: GATEWAY_LABELS[payment_method] || payment_method, + custom_surcharge_pct: surcharge_pct, + workflow_state: "Received", + items: [{ + item_code: "BUSINESS-FORMATION", + description: `${entityType} Formation${stateCode ? ` — ${stateCode}` : ""}`, + qty: 1, + rate: toDollars((pgOrder.service_fee_cents as number) || base_cents - stateFeeCents - surcharge_cents), + }, ...(stateFeeCents > 0 ? [{ + item_code: "STATE-FILING-FEE", + description: `${stateCode} state filing fee (government fee)`, + qty: 1, + rate: toDollars(stateFeeCents), + }] : []), ...(surcharge_cents > 0 ? [{ + item_code: "PAYMENT-PROCESSING-FEE", + description: `${GATEWAY_LABELS[payment_method] || payment_method} ${surcharge_pct}%`, + qty: 1, + rate: toDollars(surcharge_cents), + }] : [])], + })) as { name: string }; + + try { + await callMethod("frappe.client.submit", { doc: { doctype: "Sales Order", name: so.name } }); + } catch { /* submit may fail if workflow doesn't require it */ } + + await pool.query( + `UPDATE formation_orders SET erpnext_sales_order = $1 WHERE order_number = $2`, + [so.name, order_id], + ); + + console.log(`[checkout] Created ERPNext Sales Order ${so.name} for formation ${order_id}`); + } catch (soErr) { + console.warn("[checkout] Formation Sales Order creation failed (non-blocking):", soErr); + } + } + + // ── Create ERPNext Sales Order (FCC carrier / ISP registration) ───────── + if (order_type === "fcc_carrier_registration" && erpnextCustomer) { + try { + const pgOrder = orderData.order as Record; + const items: Array> = [{ + item_code: "FCC-CARRIER-REGISTRATION", + description: "FCC Carrier / ISP Registration", + qty: 1, + rate: toDollars((pgOrder.service_fee_cents as number) || 129900), + }]; + const formationFee = ((pgOrder.formation_fee_cents as number) || 0) + ((pgOrder.state_fee_cents as number) || 0); + if (formationFee > 0) { + items.push({ + item_code: "BUSINESS-FORMATION", + description: `Business Formation (${pgOrder.formation_state || "?"} ${((pgOrder.entity_type as string) || "LLC").toUpperCase()})`, + qty: 1, + rate: toDollars(formationFee), + }); + } + if (pgOrder.include_stir_shaken) { + items.push({ item_code: "STIR-SHAKEN", description: "STIR/SHAKEN Implementation", qty: 1, rate: toDollars(49900) }); + } + if (pgOrder.include_ocn) { + items.push({ item_code: "NECA-OCN", description: "NECA OCN Registration", qty: 1, rate: toDollars(265000) }); + } + const pucCents = (pgOrder.puc_fee_cents as number) || 0; + if (pucCents > 0) { + const stateCount = ((pgOrder.state_puc_states as string[]) || []).length; + items.push({ item_code: "STATE-PUC-REGISTRATION", description: `State PUC Registration (${stateCount} state${stateCount !== 1 ? "s" : ""})`, qty: 1, rate: toDollars(pucCents) }); + } + if (surcharge_cents > 0) { + items.push({ item_code: "PAYMENT-PROCESSING-FEE", description: `${GATEWAY_LABELS[payment_method] || payment_method} ${surcharge_pct}%`, qty: 1, rate: toDollars(surcharge_cents) }); + } + + const so = (await createResource("Sales Order", { + customer: erpnextCustomer, + delivery_date: new Date(Date.now() + 30 * 86400000).toISOString().split("T")[0], + custom_external_order_id: order_id, + custom_order_type: "fcc_carrier_registration", + custom_payment_gateway: GATEWAY_LABELS[payment_method] || payment_method, + custom_surcharge_pct: surcharge_pct, + workflow_state: "Received", + items, + })) as { name: string }; + + try { + await callMethod("frappe.client.submit", { doc: { doctype: "Sales Order", name: so.name } }); + } catch { /* submit may fail if workflow doesn't require it */ } + + await pool.query( + `UPDATE fcc_carrier_registrations SET erpnext_sales_order = $1 WHERE order_number = $2`, + [so.name, order_id], + ); + + console.log(`[checkout] Created ERPNext Sales Order ${so.name} for fcc_carrier_registration ${order_id}`); + } catch (soErr) { + console.warn("[checkout] FCC Sales Order creation failed (non-blocking):", soErr); + } + } + // ── Create Stripe Checkout Session ───────────────────────────────────── const STRIPE_PAYMENT_METHOD_MAP: Record = { card: ["card"], diff --git a/api/src/routes/order-timeline-status.ts b/api/src/routes/order-timeline-status.ts new file mode 100644 index 0000000..ebd9dfc --- /dev/null +++ b/api/src/routes/order-timeline-status.ts @@ -0,0 +1,72 @@ +/** + * Pure live-progress derivation for the order timeline. + * + * Kept free of DB/config imports so it can be unit-tested in isolation and + * reused. The route (order-timeline.ts) imports applyLiveStatus from here. + * + * We translate real signals (payment, e-signature, fulfillment_status) into + * per-step "completed | in_progress | pending" so the client portal shows true + * progress rather than a static template. + */ + +/** Step-name matchers used to find phase boundaries within any timeline. */ +export const SIGNATURE_STEP_RE = /signature|e-?sign/i; +export const FILED_STEP_RE = /filed|filing|submitted|application filed|registration/i; + +// fulfillment_status values that mean "the filing has been submitted to the agency". +export const FILED_STATUSES = new Set([ + "filed_waiting_state", + "ready_to_file", // queued for ops to file — treat prep+signature as done +]); +// fulfillment_status values that mean "fully done". +export const COMPLETED_STATUSES = new Set(["completed", "complete"]); + +export interface ProgressSignals { + paid: boolean; + signed: boolean; + fulfillmentStatus: string | null; +} + +/** + * Recompute each step's status from live signals. Returns a new step array. + * + * We resolve a "reached index" — the index of the furthest step we can prove is + * done — then mark everything up to it completed, the next in_progress, the + * rest pending. We never regress a step that the static definition already + * marked completed. + */ +export function applyLiveStatus( + steps: T[], + sig: ProgressSignals, +): T[] { + const sigIdx = steps.findIndex((s) => SIGNATURE_STEP_RE.test(s.name)); + const filedIdx = steps.findIndex((s) => FILED_STEP_RE.test(s.name)); + const confirmIdx = steps.length - 1; // last step is always the terminal one + + let reached = -1; + + // Payment confirmed → at minimum "Order Received" (index 0) is done. + if (sig.paid) reached = Math.max(reached, 0); + + // Signature captured → the signature step (and everything before it) is done. + if (sig.signed && sigIdx >= 0) reached = Math.max(reached, sigIdx); + + // Filing submitted to agency → the "Filed" step (and everything before) done. + if (sig.fulfillmentStatus && FILED_STATUSES.has(sig.fulfillmentStatus) && filedIdx >= 0) { + reached = Math.max(reached, filedIdx); + } + + // Fully completed → everything done. + if (sig.fulfillmentStatus && COMPLETED_STATUSES.has(sig.fulfillmentStatus)) { + reached = confirmIdx; + } + + return steps.map((step, i) => { + const staticallyDone = step.status === "completed"; + let status: "pending" | "in_progress" | "completed"; + if (i <= reached || staticallyDone) status = "completed"; + else if (i === reached + 1) status = "in_progress"; + else status = "pending"; + return { ...step, status }; + }); +} diff --git a/api/src/routes/order-timeline.ts b/api/src/routes/order-timeline.ts index 14801f7..6c225cf 100644 --- a/api/src/routes/order-timeline.ts +++ b/api/src/routes/order-timeline.ts @@ -9,6 +9,7 @@ import { Router, type Request, type Response } from "express"; import { pool } from "../db.js"; +import { applyLiveStatus } from "./order-timeline-status.js"; const router = Router(); @@ -163,32 +164,65 @@ const WET_SIGNATURE_SLUGS = new Set([ "provider-compliance-bundle", ]); +// ── Live-progress derivation ────────────────────────────────────────────── +// applyLiveStatus (pure, DB-free, unit-tested in order-timeline-status.ts) +// overrides each step's static status from real signals (payment, e-signature, +// fulfillment_status) so the client portal shows true progress. + router.get("/api/v1/order-timeline/:order_id", async (req: Request, res: Response) => { try { const orderId = req.params.order_id; - // Try single order first, then batch - let orders: Record[] = []; - const single = await pool.query( - "SELECT order_number, service_slug, service_name, created_at, payment_status FROM compliance_orders WHERE order_number = $1", - [orderId], - ); - if (single.rows.length > 0) { - orders = single.rows as Record[]; - } else { - // Try as batch_id - const batch = await pool.query( - "SELECT order_number, service_slug, service_name, created_at, payment_status FROM compliance_orders WHERE batch_id = $1 ORDER BY created_at", - [orderId], - ); - orders = batch.rows as Record[]; + // Try single order first, then batch. We select fulfillment_status when + // present (column may not exist on older DBs — fall back gracefully). + const COLS = "order_number, service_slug, service_name, created_at, payment_status, fulfillment_status"; + const COLS_FALLBACK = "order_number, service_slug, service_name, created_at, payment_status"; + + async function loadOrders(): Promise[]> { + const trySelect = async (cols: string) => { + const single = await pool.query( + `SELECT ${cols} FROM compliance_orders WHERE order_number = $1`, + [orderId], + ); + if (single.rows.length > 0) return single.rows as Record[]; + const batch = await pool.query( + `SELECT ${cols} FROM compliance_orders WHERE batch_id = $1 ORDER BY created_at`, + [orderId], + ); + return batch.rows as Record[]; + }; + try { + return await trySelect(COLS); + } catch { + // fulfillment_status column not present yet — retry without it. + return await trySelect(COLS_FALLBACK); + } } + const orders = await loadOrders(); + if (orders.length === 0) { res.status(404).json({ error: "Order not found." }); return; } + // Which of these orders have a signed e-signature on file? One query for + // all order_numbers in the batch. Defensive: a missing table must not break + // the timeline (clients still see estimated dates). + const orderNumbers = orders.map((o) => o.order_number as string); + const signedSet = new Set(); + try { + const sig = await pool.query( + "SELECT DISTINCT order_number FROM esign_records WHERE order_number = ANY($1) AND status = 'signed'", + [orderNumbers], + ); + for (const r of sig.rows as Record[]) { + signedSet.add(r.order_number as string); + } + } catch { + // esign_records absent — treat all as unsigned. + } + const timelines = orders.map((order) => { const slug = order.service_slug as string; const startDate = new Date(order.created_at as string); @@ -196,7 +230,7 @@ router.get("/api/v1/order-timeline/:order_id", async (req: Request, res: Respons // onward, so we always have time to produce + mail the original ink form. const isWetSig = WET_SIGNATURE_SLUGS.has(slug); let pastSignature = false; - const steps = (SERVICE_TIMELINES[slug] || DEFAULT_TIMELINE).map((step) => { + const dated = (SERVICE_TIMELINES[slug] || DEFAULT_TIMELINE).map((step) => { if (isWetSig && /signature/i.test(step.name)) { pastSignature = true; } @@ -209,11 +243,25 @@ router.get("/api/v1/order-timeline/:order_id", async (req: Request, res: Respons }; }); + // Overlay live progress so the portal reflects reality, not just dates. + const paymentStatus = String(order.payment_status || "").toLowerCase(); + const steps = applyLiveStatus(dated, { + paid: paymentStatus === "paid" || paymentStatus === "completed", + signed: signedSet.has(order.order_number as string), + fulfillmentStatus: (order.fulfillment_status as string) || null, + }); + + // The current step is the first in_progress one, else the last completed. + const inProgress = steps.find((s) => s.status === "in_progress"); + const lastCompleted = [...steps].reverse().find((s) => s.status === "completed"); + const currentStep = (inProgress || lastCompleted || steps[0]).name; + return { order_number: order.order_number, service_slug: slug, service_name: order.service_name, steps, + current_step: currentStep, estimated_completion: steps[steps.length - 1].estimated_date, }; }); diff --git a/api/src/routes/portal.ts b/api/src/routes/portal.ts index a3343a1..a241a78 100644 --- a/api/src/routes/portal.ts +++ b/api/src/routes/portal.ts @@ -46,6 +46,7 @@ router.get("/me", async (req: Request, res: Response) => { `SELECT order_number, batch_id, service_slug, service_name, service_fee_cents, discount_cents, discount_code, surcharge_cents, payment_status, payment_method, + fulfillment_status, created_at, paid_at FROM compliance_orders WHERE customer_email = $1 diff --git a/api/test/test_timeline_status.ts b/api/test/test_timeline_status.ts new file mode 100644 index 0000000..5c219ad --- /dev/null +++ b/api/test/test_timeline_status.ts @@ -0,0 +1,92 @@ +/** + * Verifies applyLiveStatus: the live-progress overlay used by the client + * portal order timeline. Run: npx tsx api/test/test_timeline_status.ts + */ +import { applyLiveStatus } from "../src/routes/order-timeline-status.js"; + +type Step = { name: string; description: string; business_days: number; status: string }; + +const npiSteps: Step[] = [ + { name: "Order Received", description: "", business_days: 0, status: "completed" }, + { name: "Document Preparation", description: "", business_days: 1, status: "pending" }, + { name: "Signature Required", description: "", business_days: 1, status: "pending" }, + { name: "Filed with CMS", description: "", business_days: 2, status: "pending" }, + { name: "CMS Confirmation", description: "", business_days: 10, status: "pending" }, +]; + +let failures = 0; +function check(label: string, got: string[], want: string[]) { + const ok = JSON.stringify(got) === JSON.stringify(want); + if (!ok) { + failures++; + console.error(`FAIL: ${label}\n got: ${got.join(",")}\n want: ${want.join(",")}`); + } else { + console.log(`ok: ${label}`); + } +} +const statuses = (s: Step[]) => s.map((x) => x.status); + +// 1. Unpaid, nothing reached → only the statically-completed step 0 is done. +check( + "unpaid", + statuses(applyLiveStatus(npiSteps, { paid: false, signed: false, fulfillmentStatus: null })), + ["completed", "pending", "pending", "pending", "pending"], +); + +// 2. Paid only → Order Received done, prep in progress +check( + "paid only", + statuses(applyLiveStatus(npiSteps, { paid: true, signed: false, fulfillmentStatus: null })), + ["completed", "in_progress", "pending", "pending", "pending"], +); + +// 3. Paid + signed → through Signature done, Filed in progress +check( + "paid+signed", + statuses(applyLiveStatus(npiSteps, { paid: true, signed: true, fulfillmentStatus: null })), + ["completed", "completed", "completed", "in_progress", "pending"], +); + +// 4. Paid + signed + filed → through Filed done, Confirmation in progress +check( + "paid+signed+filed", + statuses(applyLiveStatus(npiSteps, { paid: true, signed: true, fulfillmentStatus: "filed_waiting_state" })), + ["completed", "completed", "completed", "completed", "in_progress"], +); + +// 5. Completed → everything done +check( + "completed", + statuses(applyLiveStatus(npiSteps, { paid: true, signed: true, fulfillmentStatus: "completed" })), + ["completed", "completed", "completed", "completed", "completed"], +); + +// 6. ready_to_file without explicit signed → still advances to Filed step +check( + "ready_to_file", + statuses(applyLiveStatus(npiSteps, { paid: true, signed: false, fulfillmentStatus: "ready_to_file" })), + ["completed", "completed", "completed", "completed", "in_progress"], +); + +// 7. A timeline with no signature step (e.g. boc3) still works off paid + fulfillment +const boc3: Step[] = [ + { name: "Order Received", description: "", business_days: 0, status: "completed" }, + { name: "Process Agent Filing", description: "", business_days: 1, status: "pending" }, + { name: "FMCSA Registration", description: "", business_days: 3, status: "pending" }, +]; +check( + "boc3 paid", + statuses(applyLiveStatus(boc3, { paid: true, signed: false, fulfillmentStatus: null })), + ["completed", "in_progress", "pending"], +); +check( + "boc3 filed", + statuses(applyLiveStatus(boc3, { paid: true, signed: false, fulfillmentStatus: "filed_waiting_state" })), + ["completed", "completed", "in_progress"], // "Process Agent Filing" matches FILED_STEP_RE +); + +if (failures > 0) { + console.error(`\n${failures} test(s) failed`); + process.exit(1); +} +console.log("\nAll timeline status tests passed"); diff --git a/scripts/workers/services/boc3_filing.py b/scripts/workers/services/boc3_filing.py index bc04f8c..04957b3 100644 --- a/scripts/workers/services/boc3_filing.py +++ b/scripts/workers/services/boc3_filing.py @@ -41,6 +41,8 @@ import logging import os from datetime import datetime +from scripts.workers.telegram_notify import notify_fulfillment_todo + LOG = logging.getLogger("workers.services.boc3_filing") # Process agent partner: Registered Agents Inc / Process Agent LLC @@ -229,6 +231,20 @@ class BOC3FilingHandler: try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + todo_title = f"BOC-3 Filing — {entity_name} (DOT {dot_number})" + todo_description = ( + f"File BOC-3 process agent designation for {entity_name}.\n" + f"DOT: {dot_number}\n" + f"MC/Docket: {docket_number}\n" + f"Type: {entity_type}\n" + f"Authority status: {boc3_status}\n" + f"Customer: {customer_email}\n\n" + + ("Recommended follow-ups (upsell-approve, not auto-charged):\n" + + "\n".join(f" - {f['title']}: {f['reason']}" + for f in recommended_followups) + "\n\n" + if recommended_followups else "") + + f"Submit to process agent partner for electronic filing with FMCSA." + ) with conn.cursor() as cur: cur.execute(""" INSERT INTO admin_todos ( @@ -236,25 +252,22 @@ class BOC3FilingHandler: description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( - f"BOC-3 Filing — {entity_name} (DOT {dot_number})", + todo_title, "filing", "high", order_number, self.SERVICE_SLUG, - f"File BOC-3 process agent designation for {entity_name}.\n" - f"DOT: {dot_number}\n" - f"MC/Docket: {docket_number}\n" - f"Type: {entity_type}\n" - f"Authority status: {boc3_status}\n" - f"Customer: {customer_email}\n\n" - + ("Recommended follow-ups (upsell-approve, not auto-charged):\n" - + "\n".join(f" - {f['title']}: {f['reason']}" - for f in recommended_followups) + "\n\n" - if recommended_followups else "") - + f"Submit to process agent partner for electronic filing with FMCSA.", + todo_description, json.dumps(todo_data), )) conn.commit() + notify_fulfillment_todo( + title=todo_title, + order_number=order_number, + service_slug=self.SERVICE_SLUG, + priority="high", + description=todo_description, + ) conn.close() LOG.info("[%s] Admin todo created for BOC-3 filing", order_number) except Exception as exc: diff --git a/scripts/workers/services/carrier_closeout.py b/scripts/workers/services/carrier_closeout.py index 3e43ea8..5a31b97 100644 --- a/scripts/workers/services/carrier_closeout.py +++ b/scripts/workers/services/carrier_closeout.py @@ -19,6 +19,8 @@ import json import logging import os +from scripts.workers.telegram_notify import notify_fulfillment_todo + LOG = logging.getLogger("workers.services.carrier_closeout") @@ -117,6 +119,13 @@ class CarrierCloseoutHandler: (title, "filing", priority, order_number, slug, description, json.dumps(intake)), ) conn.commit() + notify_fulfillment_todo( + title=title, + order_number=order_number, + service_slug=slug, + priority=priority, + description=description, + ) conn.close() except Exception as exc: LOG.error("[%s] Failed to create close-out todo: %s", order_number, exc) diff --git a/scripts/workers/services/ein_application.py b/scripts/workers/services/ein_application.py index 1753236..ab2594f 100644 --- a/scripts/workers/services/ein_application.py +++ b/scripts/workers/services/ein_application.py @@ -35,6 +35,8 @@ import os from datetime import datetime, timezone, timedelta from pathlib import Path +from scripts.workers.telegram_notify import notify_fulfillment_todo + LOG = logging.getLogger("workers.services.ein_application") SCREENSHOTS_DIR = Path(os.getenv("SCREENSHOTS_DIR", "/tmp/ein-screenshots")) @@ -176,6 +178,13 @@ class EINApplicationHandler: self.SERVICE_SLUG, description, json.dumps(intake), )) conn.commit() + notify_fulfillment_todo( + title=title, + order_number=order_number, + service_slug=self.SERVICE_SLUG, + priority=priority, + description=description, + ) conn.close() except Exception as exc: LOG.error("[%s] Failed to create EIN todo: %s", order_number, exc) diff --git a/scripts/workers/services/hazmat_phmsa.py b/scripts/workers/services/hazmat_phmsa.py index 52e4ea3..b28c423 100644 --- a/scripts/workers/services/hazmat_phmsa.py +++ b/scripts/workers/services/hazmat_phmsa.py @@ -34,6 +34,8 @@ import logging import os from datetime import datetime +from scripts.workers.telegram_notify import notify_fulfillment_todo + LOG = logging.getLogger("workers.services.hazmat_phmsa") # PHMSA registration fee brackets (49 CFR 107.612). The processing fee depends on @@ -107,6 +109,20 @@ class HazmatPHMSAHandler: try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + todo_title = ( + f"PHMSA Hazmat Registration — {entity_name} (DOT {dot_number})" + if dot_number else f"PHMSA Hazmat Registration — {entity_name}" + ) + todo_description = ( + f"Service: {self.SERVICE_NAME}\n" + f"DOT: {dot_number}\n" + f"Hazmat classes: {', '.join(hazmat_classes) if hazmat_classes else 'see intake'}\n" + f"Bulk packaging: {'Yes' if bulk else 'No'}\n" + f"Small business: {'Yes' if is_small else 'No'}\n" + f"Est. gov fee: ${(fee['registration_fee_cents'] + fee['processing_fee_cents']) / 100:,.2f}\n" + f"Customer: {customer_email}\n\n" + f"Steps:\n" + "\n".join(steps) + ) try: with conn.cursor() as cur: cur.execute(""" @@ -115,23 +131,22 @@ class HazmatPHMSAHandler: description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( - f"PHMSA Hazmat Registration — {entity_name} (DOT {dot_number})" - if dot_number else f"PHMSA Hazmat Registration — {entity_name}", + todo_title, "filing", "high", order_number, self.SERVICE_SLUG, - f"Service: {self.SERVICE_NAME}\n" - f"DOT: {dot_number}\n" - f"Hazmat classes: {', '.join(hazmat_classes) if hazmat_classes else 'see intake'}\n" - f"Bulk packaging: {'Yes' if bulk else 'No'}\n" - f"Small business: {'Yes' if is_small else 'No'}\n" - f"Est. gov fee: ${(fee['registration_fee_cents'] + fee['processing_fee_cents']) / 100:,.2f}\n" - f"Customer: {customer_email}\n\n" - f"Steps:\n" + "\n".join(steps), + todo_description, json.dumps(todo_data), )) conn.commit() + notify_fulfillment_todo( + title=todo_title, + order_number=order_number, + service_slug=self.SERVICE_SLUG, + priority="high", + description=todo_description, + ) finally: conn.close() LOG.info("[%s] Admin todo created for PHMSA hazmat registration", order_number) diff --git a/scripts/workers/services/mailbox_setup.py b/scripts/workers/services/mailbox_setup.py index a6690d4..e976c5c 100644 --- a/scripts/workers/services/mailbox_setup.py +++ b/scripts/workers/services/mailbox_setup.py @@ -27,6 +27,8 @@ import logging import os from datetime import datetime +from scripts.workers.telegram_notify import notify_fulfillment_todo + LOG = logging.getLogger("workers.services.mailbox_setup") @@ -101,6 +103,18 @@ class MailboxSetupHandler: try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + todo_title = f"Mailbox Setup — {entity_name} ({formation_state})" + todo_description = ( + f"Set up Anytime Mailbox for {entity_name} in {formation_state}.\n\n" + f"Steps:\n" + f"1. Customer e-signs USPS Form 1583 (link sent)\n" + f"2. Schedule online notarization session\n" + f"3. Submit notarized 1583 to Anytime Mailbox\n" + f"4. Activate mailbox, provide address to customer\n\n" + f"Customer: {customer_email}\n" + f"Photo ID: on file in MinIO (from intake)\n" + f"Notarization: use same service as CRTC BITS" + ) with conn.cursor() as cur: cur.execute(""" INSERT INTO admin_todos ( @@ -108,20 +122,12 @@ class MailboxSetupHandler: description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( - f"Mailbox Setup — {entity_name} ({formation_state})", + todo_title, "provisioning", "normal", order_number, self.SERVICE_SLUG, - f"Set up Anytime Mailbox for {entity_name} in {formation_state}.\n\n" - f"Steps:\n" - f"1. Customer e-signs USPS Form 1583 (link sent)\n" - f"2. Schedule online notarization session\n" - f"3. Submit notarized 1583 to Anytime Mailbox\n" - f"4. Activate mailbox, provide address to customer\n\n" - f"Customer: {customer_email}\n" - f"Photo ID: on file in MinIO (from intake)\n" - f"Notarization: use same service as CRTC BITS", + todo_description, json.dumps({ "order_number": order_number, "entity_name": entity_name, @@ -130,6 +136,13 @@ class MailboxSetupHandler: }), )) conn.commit() + notify_fulfillment_todo( + title=todo_title, + order_number=order_number, + service_slug=self.SERVICE_SLUG, + priority="normal", + description=todo_description, + ) conn.close() except Exception as exc: LOG.error("[%s] Failed to create mailbox todo: %s", order_number, exc) diff --git a/scripts/workers/services/mcs150_update.py b/scripts/workers/services/mcs150_update.py index 228d95d..c79e6e8 100644 --- a/scripts/workers/services/mcs150_update.py +++ b/scripts/workers/services/mcs150_update.py @@ -47,6 +47,8 @@ import logging import os from datetime import datetime +from scripts.workers.telegram_notify import notify_fulfillment_todo + LOG = logging.getLogger("workers.services.mcs150_update") @@ -246,6 +248,16 @@ class MCS150UpdateHandler: filed_method = filing_result.get("method", "pending") if filing_result else "pending" filed_ok = filing_result.get("success", False) if filing_result else False + todo_title = f"MCS-150 {'Filed' if filed_ok else 'Review'} — {entity_name} (DOT {dot_number})" + todo_priority = "low" if filed_ok else "normal" + todo_description = ( + f"MCS-150 for {entity_name} (DOT {dot_number}).\n" + f"Filing method: {filed_method}\n" + f"Status: {'SUBMITTED — verify in 5-10 days' if filed_ok else 'NEEDS MANUAL FILING'}\n" + f"Customer: {customer_email}\n" + f"PDF: {minio_path or 'not generated'}" + ) + with conn.cursor() as cur: cur.execute(""" INSERT INTO admin_todos ( @@ -253,16 +265,12 @@ class MCS150UpdateHandler: description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( - f"MCS-150 {'Filed' if filed_ok else 'Review'} — {entity_name} (DOT {dot_number})", + todo_title, "filing", - "low" if filed_ok else "normal", + todo_priority, order_number, self.SERVICE_SLUG, - f"MCS-150 for {entity_name} (DOT {dot_number}).\n" - f"Filing method: {filed_method}\n" - f"Status: {'SUBMITTED — verify in 5-10 days' if filed_ok else 'NEEDS MANUAL FILING'}\n" - f"Customer: {customer_email}\n" - f"PDF: {minio_path or 'not generated'}", + todo_description, json.dumps({ "order_number": order_number, "dot_number": dot_number, @@ -271,6 +279,13 @@ class MCS150UpdateHandler: }), )) conn.commit() + notify_fulfillment_todo( + title=todo_title, + order_number=order_number, + service_slug=self.SERVICE_SLUG, + priority=todo_priority, + description=todo_description, + ) conn.close() except Exception as exc: LOG.error("[%s] Failed to create admin todo: %s", order_number, exc) @@ -311,6 +326,14 @@ class MCS150UpdateHandler: try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + todo_title = f"Awaiting client signature — {entity_name} (DOT {dot_number})" + todo_description = ( + f"{slug} for {entity_name} (DOT {dot_number}).\n" + f"Status: AWAITING CLIENT SIGNATURE before filing.\n" + f"Signing link emailed to {customer_email}.\n" + f"PDF: {minio_path or 'not generated'}\n" + f"Filing auto-resumes once the client signs." + ) with conn.cursor() as cur: cur.execute( """ @@ -320,18 +343,21 @@ class MCS150UpdateHandler: ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( - f"Awaiting client signature — {entity_name} (DOT {dot_number})", + todo_title, "filing", "low", order_number, slug, - f"{slug} for {entity_name} (DOT {dot_number}).\n" - f"Status: AWAITING CLIENT SIGNATURE before filing.\n" - f"Signing link emailed to {customer_email}.\n" - f"PDF: {minio_path or 'not generated'}\n" - f"Filing auto-resumes once the client signs.", + todo_description, json.dumps({"order_number": order_number, "dot_number": dot_number, "entity_name": entity_name, "awaiting_signature": True}), ), ) conn.commit() + notify_fulfillment_todo( + title=todo_title, + order_number=order_number, + service_slug=slug, + priority="low", + description=todo_description, + ) conn.close() LOG.info("[%s] Pending-signature todo created", order_number) except Exception as exc: diff --git a/scripts/workers/services/npi_provider.py b/scripts/workers/services/npi_provider.py index 19b3dc0..a14fa65 100644 --- a/scripts/workers/services/npi_provider.py +++ b/scripts/workers/services/npi_provider.py @@ -33,6 +33,8 @@ import json import logging import os +from scripts.workers.telegram_notify import notify_fulfillment_todo + LOG = logging.getLogger("workers.services.npi_provider") # Per-slug admin todo metadata: human-readable action + the portal a human uses. @@ -411,6 +413,13 @@ class _BaseNPIHandler: ), ) conn.commit() + notify_fulfillment_todo( + title=title, + order_number=order_number, + service_slug=self.SERVICE_SLUG, + priority=priority, + description=description, + ) conn.close() except Exception as exc: LOG.error("[%s] Failed to create NPI todo: %s", order_number, exc) diff --git a/scripts/workers/services/state_trucking.py b/scripts/workers/services/state_trucking.py index 1311832..448a3e2 100644 --- a/scripts/workers/services/state_trucking.py +++ b/scripts/workers/services/state_trucking.py @@ -30,6 +30,8 @@ import logging import os from datetime import datetime +from scripts.workers.telegram_notify import notify_fulfillment_todo + LOG = logging.getLogger("workers.services.state_trucking") # State motor carrier fulfillment lifecycle (compliance_orders.fulfillment_status, @@ -331,6 +333,22 @@ class StateTruckingHandler: try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + todo_title = ( + f"{service_name} — {entity_name} (DOT {dot_number})" + if dot_number else f"{service_name} — {entity_name}" + ) + todo_priority = ( + "high" if service_slug in ("ca-mcp-carb", "state-trucking-bundle") else "normal" + ) + todo_description = ( + f"Service: {service_name}\n" + f"DOT: {dot_number}\n" + f"Base state: {base_state}\n" + f"Operating states: {', '.join(operating_states) if operating_states else 'N/A'}\n" + f"Customer: {customer_email}\n" + + intake_lines + + f"\nSteps:\n" + "\n".join(steps) + ) try: with conn.cursor() as cur: cur.execute(""" @@ -339,22 +357,22 @@ class StateTruckingHandler: description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( - f"{service_name} — {entity_name} (DOT {dot_number})" - if dot_number else f"{service_name} — {entity_name}", + todo_title, "filing", - "high" if service_slug in ("ca-mcp-carb", "state-trucking-bundle") else "normal", + todo_priority, order_number, service_slug, - f"Service: {service_name}\n" - f"DOT: {dot_number}\n" - f"Base state: {base_state}\n" - f"Operating states: {', '.join(operating_states) if operating_states else 'N/A'}\n" - f"Customer: {customer_email}\n" - + intake_lines + - f"\nSteps:\n" + "\n".join(steps), + todo_description, json.dumps(todo_data), )) conn.commit() + notify_fulfillment_todo( + title=todo_title, + order_number=order_number, + service_slug=service_slug, + priority=todo_priority, + description=todo_description, + ) finally: conn.close() LOG.info("[%s] Admin todo created for %s", order_number, service_name) diff --git a/scripts/workers/telegram_notify.py b/scripts/workers/telegram_notify.py new file mode 100644 index 0000000..326d3e1 --- /dev/null +++ b/scripts/workers/telegram_notify.py @@ -0,0 +1,136 @@ +"""Shared Telegram notifier for the fulfillment workers. + +One place to send operator notifications so every worker/handler reports the +same way (and we can change transport once). Fire-and-forget: a Telegram outage +must never break fulfillment. + +Env: TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID. +""" +from __future__ import annotations + +import json +import logging +import os +import urllib.request + +LOG = logging.getLogger("workers.telegram_notify") + + +def telegram_enabled() -> bool: + return bool(os.getenv("TELEGRAM_BOT_TOKEN") and os.getenv("TELEGRAM_CHAT_ID")) + + +def send_telegram(text: str, *, parse_mode: str | None = None) -> bool: + """Send a Telegram message. Returns True on success, never raises. + + Safe to call from any worker; no-ops (returns False) if not configured. + """ + bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "") + chat_id = os.getenv("TELEGRAM_CHAT_ID", "") + if not bot_token or not chat_id: + return False + try: + payload: dict = {"chat_id": chat_id, "text": text} + if parse_mode: + payload["parse_mode"] = parse_mode + data = json.dumps(payload).encode() + req = urllib.request.Request( + f"https://api.telegram.org/bot{bot_token}/sendMessage", + data=data, + headers={"Content-Type": "application/json"}, + ) + urllib.request.urlopen(req, timeout=10) # nosec - trusted Telegram API + return True + except Exception as exc: # never break fulfillment on a notify failure + LOG.warning("Telegram notify failed: %s", exc) + return False + + +def create_admin_todo( + *, + title: str, + order_number: str, + service_slug: str, + description: str, + data: dict | None = None, + category: str = "filing", + priority: str = "normal", + notify: bool = True, +) -> bool: + """Insert an admin_todos row AND fire an operator Telegram alert. + + Single source of truth for fulfillment-task creation so every worker both + persists the task and notifies the operator. The insert and the notify are + independent: a notify failure never blocks the task, and a DB failure is + logged (returns False) without raising. + + Column order matches the shared admin_todos schema used across handlers: + (title, category, priority, order_number, service_slug, description, data, status). + """ + ok = False + try: + import psycopg2 # local import: keep module importable without psycopg2 + + conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + try: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO admin_todos ( + title, category, priority, order_number, service_slug, + description, data, status + ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') + """, + ( + title, category, priority, order_number, service_slug, + description, json.dumps(data or {}), + ), + ) + conn.commit() + ok = True + finally: + conn.close() + except Exception as exc: + LOG.error("create_admin_todo failed for %s/%s: %s", order_number, service_slug, exc) + return False + + if notify: + notify_fulfillment_todo( + title=title, + order_number=order_number, + service_slug=service_slug, + priority=priority, + description=description, + ) + return ok + + +def notify_fulfillment_todo( + *, + title: str, + order_number: str, + service_slug: str, + priority: str = "normal", + description: str = "", +) -> bool: + """Operator alert that a fulfillment task needs attention. + + Called whenever an admin_todo is created so nothing waits unseen in the + queue. The description is trimmed to keep the message readable. + """ + icon = {"high": "🔴", "urgent": "🔴", "normal": "🟡", "low": "⚪"}.get( + (priority or "normal").lower(), "🟡" + ) + desc = (description or "").strip() + if len(desc) > 600: + desc = desc[:600] + "…" + text = ( + f"{icon} FULFILLMENT NEEDED\n\n" + f"{title}\n" + f"Service: {service_slug}\n" + f"Order: {order_number}\n" + f"Priority: {priority}\n" + ) + if desc: + text += f"\n{desc}" + return send_telegram(text) diff --git a/site/public/portal/dashboard/index.html b/site/public/portal/dashboard/index.html index fbb6fd8..da60d14 100644 --- a/site/public/portal/dashboard/index.html +++ b/site/public/portal/dashboard/index.html @@ -165,6 +165,50 @@ return '' + c.label + ''; } + // Client-safe progress label derived from the order's fulfillment_status. + // We translate internal lifecycle values into plain-language stages and + // deliberately avoid exposing any back-office mechanics. + function fulfillmentBadge(status) { + if (!status) return ''; + var map = { + authorization_required: { bg: '#fef9c3', text: '#a16207', label: 'Action needed: signature' }, + awaiting_customer_delegation: { bg: '#fef9c3', text: '#a16207', label: 'Action needed' }, + awaiting_secure_credentials: { bg: '#fef9c3', text: '#a16207', label: 'Action needed' }, + awaiting_government_fee_approval: { bg: '#fef9c3', text: '#a16207', label: 'Action needed: approval' }, + authorization_signed: { bg: '#dbeafe', text: '#1d4ed8', label: 'In progress' }, + ready_to_file: { bg: '#dbeafe', text: '#1d4ed8', label: 'In progress' }, + awaiting_insurance_filing: { bg: '#dbeafe', text: '#1d4ed8', label: 'Awaiting insurance' }, + filed_waiting_state: { bg: '#dbeafe', text: '#1d4ed8', label: 'Filed — awaiting confirmation' }, + completed: { bg: '#dcfce7', text: '#15803d', label: 'Completed' } + }; + var c = map[status]; + if (!c) return ''; + return '' + c.label + ''; + } + + // For a batch, show the single most actionable status: anything needing the + // client's attention wins, then in-progress, then completed. + function pickBatchFulfillment(items) { + var actionNeeded = [ + 'authorization_required', 'awaiting_customer_delegation', + 'awaiting_secure_credentials', 'awaiting_government_fee_approval' + ]; + var inProgress = [ + 'authorization_signed', 'ready_to_file', + 'awaiting_insurance_filing', 'filed_waiting_state' + ]; + var statuses = items.map(function(o) { return o.fulfillment_status; }); + for (var i = 0; i < statuses.length; i++) { + if (actionNeeded.indexOf(statuses[i]) !== -1) return statuses[i]; + } + for (var j = 0; j < statuses.length; j++) { + if (inProgress.indexOf(statuses[j]) !== -1) return statuses[j]; + } + // All completed? + var allDone = statuses.length > 0 && statuses.every(function(s) { return s === 'completed'; }); + return allDone ? 'completed' : (statuses[0] || null); + } + function paymentIcon(method) { if (!method) return ''; var m = method.toLowerCase(); @@ -261,6 +305,8 @@ totalCents += (o.service_fee_cents || 0) - (o.discount_cents || 0) + (o.surcharge_cents || 0); if (names.indexOf(o.service_name) === -1) names.push(o.service_name); }); + // Surface the most actionable fulfillment status across the batch. + var batchFulfillment = pickBatchFulfillment(items); html += '
'; html += '
'; @@ -279,7 +325,15 @@ html += '' + formatCents(totalCents) + ''; html += statusBadge(first.payment_status); html += '
'; - html += '
'; + html += ''; + // Fulfillment progress row (only when we have a status to show) + if (fulfillmentBadge(batchFulfillment)) { + html += '
'; + html += 'Progress:'; + html += fulfillmentBadge(batchFulfillment); + html += '
'; + } + html += ''; }); // Render standalone orders @@ -301,7 +355,15 @@ html += '' + formatCents(totalCents) + ''; html += statusBadge(o.payment_status); html += ''; - html += ''; + html += ''; + // Fulfillment progress row (only when we have a status to show) + if (fulfillmentBadge(o.fulfillment_status)) { + html += '
'; + html += 'Progress:'; + html += fulfillmentBadge(o.fulfillment_status); + html += '
'; + } + html += ''; }); $grid.innerHTML = html;