Wire fulfillment alerts to Telegram + surface order progress in portal + even out ERPNext sync

Telegram notifications:
- Add shared scripts/workers/telegram_notify.py (send_telegram, notify_fulfillment_todo,
  create_admin_todo) so every worker alerts the operator the same way; fire-and-forget.
- Fire notify_fulfillment_todo after each admin_todos insert across all 8 service
  handlers (9 sites) so no fulfillment task waits unseen.
  (Orders + quotes + tickets already notified via checkout/quotes/tickets routes.)

Client portal order progress:
- order-timeline: derive real per-step status from live signals (payment paid,
  e-signature signed, fulfillment_status) instead of a static template; add
  current_step to the response.
- Extract pure applyLiveStatus into order-timeline-status.ts (DB-free) + unit test
  (api/test/test_timeline_status.ts, 8 cases).
- portal /me now returns compliance_orders.fulfillment_status.
- Dashboard renders a client-safe Progress badge (In progress / Action needed /
  Filed-awaiting-confirmation / Completed); batches show the most actionable status.
  No back-office mechanics exposed.

ERPNext sync parity:
- Create a Sales Order for formation and fcc_carrier_registration orders (previously
  only canada_crtc + compliance synced); write erpnext_sales_order back to each table.
  Non-blocking, matches existing pattern.

Verified: API tsc clean, timeline unit tests 8/8, Astro build 58 pages,
cms10114/ink/paper_batch Python tests still green, no mechanics leaks.
This commit is contained in:
justin 2026-06-07 03:17:46 -05:00
parent 41df4d9553
commit 28b1af341d
15 changed files with 706 additions and 73 deletions

View file

@ -41,6 +41,8 @@ import logging
import os
from datetime import datetime
from scripts.workers.telegram_notify import notify_fulfillment_todo
LOG = logging.getLogger("workers.services.boc3_filing")
# Process agent partner: Registered Agents Inc / Process Agent LLC
@ -229,6 +231,20 @@ class BOC3FilingHandler:
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
todo_title = f"BOC-3 Filing — {entity_name} (DOT {dot_number})"
todo_description = (
f"File BOC-3 process agent designation for {entity_name}.\n"
f"DOT: {dot_number}\n"
f"MC/Docket: {docket_number}\n"
f"Type: {entity_type}\n"
f"Authority status: {boc3_status}\n"
f"Customer: {customer_email}\n\n"
+ ("Recommended follow-ups (upsell-approve, not auto-charged):\n"
+ "\n".join(f" - {f['title']}: {f['reason']}"
for f in recommended_followups) + "\n\n"
if recommended_followups else "")
+ f"Submit to process agent partner for electronic filing with FMCSA."
)
with conn.cursor() as cur:
cur.execute("""
INSERT INTO admin_todos (
@ -236,25 +252,22 @@ class BOC3FilingHandler:
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""", (
f"BOC-3 Filing — {entity_name} (DOT {dot_number})",
todo_title,
"filing",
"high",
order_number,
self.SERVICE_SLUG,
f"File BOC-3 process agent designation for {entity_name}.\n"
f"DOT: {dot_number}\n"
f"MC/Docket: {docket_number}\n"
f"Type: {entity_type}\n"
f"Authority status: {boc3_status}\n"
f"Customer: {customer_email}\n\n"
+ ("Recommended follow-ups (upsell-approve, not auto-charged):\n"
+ "\n".join(f" - {f['title']}: {f['reason']}"
for f in recommended_followups) + "\n\n"
if recommended_followups else "")
+ f"Submit to process agent partner for electronic filing with FMCSA.",
todo_description,
json.dumps(todo_data),
))
conn.commit()
notify_fulfillment_todo(
title=todo_title,
order_number=order_number,
service_slug=self.SERVICE_SLUG,
priority="high",
description=todo_description,
)
conn.close()
LOG.info("[%s] Admin todo created for BOC-3 filing", order_number)
except Exception as exc:

View file

