new-site/scripts/tests/e2e_crtc_order.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

778 lines
33 KiB
Python

"""
E2E integration test: CRTC order form → Stripe Checkout → verify PG + ERPNext.
Tests all 3 company types:
1. Numbered (fastest, no name search)
2. Numbered + Trade Name
3. Named (name reservation)
Also validates:
- Porkbun API connectivity + .ca domain availability
- Flowroute API connectivity + Canadian DID search
- ERPNext Sales Order created after payment
Usage:
# Full test (all 3 types):
python -m scripts.tests.e2e_crtc_order
# Single type, skip providers:
python -m scripts.tests.e2e_crtc_order --type numbered --skip-providers
# Keep browser visible:
python -m scripts.tests.e2e_crtc_order --type numbered --headed
Environment:
See .env.test for required variables.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import time
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
# Load test env
env_path = Path(__file__).parent / ".env.test"
if env_path.exists():
load_dotenv(env_path)
from playwright.sync_api import sync_playwright, Page
from scripts.tests.test_data import make_test_order
from scripts.tests.ai_retry import (
ai_retry, step, get_run_results, _take_screenshot, SCREENSHOT_DIR,
)
LOG = logging.getLogger("tests.e2e_crtc")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
SITE_URL = os.environ.get("SITE_URL", "https://performancewest.net")
API_URL = os.environ.get("API_URL", "https://api.performancewest.net")
# ──────────────────────────────────────────────────────────────
# Phase 1: Order form (Playwright)
# ──────────────────────────────────────────────────────────────
@ai_retry(max_retries=2, step_name="01_load_order_page")
def step_load_order_page(page: Page, data: dict, _corrected_selector: str = ""):
"""Navigate to CRTC order form and verify it loads."""
page.goto(f"{SITE_URL}/order/canada-crtc?test_mode=1", wait_until="networkidle", timeout=30000)
page.wait_for_selector("#crtc-form", timeout=10000)
LOG.info("Order form loaded at %s", page.url)
@ai_retry(max_retries=2, step_name="02_company_type")
def step_company_type(page: Page, data: dict, _corrected_selector: str = ""):
"""Step 1: Select company type."""
ctype = data["company_type"]
# Radio inputs are sr-only — click the parent label, not the hidden input
sel = _corrected_selector or f'label:has(input[name="company_type"][value="{ctype}"])'
# Scroll into view first to get past sticky nav, then force-click the label
el = page.locator(sel).first
el.scroll_into_view_if_needed(timeout=5000)
page.wait_for_timeout(300)
el.click(timeout=5000, force=True)
page.wait_for_timeout(500)
if ctype == "numbered_tradename":
page.wait_for_selector("#trade-name-fields", state="visible", timeout=3000)
page.fill('input[name="trade_name"]', data["trade_name"])
elif ctype == "named":
page.wait_for_selector("#named-fields", state="visible", timeout=3000)
page.fill('input[name="name_choice_1"]', data.get("name_choice_1", ""))
page.fill('input[name="name_choice_2"]', data.get("name_choice_2", ""))
page.fill('input[name="name_choice_3"]', data.get("name_choice_3", ""))
page.select_option('select[name="legal_ending"]', data.get("legal_ending", "Ltd."))
page.locator('button:has-text("Next")').last.scroll_into_view_if_needed()
page.locator('button:has-text("Next")').last.click(timeout=5000, force=True)
page.wait_for_timeout(500)
LOG.info("Company type selected: %s", ctype)
@ai_retry(max_retries=2, step_name="03_director_info")
def step_director_info(page: Page, data: dict, _corrected_selector: str = ""):
"""Step 2: Fill director information (split name fields)."""
# Form uses split first/middle/last fields
page.fill('input[name="director_first_name"]', data["director_first_name"])
if data.get("director_middle_name"):
page.fill('input[name="director_middle_name"]', data["director_middle_name"])
page.fill('input[name="director_last_name"]', data["director_last_name"])
page.select_option('select[name="director_country"]', data["director_country"])
# Wait for address fields to become visible (hidden until country selected)
page.wait_for_selector('#director_address_fields:not(.hidden)', timeout=5000)
page.wait_for_timeout(500) # extra wait for province dropdown JS to populate
page.fill('input[name="director_street"]', data["director_street"])
if data.get("director_street2"):
page.fill('input[name="director_street2"]', data["director_street2"])
page.fill('input[name="director_city"]', data["director_city"])
page.fill('input[name="director_postal"]', data["director_postal"])
# Province is a custom select→hidden field combo.
# For US/CA/AU/GB: select from #director_province_select (visible after country change),
# which syncs value into the hidden #director_province via onchange.
prov_select = page.locator('#director_province_select')
prov_text = page.locator('#director_province_text')
prov_hidden = page.locator('#director_province')
if prov_select.is_visible():
prov_select.select_option(data["director_province"])
# Trigger onchange to sync to hidden field
prov_select.dispatch_event("change")
elif prov_text.is_visible():
prov_text.fill(data["director_province"])
# For free-text: directly set hidden field value via JS
page.evaluate(
"document.getElementById('director_province').value = arguments[0]",
data["director_province"]
)
else:
# Fallback: set hidden field directly
page.evaluate(
"document.getElementById('director_province').value = arguments[0]",
data["director_province"]
)
LOG.info("Province set to: %s", data["director_province"])
# Set the hidden director_name field (concat of first/middle/last — used by submitOrder)
full_name = f"{data['director_first_name']} {data.get('director_middle_name', '')} {data['director_last_name']}".replace(" ", " ").strip()
page.evaluate(f"document.getElementById('director_name').value = '{full_name}'")
if data.get("director_citizenship"):
cit = page.locator('select[name="director_citizenship"]')
cit.scroll_into_view_if_needed()
cit.select_option(data["director_citizenship"])
page.locator('button:has-text("Next")').last.scroll_into_view_if_needed()
page.locator('button:has-text("Next")').last.click(timeout=5000, force=True)
page.wait_for_timeout(500)
LOG.info("Director info filled")
@ai_retry(max_retries=2, step_name="04_telecom_details")
def step_telecom_details(page: Page, data: dict, _corrected_selector: str = ""):
"""Step 3: Fill telecom service details."""
# Wait for step 3 to be visible
page.wait_for_selector('textarea[name="service_description"]:visible', timeout=5000)
page.fill('textarea[name="service_description"]', data["service_description"])
page.fill('input[name="geographic_coverage"]', data["geographic_coverage"])
if data.get("include_bits"):
bits_cb = page.locator('#include_bits')
if bits_cb.count() > 0 and not bits_cb.is_checked():
bits_cb.check()
page.fill('input[name="reg_contact_name"]', data["reg_contact_name"])
page.fill('input[name="reg_contact_email"]', data["reg_contact_email"])
page.fill('input[name="reg_contact_phone"]', data["reg_contact_phone"])
page.locator('button:has-text("Next")').last.scroll_into_view_if_needed()
page.locator('button:has-text("Next")').last.click(timeout=5000, force=True)
page.wait_for_timeout(500)
LOG.info("Telecom details filled")
@ai_retry(max_retries=2, step_name="05_identity_verification")
def step_identity_verification(page: Page, data: dict, _corrected_selector: str = ""):
"""Step 4: Identity verification.
In test mode (sk_test_ key) the API accepts orders without identity verification.
We skip the Stripe Identity flow entirely and click Next to advance to step 5.
"""
# Step 4 is identity — just advance past it in test mode
# The API has a test mode bypass that accepts null identity_session_id
LOG.info("Identity step — skipping in test mode (API bypass active)")
next_btn = page.locator('#btn-next, button:has-text("Next"), button:has-text("Continue"), button:has-text("Skip")')
if next_btn.count() > 0:
next_btn.last.scroll_into_view_if_needed()
next_btn.last.click(timeout=5000, force=True)
page.wait_for_timeout(500)
else:
LOG.warning("No Next button found on identity step")
LOG.info("Advanced past identity step")
@ai_retry(max_retries=2, step_name="06_review_and_submit")
def step_review_and_submit(page: Page, data: dict, _corrected_selector: str = ""):
"""Step 5 (Review & Submit): fill contact info, accept terms, then submit."""
# Wait for step 5 panel to become visible
page.wait_for_selector('#step-5:not(.hidden)', timeout=5000)
page.wait_for_timeout(500)
LOG.info("Review & Submit page loaded")
# Fill contact info (these fields are in step 5, not earlier steps)
page.locator('#customer_name').scroll_into_view_if_needed(timeout=5000)
page.locator('#customer_name').fill(data["customer_name"])
page.locator('#customer_email').fill(data["customer_email"])
if data.get("customer_phone"):
page.locator('#customer_phone').fill(data["customer_phone"])
# Accept terms checkbox if present
consent = page.locator('input[type="checkbox"][name="consent"], input[type="checkbox"][id*="consent"], input[type="checkbox"][id*="terms"]')
if consent.count() > 0 and not consent.first.is_checked():
consent.first.check()
# Check consent checkbox — required by validateStep(5)
consent = page.locator('#consent')
consent.wait_for(state="visible", timeout=5000)
if not consent.is_checked():
consent.scroll_into_view_if_needed()
consent.check()
page.wait_for_timeout(300)
LOG.info("Consent checked: %s", consent.is_checked())
# Submit — click the btn-next button (which is "Submit Order" on step 5)
submit_btn = page.locator('#btn-next')
submit_btn.scroll_into_view_if_needed(timeout=5000)
LOG.info("Clicking Submit Order button")
submit_btn.click(timeout=10000)
# Wait a bit, then check what happened
page.wait_for_timeout(5000)
# Check for JS error in submit-status
submit_status = page.locator('#submit-status')
if submit_status.is_visible():
error_text = submit_status.text_content()
LOG.info("Submit status text: %s", error_text)
if error_text and ("error" in error_text.lower() or "wrong" in error_text.lower()):
raise AssertionError(f"Order submission failed: {error_text}")
# Check if btn-next text changed (debugging)
btn_text = page.locator('#btn-next').text_content()
LOG.info("Button state: text='%s' disabled=%s", btn_text, page.locator('#btn-next').is_disabled())
# Take screenshot for debugging
_take_screenshot(page, "06_after_submit", "")
page.wait_for_selector("#step-success:not(.hidden)", timeout=30000)
page.wait_for_timeout(500)
# Capture the order number from the success screen
order_number_el = page.locator("#success-order-number")
order_number = order_number_el.text_content(timeout=5000).strip()
if not order_number or order_number == "--":
raise AssertionError("Order number not populated in step-success")
data["order_number"] = order_number
data["order_id"] = order_number
LOG.info("Order submitted — order number: %s", order_number)
@ai_retry(max_retries=2, step_name="07_proceed_to_payment")
def step_proceed_to_payment(page: Page, data: dict, _corrected_selector: str = ""):
"""Click Proceed to Payment → wait for redirect to checkout.stripe.com."""
# Select card payment method
card_radio = page.locator('input[name="payment_method_choice"][value="card"]')
if card_radio.count() > 0:
card_radio.check()
pay_btn = page.locator('#btn-proceed-payment')
pay_btn.scroll_into_view_if_needed()
pay_btn.click(timeout=5000, force=True)
# Wait for redirect to Stripe hosted checkout
page.wait_for_url("*checkout.stripe.com*", timeout=30000)
data["checkout_url"] = page.url
LOG.info("Redirected to Stripe Checkout: %s", page.url[:80])
# ──────────────────────────────────────────────────────────────
# Phase 2: Stripe Checkout (hosted page at checkout.stripe.com)
# ──────────────────────────────────────────────────────────────
@ai_retry(max_retries=3, step_name="07_stripe_checkout_fill", use_opus=True)
def step_enter_payment(page: Page, data: dict, _corrected_selector: str = ""):
"""Fill Stripe Checkout hosted page with test card details.
Stripe Checkout (hosted) renders card fields directly in the page DOM,
not inside an iframe like Stripe Elements. The fields are standard inputs
inside a Stripe-controlled iframe at checkout.stripe.com.
"""
# Wait for the checkout page to fully render
page.wait_for_load_state("networkidle", timeout=20000)
LOG.info("Stripe Checkout page loaded")
# Email field (Stripe Checkout always asks for email)
email_input = page.locator('input[type="email"], input[name="email"], input[placeholder*="email" i]')
if email_input.count() > 0:
email_input.first.fill(data["customer_email"])
LOG.info("Filled email: %s", data["customer_email"])
# Card number — Stripe Checkout uses a direct input (not an iframe)
# Selectors for Stripe Checkout card fields
card_number = page.locator(
'input[name="cardNumber"], '
'input[data-elements-stable-field-name="cardNumber"], '
'input[autocomplete="cc-number"], '
'input[placeholder*="1234" i]'
)
if card_number.count() == 0:
# Stripe Checkout renders inside a cross-origin iframe at checkout.stripe.com
# The fields are accessible since we're already ON checkout.stripe.com
# Try waiting longer for them to appear
page.wait_for_selector(
'input[name="cardNumber"], '
'input[autocomplete="cc-number"], '
'[data-testid="card-tab"]',
timeout=15000,
)
card_number = page.locator('input[name="cardNumber"], input[autocomplete="cc-number"]')
card_number.first.fill(data["card_number"])
LOG.info("Filled card number")
# Expiry
expiry = page.locator(
'input[name="cardExpiry"], '
'input[autocomplete="cc-exp"], '
'input[placeholder*="MM" i]'
)
expiry.first.fill(f'{data["card_exp_month"]}/{data["card_exp_year"]}')
# CVC
cvc = page.locator(
'input[name="cardCvc"], '
'input[autocomplete="cc-csc"], '
'input[placeholder*="CVC" i], '
'input[placeholder*="CVV" i]'
)
cvc.first.fill(data["card_cvv"])
# Name on card (Stripe Checkout sometimes shows this)
name_field = page.locator('input[name="billingName"], input[autocomplete="cc-name"]')
if name_field.count() > 0:
name_field.first.fill(data["customer_name"])
# ZIP / postal
zip_field = page.locator(
'input[name="postalCode"], '
'input[autocomplete="postal-code"], '
'input[placeholder*="ZIP" i]'
)
if zip_field.count() > 0:
zip_field.first.fill(data["card_zip"])
LOG.info("All payment fields filled — submitting")
# Submit button
pay_btn = page.locator(
'button:has-text("Pay"), '
'button:has-text("Subscribe"), '
'button[type="submit"], '
'[data-testid="hosted-payment-submit-button"]'
)
pay_btn.first.click(timeout=10000)
LOG.info("Payment submitted")
@step(step_name="08_payment_success")
def step_payment_success(page: Page, data: dict):
"""Wait for Stripe to redirect back to our success page."""
# Stripe redirects to our success_url after payment
# success_url = {DOMAIN}/order/success?session_id={CHECKOUT_SESSION_ID}&order_id=...
page.wait_for_url(f"{SITE_URL}/order/success*", timeout=60000)
page.wait_for_load_state("networkidle", timeout=15000)
LOG.info("Success page loaded: %s", page.url)
# Extract session_id and order_id from URL
import urllib.parse
params = urllib.parse.parse_qs(urllib.parse.urlparse(page.url).query)
data["session_id"] = params.get("session_id", [None])[0]
data["order_id"] = params.get("order_id", [None])[0]
data["order_number"] = data["order_id"]
LOG.info("Session ID: %s | Order ID: %s", data["session_id"], data["order_id"])
# The success page JS polls /api/v1/checkout/session/:session_id
# Wait a few seconds for it to process and show the confirmation
page.wait_for_timeout(5000)
# Verify some confirmation text is present
confirmed = page.locator(
"text=confirmed, "
"text=Confirmed, "
"text=Thank you, "
"text=Order received, "
".success, "
"[data-status='confirmed']"
)
if confirmed.count() > 0:
LOG.info("Success page shows order confirmation")
else:
LOG.warning("Success page loaded but no explicit confirmation text found — may still be processing")
_take_screenshot(page, "08_payment_success", "")
# ──────────────────────────────────────────────────────────────
# Phase 3+4: Provider API tests
# ──────────────────────────────────────────────────────────────
@step(step_name="09_porkbun_api")
def step_porkbun_api(page: Page, data: dict):
"""Test Porkbun API: ping + check .ca domain availability."""
from scripts.tests.providers.porkbun_client import PorkbunClient
client = PorkbunClient()
assert client.ping(), "Porkbun API ping failed"
result = client.check_availability(data["test_domain"])
LOG.info("Porkbun: %s available=%s price=%s",
data["test_domain"], result["available"], result.get("price"))
data["domain_available"] = result["available"]
@step(step_name="10_flowroute_api")
def step_flowroute_api(page: Page, data: dict):
"""Test Flowroute API: ping + search Canadian DIDs."""
from scripts.tests.providers.flowroute_client import FlowrouteClient
client = FlowrouteClient()
assert client.ping(), "Flowroute API ping failed"
dids = client.search_available_dids(starts_with="1604", limit=3)
LOG.info("Flowroute: %d Vancouver DIDs available", len(dids))
for d in dids[:3]:
LOG.info(" %s (%s) $%s/mo", d["did"], d["rate_center"], d["monthly_cost"])
if dids:
data["available_dids"] = [d["did"] for d in dids]
# ──────────────────────────────────────────────────────────────
# Phase 5: Verification (PG + ERPNext)
# ──────────────────────────────────────────────────────────────
@step(step_name="11_verify_pg_order")
def step_verify_pg_order(page: Page, data: dict):
"""Verify order exists in PostgreSQL with correct fields and payment status."""
import psycopg2, psycopg2.extras
conn = psycopg2.connect(os.environ["DATABASE_URL"])
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# Look up by order_id from URL, or fall back to most recent test email
if data.get("order_id"):
cur.execute(
"SELECT * FROM canada_crtc_orders WHERE order_number = %s",
(data["order_id"],)
)
else:
cur.execute(
"SELECT * FROM canada_crtc_orders "
"WHERE customer_email LIKE 'testcarrier+%@performancewest.net' "
"ORDER BY created_at DESC LIMIT 1"
)
order = cur.fetchone()
conn.close()
if not order:
raise AssertionError(f"Order {data.get('order_id')} not found in canada_crtc_orders")
data["order_number"] = order["order_number"]
data["order_id"] = order["order_number"]
LOG.info("PG order: %s | company_type=%s | total=$%.2f | payment=%s | status=%s",
order["order_number"], order["company_type"],
(order["total_cents"] or 0) / 100,
order["payment_status"], order["status"])
# ── Field assertions ──────────────────────────────────────
assert order["company_type"] == data["company_type"], \
f"company_type: got {order['company_type']}, want {data['company_type']}"
assert order["payment_status"] == "paid", \
f"payment_status is '{order['payment_status']}' — webhook may not have fired yet"
assert order["stripe_session_id"], \
"stripe_session_id is empty — checkout session was not recorded"
assert order["customer_email"] == data["customer_email"], \
f"customer_email mismatch: {order['customer_email']}"
assert order["total_cents"] and order["total_cents"] > 0, \
"total_cents is 0 — order pricing was not calculated"
if data["company_type"] == "numbered_tradename":
assert order.get("trade_name") == data["trade_name"], \
f"trade_name: got {order.get('trade_name')}, want {data['trade_name']}"
LOG.info("PG order verification PASSED")
# Store for ERPNext check
data["erpnext_sales_order"] = order.get("erpnext_sales_order")
data["stripe_session_id"] = order.get("stripe_session_id")
@step(step_name="12_verify_erpnext")
def step_verify_erpnext(page: Page, data: dict):
"""Verify ERPNext Sales Order was created and is in correct workflow state."""
import urllib.request, urllib.error
erpnext_url = os.environ.get("ERPNEXT_URL", "http://erpnext:8000")
erpnext_key = os.environ.get("ERPNEXT_API_KEY", "")
erpnext_secret = os.environ.get("ERPNEXT_API_SECRET", "")
if not erpnext_key or not erpnext_secret:
LOG.warning("ERPNEXT_API_KEY/SECRET not set — skipping ERPNext verification")
return
order_id = data.get("order_id") or data.get("order_number")
if not order_id:
LOG.warning("No order_id to look up in ERPNext")
return
# Poll ERPNext for Sales Order by custom_external_order_id
# Give it up to 30s for the webhook to have processed
so_name = data.get("erpnext_sales_order")
if not so_name:
LOG.info("Sales Order not in PG yet — polling ERPNext directly...")
url = (
f"{erpnext_url}/api/resource/Sales Order"
f"?filters=[[\"Sales Order\",\"custom_external_order_id\",\"=\",\"{order_id}\"]]"
f"&fields=[\"name\",\"workflow_state\",\"status\",\"grand_total\",\"customer\"]"
f"&limit=1"
)
req = urllib.request.Request(url, headers={
"Authorization": f"token {erpnext_key}:{erpnext_secret}",
"Content-Type": "application/json",
})
try:
for attempt in range(6): # 30s total
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read())
records = result.get("data", [])
if records:
so_name = records[0]["name"]
break
if attempt < 5:
LOG.info(" ERPNext: no Sales Order yet — waiting 5s (attempt %d/6)", attempt+1)
time.sleep(5)
except urllib.error.URLError as e:
LOG.warning("ERPNext unreachable from test runner (%s) — skipping", e)
return
if not so_name:
raise AssertionError(
f"No ERPNext Sales Order found for order {order_id} after 30s — "
"checkout webhook may have failed"
)
# Fetch the Sales Order details
url = f"{erpnext_url}/api/resource/Sales Order/{so_name}"
req = urllib.request.Request(url, headers={
"Authorization": f"token {erpnext_key}:{erpnext_secret}",
"Content-Type": "application/json",
})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
so = json.loads(resp.read()).get("data", {})
except urllib.error.URLError as e:
LOG.warning("Could not fetch Sales Order details: %s", e)
return
LOG.info(
"ERPNext Sales Order: %s | state=%s | status=%s | total=%.2f | customer=%s",
so.get("name"), so.get("workflow_state"), so.get("status"),
float(so.get("grand_total") or 0),
so.get("customer"),
)
assert so.get("name"), "Sales Order has no name"
assert so.get("grand_total") and float(so["grand_total"]) > 0, \
"Sales Order grand_total is 0"
# Workflow should have advanced past 'Received' if payment confirmed
wf_state = so.get("workflow_state", "")
if wf_state in ("Received", "Awaiting Funds", "In Progress"):
LOG.info("ERPNext workflow state '%s' — acceptable", wf_state)
else:
LOG.warning("Unexpected ERPNext workflow state: %s", wf_state)
data["erpnext_sales_order"] = so.get("name")
LOG.info("ERPNext verification PASSED — Sales Order %s", so.get("name"))
# ──────────────────────────────────────────────────────────────
# Test report
# ──────────────────────────────────────────────────────────────
def generate_report(company_type: str):
results = get_run_results()
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
report_dir = Path(__file__).parent / "runs" / f"{company_type}_{ts}"
report_dir.mkdir(parents=True, exist_ok=True)
for r in results:
for ss in r.screenshots:
ss_path = Path(ss)
if ss_path.exists():
ss_path.rename(report_dir / ss_path.name)
(report_dir / "results.json").write_text(json.dumps([r.to_dict() for r in results], indent=2))
passed = sum(1 for r in results if r.success)
failed = sum(1 for r in results if not r.success)
ai_fixes = sum(len(r.ai_corrections) for r in results)
total_time = sum(r.duration for r in results)
LOG.info("")
LOG.info("=" * 60)
LOG.info("TEST REPORT: %s", company_type)
LOG.info("=" * 60)
LOG.info(" Passed: %d / %d | AI corrections: %d | Time: %.1fs",
passed, passed + failed, ai_fixes, total_time)
LOG.info(" Report: %s", report_dir)
LOG.info("=" * 60)
for r in results:
status = "PASS" if r.success else "FAIL"
LOG.info(" [%s] %s (%.1fs, %d attempt%s%s)",
status, r.step_name, r.duration, r.attempts,
"s" if r.attempts != 1 else "",
f", {len(r.ai_corrections)} AI fix" if r.ai_corrections else "")
if not r.success and r.error:
LOG.info(" └─ %s", r.error[:120])
return report_dir
# ──────────────────────────────────────────────────────────────
# Cleanup
# ──────────────────────────────────────────────────────────────
def cleanup(data: dict):
"""Remove test data from PostgreSQL (does not touch ERPNext)."""
import psycopg2
conn = psycopg2.connect(os.environ["DATABASE_URL"])
cur = conn.cursor()
pat = "testcarrier+%@performancewest.net"
cur.execute("DELETE FROM canada_crtc_orders WHERE customer_email LIKE %s", (pat,))
deleted = cur.rowcount
try:
cur.execute("""DELETE FROM customer_directors WHERE customer_id IN
(SELECT id FROM customers WHERE email LIKE %s)""", (pat,))
cur.execute("""DELETE FROM customer_addresses WHERE customer_id IN
(SELECT id FROM customers WHERE email LIKE %s)""", (pat,))
except Exception:
pass # Tables may not exist yet
cur.execute("DELETE FROM customers WHERE email LIKE %s", (pat,))
conn.commit()
conn.close()
LOG.info("Cleanup: deleted %d test order(s)", deleted)
# ──────────────────────────────────────────────────────────────
# Main orchestrator
# ──────────────────────────────────────────────────────────────
def run_test(company_type: str, skip_providers: bool = False, headless: bool = True):
LOG.info("Starting E2E test: company_type=%s headless=%s", company_type, headless)
data = make_test_order()
data["company_type"] = company_type
if company_type == "numbered_tradename":
data["trade_name"] = "Pacific Telecom"
data["add_trade_name"] = True
elif company_type == "named":
data["name_choice_1"] = "Pacific Telecom Solutions Ltd."
data["name_choice_2"] = "Western Digital Communications Inc."
data["name_choice_3"] = "Cascade Voice Networks Corp."
data["legal_ending"] = "Ltd."
with sync_playwright() as p:
browser = p.chromium.launch(
headless=headless,
args=["--disable-blink-features=AutomationControlled"],
)
context = browser.new_context(
viewport={"width": 1440, "height": 900},
user_agent=(
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
),
)
page = context.new_page()
# Log browser console errors and relevant network activity
page.on("console", lambda msg: LOG.warning("[browser] %s: %s", msg.type, msg.text) if msg.type in ("error", "warning") else None)
# Auto-dismiss alert dialogs and log them
page.on("dialog", lambda d: (LOG.warning("[dialog] %s: %s", d.type, d.message), d.dismiss()))
# Log API calls
page.on("request", lambda r: LOG.info("[request] %s %s", r.method, r.url) if "api.performancewest" in r.url or "canada-crtc" in r.url else None)
page.on("response", lambda r: LOG.info("[response] %s %s%d", r.request.method, r.url, r.status) if "api.performancewest" in r.url or "canada-crtc" in r.url else None)
try:
# Phase 1 — Order form
step_load_order_page(page, data)
step_company_type(page, data)
step_director_info(page, data)
step_telecom_details(page, data)
step_identity_verification(page, data)
step_review_and_submit(page, data)
# Phase 2 — Stripe Checkout hosted page
step_proceed_to_payment(page, data)
step_enter_payment(page, data)
step_payment_success(page, data)
# Phase 3 — Provider APIs
if not skip_providers:
step_porkbun_api(page, data)
step_flowroute_api(page, data)
# Phase 4 — Verification
step_verify_pg_order(page, data)
step_verify_erpnext(page, data)
except Exception as exc:
LOG.error("Test aborted: %s", exc)
_take_screenshot(page, "ABORT", "_fatal")
finally:
browser.close()
report_dir = generate_report(company_type)
try:
cleanup(data)
except Exception as ce:
LOG.warning("Cleanup failed: %s", ce)
return report_dir
def main():
parser = argparse.ArgumentParser(description="E2E CRTC order test")
parser.add_argument("--type", choices=["numbered", "numbered_tradename", "named", "all"],
default="numbered", help="Company type to test (default: numbered)")
parser.add_argument("--skip-providers", action="store_true",
help="Skip Porkbun/Flowroute API tests")
parser.add_argument("--headed", action="store_true",
help="Run browser in headed (visible) mode")
parser.add_argument("--no-cleanup", action="store_true",
help="Skip cleanup — leave test order in DB for inspection")
args = parser.parse_args()
types = ["numbered", "numbered_tradename", "named"] if args.type == "all" else [args.type]
for ct in types:
LOG.info("\n" + "=" * 60)
LOG.info("RUNNING: %s", ct)
LOG.info("=" * 60)
run_test(ct, skip_providers=args.skip_providers, headless=not args.headed)
if __name__ == "__main__":
main()