"""Playwright failure monitoring and alerting. Provides: 1. alert_playwright_failure() — sends Telegram + email notification when a Playwright submission fails, with screenshot link 2. upload_failure_screenshot() — saves screenshot to MinIO for debugging 3. PlaywrightHealthCheck — scheduled probe that verifies selectors are still valid on target portals without submitting anything Used by all Playwright-based handlers (RMD, 499-A, CPNI, CORES, BDC, etc.) """ from __future__ import annotations import logging import os import tempfile from datetime import datetime from pathlib import Path from typing import Optional logger = logging.getLogger("workers.services.telecom.playwright_monitor") ADMIN_EMAIL = os.environ.get("ADMIN_EMAIL", "ops@performancewest.net") MINIO_BUCKET = os.environ.get("MINIO_BUCKET", "performancewest") def upload_failure_screenshot( page, order_number: str, service_slug: str, work_dir: Optional[str] = None, ) -> Optional[str]: """Take a full-page screenshot and upload to MinIO. Returns the MinIO object key (path) or None if upload failed. """ try: work_dir = work_dir or tempfile.mkdtemp(prefix=f"pw_fail_{order_number}_") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"playwright_error_{service_slug}_{timestamp}.png" local_path = os.path.join(work_dir, filename) # Use sync API if page supports it, otherwise try async import asyncio if asyncio.iscoroutinefunction(getattr(page, 'screenshot', None)): loop = asyncio.get_event_loop() loop.run_until_complete(page.screenshot(path=local_path, full_page=True)) else: page.screenshot(path=local_path, full_page=True) # Upload to MinIO minio_key = f"compliance/{order_number}/errors/{filename}" try: from scripts.document_gen.minio_client import MinIOStorage storage = MinIOStorage() storage.upload(local_path, minio_key) logger.info("Failure screenshot uploaded: %s", minio_key) return minio_key except Exception as exc: logger.warning("MinIO screenshot upload failed: %s", exc) return None except Exception as exc: logger.warning("Screenshot capture failed: %s", exc) return None async def upload_failure_screenshot_async( page, order_number: str, service_slug: str, work_dir: Optional[str] = None, ) -> Optional[str]: """Async version — take screenshot and upload to MinIO.""" try: work_dir = work_dir or tempfile.mkdtemp(prefix=f"pw_fail_{order_number}_") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"playwright_error_{service_slug}_{timestamp}.png" local_path = os.path.join(work_dir, filename) await page.screenshot(path=local_path, full_page=True) minio_key = f"compliance/{order_number}/errors/{filename}" try: from scripts.document_gen.minio_client import MinIOStorage storage = MinIOStorage() storage.upload(local_path, minio_key) logger.info("Failure screenshot uploaded: %s", minio_key) return minio_key except Exception as exc: logger.warning("MinIO screenshot upload failed: %s", exc) return None except Exception as exc: logger.warning("Async screenshot capture failed: %s", exc) return None def alert_playwright_failure( order_number: str, service_slug: str, service_name: str, entity_name: str, error: Exception, screenshot_key: Optional[str] = None, portal_url: str = "", ) -> None: """Send Telegram + email alert when a Playwright submission fails. This fires immediately so ops knows a client order is stuck. """ error_msg = str(error)[:300] screenshot_info = "" if screenshot_key: minio_url = os.environ.get("MINIO_CONSOLE_URL", "https://minio-console.performancewest.net") screenshot_info = f"\nScreenshot: {minio_url}/browser/{MINIO_BUCKET}/{screenshot_key}" alert_body = ( f"🔴 PLAYWRIGHT FAILURE\n\n" f"Service: {service_name} ({service_slug})\n" f"Order: {order_number}\n" f"Entity: {entity_name}\n" f"Portal: {portal_url}\n\n" f"Error: {error_msg}\n" f"{screenshot_info}\n\n" f"The client's order is stuck. Either:\n" f"1. Fix the selector and re-dispatch the order\n" f"2. File manually at the portal URL above" ) # Telegram alert _send_telegram_alert(alert_body) # Email alert _send_email_alert( subject=f"[PLAYWRIGHT FAIL] {service_name} — {entity_name} ({order_number})", body=alert_body, ) def _send_telegram_alert(message: str) -> None: """Send alert to Telegram bot.""" try: bot_token = os.environ.get("TELEGRAM_BOT_TOKEN", "") chat_id = os.environ.get("TELEGRAM_CHAT_ID", "") if not bot_token or not chat_id: logger.debug("Telegram not configured — skipping alert") return import urllib.request import urllib.parse import json # Truncate for Telegram's 4096 char limit msg = message[:4000] data = urllib.parse.urlencode({ "chat_id": chat_id, "text": msg, "parse_mode": "", }).encode() req = urllib.request.Request( f"https://api.telegram.org/bot{bot_token}/sendMessage", data=data, method="POST", ) urllib.request.urlopen(req, timeout=10) logger.info("Telegram playwright failure alert sent") except Exception as exc: logger.warning("Telegram alert failed: %s", exc) def _send_email_alert(subject: str, body: str) -> None: """Send failure alert email to admin.""" try: import smtplib from email.mime.text import MIMEText smtp_host = os.environ.get("SMTP_HOST", "co.carrierone.com") smtp_port = int(os.environ.get("SMTP_PORT", "587")) smtp_user = os.environ.get("SMTP_USER", "") smtp_pass = os.environ.get("SMTP_PASS", "") if not smtp_user or not smtp_pass: return msg = MIMEText(body) msg["From"] = os.environ.get("SMTP_FROM", "Performance West ") msg["To"] = ADMIN_EMAIL msg["Subject"] = subject with smtplib.SMTP(smtp_host, smtp_port, timeout=15) as s: s.starttls() s.login(smtp_user, smtp_pass) s.send_message(msg) logger.info("Playwright failure email sent to %s", ADMIN_EMAIL) except Exception as exc: logger.warning("Failure email send failed: %s", exc) # ═══════════════════════════════════════════════════════════════════════════ # Proactive Selector Health Check # ═══════════════════════════════════════════════════════════════════════════ # Portal definitions — each portal has a URL, expected selectors, and a # login state file. The health check navigates to the portal and verifies # all critical selectors exist without submitting anything. PORTAL_CHECKS = [ { "name": "FCC RMD Portal", "url": "https://fccprod.servicenowservices.com/rmd", "storage_state": "/app/data/rmd_session.json", "selectors": [ 'text="File Certification"', 'input[name="frn"]', ], "service_slugs": ["rmd-filing"], }, { "name": "USAC E-File", "url": "https://forms.universalservice.org", "storage_state": "/app/data/usac_session.json", "selectors": [ 'text="Form 499-A"', 'text="Form 499-Q"', ], "service_slugs": ["fcc-499a", "fcc-499a-zero", "fcc-499a-499q", "fcc-499q"], }, { "name": "FCC CPNI (ECFS)", "url": "https://www.fcc.gov/ecfs/search/search-filings", "storage_state": None, "selectors": [ 'input[id*="search"], input[name*="search"]', ], "service_slugs": ["cpni-certification"], }, ] async def run_selector_health_check(dry_run: bool = False) -> list[dict]: """Proactively check all portal selectors are still valid. Returns a list of failed checks. Sends Telegram alert for each failure. Called by a daily/weekly cron. """ failures = [] try: from playwright.async_api import async_playwright except ImportError: logger.warning("Playwright not available — skipping health check") return [] async with async_playwright() as p: browser = await p.chromium.launch(headless=True) for portal in PORTAL_CHECKS: name = portal["name"] url = portal["url"] state_file = portal.get("storage_state") # Skip if no session file exists (can't access authenticated portals) if state_file and not os.path.exists(state_file): logger.info("Health check: %s — no session file, skipping", name) continue try: context_kwargs = {} if state_file: context_kwargs["storage_state"] = state_file context = await browser.new_context(**context_kwargs) page = await context.new_page() await page.goto(url, timeout=30000) missing = [] for selector in portal["selectors"]: try: el = page.locator(selector) count = await el.count() if count == 0: missing.append(selector) except Exception: missing.append(selector) if missing: failure = { "portal": name, "url": url, "missing_selectors": missing, "affects": portal["service_slugs"], } failures.append(failure) logger.warning( "Health check FAILED: %s — missing selectors: %s", name, missing, ) if not dry_run: alert_body = ( f"⚠️ PORTAL UI CHANGE DETECTED\n\n" f"Portal: {name}\n" f"URL: {url}\n" f"Missing selectors:\n" + "\n".join(f" • {s}" for s in missing) + f"\n\nAffected services: {', '.join(portal['service_slugs'])}\n\n" f"Playwright automation will FAIL for these services until " f"selectors are updated. Check the portal for UI changes." ) _send_telegram_alert(alert_body) _send_email_alert( subject=f"[SELECTOR ALERT] {name} — UI change detected", body=alert_body, ) else: logger.info("Health check OK: %s — all selectors present", name) await context.close() except Exception as exc: logger.warning("Health check error for %s: %s", name, exc) failures.append({ "portal": name, "url": url, "error": str(exc), "affects": portal["service_slugs"], }) await browser.close() return failures def main(): """CLI entrypoint for selector health check cron.""" import asyncio import argparse logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s") parser = argparse.ArgumentParser(description="Playwright selector health check") parser.add_argument("--dry-run", action="store_true", help="Check but don't alert") args = parser.parse_args() failures = asyncio.run(run_selector_health_check(dry_run=args.dry_run)) if failures: print(f"FAILED: {len(failures)} portal(s) have selector issues") for f in failures: print(f" {f['portal']}: {f.get('missing_selectors', f.get('error', '?'))}") else: print("ALL OK: All portal selectors are valid") if __name__ == "__main__": main()