@ -19,6 +19,8 @@ import json
import logging
import os
from scripts.workers.telegram_notify import notify_fulfillment_todo
LOG = logging.getLogger("workers.services.carrier_closeout")
@ -117,6 +119,13 @@ class CarrierCloseoutHandler:
(title, "filing", priority, order_number, slug, description, json.dumps(intake)),
)
conn.commit()
notify_fulfillment_todo(
title=title,
order_number=order_number,
service_slug=slug,
priority=priority,
description=description,
)
conn.close()
except Exception as exc:
LOG.error("[%s] Failed to create close-out todo: %s", order_number, exc)

View file

@ -35,6 +35,8 @@ import os
from datetime import datetime, timezone, timedelta
from pathlib import Path
from scripts.workers.telegram_notify import notify_fulfillment_todo
LOG = logging.getLogger("workers.services.ein_application")
SCREENSHOTS_DIR = Path(os.getenv("SCREENSHOTS_DIR", "/tmp/ein-screenshots"))
@ -176,6 +178,13 @@ class EINApplicationHandler:
self.SERVICE_SLUG, description, json.dumps(intake),
))
conn.commit()
notify_fulfillment_todo(
title=title,
order_number=order_number,
service_slug=self.SERVICE_SLUG,
priority=priority,
description=description,
)
conn.close()
except Exception as exc:
LOG.error("[%s] Failed to create EIN todo: %s", order_number, exc)

View file

@ -34,6 +34,8 @@ import logging
import os
from datetime import datetime
from scripts.workers.telegram_notify import notify_fulfillment_todo
LOG = logging.getLogger("workers.services.hazmat_phmsa")
# PHMSA registration fee brackets (49 CFR 107.612). The processing fee depends on
@ -107,6 +109,20 @@ class HazmatPHMSAHandler:
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
todo_title = (
f"PHMSA Hazmat Registration — {entity_name} (DOT {dot_number})"
if dot_number else f"PHMSA Hazmat Registration — {entity_name}"
)
todo_description = (
f"Service: {self.SERVICE_NAME}\n"
f"DOT: {dot_number}\n"
f"Hazmat classes: {', '.join(hazmat_classes) if hazmat_classes else 'see intake'}\n"
f"Bulk packaging: {'Yes' if bulk else 'No'}\n"
f"Small business: {'Yes' if is_small else 'No'}\n"
f"Est. gov fee: ${(fee['registration_fee_cents'] + fee['processing_fee_cents']) / 100:,.2f}\n"
f"Customer: {customer_email}\n\n"
f"Steps:\n" + "\n".join(steps)
)
try:
with conn.cursor() as cur:
cur.execute("""
@ -115,23 +131,22 @@ class HazmatPHMSAHandler:
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""", (
f"PHMSA Hazmat Registration — {entity_name} (DOT {dot_number})"
if dot_number else f"PHMSA Hazmat Registration — {entity_name}",
todo_title,
"filing",
"high",
order_number,
self.SERVICE_SLUG,
f"Service: {self.SERVICE_NAME}\n"
f"DOT: {dot_number}\n"
f"Hazmat classes: {', '.join(hazmat_classes) if hazmat_classes else 'see intake'}\n"
f"Bulk packaging: {'Yes' if bulk else 'No'}\n"
f"Small business: {'Yes' if is_small else 'No'}\n"
f"Est. gov fee: ${(fee['registration_fee_cents'] + fee['processing_fee_cents']) / 100:,.2f}\n"
f"Customer: {customer_email}\n\n"
f"Steps:\n" + "\n".join(steps),
todo_description,
json.dumps(todo_data),
))
conn.commit()
notify_fulfillment_todo(
title=todo_title,
order_number=order_number,
service_slug=self.SERVICE_SLUG,
priority="high",
description=todo_description,
)
finally:
conn.close()
LOG.info("[%s] Admin todo created for PHMSA hazmat registration", order_number)

View file

