"""Shared Telegram notifier for the fulfillment workers. One place to send operator notifications so every worker/handler reports the same way (and we can change transport once). Fire-and-forget: a Telegram outage must never break fulfillment. Env: TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID. """ from __future__ import annotations import json import logging import os import urllib.request LOG = logging.getLogger("workers.telegram_notify") def telegram_enabled() -> bool: return bool(os.getenv("TELEGRAM_BOT_TOKEN") and os.getenv("TELEGRAM_CHAT_ID")) def send_telegram(text: str, *, parse_mode: str | None = None) -> bool: """Send a Telegram message. Returns True on success, never raises. Safe to call from any worker; no-ops (returns False) if not configured. """ bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "") chat_id = os.getenv("TELEGRAM_CHAT_ID", "") if not bot_token or not chat_id: return False try: payload: dict = {"chat_id": chat_id, "text": text} if parse_mode: payload["parse_mode"] = parse_mode data = json.dumps(payload).encode() req = urllib.request.Request( f"https://api.telegram.org/bot{bot_token}/sendMessage", data=data, headers={"Content-Type": "application/json"}, ) urllib.request.urlopen(req, timeout=10) # nosec - trusted Telegram API return True except Exception as exc: # never break fulfillment on a notify failure LOG.warning("Telegram notify failed: %s", exc) return False def create_admin_todo( *, title: str, order_number: str, service_slug: str, description: str, data: dict | None = None, category: str = "filing", priority: str = "normal", notify: bool = True, ) -> bool: """Insert an admin_todos row AND fire an operator Telegram alert. Single source of truth for fulfillment-task creation so every worker both persists the task and notifies the operator. The insert and the notify are independent: a notify failure never blocks the task, and a DB failure is logged (returns False) without raising. Column order matches the shared admin_todos schema used across handlers: (title, category, priority, order_number, service_slug, description, data, status). """ ok = False try: import psycopg2 # local import: keep module importable without psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( """ INSERT INTO admin_todos ( title, category, priority, order_number, service_slug, description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( title, category, priority, order_number, service_slug, description, json.dumps(data or {}), ), ) conn.commit() ok = True finally: conn.close() except Exception as exc: LOG.error("create_admin_todo failed for %s/%s: %s", order_number, service_slug, exc) return False if notify: notify_fulfillment_todo( title=title, order_number=order_number, service_slug=service_slug, priority=priority, description=description, ) return ok def notify_fulfillment_todo( *, title: str, order_number: str, service_slug: str, priority: str = "normal", description: str = "", ) -> bool: """Operator alert that a fulfillment task needs attention. Called whenever an admin_todo is created so nothing waits unseen in the queue. The description is trimmed to keep the message readable. """ icon = {"high": "🔴", "urgent": "🔴", "normal": "🟡", "low": "⚪"}.get( (priority or "normal").lower(), "🟡" ) desc = (description or "").strip() if len(desc) > 600: desc = desc[:600] + "…" text = ( f"{icon} FULFILLMENT NEEDED\n\n" f"{title}\n" f"Service: {service_slug}\n" f"Order: {order_number}\n" f"Priority: {priority}\n" ) if desc: text += f"\n{desc}" return send_telegram(text)