new-site/scripts/workers/telegram_notify.py
justin 28b1af341d Wire fulfillment alerts to Telegram + surface order progress in portal + even out ERPNext sync
Telegram notifications:
- Add shared scripts/workers/telegram_notify.py (send_telegram, notify_fulfillment_todo,
  create_admin_todo) so every worker alerts the operator the same way; fire-and-forget.
- Fire notify_fulfillment_todo after each admin_todos insert across all 8 service
  handlers (9 sites) so no fulfillment task waits unseen.
  (Orders + quotes + tickets already notified via checkout/quotes/tickets routes.)

Client portal order progress:
- order-timeline: derive real per-step status from live signals (payment paid,
  e-signature signed, fulfillment_status) instead of a static template; add
  current_step to the response.
- Extract pure applyLiveStatus into order-timeline-status.ts (DB-free) + unit test
  (api/test/test_timeline_status.ts, 8 cases).
- portal /me now returns compliance_orders.fulfillment_status.
- Dashboard renders a client-safe Progress badge (In progress / Action needed /
  Filed-awaiting-confirmation / Completed); batches show the most actionable status.
  No back-office mechanics exposed.

ERPNext sync parity:
- Create a Sales Order for formation and fcc_carrier_registration orders (previously
  only canada_crtc + compliance synced); write erpnext_sales_order back to each table.
  Non-blocking, matches existing pattern.

Verified: API tsc clean, timeline unit tests 8/8, Astro build 58 pages,
cms10114/ink/paper_batch Python tests still green, no mechanics leaks.
2026-06-07 03:17:46 -05:00

136 lines
4.3 KiB
Python

"""Shared Telegram notifier for the fulfillment workers.
One place to send operator notifications so every worker/handler reports the
same way (and we can change transport once). Fire-and-forget: a Telegram outage
must never break fulfillment.
Env: TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID.
"""
from __future__ import annotations
import json
import logging
import os
import urllib.request
LOG = logging.getLogger("workers.telegram_notify")
def telegram_enabled() -> bool:
return bool(os.getenv("TELEGRAM_BOT_TOKEN") and os.getenv("TELEGRAM_CHAT_ID"))
def send_telegram(text: str, *, parse_mode: str | None = None) -> bool:
"""Send a Telegram message. Returns True on success, never raises.
Safe to call from any worker; no-ops (returns False) if not configured.
"""
bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot_token or not chat_id:
return False
try:
payload: dict = {"chat_id": chat_id, "text": text}
if parse_mode:
payload["parse_mode"] = parse_mode
data = json.dumps(payload).encode()
req = urllib.request.Request(
f"https://api.telegram.org/bot{bot_token}/sendMessage",
data=data,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=10) # nosec - trusted Telegram API
return True
except Exception as exc: # never break fulfillment on a notify failure
LOG.warning("Telegram notify failed: %s", exc)
return False
def create_admin_todo(
*,
title: str,
order_number: str,
service_slug: str,
description: str,
data: dict | None = None,
category: str = "filing",
priority: str = "normal",
notify: bool = True,
) -> bool:
"""Insert an admin_todos row AND fire an operator Telegram alert.
Single source of truth for fulfillment-task creation so every worker both
persists the task and notifies the operator. The insert and the notify are
independent: a notify failure never blocks the task, and a DB failure is
logged (returns False) without raising.
Column order matches the shared admin_todos schema used across handlers:
(title, category, priority, order_number, service_slug, description, data, status).
"""
ok = False
try:
import psycopg2 # local import: keep module importable without psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO admin_todos (
title, category, priority, order_number, service_slug,
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""",
(
title, category, priority, order_number, service_slug,
description, json.dumps(data or {}),
),
)
conn.commit()
ok = True
finally:
conn.close()
except Exception as exc:
LOG.error("create_admin_todo failed for %s/%s: %s", order_number, service_slug, exc)
return False
if notify:
notify_fulfillment_todo(
title=title,
order_number=order_number,
service_slug=service_slug,
priority=priority,
description=description,
)
return ok
def notify_fulfillment_todo(
*,
title: str,
order_number: str,
service_slug: str,
priority: str = "normal",
description: str = "",
) -> bool:
"""Operator alert that a fulfillment task needs attention.
Called whenever an admin_todo is created so nothing waits unseen in the
queue. The description is trimmed to keep the message readable.
"""
icon = {"high": "🔴", "urgent": "🔴", "normal": "🟡", "low": ""}.get(
(priority or "normal").lower(), "🟡"
)
desc = (description or "").strip()
if len(desc) > 600:
desc = desc[:600] + ""
text = (
f"{icon} FULFILLMENT NEEDED\n\n"
f"{title}\n"
f"Service: {service_slug}\n"
f"Order: {order_number}\n"
f"Priority: {priority}\n"
)
if desc:
text += f"\n{desc}"
return send_telegram(text)