@ -27,6 +27,8 @@ import logging
import os
from datetime import datetime
from scripts.workers.telegram_notify import notify_fulfillment_todo
LOG = logging.getLogger("workers.services.mailbox_setup")
@ -101,6 +103,18 @@ class MailboxSetupHandler:
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
todo_title = f"Mailbox Setup — {entity_name} ({formation_state})"
todo_description = (
f"Set up Anytime Mailbox for {entity_name} in {formation_state}.\n\n"
f"Steps:\n"
f"1. Customer e-signs USPS Form 1583 (link sent)\n"
f"2. Schedule online notarization session\n"
f"3. Submit notarized 1583 to Anytime Mailbox\n"
f"4. Activate mailbox, provide address to customer\n\n"
f"Customer: {customer_email}\n"
f"Photo ID: on file in MinIO (from intake)\n"
f"Notarization: use same service as CRTC BITS"
)
with conn.cursor() as cur:
cur.execute("""
INSERT INTO admin_todos (
@ -108,20 +122,12 @@ class MailboxSetupHandler:
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""", (
f"Mailbox Setup — {entity_name} ({formation_state})",
todo_title,
"provisioning",
"normal",
order_number,
self.SERVICE_SLUG,
f"Set up Anytime Mailbox for {entity_name} in {formation_state}.\n\n"
f"Steps:\n"
f"1. Customer e-signs USPS Form 1583 (link sent)\n"
f"2. Schedule online notarization session\n"
f"3. Submit notarized 1583 to Anytime Mailbox\n"
f"4. Activate mailbox, provide address to customer\n\n"
f"Customer: {customer_email}\n"
f"Photo ID: on file in MinIO (from intake)\n"
f"Notarization: use same service as CRTC BITS",
todo_description,
json.dumps({
"order_number": order_number,
"entity_name": entity_name,
@ -130,6 +136,13 @@ class MailboxSetupHandler:
}),
))
conn.commit()
notify_fulfillment_todo(
title=todo_title,
order_number=order_number,
service_slug=self.SERVICE_SLUG,
priority="normal",
description=todo_description,
)
conn.close()
except Exception as exc:
LOG.error("[%s] Failed to create mailbox todo: %s", order_number, exc)

View file

