""" AI auto-correction harness using OpenCode. On Playwright failure, captures screenshot + DOM + error, invokes OpenCode to diagnose and fix, then retries the step. Usage: @ai_retry(max_retries=3, step_name="fill_director_name") async def fill_director_name(page, data): await page.fill("#director_name", data["director_name"]) """ from __future__ import annotations import asyncio import base64 import functools import json import logging import os import subprocess import time from datetime import datetime from pathlib import Path from typing import Any, Callable, Optional LOG = logging.getLogger("tests.ai_retry") SCREENSHOT_DIR = Path(__file__).parent / "screenshots" SCREENSHOT_DIR.mkdir(exist_ok=True) CORRECTIONS_FILE = Path(__file__).parent / "discovered_selectors.json" OPENCODE_BIN = os.environ.get("OPENCODE_BIN", "opencode") # Accumulated corrections across the test run _corrections: dict[str, str] = {} def _load_corrections() -> dict: global _corrections if CORRECTIONS_FILE.exists(): try: _corrections = json.loads(CORRECTIONS_FILE.read_text()) except Exception: _corrections = {} return _corrections def _save_corrections(): CORRECTIONS_FILE.write_text(json.dumps(_corrections, indent=2)) def _take_screenshot(page, step_name: str, suffix: str = "") -> Path: """Capture a full-page screenshot and return the path.""" ts = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{step_name}_{ts}{suffix}.png" path = SCREENSHOT_DIR / filename try: page.screenshot(path=str(path), full_page=True, timeout=10000) except Exception as e: LOG.warning("Screenshot failed for %s: %s", step_name, e) return path return path def _capture_dom(page, max_bytes: int = 50_000) -> str: """Get the page HTML, truncated.""" try: html = page.content() return html[:max_bytes] except Exception: return "" def _ask_opencode(prompt: str, screenshot_path: Optional[Path] = None) -> str: """ Invoke OpenCode CLI to analyze a failure and return a corrected selector. Returns the raw text output from OpenCode. """ # Build the prompt with screenshot reference if available full_prompt = prompt if screenshot_path and screenshot_path.exists(): full_prompt += f"\n\nScreenshot saved at: {screenshot_path}" try: result = subprocess.run( [OPENCODE_BIN, "--print", full_prompt], capture_output=True, text=True, timeout=120, cwd=str(Path(__file__).parent.parent.parent), # project root ) output = result.stdout.strip() if result.returncode != 0: LOG.warning("OpenCode returned %d: %s", result.returncode, result.stderr[:200]) return output except subprocess.TimeoutExpired: LOG.error("OpenCode timed out after 120s") return "" except FileNotFoundError: LOG.error("OpenCode binary not found at %s", OPENCODE_BIN) return "" def _extract_selector_from_response(response: str) -> Optional[str]: """ Parse OpenCode's response for a CSS/XPath selector. Looks for quoted strings or code blocks containing selectors. """ import re # Look for selectors in code blocks code_blocks = re.findall(r'`([^`]+)`', response) for block in code_blocks: block = block.strip() # CSS selector patterns if block.startswith(('#', '.', '[', 'input', 'button', 'select', 'textarea', 'a', 'div', 'span', 'table')): return block # XPath if block.startswith(('/', 'xpath=')): return block # Look for quoted selectors in text quoted = re.findall(r'"([#.\[][^"]+)"', response) for q in quoted: if len(q) > 2 and any(c in q for c in '#.['): return q return None class StepResult: """Result of a test step with metadata.""" def __init__(self, step_name: str): self.step_name = step_name self.success = False self.attempts = 0 self.ai_corrections: list[dict] = [] self.screenshots: list[str] = [] self.error: Optional[str] = None self.started_at = time.time() self.finished_at: Optional[float] = None def finish(self, success: bool, error: str = ""): self.success = success self.error = error if not success else "" self.finished_at = time.time() @property def duration(self) -> float: end = self.finished_at or time.time() return round(end - self.started_at, 2) def to_dict(self) -> dict: return { "step": self.step_name, "success": self.success, "attempts": self.attempts, "ai_corrections": self.ai_corrections, "screenshots": [str(s) for s in self.screenshots], "error": self.error, "duration_s": self.duration, } # Global test run results _run_results: list[StepResult] = [] def get_run_results() -> list[StepResult]: return _run_results def ai_retry(max_retries: int = 3, step_name: str = "", use_opus: bool = False): """ Decorator that wraps a Playwright step with AI-powered auto-correction. On failure: 1. Takes screenshot + captures DOM 2. Asks OpenCode to diagnose the issue 3. If OpenCode returns a corrected selector, retries with it 4. Logs all corrections for future runs Args: max_retries: Max AI-assisted retry attempts step_name: Human-readable step name for logging/screenshots use_opus: If True, instructs OpenCode to use Opus model for analysis """ def decorator(func: Callable): @functools.wraps(func) def wrapper(*args, **kwargs): name = step_name or func.__name__ result = StepResult(name) _run_results.append(result) # Check if we have a prior correction for this step _load_corrections() prior_fix = _corrections.get(name) for attempt in range(1 + max_retries): result.attempts = attempt + 1 try: # On retry with correction, inject it via kwargs if attempt > 0 and prior_fix: kwargs["_corrected_selector"] = prior_fix ret = func(*args, **kwargs) # Success — take a success screenshot page = args[0] if args else kwargs.get("page") if page and hasattr(page, 'screenshot'): ss = _take_screenshot(page, name, "_pass") result.screenshots.append(ss) result.finish(True) LOG.info("[%s] PASS (attempt %d)", name, attempt + 1) return ret except Exception as exc: err_msg = str(exc) LOG.warning("[%s] FAIL attempt %d: %s", name, attempt + 1, err_msg[:200]) # Take failure screenshot page = args[0] if args else kwargs.get("page") if page and hasattr(page, 'screenshot'): ss = _take_screenshot(page, name, f"_fail_{attempt}") result.screenshots.append(ss) if attempt < max_retries: # Ask OpenCode for help dom = _capture_dom(page) model_hint = "Use Opus for this analysis." if use_opus else "" prompt = ( f"{model_hint}\n" f"A Playwright E2E test step '{name}' failed.\n" f"Error: {err_msg}\n" f"Function: {func.__name__}\n" f"The page screenshot is at {ss}\n\n" f"Page HTML (first 30KB):\n```html\n{dom[:30000]}\n```\n\n" f"Analyze the page and return the correct CSS selector " f"for this step. Return ONLY the selector string in backticks." ) LOG.info("[%s] Asking OpenCode for correction...", name) response = _ask_opencode(prompt, ss) corrected = _extract_selector_from_response(response) if corrected: LOG.info("[%s] AI correction: %s", name, corrected) prior_fix = corrected _corrections[name] = corrected _save_corrections() result.ai_corrections.append({ "attempt": attempt + 1, "original_error": err_msg[:200], "corrected_selector": corrected, }) else: LOG.warning("[%s] OpenCode did not return a usable selector", name) if attempt == max_retries: result.finish(False, err_msg) LOG.error("[%s] FAILED after %d attempts", name, attempt + 1) raise return wrapper return decorator def step(step_name: str, use_opus: bool = False): """ Simpler decorator for steps that don't need AI retry — just captures screenshots and logs results. """ def decorator(func: Callable): @functools.wraps(func) def wrapper(*args, **kwargs): name = step_name or func.__name__ result = StepResult(name) _run_results.append(result) result.attempts = 1 try: ret = func(*args, **kwargs) page = args[0] if args else kwargs.get("page") if page and hasattr(page, 'screenshot'): ss = _take_screenshot(page, name, "_pass") result.screenshots.append(ss) result.finish(True) LOG.info("[%s] PASS", name) return ret except Exception as exc: page = args[0] if args else kwargs.get("page") if page and hasattr(page, 'screenshot'): ss = _take_screenshot(page, name, "_fail") result.screenshots.append(ss) result.finish(False, str(exc)) LOG.error("[%s] FAIL: %s", name, str(exc)[:200]) raise return wrapper return decorator