new-site/scripts/tests/ai_retry.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

306 lines
11 KiB
Python

"""
AI auto-correction harness using OpenCode.
On Playwright failure, captures screenshot + DOM + error,
invokes OpenCode to diagnose and fix, then retries the step.
Usage:
@ai_retry(max_retries=3, step_name="fill_director_name")
async def fill_director_name(page, data):
await page.fill("#director_name", data["director_name"])
"""
from __future__ import annotations
import asyncio
import base64
import functools
import json
import logging
import os
import subprocess
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Optional
LOG = logging.getLogger("tests.ai_retry")
SCREENSHOT_DIR = Path(__file__).parent / "screenshots"
SCREENSHOT_DIR.mkdir(exist_ok=True)
CORRECTIONS_FILE = Path(__file__).parent / "discovered_selectors.json"
OPENCODE_BIN = os.environ.get("OPENCODE_BIN", "opencode")
# Accumulated corrections across the test run
_corrections: dict[str, str] = {}
def _load_corrections() -> dict:
global _corrections
if CORRECTIONS_FILE.exists():
try:
_corrections = json.loads(CORRECTIONS_FILE.read_text())
except Exception:
_corrections = {}
return _corrections
def _save_corrections():
CORRECTIONS_FILE.write_text(json.dumps(_corrections, indent=2))
def _take_screenshot(page, step_name: str, suffix: str = "") -> Path:
"""Capture a full-page screenshot and return the path."""
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{step_name}_{ts}{suffix}.png"
path = SCREENSHOT_DIR / filename
try:
page.screenshot(path=str(path), full_page=True, timeout=10000)
except Exception as e:
LOG.warning("Screenshot failed for %s: %s", step_name, e)
return path
return path
def _capture_dom(page, max_bytes: int = 50_000) -> str:
"""Get the page HTML, truncated."""
try:
html = page.content()
return html[:max_bytes]
except Exception:
return ""
def _ask_opencode(prompt: str, screenshot_path: Optional[Path] = None) -> str:
"""
Invoke OpenCode CLI to analyze a failure and return a corrected selector.
Returns the raw text output from OpenCode.
"""
# Build the prompt with screenshot reference if available
full_prompt = prompt
if screenshot_path and screenshot_path.exists():
full_prompt += f"\n\nScreenshot saved at: {screenshot_path}"
try:
result = subprocess.run(
[OPENCODE_BIN, "--print", full_prompt],
capture_output=True,
text=True,
timeout=120,
cwd=str(Path(__file__).parent.parent.parent), # project root
)
output = result.stdout.strip()
if result.returncode != 0:
LOG.warning("OpenCode returned %d: %s", result.returncode, result.stderr[:200])
return output
except subprocess.TimeoutExpired:
LOG.error("OpenCode timed out after 120s")
return ""
except FileNotFoundError:
LOG.error("OpenCode binary not found at %s", OPENCODE_BIN)
return ""
def _extract_selector_from_response(response: str) -> Optional[str]:
"""
Parse OpenCode's response for a CSS/XPath selector.
Looks for quoted strings or code blocks containing selectors.
"""
import re
# Look for selectors in code blocks
code_blocks = re.findall(r'`([^`]+)`', response)
for block in code_blocks:
block = block.strip()
# CSS selector patterns
if block.startswith(('#', '.', '[', 'input', 'button', 'select', 'textarea', 'a', 'div', 'span', 'table')):
return block
# XPath
if block.startswith(('/', 'xpath=')):
return block
# Look for quoted selectors in text
quoted = re.findall(r'"([#.\[][^"]+)"', response)
for q in quoted:
if len(q) > 2 and any(c in q for c in '#.['):
return q
return None
class StepResult:
"""Result of a test step with metadata."""
def __init__(self, step_name: str):
self.step_name = step_name
self.success = False
self.attempts = 0
self.ai_corrections: list[dict] = []
self.screenshots: list[str] = []
self.error: Optional[str] = None
self.started_at = time.time()
self.finished_at: Optional[float] = None
def finish(self, success: bool, error: str = ""):
self.success = success
self.error = error if not success else ""
self.finished_at = time.time()
@property
def duration(self) -> float:
end = self.finished_at or time.time()
return round(end - self.started_at, 2)
def to_dict(self) -> dict:
return {
"step": self.step_name,
"success": self.success,
"attempts": self.attempts,
"ai_corrections": self.ai_corrections,
"screenshots": [str(s) for s in self.screenshots],
"error": self.error,
"duration_s": self.duration,
}
# Global test run results
_run_results: list[StepResult] = []
def get_run_results() -> list[StepResult]:
return _run_results
def ai_retry(max_retries: int = 3, step_name: str = "", use_opus: bool = False):
"""
Decorator that wraps a Playwright step with AI-powered auto-correction.
On failure:
1. Takes screenshot + captures DOM
2. Asks OpenCode to diagnose the issue
3. If OpenCode returns a corrected selector, retries with it
4. Logs all corrections for future runs
Args:
max_retries: Max AI-assisted retry attempts
step_name: Human-readable step name for logging/screenshots
use_opus: If True, instructs OpenCode to use Opus model for analysis
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
name = step_name or func.__name__
result = StepResult(name)
_run_results.append(result)
# Check if we have a prior correction for this step
_load_corrections()
prior_fix = _corrections.get(name)
for attempt in range(1 + max_retries):
result.attempts = attempt + 1
try:
# On retry with correction, inject it via kwargs
if attempt > 0 and prior_fix:
kwargs["_corrected_selector"] = prior_fix
ret = func(*args, **kwargs)
# Success — take a success screenshot
page = args[0] if args else kwargs.get("page")
if page and hasattr(page, 'screenshot'):
ss = _take_screenshot(page, name, "_pass")
result.screenshots.append(ss)
result.finish(True)
LOG.info("[%s] PASS (attempt %d)", name, attempt + 1)
return ret
except Exception as exc:
err_msg = str(exc)
LOG.warning("[%s] FAIL attempt %d: %s", name, attempt + 1, err_msg[:200])
# Take failure screenshot
page = args[0] if args else kwargs.get("page")
if page and hasattr(page, 'screenshot'):
ss = _take_screenshot(page, name, f"_fail_{attempt}")
result.screenshots.append(ss)
if attempt < max_retries:
# Ask OpenCode for help
dom = _capture_dom(page)
model_hint = "Use Opus for this analysis." if use_opus else ""
prompt = (
f"{model_hint}\n"
f"A Playwright E2E test step '{name}' failed.\n"
f"Error: {err_msg}\n"
f"Function: {func.__name__}\n"
f"The page screenshot is at {ss}\n\n"
f"Page HTML (first 30KB):\n```html\n{dom[:30000]}\n```\n\n"
f"Analyze the page and return the correct CSS selector "
f"for this step. Return ONLY the selector string in backticks."
)
LOG.info("[%s] Asking OpenCode for correction...", name)
response = _ask_opencode(prompt, ss)
corrected = _extract_selector_from_response(response)
if corrected:
LOG.info("[%s] AI correction: %s", name, corrected)
prior_fix = corrected
_corrections[name] = corrected
_save_corrections()
result.ai_corrections.append({
"attempt": attempt + 1,
"original_error": err_msg[:200],
"corrected_selector": corrected,
})
else:
LOG.warning("[%s] OpenCode did not return a usable selector", name)
if attempt == max_retries:
result.finish(False, err_msg)
LOG.error("[%s] FAILED after %d attempts", name, attempt + 1)
raise
return wrapper
return decorator
def step(step_name: str, use_opus: bool = False):
"""
Simpler decorator for steps that don't need AI retry —
just captures screenshots and logs results.
"""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
name = step_name or func.__name__
result = StepResult(name)
_run_results.append(result)
result.attempts = 1
try:
ret = func(*args, **kwargs)
page = args[0] if args else kwargs.get("page")
if page and hasattr(page, 'screenshot'):
ss = _take_screenshot(page, name, "_pass")
result.screenshots.append(ss)
result.finish(True)
LOG.info("[%s] PASS", name)
return ret
except Exception as exc:
page = args[0] if args else kwargs.get("page")
if page and hasattr(page, 'screenshot'):
ss = _take_screenshot(page, name, "_fail")
result.screenshots.append(ss)
result.finish(False, str(exc))
LOG.error("[%s] FAIL: %s", name, str(exc)[:200])
raise
return wrapper
return decorator