@ -47,6 +47,8 @@ import logging
import os
from datetime import datetime
from scripts.workers.telegram_notify import notify_fulfillment_todo
LOG = logging.getLogger("workers.services.mcs150_update")
@ -246,6 +248,16 @@ class MCS150UpdateHandler:
filed_method = filing_result.get("method", "pending") if filing_result else "pending"
filed_ok = filing_result.get("success", False) if filing_result else False
todo_title = f"MCS-150 {'Filed' if filed_ok else 'Review'}{entity_name} (DOT {dot_number})"
todo_priority = "low" if filed_ok else "normal"
todo_description = (
f"MCS-150 for {entity_name} (DOT {dot_number}).\n"
f"Filing method: {filed_method}\n"
f"Status: {'SUBMITTED — verify in 5-10 days' if filed_ok else 'NEEDS MANUAL FILING'}\n"
f"Customer: {customer_email}\n"
f"PDF: {minio_path or 'not generated'}"
)
with conn.cursor() as cur:
cur.execute("""
INSERT INTO admin_todos (
@ -253,16 +265,12 @@ class MCS150UpdateHandler:
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""", (
f"MCS-150 {'Filed' if filed_ok else 'Review'}{entity_name} (DOT {dot_number})",
todo_title,
"filing",
"low" if filed_ok else "normal",
todo_priority,
order_number,
self.SERVICE_SLUG,
f"MCS-150 for {entity_name} (DOT {dot_number}).\n"
f"Filing method: {filed_method}\n"
f"Status: {'SUBMITTED — verify in 5-10 days' if filed_ok else 'NEEDS MANUAL FILING'}\n"
f"Customer: {customer_email}\n"
f"PDF: {minio_path or 'not generated'}",
todo_description,
json.dumps({
"order_number": order_number,
"dot_number": dot_number,
@ -271,6 +279,13 @@ class MCS150UpdateHandler:
}),
))
conn.commit()
notify_fulfillment_todo(
title=todo_title,
order_number=order_number,
service_slug=self.SERVICE_SLUG,
priority=todo_priority,
description=todo_description,
)
conn.close()
except Exception as exc:
LOG.error("[%s] Failed to create admin todo: %s", order_number, exc)
@ -311,6 +326,14 @@ class MCS150UpdateHandler:
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
todo_title = f"Awaiting client signature — {entity_name} (DOT {dot_number})"
todo_description = (
f"{slug} for {entity_name} (DOT {dot_number}).\n"
f"Status: AWAITING CLIENT SIGNATURE before filing.\n"
f"Signing link emailed to {customer_email}.\n"
f"PDF: {minio_path or 'not generated'}\n"
f"Filing auto-resumes once the client signs."
)
with conn.cursor() as cur:
cur.execute(
"""
@ -320,18 +343,21 @@ class MCS150UpdateHandler:
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""",
(
f"Awaiting client signature — {entity_name} (DOT {dot_number})",
todo_title,
"filing", "low", order_number, slug,
f"{slug} for {entity_name} (DOT {dot_number}).\n"
f"Status: AWAITING CLIENT SIGNATURE before filing.\n"
f"Signing link emailed to {customer_email}.\n"
f"PDF: {minio_path or 'not generated'}\n"
f"Filing auto-resumes once the client signs.",
todo_description,
json.dumps({"order_number": order_number, "dot_number": dot_number,
"entity_name": entity_name, "awaiting_signature": True}),
),
)
conn.commit()
notify_fulfillment_todo(
title=todo_title,
order_number=order_number,
service_slug=slug,
priority="low",
description=todo_description,
)
conn.close()
LOG.info("[%s] Pending-signature todo created", order_number)
except Exception as exc:

View file

@ -33,6 +33,8 @@ import json
import logging
import os
from scripts.workers.telegram_notify import notify_fulfillment_todo
LOG = logging.getLogger("workers.services.npi_provider")
# Per-slug admin todo metadata: human-readable action + the portal a human uses.
@ -411,6 +413,13 @@ class _BaseNPIHandler:
),
)
conn.commit()
notify_fulfillment_todo(
title=title,
order_number=order_number,
service_slug=self.SERVICE_SLUG,
priority=priority,
description=description,
)
conn.close()
except Exception as exc:
LOG.error("[%s] Failed to create NPI todo: %s", order_number, exc)

View file

@ -30,6 +30,8 @@ import logging
import os
from datetime import datetime
from scripts.workers.telegram_notify import notify_fulfillment_todo
LOG = logging.getLogger("workers.services.state_trucking")
# State motor carrier fulfillment lifecycle (compliance_orders.fulfillment_status,
@ -331,6 +333,22 @@ class StateTruckingHandler:
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
todo_title = (
f"{service_name}{entity_name} (DOT {dot_number})"
if dot_number else f"{service_name}{entity_name}"
)
todo_priority = (
"high" if service_slug in ("ca-mcp-carb", "state-trucking-bundle") else "normal"
)
todo_description = (
f"Service: {service_name}\n"
f"DOT: {dot_number}\n"
f"Base state: {base_state}\n"
f"Operating states: {', '.join(operating_states) if operating_states else 'N/A'}\n"
f"Customer: {customer_email}\n"
+ intake_lines +
f"\nSteps:\n" + "\n".join(steps)
)
try:
with conn.cursor() as cur:
cur.execute("""
@ -339,22 +357,22 @@ class StateTruckingHandler:
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""", (
f"{service_name}{entity_name} (DOT {dot_number})"
if dot_number else f"{service_name}{entity_name}",
todo_title,
"filing",
"high" if service_slug in ("ca-mcp-carb", "state-trucking-bundle") else "normal",
todo_priority,
order_number,
service_slug,
f"Service: {service_name}\n"
f"DOT: {dot_number}\n"
f"Base state: {base_state}\n"
f"Operating states: {', '.join(operating_states) if operating_states else 'N/A'}\n"
f"Customer: {customer_email}\n"
+ intake_lines +
f"\nSteps:\n" + "\n".join(steps),
todo_description,
json.dumps(todo_data),
))
conn.commit()
notify_fulfillment_todo(
title=todo_title,
order_number=order_number,
service_slug=service_slug,
priority=todo_priority,
description=todo_description,
)
finally:
conn.close()
LOG.info("[%s] Admin todo created for %s", order_number, service_name)

