new-site/scripts/workers/telegram_notify.py
justin b28dda7c5a feat(telegram): include a presigned PDF view link in the admin-review alert
When an MCS-150/USDOT order hits the pre-submission admin-verification gate, the
Telegram FULFILLMENT NEEDED alert now appends a presigned link to the prepared
PDF (via the public minio.performancewest.net endpoint, IP-allowlisted to admin)
so you can review the document straight from the alert before approving. Added
notify_fulfillment_todo(view_url=...) + a _presigned_view_url helper (public
endpoint + explicit region to avoid the region-probe that 403s from the worker).
2026-06-10 12:13:43 -05:00

141 lines
4.6 KiB
Python

"""Shared Telegram notifier for the fulfillment workers.
One place to send operator notifications so every worker/handler reports the
same way (and we can change transport once). Fire-and-forget: a Telegram outage
must never break fulfillment.
Env: TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID.
"""
from __future__ import annotations
import json
import logging
import os
import urllib.request
LOG = logging.getLogger("workers.telegram_notify")
def telegram_enabled() -> bool:
return bool(os.getenv("TELEGRAM_BOT_TOKEN") and os.getenv("TELEGRAM_CHAT_ID"))
def send_telegram(text: str, *, parse_mode: str | None = None) -> bool:
"""Send a Telegram message. Returns True on success, never raises.
Safe to call from any worker; no-ops (returns False) if not configured.
"""
bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot_token or not chat_id:
return False
try:
payload: dict = {"chat_id": chat_id, "text": text}
if parse_mode:
payload["parse_mode"] = parse_mode
data = json.dumps(payload).encode()
req = urllib.request.Request(
f"https://api.telegram.org/bot{bot_token}/sendMessage",
data=data,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=10) # nosec - trusted Telegram API
return True
except Exception as exc: # never break fulfillment on a notify failure
LOG.warning("Telegram notify failed: %s", exc)
return False
def create_admin_todo(
*,
title: str,
order_number: str,
service_slug: str,
description: str,
data: dict | None = None,
category: str = "filing",
priority: str = "normal",
notify: bool = True,
) -> bool:
"""Insert an admin_todos row AND fire an operator Telegram alert.
Single source of truth for fulfillment-task creation so every worker both
persists the task and notifies the operator. The insert and the notify are
independent: a notify failure never blocks the task, and a DB failure is
logged (returns False) without raising.
Column order matches the shared admin_todos schema used across handlers:
(title, category, priority, order_number, service_slug, description, data, status).
"""
ok = False
try:
import psycopg2 # local import: keep module importable without psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO admin_todos (
title, category, priority, order_number, service_slug,
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""",
(
title, category, priority, order_number, service_slug,
description, json.dumps(data or {}),
),
)
conn.commit()
ok = True
finally:
conn.close()
except Exception as exc:
LOG.error("create_admin_todo failed for %s/%s: %s", order_number, service_slug, exc)
return False
if notify:
notify_fulfillment_todo(
title=title,
order_number=order_number,
service_slug=service_slug,
priority=priority,
description=description,
)
return ok
def notify_fulfillment_todo(
*,
title: str,
order_number: str,
service_slug: str,
priority: str = "normal",
description: str = "",
view_url: str = "",
) -> bool:
"""Operator alert that a fulfillment task needs attention.
Called whenever an admin_todo is created so nothing waits unseen in the
queue. The description is trimmed to keep the message readable. ``view_url``
(a presigned link to the prepared document) is appended when supplied so the
operator can review the PDF directly from the alert.
"""
icon = {"high": "🔴", "urgent": "🔴", "normal": "🟡", "low": ""}.get(
(priority or "normal").lower(), "🟡"
)
desc = (description or "").strip()
if len(desc) > 600:
desc = desc[:600] + ""
text = (
f"{icon} FULFILLMENT NEEDED\n\n"
f"{title}\n"
f"Service: {service_slug}\n"
f"Order: {order_number}\n"
f"Priority: {priority}\n"
)
if desc:
text += f"\n{desc}"
if view_url:
text += f"\n\n📄 Review the prepared document:\n{view_url}"
return send_telegram(text)