Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
778 lines
33 KiB
Python
778 lines
33 KiB
Python
"""
|
|
E2E integration test: CRTC order form → Stripe Checkout → verify PG + ERPNext.
|
|
|
|
Tests all 3 company types:
|
|
1. Numbered (fastest, no name search)
|
|
2. Numbered + Trade Name
|
|
3. Named (name reservation)
|
|
|
|
Also validates:
|
|
- Porkbun API connectivity + .ca domain availability
|
|
- Flowroute API connectivity + Canadian DID search
|
|
- ERPNext Sales Order created after payment
|
|
|
|
Usage:
|
|
# Full test (all 3 types):
|
|
python -m scripts.tests.e2e_crtc_order
|
|
|
|
# Single type, skip providers:
|
|
python -m scripts.tests.e2e_crtc_order --type numbered --skip-providers
|
|
|
|
# Keep browser visible:
|
|
python -m scripts.tests.e2e_crtc_order --type numbered --headed
|
|
|
|
Environment:
|
|
See .env.test for required variables.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
# Load test env
|
|
env_path = Path(__file__).parent / ".env.test"
|
|
if env_path.exists():
|
|
load_dotenv(env_path)
|
|
|
|
from playwright.sync_api import sync_playwright, Page
|
|
from scripts.tests.test_data import make_test_order
|
|
from scripts.tests.ai_retry import (
|
|
ai_retry, step, get_run_results, _take_screenshot, SCREENSHOT_DIR,
|
|
)
|
|
|
|
LOG = logging.getLogger("tests.e2e_crtc")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
|
|
SITE_URL = os.environ.get("SITE_URL", "https://performancewest.net")
|
|
API_URL = os.environ.get("API_URL", "https://api.performancewest.net")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Phase 1: Order form (Playwright)
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
@ai_retry(max_retries=2, step_name="01_load_order_page")
|
|
def step_load_order_page(page: Page, data: dict, _corrected_selector: str = ""):
|
|
"""Navigate to CRTC order form and verify it loads."""
|
|
page.goto(f"{SITE_URL}/order/canada-crtc?test_mode=1", wait_until="networkidle", timeout=30000)
|
|
page.wait_for_selector("#crtc-form", timeout=10000)
|
|
LOG.info("Order form loaded at %s", page.url)
|
|
|
|
|
|
@ai_retry(max_retries=2, step_name="02_company_type")
|
|
def step_company_type(page: Page, data: dict, _corrected_selector: str = ""):
|
|
"""Step 1: Select company type."""
|
|
ctype = data["company_type"]
|
|
# Radio inputs are sr-only — click the parent label, not the hidden input
|
|
sel = _corrected_selector or f'label:has(input[name="company_type"][value="{ctype}"])'
|
|
# Scroll into view first to get past sticky nav, then force-click the label
|
|
el = page.locator(sel).first
|
|
el.scroll_into_view_if_needed(timeout=5000)
|
|
page.wait_for_timeout(300)
|
|
el.click(timeout=5000, force=True)
|
|
page.wait_for_timeout(500)
|
|
|
|
if ctype == "numbered_tradename":
|
|
page.wait_for_selector("#trade-name-fields", state="visible", timeout=3000)
|
|
page.fill('input[name="trade_name"]', data["trade_name"])
|
|
elif ctype == "named":
|
|
page.wait_for_selector("#named-fields", state="visible", timeout=3000)
|
|
page.fill('input[name="name_choice_1"]', data.get("name_choice_1", ""))
|
|
page.fill('input[name="name_choice_2"]', data.get("name_choice_2", ""))
|
|
page.fill('input[name="name_choice_3"]', data.get("name_choice_3", ""))
|
|
page.select_option('select[name="legal_ending"]', data.get("legal_ending", "Ltd."))
|
|
|
|
page.locator('button:has-text("Next")').last.scroll_into_view_if_needed()
|
|
page.locator('button:has-text("Next")').last.click(timeout=5000, force=True)
|
|
page.wait_for_timeout(500)
|
|
LOG.info("Company type selected: %s", ctype)
|
|
|
|
|
|
@ai_retry(max_retries=2, step_name="03_director_info")
|
|
def step_director_info(page: Page, data: dict, _corrected_selector: str = ""):
|
|
"""Step 2: Fill director information (split name fields)."""
|
|
# Form uses split first/middle/last fields
|
|
page.fill('input[name="director_first_name"]', data["director_first_name"])
|
|
if data.get("director_middle_name"):
|
|
page.fill('input[name="director_middle_name"]', data["director_middle_name"])
|
|
page.fill('input[name="director_last_name"]', data["director_last_name"])
|
|
|
|
page.select_option('select[name="director_country"]', data["director_country"])
|
|
# Wait for address fields to become visible (hidden until country selected)
|
|
page.wait_for_selector('#director_address_fields:not(.hidden)', timeout=5000)
|
|
page.wait_for_timeout(500) # extra wait for province dropdown JS to populate
|
|
|
|
page.fill('input[name="director_street"]', data["director_street"])
|
|
if data.get("director_street2"):
|
|
page.fill('input[name="director_street2"]', data["director_street2"])
|
|
page.fill('input[name="director_city"]', data["director_city"])
|
|
page.fill('input[name="director_postal"]', data["director_postal"])
|
|
|
|
# Province is a custom select→hidden field combo.
|
|
# For US/CA/AU/GB: select from #director_province_select (visible after country change),
|
|
# which syncs value into the hidden #director_province via onchange.
|
|
prov_select = page.locator('#director_province_select')
|
|
prov_text = page.locator('#director_province_text')
|
|
prov_hidden = page.locator('#director_province')
|
|
|
|
if prov_select.is_visible():
|
|
prov_select.select_option(data["director_province"])
|
|
# Trigger onchange to sync to hidden field
|
|
prov_select.dispatch_event("change")
|
|
elif prov_text.is_visible():
|
|
prov_text.fill(data["director_province"])
|
|
# For free-text: directly set hidden field value via JS
|
|
page.evaluate(
|
|
"document.getElementById('director_province').value = arguments[0]",
|
|
data["director_province"]
|
|
)
|
|
else:
|
|
# Fallback: set hidden field directly
|
|
page.evaluate(
|
|
"document.getElementById('director_province').value = arguments[0]",
|
|
data["director_province"]
|
|
)
|
|
|
|
LOG.info("Province set to: %s", data["director_province"])
|
|
|
|
# Set the hidden director_name field (concat of first/middle/last — used by submitOrder)
|
|
full_name = f"{data['director_first_name']} {data.get('director_middle_name', '')} {data['director_last_name']}".replace(" ", " ").strip()
|
|
page.evaluate(f"document.getElementById('director_name').value = '{full_name}'")
|
|
|
|
if data.get("director_citizenship"):
|
|
cit = page.locator('select[name="director_citizenship"]')
|
|
cit.scroll_into_view_if_needed()
|
|
cit.select_option(data["director_citizenship"])
|
|
|
|
page.locator('button:has-text("Next")').last.scroll_into_view_if_needed()
|
|
page.locator('button:has-text("Next")').last.click(timeout=5000, force=True)
|
|
page.wait_for_timeout(500)
|
|
LOG.info("Director info filled")
|
|
|
|
|
|
@ai_retry(max_retries=2, step_name="04_telecom_details")
|
|
def step_telecom_details(page: Page, data: dict, _corrected_selector: str = ""):
|
|
"""Step 3: Fill telecom service details."""
|
|
# Wait for step 3 to be visible
|
|
page.wait_for_selector('textarea[name="service_description"]:visible', timeout=5000)
|
|
page.fill('textarea[name="service_description"]', data["service_description"])
|
|
page.fill('input[name="geographic_coverage"]', data["geographic_coverage"])
|
|
|
|
if data.get("include_bits"):
|
|
bits_cb = page.locator('#include_bits')
|
|
if bits_cb.count() > 0 and not bits_cb.is_checked():
|
|
bits_cb.check()
|
|
|
|
page.fill('input[name="reg_contact_name"]', data["reg_contact_name"])
|
|
page.fill('input[name="reg_contact_email"]', data["reg_contact_email"])
|
|
page.fill('input[name="reg_contact_phone"]', data["reg_contact_phone"])
|
|
|
|
page.locator('button:has-text("Next")').last.scroll_into_view_if_needed()
|
|
page.locator('button:has-text("Next")').last.click(timeout=5000, force=True)
|
|
page.wait_for_timeout(500)
|
|
LOG.info("Telecom details filled")
|
|
|
|
|
|
@ai_retry(max_retries=2, step_name="05_identity_verification")
|
|
def step_identity_verification(page: Page, data: dict, _corrected_selector: str = ""):
|
|
"""Step 4: Identity verification.
|
|
|
|
In test mode (sk_test_ key) the API accepts orders without identity verification.
|
|
We skip the Stripe Identity flow entirely and click Next to advance to step 5.
|
|
"""
|
|
# Step 4 is identity — just advance past it in test mode
|
|
# The API has a test mode bypass that accepts null identity_session_id
|
|
LOG.info("Identity step — skipping in test mode (API bypass active)")
|
|
|
|
next_btn = page.locator('#btn-next, button:has-text("Next"), button:has-text("Continue"), button:has-text("Skip")')
|
|
if next_btn.count() > 0:
|
|
next_btn.last.scroll_into_view_if_needed()
|
|
next_btn.last.click(timeout=5000, force=True)
|
|
page.wait_for_timeout(500)
|
|
else:
|
|
LOG.warning("No Next button found on identity step")
|
|
|
|
LOG.info("Advanced past identity step")
|
|
|
|
|
|
@ai_retry(max_retries=2, step_name="06_review_and_submit")
|
|
def step_review_and_submit(page: Page, data: dict, _corrected_selector: str = ""):
|
|
"""Step 5 (Review & Submit): fill contact info, accept terms, then submit."""
|
|
# Wait for step 5 panel to become visible
|
|
page.wait_for_selector('#step-5:not(.hidden)', timeout=5000)
|
|
page.wait_for_timeout(500)
|
|
LOG.info("Review & Submit page loaded")
|
|
|
|
# Fill contact info (these fields are in step 5, not earlier steps)
|
|
page.locator('#customer_name').scroll_into_view_if_needed(timeout=5000)
|
|
page.locator('#customer_name').fill(data["customer_name"])
|
|
page.locator('#customer_email').fill(data["customer_email"])
|
|
if data.get("customer_phone"):
|
|
page.locator('#customer_phone').fill(data["customer_phone"])
|
|
|
|
# Accept terms checkbox if present
|
|
consent = page.locator('input[type="checkbox"][name="consent"], input[type="checkbox"][id*="consent"], input[type="checkbox"][id*="terms"]')
|
|
if consent.count() > 0 and not consent.first.is_checked():
|
|
consent.first.check()
|
|
|
|
# Check consent checkbox — required by validateStep(5)
|
|
consent = page.locator('#consent')
|
|
consent.wait_for(state="visible", timeout=5000)
|
|
if not consent.is_checked():
|
|
consent.scroll_into_view_if_needed()
|
|
consent.check()
|
|
page.wait_for_timeout(300)
|
|
LOG.info("Consent checked: %s", consent.is_checked())
|
|
|
|
# Submit — click the btn-next button (which is "Submit Order" on step 5)
|
|
submit_btn = page.locator('#btn-next')
|
|
submit_btn.scroll_into_view_if_needed(timeout=5000)
|
|
LOG.info("Clicking Submit Order button")
|
|
submit_btn.click(timeout=10000)
|
|
|
|
# Wait a bit, then check what happened
|
|
page.wait_for_timeout(5000)
|
|
|
|
# Check for JS error in submit-status
|
|
submit_status = page.locator('#submit-status')
|
|
if submit_status.is_visible():
|
|
error_text = submit_status.text_content()
|
|
LOG.info("Submit status text: %s", error_text)
|
|
if error_text and ("error" in error_text.lower() or "wrong" in error_text.lower()):
|
|
raise AssertionError(f"Order submission failed: {error_text}")
|
|
|
|
# Check if btn-next text changed (debugging)
|
|
btn_text = page.locator('#btn-next').text_content()
|
|
LOG.info("Button state: text='%s' disabled=%s", btn_text, page.locator('#btn-next').is_disabled())
|
|
|
|
# Take screenshot for debugging
|
|
_take_screenshot(page, "06_after_submit", "")
|
|
|
|
page.wait_for_selector("#step-success:not(.hidden)", timeout=30000)
|
|
page.wait_for_timeout(500)
|
|
|
|
# Capture the order number from the success screen
|
|
order_number_el = page.locator("#success-order-number")
|
|
order_number = order_number_el.text_content(timeout=5000).strip()
|
|
if not order_number or order_number == "--":
|
|
raise AssertionError("Order number not populated in step-success")
|
|
|
|
data["order_number"] = order_number
|
|
data["order_id"] = order_number
|
|
LOG.info("Order submitted — order number: %s", order_number)
|
|
|
|
|
|
@ai_retry(max_retries=2, step_name="07_proceed_to_payment")
|
|
def step_proceed_to_payment(page: Page, data: dict, _corrected_selector: str = ""):
|
|
"""Click Proceed to Payment → wait for redirect to checkout.stripe.com."""
|
|
# Select card payment method
|
|
card_radio = page.locator('input[name="payment_method_choice"][value="card"]')
|
|
if card_radio.count() > 0:
|
|
card_radio.check()
|
|
|
|
pay_btn = page.locator('#btn-proceed-payment')
|
|
pay_btn.scroll_into_view_if_needed()
|
|
pay_btn.click(timeout=5000, force=True)
|
|
|
|
# Wait for redirect to Stripe hosted checkout
|
|
page.wait_for_url("*checkout.stripe.com*", timeout=30000)
|
|
data["checkout_url"] = page.url
|
|
LOG.info("Redirected to Stripe Checkout: %s", page.url[:80])
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Phase 2: Stripe Checkout (hosted page at checkout.stripe.com)
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
@ai_retry(max_retries=3, step_name="07_stripe_checkout_fill", use_opus=True)
|
|
def step_enter_payment(page: Page, data: dict, _corrected_selector: str = ""):
|
|
"""Fill Stripe Checkout hosted page with test card details.
|
|
|
|
Stripe Checkout (hosted) renders card fields directly in the page DOM,
|
|
not inside an iframe like Stripe Elements. The fields are standard inputs
|
|
inside a Stripe-controlled iframe at checkout.stripe.com.
|
|
"""
|
|
# Wait for the checkout page to fully render
|
|
page.wait_for_load_state("networkidle", timeout=20000)
|
|
LOG.info("Stripe Checkout page loaded")
|
|
|
|
# Email field (Stripe Checkout always asks for email)
|
|
email_input = page.locator('input[type="email"], input[name="email"], input[placeholder*="email" i]')
|
|
if email_input.count() > 0:
|
|
email_input.first.fill(data["customer_email"])
|
|
LOG.info("Filled email: %s", data["customer_email"])
|
|
|
|
# Card number — Stripe Checkout uses a direct input (not an iframe)
|
|
# Selectors for Stripe Checkout card fields
|
|
card_number = page.locator(
|
|
'input[name="cardNumber"], '
|
|
'input[data-elements-stable-field-name="cardNumber"], '
|
|
'input[autocomplete="cc-number"], '
|
|
'input[placeholder*="1234" i]'
|
|
)
|
|
|
|
if card_number.count() == 0:
|
|
# Stripe Checkout renders inside a cross-origin iframe at checkout.stripe.com
|
|
# The fields are accessible since we're already ON checkout.stripe.com
|
|
# Try waiting longer for them to appear
|
|
page.wait_for_selector(
|
|
'input[name="cardNumber"], '
|
|
'input[autocomplete="cc-number"], '
|
|
'[data-testid="card-tab"]',
|
|
timeout=15000,
|
|
)
|
|
card_number = page.locator('input[name="cardNumber"], input[autocomplete="cc-number"]')
|
|
|
|
card_number.first.fill(data["card_number"])
|
|
LOG.info("Filled card number")
|
|
|
|
# Expiry
|
|
expiry = page.locator(
|
|
'input[name="cardExpiry"], '
|
|
'input[autocomplete="cc-exp"], '
|
|
'input[placeholder*="MM" i]'
|
|
)
|
|
expiry.first.fill(f'{data["card_exp_month"]}/{data["card_exp_year"]}')
|
|
|
|
# CVC
|
|
cvc = page.locator(
|
|
'input[name="cardCvc"], '
|
|
'input[autocomplete="cc-csc"], '
|
|
'input[placeholder*="CVC" i], '
|
|
'input[placeholder*="CVV" i]'
|
|
)
|
|
cvc.first.fill(data["card_cvv"])
|
|
|
|
# Name on card (Stripe Checkout sometimes shows this)
|
|
name_field = page.locator('input[name="billingName"], input[autocomplete="cc-name"]')
|
|
if name_field.count() > 0:
|
|
name_field.first.fill(data["customer_name"])
|
|
|
|
# ZIP / postal
|
|
zip_field = page.locator(
|
|
'input[name="postalCode"], '
|
|
'input[autocomplete="postal-code"], '
|
|
'input[placeholder*="ZIP" i]'
|
|
)
|
|
if zip_field.count() > 0:
|
|
zip_field.first.fill(data["card_zip"])
|
|
|
|
LOG.info("All payment fields filled — submitting")
|
|
|
|
# Submit button
|
|
pay_btn = page.locator(
|
|
'button:has-text("Pay"), '
|
|
'button:has-text("Subscribe"), '
|
|
'button[type="submit"], '
|
|
'[data-testid="hosted-payment-submit-button"]'
|
|
)
|
|
pay_btn.first.click(timeout=10000)
|
|
LOG.info("Payment submitted")
|
|
|
|
|
|
@step(step_name="08_payment_success")
|
|
def step_payment_success(page: Page, data: dict):
|
|
"""Wait for Stripe to redirect back to our success page."""
|
|
# Stripe redirects to our success_url after payment
|
|
# success_url = {DOMAIN}/order/success?session_id={CHECKOUT_SESSION_ID}&order_id=...
|
|
page.wait_for_url(f"{SITE_URL}/order/success*", timeout=60000)
|
|
page.wait_for_load_state("networkidle", timeout=15000)
|
|
LOG.info("Success page loaded: %s", page.url)
|
|
|
|
# Extract session_id and order_id from URL
|
|
import urllib.parse
|
|
params = urllib.parse.parse_qs(urllib.parse.urlparse(page.url).query)
|
|
data["session_id"] = params.get("session_id", [None])[0]
|
|
data["order_id"] = params.get("order_id", [None])[0]
|
|
data["order_number"] = data["order_id"]
|
|
|
|
LOG.info("Session ID: %s | Order ID: %s", data["session_id"], data["order_id"])
|
|
|
|
# The success page JS polls /api/v1/checkout/session/:session_id
|
|
# Wait a few seconds for it to process and show the confirmation
|
|
page.wait_for_timeout(5000)
|
|
|
|
# Verify some confirmation text is present
|
|
confirmed = page.locator(
|
|
"text=confirmed, "
|
|
"text=Confirmed, "
|
|
"text=Thank you, "
|
|
"text=Order received, "
|
|
".success, "
|
|
"[data-status='confirmed']"
|
|
)
|
|
if confirmed.count() > 0:
|
|
LOG.info("Success page shows order confirmation")
|
|
else:
|
|
LOG.warning("Success page loaded but no explicit confirmation text found — may still be processing")
|
|
|
|
_take_screenshot(page, "08_payment_success", "")
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Phase 3+4: Provider API tests
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
@step(step_name="09_porkbun_api")
|
|
def step_porkbun_api(page: Page, data: dict):
|
|
"""Test Porkbun API: ping + check .ca domain availability."""
|
|
from scripts.tests.providers.porkbun_client import PorkbunClient
|
|
client = PorkbunClient()
|
|
assert client.ping(), "Porkbun API ping failed"
|
|
result = client.check_availability(data["test_domain"])
|
|
LOG.info("Porkbun: %s available=%s price=%s",
|
|
data["test_domain"], result["available"], result.get("price"))
|
|
data["domain_available"] = result["available"]
|
|
|
|
|
|
@step(step_name="10_flowroute_api")
|
|
def step_flowroute_api(page: Page, data: dict):
|
|
"""Test Flowroute API: ping + search Canadian DIDs."""
|
|
from scripts.tests.providers.flowroute_client import FlowrouteClient
|
|
client = FlowrouteClient()
|
|
assert client.ping(), "Flowroute API ping failed"
|
|
dids = client.search_available_dids(starts_with="1604", limit=3)
|
|
LOG.info("Flowroute: %d Vancouver DIDs available", len(dids))
|
|
for d in dids[:3]:
|
|
LOG.info(" %s (%s) $%s/mo", d["did"], d["rate_center"], d["monthly_cost"])
|
|
if dids:
|
|
data["available_dids"] = [d["did"] for d in dids]
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Phase 5: Verification (PG + ERPNext)
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
@step(step_name="11_verify_pg_order")
|
|
def step_verify_pg_order(page: Page, data: dict):
|
|
"""Verify order exists in PostgreSQL with correct fields and payment status."""
|
|
import psycopg2, psycopg2.extras
|
|
|
|
conn = psycopg2.connect(os.environ["DATABASE_URL"])
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
|
|
# Look up by order_id from URL, or fall back to most recent test email
|
|
if data.get("order_id"):
|
|
cur.execute(
|
|
"SELECT * FROM canada_crtc_orders WHERE order_number = %s",
|
|
(data["order_id"],)
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"SELECT * FROM canada_crtc_orders "
|
|
"WHERE customer_email LIKE 'testcarrier+%@performancewest.net' "
|
|
"ORDER BY created_at DESC LIMIT 1"
|
|
)
|
|
|
|
order = cur.fetchone()
|
|
conn.close()
|
|
|
|
if not order:
|
|
raise AssertionError(f"Order {data.get('order_id')} not found in canada_crtc_orders")
|
|
|
|
data["order_number"] = order["order_number"]
|
|
data["order_id"] = order["order_number"]
|
|
|
|
LOG.info("PG order: %s | company_type=%s | total=$%.2f | payment=%s | status=%s",
|
|
order["order_number"], order["company_type"],
|
|
(order["total_cents"] or 0) / 100,
|
|
order["payment_status"], order["status"])
|
|
|
|
# ── Field assertions ──────────────────────────────────────
|
|
assert order["company_type"] == data["company_type"], \
|
|
f"company_type: got {order['company_type']}, want {data['company_type']}"
|
|
assert order["payment_status"] == "paid", \
|
|
f"payment_status is '{order['payment_status']}' — webhook may not have fired yet"
|
|
assert order["stripe_session_id"], \
|
|
"stripe_session_id is empty — checkout session was not recorded"
|
|
assert order["customer_email"] == data["customer_email"], \
|
|
f"customer_email mismatch: {order['customer_email']}"
|
|
assert order["total_cents"] and order["total_cents"] > 0, \
|
|
"total_cents is 0 — order pricing was not calculated"
|
|
|
|
if data["company_type"] == "numbered_tradename":
|
|
assert order.get("trade_name") == data["trade_name"], \
|
|
f"trade_name: got {order.get('trade_name')}, want {data['trade_name']}"
|
|
|
|
LOG.info("PG order verification PASSED")
|
|
|
|
# Store for ERPNext check
|
|
data["erpnext_sales_order"] = order.get("erpnext_sales_order")
|
|
data["stripe_session_id"] = order.get("stripe_session_id")
|
|
|
|
|
|
@step(step_name="12_verify_erpnext")
|
|
def step_verify_erpnext(page: Page, data: dict):
|
|
"""Verify ERPNext Sales Order was created and is in correct workflow state."""
|
|
import urllib.request, urllib.error
|
|
|
|
erpnext_url = os.environ.get("ERPNEXT_URL", "http://erpnext:8000")
|
|
erpnext_key = os.environ.get("ERPNEXT_API_KEY", "")
|
|
erpnext_secret = os.environ.get("ERPNEXT_API_SECRET", "")
|
|
|
|
if not erpnext_key or not erpnext_secret:
|
|
LOG.warning("ERPNEXT_API_KEY/SECRET not set — skipping ERPNext verification")
|
|
return
|
|
|
|
order_id = data.get("order_id") or data.get("order_number")
|
|
if not order_id:
|
|
LOG.warning("No order_id to look up in ERPNext")
|
|
return
|
|
|
|
# Poll ERPNext for Sales Order by custom_external_order_id
|
|
# Give it up to 30s for the webhook to have processed
|
|
so_name = data.get("erpnext_sales_order")
|
|
if not so_name:
|
|
LOG.info("Sales Order not in PG yet — polling ERPNext directly...")
|
|
url = (
|
|
f"{erpnext_url}/api/resource/Sales Order"
|
|
f"?filters=[[\"Sales Order\",\"custom_external_order_id\",\"=\",\"{order_id}\"]]"
|
|
f"&fields=[\"name\",\"workflow_state\",\"status\",\"grand_total\",\"customer\"]"
|
|
f"&limit=1"
|
|
)
|
|
req = urllib.request.Request(url, headers={
|
|
"Authorization": f"token {erpnext_key}:{erpnext_secret}",
|
|
"Content-Type": "application/json",
|
|
})
|
|
try:
|
|
for attempt in range(6): # 30s total
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
result = json.loads(resp.read())
|
|
records = result.get("data", [])
|
|
if records:
|
|
so_name = records[0]["name"]
|
|
break
|
|
if attempt < 5:
|
|
LOG.info(" ERPNext: no Sales Order yet — waiting 5s (attempt %d/6)", attempt+1)
|
|
time.sleep(5)
|
|
except urllib.error.URLError as e:
|
|
LOG.warning("ERPNext unreachable from test runner (%s) — skipping", e)
|
|
return
|
|
|
|
if not so_name:
|
|
raise AssertionError(
|
|
f"No ERPNext Sales Order found for order {order_id} after 30s — "
|
|
"checkout webhook may have failed"
|
|
)
|
|
|
|
# Fetch the Sales Order details
|
|
url = f"{erpnext_url}/api/resource/Sales Order/{so_name}"
|
|
req = urllib.request.Request(url, headers={
|
|
"Authorization": f"token {erpnext_key}:{erpnext_secret}",
|
|
"Content-Type": "application/json",
|
|
})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
so = json.loads(resp.read()).get("data", {})
|
|
except urllib.error.URLError as e:
|
|
LOG.warning("Could not fetch Sales Order details: %s", e)
|
|
return
|
|
|
|
LOG.info(
|
|
"ERPNext Sales Order: %s | state=%s | status=%s | total=%.2f | customer=%s",
|
|
so.get("name"), so.get("workflow_state"), so.get("status"),
|
|
float(so.get("grand_total") or 0),
|
|
so.get("customer"),
|
|
)
|
|
|
|
assert so.get("name"), "Sales Order has no name"
|
|
assert so.get("grand_total") and float(so["grand_total"]) > 0, \
|
|
"Sales Order grand_total is 0"
|
|
# Workflow should have advanced past 'Received' if payment confirmed
|
|
wf_state = so.get("workflow_state", "")
|
|
if wf_state in ("Received", "Awaiting Funds", "In Progress"):
|
|
LOG.info("ERPNext workflow state '%s' — acceptable", wf_state)
|
|
else:
|
|
LOG.warning("Unexpected ERPNext workflow state: %s", wf_state)
|
|
|
|
data["erpnext_sales_order"] = so.get("name")
|
|
LOG.info("ERPNext verification PASSED — Sales Order %s", so.get("name"))
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Test report
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def generate_report(company_type: str):
|
|
results = get_run_results()
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
report_dir = Path(__file__).parent / "runs" / f"{company_type}_{ts}"
|
|
report_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
for r in results:
|
|
for ss in r.screenshots:
|
|
ss_path = Path(ss)
|
|
if ss_path.exists():
|
|
ss_path.rename(report_dir / ss_path.name)
|
|
|
|
(report_dir / "results.json").write_text(json.dumps([r.to_dict() for r in results], indent=2))
|
|
|
|
passed = sum(1 for r in results if r.success)
|
|
failed = sum(1 for r in results if not r.success)
|
|
ai_fixes = sum(len(r.ai_corrections) for r in results)
|
|
total_time = sum(r.duration for r in results)
|
|
|
|
LOG.info("")
|
|
LOG.info("=" * 60)
|
|
LOG.info("TEST REPORT: %s", company_type)
|
|
LOG.info("=" * 60)
|
|
LOG.info(" Passed: %d / %d | AI corrections: %d | Time: %.1fs",
|
|
passed, passed + failed, ai_fixes, total_time)
|
|
LOG.info(" Report: %s", report_dir)
|
|
LOG.info("=" * 60)
|
|
|
|
for r in results:
|
|
status = "PASS" if r.success else "FAIL"
|
|
LOG.info(" [%s] %s (%.1fs, %d attempt%s%s)",
|
|
status, r.step_name, r.duration, r.attempts,
|
|
"s" if r.attempts != 1 else "",
|
|
f", {len(r.ai_corrections)} AI fix" if r.ai_corrections else "")
|
|
if not r.success and r.error:
|
|
LOG.info(" └─ %s", r.error[:120])
|
|
|
|
return report_dir
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Cleanup
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def cleanup(data: dict):
|
|
"""Remove test data from PostgreSQL (does not touch ERPNext)."""
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ["DATABASE_URL"])
|
|
cur = conn.cursor()
|
|
pat = "testcarrier+%@performancewest.net"
|
|
|
|
cur.execute("DELETE FROM canada_crtc_orders WHERE customer_email LIKE %s", (pat,))
|
|
deleted = cur.rowcount
|
|
try:
|
|
cur.execute("""DELETE FROM customer_directors WHERE customer_id IN
|
|
(SELECT id FROM customers WHERE email LIKE %s)""", (pat,))
|
|
cur.execute("""DELETE FROM customer_addresses WHERE customer_id IN
|
|
(SELECT id FROM customers WHERE email LIKE %s)""", (pat,))
|
|
except Exception:
|
|
pass # Tables may not exist yet
|
|
cur.execute("DELETE FROM customers WHERE email LIKE %s", (pat,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
LOG.info("Cleanup: deleted %d test order(s)", deleted)
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Main orchestrator
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def run_test(company_type: str, skip_providers: bool = False, headless: bool = True):
|
|
LOG.info("Starting E2E test: company_type=%s headless=%s", company_type, headless)
|
|
|
|
data = make_test_order()
|
|
data["company_type"] = company_type
|
|
|
|
if company_type == "numbered_tradename":
|
|
data["trade_name"] = "Pacific Telecom"
|
|
data["add_trade_name"] = True
|
|
elif company_type == "named":
|
|
data["name_choice_1"] = "Pacific Telecom Solutions Ltd."
|
|
data["name_choice_2"] = "Western Digital Communications Inc."
|
|
data["name_choice_3"] = "Cascade Voice Networks Corp."
|
|
data["legal_ending"] = "Ltd."
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(
|
|
headless=headless,
|
|
args=["--disable-blink-features=AutomationControlled"],
|
|
)
|
|
context = browser.new_context(
|
|
viewport={"width": 1440, "height": 900},
|
|
user_agent=(
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
|
|
),
|
|
)
|
|
page = context.new_page()
|
|
|
|
# Log browser console errors and relevant network activity
|
|
page.on("console", lambda msg: LOG.warning("[browser] %s: %s", msg.type, msg.text) if msg.type in ("error", "warning") else None)
|
|
# Auto-dismiss alert dialogs and log them
|
|
page.on("dialog", lambda d: (LOG.warning("[dialog] %s: %s", d.type, d.message), d.dismiss()))
|
|
# Log API calls
|
|
page.on("request", lambda r: LOG.info("[request] %s %s", r.method, r.url) if "api.performancewest" in r.url or "canada-crtc" in r.url else None)
|
|
page.on("response", lambda r: LOG.info("[response] %s %s → %d", r.request.method, r.url, r.status) if "api.performancewest" in r.url or "canada-crtc" in r.url else None)
|
|
|
|
try:
|
|
# Phase 1 — Order form
|
|
step_load_order_page(page, data)
|
|
step_company_type(page, data)
|
|
step_director_info(page, data)
|
|
step_telecom_details(page, data)
|
|
step_identity_verification(page, data)
|
|
step_review_and_submit(page, data)
|
|
|
|
# Phase 2 — Stripe Checkout hosted page
|
|
step_proceed_to_payment(page, data)
|
|
step_enter_payment(page, data)
|
|
step_payment_success(page, data)
|
|
|
|
# Phase 3 — Provider APIs
|
|
if not skip_providers:
|
|
step_porkbun_api(page, data)
|
|
step_flowroute_api(page, data)
|
|
|
|
# Phase 4 — Verification
|
|
step_verify_pg_order(page, data)
|
|
step_verify_erpnext(page, data)
|
|
|
|
except Exception as exc:
|
|
LOG.error("Test aborted: %s", exc)
|
|
_take_screenshot(page, "ABORT", "_fatal")
|
|
|
|
finally:
|
|
browser.close()
|
|
|
|
report_dir = generate_report(company_type)
|
|
|
|
try:
|
|
cleanup(data)
|
|
except Exception as ce:
|
|
LOG.warning("Cleanup failed: %s", ce)
|
|
|
|
return report_dir
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="E2E CRTC order test")
|
|
parser.add_argument("--type", choices=["numbered", "numbered_tradename", "named", "all"],
|
|
default="numbered", help="Company type to test (default: numbered)")
|
|
parser.add_argument("--skip-providers", action="store_true",
|
|
help="Skip Porkbun/Flowroute API tests")
|
|
parser.add_argument("--headed", action="store_true",
|
|
help="Run browser in headed (visible) mode")
|
|
parser.add_argument("--no-cleanup", action="store_true",
|
|
help="Skip cleanup — leave test order in DB for inspection")
|
|
args = parser.parse_args()
|
|
|
|
types = ["numbered", "numbered_tradename", "named"] if args.type == "all" else [args.type]
|
|
|
|
for ct in types:
|
|
LOG.info("\n" + "=" * 60)
|
|
LOG.info("RUNNING: %s", ct)
|
|
LOG.info("=" * 60)
|
|
run_test(ct, skip_providers=args.skip_providers, headless=not args.headed)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|