View file

@ -0,0 +1,136 @@
"""Shared Telegram notifier for the fulfillment workers.
One place to send operator notifications so every worker/handler reports the
same way (and we can change transport once). Fire-and-forget: a Telegram outage
must never break fulfillment.
Env: TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID.
"""
from __future__ import annotations
import json
import logging
import os
import urllib.request
LOG = logging.getLogger("workers.telegram_notify")
def telegram_enabled() -> bool:
return bool(os.getenv("TELEGRAM_BOT_TOKEN") and os.getenv("TELEGRAM_CHAT_ID"))
def send_telegram(text: str, *, parse_mode: str | None = None) -> bool:
"""Send a Telegram message. Returns True on success, never raises.
Safe to call from any worker; no-ops (returns False) if not configured.
"""
bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot_token or not chat_id:
return False
try:
payload: dict = {"chat_id": chat_id, "text": text}
if parse_mode:
payload["parse_mode"] = parse_mode
data = json.dumps(payload).encode()
req = urllib.request.Request(
f"https://api.telegram.org/bot{bot_token}/sendMessage",
data=data,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=10) # nosec - trusted Telegram API
return True
except Exception as exc: # never break fulfillment on a notify failure
LOG.warning("Telegram notify failed: %s", exc)
return False
def create_admin_todo(
*,
title: str,
order_number: str,
service_slug: str,
description: str,
data: dict | None = None,
category: str = "filing",
priority: str = "normal",
notify: bool = True,
) -> bool:
"""Insert an admin_todos row AND fire an operator Telegram alert.
Single source of truth for fulfillment-task creation so every worker both
persists the task and notifies the operator. The insert and the notify are
independent: a notify failure never blocks the task, and a DB failure is
logged (returns False) without raising.
Column order matches the shared admin_todos schema used across handlers:
(title, category, priority, order_number, service_slug, description, data, status).
"""
ok = False
try:
import psycopg2 # local import: keep module importable without psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO admin_todos (
title, category, priority, order_number, service_slug,
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""",
(
title, category, priority, order_number, service_slug,
description, json.dumps(data or {}),
),
)
conn.commit()
ok = True
finally:
conn.close()
except Exception as exc:
LOG.error("create_admin_todo failed for %s/%s: %s", order_number, service_slug, exc)
return False
if notify:
notify_fulfillment_todo(
title=title,
order_number=order_number,
service_slug=service_slug,
priority=priority,
description=description,
)
return ok
def notify_fulfillment_todo(
*,
title: str,
order_number: str,
service_slug: str,
priority: str = "normal",
description: str = "",
) -> bool:
"""Operator alert that a fulfillment task needs attention.
Called whenever an admin_todo is created so nothing waits unseen in the
queue. The description is trimmed to keep the message readable.
"""
icon = {"high": "🔴", "urgent": "🔴", "normal": "🟡", "low": ""}.get(
(priority or "normal").lower(), "🟡"
)
desc = (description or "").strip()
if len(desc) > 600:
desc = desc[:600] + ""
text = (
f"{icon} FULFILLMENT NEEDED\n\n"
f"{title}\n"
f"Service: {service_slug}\n"
f"Order: {order_number}\n"
f"Priority: {priority}\n"
)
if desc:
text += f"\n{desc}"
return send_telegram(text)