Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
306 lines
11 KiB
Python
306 lines
11 KiB
Python
"""
|
|
AI auto-correction harness using OpenCode.
|
|
|
|
On Playwright failure, captures screenshot + DOM + error,
|
|
invokes OpenCode to diagnose and fix, then retries the step.
|
|
|
|
Usage:
|
|
@ai_retry(max_retries=3, step_name="fill_director_name")
|
|
async def fill_director_name(page, data):
|
|
await page.fill("#director_name", data["director_name"])
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import base64
|
|
import functools
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Optional
|
|
|
|
LOG = logging.getLogger("tests.ai_retry")
|
|
|
|
SCREENSHOT_DIR = Path(__file__).parent / "screenshots"
|
|
SCREENSHOT_DIR.mkdir(exist_ok=True)
|
|
|
|
CORRECTIONS_FILE = Path(__file__).parent / "discovered_selectors.json"
|
|
OPENCODE_BIN = os.environ.get("OPENCODE_BIN", "opencode")
|
|
|
|
# Accumulated corrections across the test run
|
|
_corrections: dict[str, str] = {}
|
|
|
|
|
|
def _load_corrections() -> dict:
|
|
global _corrections
|
|
if CORRECTIONS_FILE.exists():
|
|
try:
|
|
_corrections = json.loads(CORRECTIONS_FILE.read_text())
|
|
except Exception:
|
|
_corrections = {}
|
|
return _corrections
|
|
|
|
|
|
def _save_corrections():
|
|
CORRECTIONS_FILE.write_text(json.dumps(_corrections, indent=2))
|
|
|
|
|
|
def _take_screenshot(page, step_name: str, suffix: str = "") -> Path:
|
|
"""Capture a full-page screenshot and return the path."""
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"{step_name}_{ts}{suffix}.png"
|
|
path = SCREENSHOT_DIR / filename
|
|
try:
|
|
page.screenshot(path=str(path), full_page=True, timeout=10000)
|
|
except Exception as e:
|
|
LOG.warning("Screenshot failed for %s: %s", step_name, e)
|
|
return path
|
|
return path
|
|
|
|
|
|
def _capture_dom(page, max_bytes: int = 50_000) -> str:
|
|
"""Get the page HTML, truncated."""
|
|
try:
|
|
html = page.content()
|
|
return html[:max_bytes]
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _ask_opencode(prompt: str, screenshot_path: Optional[Path] = None) -> str:
|
|
"""
|
|
Invoke OpenCode CLI to analyze a failure and return a corrected selector.
|
|
|
|
Returns the raw text output from OpenCode.
|
|
"""
|
|
# Build the prompt with screenshot reference if available
|
|
full_prompt = prompt
|
|
if screenshot_path and screenshot_path.exists():
|
|
full_prompt += f"\n\nScreenshot saved at: {screenshot_path}"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[OPENCODE_BIN, "--print", full_prompt],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120,
|
|
cwd=str(Path(__file__).parent.parent.parent), # project root
|
|
)
|
|
output = result.stdout.strip()
|
|
if result.returncode != 0:
|
|
LOG.warning("OpenCode returned %d: %s", result.returncode, result.stderr[:200])
|
|
return output
|
|
except subprocess.TimeoutExpired:
|
|
LOG.error("OpenCode timed out after 120s")
|
|
return ""
|
|
except FileNotFoundError:
|
|
LOG.error("OpenCode binary not found at %s", OPENCODE_BIN)
|
|
return ""
|
|
|
|
|
|
def _extract_selector_from_response(response: str) -> Optional[str]:
|
|
"""
|
|
Parse OpenCode's response for a CSS/XPath selector.
|
|
Looks for quoted strings or code blocks containing selectors.
|
|
"""
|
|
import re
|
|
|
|
# Look for selectors in code blocks
|
|
code_blocks = re.findall(r'`([^`]+)`', response)
|
|
for block in code_blocks:
|
|
block = block.strip()
|
|
# CSS selector patterns
|
|
if block.startswith(('#', '.', '[', 'input', 'button', 'select', 'textarea', 'a', 'div', 'span', 'table')):
|
|
return block
|
|
# XPath
|
|
if block.startswith(('/', 'xpath=')):
|
|
return block
|
|
|
|
# Look for quoted selectors in text
|
|
quoted = re.findall(r'"([#.\[][^"]+)"', response)
|
|
for q in quoted:
|
|
if len(q) > 2 and any(c in q for c in '#.['):
|
|
return q
|
|
|
|
return None
|
|
|
|
|
|
class StepResult:
|
|
"""Result of a test step with metadata."""
|
|
def __init__(self, step_name: str):
|
|
self.step_name = step_name
|
|
self.success = False
|
|
self.attempts = 0
|
|
self.ai_corrections: list[dict] = []
|
|
self.screenshots: list[str] = []
|
|
self.error: Optional[str] = None
|
|
self.started_at = time.time()
|
|
self.finished_at: Optional[float] = None
|
|
|
|
def finish(self, success: bool, error: str = ""):
|
|
self.success = success
|
|
self.error = error if not success else ""
|
|
self.finished_at = time.time()
|
|
|
|
@property
|
|
def duration(self) -> float:
|
|
end = self.finished_at or time.time()
|
|
return round(end - self.started_at, 2)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"step": self.step_name,
|
|
"success": self.success,
|
|
"attempts": self.attempts,
|
|
"ai_corrections": self.ai_corrections,
|
|
"screenshots": [str(s) for s in self.screenshots],
|
|
"error": self.error,
|
|
"duration_s": self.duration,
|
|
}
|
|
|
|
|
|
# Global test run results
|
|
_run_results: list[StepResult] = []
|
|
|
|
|
|
def get_run_results() -> list[StepResult]:
|
|
return _run_results
|
|
|
|
|
|
def ai_retry(max_retries: int = 3, step_name: str = "", use_opus: bool = False):
|
|
"""
|
|
Decorator that wraps a Playwright step with AI-powered auto-correction.
|
|
|
|
On failure:
|
|
1. Takes screenshot + captures DOM
|
|
2. Asks OpenCode to diagnose the issue
|
|
3. If OpenCode returns a corrected selector, retries with it
|
|
4. Logs all corrections for future runs
|
|
|
|
Args:
|
|
max_retries: Max AI-assisted retry attempts
|
|
step_name: Human-readable step name for logging/screenshots
|
|
use_opus: If True, instructs OpenCode to use Opus model for analysis
|
|
"""
|
|
def decorator(func: Callable):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
name = step_name or func.__name__
|
|
result = StepResult(name)
|
|
_run_results.append(result)
|
|
|
|
# Check if we have a prior correction for this step
|
|
_load_corrections()
|
|
prior_fix = _corrections.get(name)
|
|
|
|
for attempt in range(1 + max_retries):
|
|
result.attempts = attempt + 1
|
|
try:
|
|
# On retry with correction, inject it via kwargs
|
|
if attempt > 0 and prior_fix:
|
|
kwargs["_corrected_selector"] = prior_fix
|
|
|
|
ret = func(*args, **kwargs)
|
|
|
|
# Success — take a success screenshot
|
|
page = args[0] if args else kwargs.get("page")
|
|
if page and hasattr(page, 'screenshot'):
|
|
ss = _take_screenshot(page, name, "_pass")
|
|
result.screenshots.append(ss)
|
|
|
|
result.finish(True)
|
|
LOG.info("[%s] PASS (attempt %d)", name, attempt + 1)
|
|
return ret
|
|
|
|
except Exception as exc:
|
|
err_msg = str(exc)
|
|
LOG.warning("[%s] FAIL attempt %d: %s", name, attempt + 1, err_msg[:200])
|
|
|
|
# Take failure screenshot
|
|
page = args[0] if args else kwargs.get("page")
|
|
if page and hasattr(page, 'screenshot'):
|
|
ss = _take_screenshot(page, name, f"_fail_{attempt}")
|
|
result.screenshots.append(ss)
|
|
|
|
if attempt < max_retries:
|
|
# Ask OpenCode for help
|
|
dom = _capture_dom(page)
|
|
model_hint = "Use Opus for this analysis." if use_opus else ""
|
|
prompt = (
|
|
f"{model_hint}\n"
|
|
f"A Playwright E2E test step '{name}' failed.\n"
|
|
f"Error: {err_msg}\n"
|
|
f"Function: {func.__name__}\n"
|
|
f"The page screenshot is at {ss}\n\n"
|
|
f"Page HTML (first 30KB):\n```html\n{dom[:30000]}\n```\n\n"
|
|
f"Analyze the page and return the correct CSS selector "
|
|
f"for this step. Return ONLY the selector string in backticks."
|
|
)
|
|
|
|
LOG.info("[%s] Asking OpenCode for correction...", name)
|
|
response = _ask_opencode(prompt, ss)
|
|
corrected = _extract_selector_from_response(response)
|
|
|
|
if corrected:
|
|
LOG.info("[%s] AI correction: %s", name, corrected)
|
|
prior_fix = corrected
|
|
_corrections[name] = corrected
|
|
_save_corrections()
|
|
result.ai_corrections.append({
|
|
"attempt": attempt + 1,
|
|
"original_error": err_msg[:200],
|
|
"corrected_selector": corrected,
|
|
})
|
|
else:
|
|
LOG.warning("[%s] OpenCode did not return a usable selector", name)
|
|
|
|
if attempt == max_retries:
|
|
result.finish(False, err_msg)
|
|
LOG.error("[%s] FAILED after %d attempts", name, attempt + 1)
|
|
raise
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def step(step_name: str, use_opus: bool = False):
|
|
"""
|
|
Simpler decorator for steps that don't need AI retry —
|
|
just captures screenshots and logs results.
|
|
"""
|
|
def decorator(func: Callable):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
name = step_name or func.__name__
|
|
result = StepResult(name)
|
|
_run_results.append(result)
|
|
result.attempts = 1
|
|
|
|
try:
|
|
ret = func(*args, **kwargs)
|
|
|
|
page = args[0] if args else kwargs.get("page")
|
|
if page and hasattr(page, 'screenshot'):
|
|
ss = _take_screenshot(page, name, "_pass")
|
|
result.screenshots.append(ss)
|
|
|
|
result.finish(True)
|
|
LOG.info("[%s] PASS", name)
|
|
return ret
|
|
|
|
except Exception as exc:
|
|
page = args[0] if args else kwargs.get("page")
|
|
if page and hasattr(page, 'screenshot'):
|
|
ss = _take_screenshot(page, name, "_fail")
|
|
result.screenshots.append(ss)
|
|
|
|
result.finish(False, str(exc))
|
|
LOG.error("[%s] FAIL: %s", name, str(exc)[:200])
|
|
raise
|
|
|
|
return wrapper
|
|
return decorator
|