new-site/scripts/workers/hestia_provisioner.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

447 lines
19 KiB
Python

"""
HestiaCP Domain Provisioner — sets up a new client's .ca domain with
all required email addresses, web hosting, and DNS on cp.carrierone.com.
Called as part of the Canada CRTC order fulfillment (Step 5b).
Pipeline context:
Step 4: BC Incorporation → BC corp number obtained
Step 5: .ca domain registration (Porkbun) — uses BC corp number for CIRA CPR
CIRA registrant = filings@performancewest.net (Porkbun account owner)
This cannot be changed via API — it stays as PW permanently
Step 5b: THIS MODULE — creates regulatory@domain.ca and 13 other mailboxes
Credentials stored in ERPNext Sensitive ID document
Step 6: CRTC letter sent FROM regulatory@domain.ca using HestiaCP SMTP
CIRA Registrant Note:
Porkbun v3 API has no per-domain contact update endpoint. The CIRA
registrant for all .ca domains will always be the Porkbun account owner
(Performance West, filings@performancewest.net). The client's identity
is established through:
- The BC corp number (CPR category CCO)
- regulatory@domain.ca as the CRTC contact address
- The CRTC letter sent from regulatory@domain.ca
Creates per domain:
- Web domain with placeholder "Canadian Telecommunications Carrier" page
- SSL certificate (Let's Encrypt)
- 14 email accounts (regulatory, crtc, ccts, corpadmin, registeredoffice,
accounting, billing, abuse, noc, postmaster, info, admin, sales, support)
- DKIM + SPF + DMARC DNS records
Uses the HestiaCP CLI API via SSH to cp.carrierone.com.
"""
from __future__ import annotations
import json
import logging
import os
import secrets
import string
import subprocess
from typing import Optional
LOG = logging.getLogger("workers.hestia")
HESTIA_HOST = os.getenv("HESTIA_HOST", "cp.carrierone.com")
HESTIA_PORT = int(os.getenv("HESTIA_SSH_PORT", "22"))
HESTIA_USER = os.getenv("HESTIA_USER", "admin")
HESTIA_KEY = os.getenv("HESTIA_SSH_KEY", "") # Path to SSH private key on the workers container
# Carbonio IMAP/SMTP server (co.carrierone.com) for client-facing webmail
WEBMAIL_URL = "https://co.carrierone.com:6071"
IMAP_HOST = "co.carrierone.com"
SMTP_HOST = "co.carrierone.com"
SMTP_PORT = 587
IMAP_PORT = 993
# All mailboxes to create per client .ca domain
# Order matters — "regulatory" is first so we can extract its credentials quickly
MAILBOXES: dict[str, str] = {
# ── Regulatory (most important — used on CRTC letter) ─────────────────
"regulatory": "CRTC and all regulatory correspondence — listed on the CRTC notification letter",
"crtc": "CRTC registration and renewal correspondence",
"ccts": "CCTS complaint handling and membership",
# ── Corporate ─────────────────────────────────────────────────────────
"corpadmin": "BC Registry, annual reports, and corporate administration",
"registeredoffice": "Official legal service and government notices (BC BCA s.40)",
# ── Financial ─────────────────────────────────────────────────────────
"accounting": "CRA tax notices, GST/HST, bank correspondence",
"billing": "Payment notifications and billing inquiries",
# ── Operations (RFC 2142 mandatory for telecom carriers) ──────────────
"abuse": "Telecom abuse reports — spam, robocall, network abuse (RFC 2142 required)",
"noc": "Network operations — outages, interconnection, technical alerts",
"postmaster": "Email delivery issues and bounce handling (RFC 2142 required)",
# ── Business ──────────────────────────────────────────────────────────
"info": "General inquiries",
"admin": "Administrative catch-all",
"sales": "Wholesale partner and sales inquiries",
"support": "Customer support requests",
}
def generate_password(length: int = 24) -> str:
"""Generate a secure random password (letters + digits + safe symbols)."""
chars = string.ascii_letters + string.digits + "!@#$%&*"
return "".join(secrets.choice(chars) for _ in range(length))
def run_hestia_cmd(cmd: str, timeout: int = 30) -> tuple[bool, str]:
"""Execute a HestiaCP CLI command on cp.carrierone.com via SSH.
Args:
cmd: The v-* command to run (e.g. "v-add-domain admin example.ca")
timeout: SSH command timeout in seconds
Returns:
(success: bool, output: str)
"""
if not HESTIA_KEY:
LOG.error("HESTIA_SSH_KEY is not set — cannot run HestiaCP command: %s", cmd)
return False, "HESTIA_SSH_KEY not configured"
ssh_cmd = [
"ssh",
"-i", HESTIA_KEY,
"-p", str(HESTIA_PORT),
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
f"{HESTIA_USER}@{HESTIA_HOST}",
cmd,
]
try:
result = subprocess.run(
ssh_cmd, capture_output=True, text=True, timeout=timeout,
)
success = result.returncode == 0
output = result.stdout.strip() or result.stderr.strip()
if not success:
LOG.error("HestiaCP command failed [rc=%d]: %s%s", result.returncode, cmd, output)
return success, output
except subprocess.TimeoutExpired:
LOG.error("HestiaCP command timed out (%ds): %s", timeout, cmd)
return False, "timeout"
except Exception as exc:
LOG.error("SSH error running HestiaCP command: %s", exc)
return False, str(exc)
def provision_domain(
domain: str,
client_name: str,
redirect_url: str = "",
order_number: str = "",
) -> dict:
"""Provision a complete .ca domain on HestiaCP.
Steps:
1. Add web domain
2. Request Let's Encrypt SSL
3. Deploy placeholder page (or redirect)
4. Add mail domain
5. Enable DKIM
6. Create all 14 mailboxes with random passwords
7. Request mail SSL
8. Return credentials dict
Args:
domain: The .ca domain (e.g. "valleyinternet.ca")
client_name: Client's legal company name (for the placeholder page)
redirect_url: If set, redirect the domain to this URL instead of placeholder
order_number: For logging context
Returns:
{
"success": bool,
"domain": str,
"mailboxes": {
prefix: {
"email": str, # e.g. "regulatory@valleyinternet.ca"
"password": str, # random 24-char password
"description": str,
"smtp_host": str, # co.carrierone.com
"smtp_port": int, # 587
"imap_host": str, # co.carrierone.com
"imap_port": int, # 993
"webmail": str, # https://co.carrierone.com:6071
}
},
"regulatory_email": str, # regulatory@domain.ca
"regulatory_password": str, # password for that account
"errors": [str],
}
"""
tag = f"[{order_number}] " if order_number else ""
LOG.info("%sProvisioning domain: %s for '%s'", tag, domain, client_name)
errors: list[str] = []
mailbox_credentials: dict = {}
# ── Step 1: Add web domain ────────────────────────────────────────────
LOG.info("%sStep 1: Adding web domain %s", tag, domain)
ok, out = run_hestia_cmd(f"v-add-domain {HESTIA_USER} {domain}")
if not ok:
errors.append(f"Failed to add web domain: {out}")
# Non-fatal — continue to set up email even if web fails
# ── Step 2: Request Let's Encrypt SSL for web ─────────────────────────
LOG.info("%sStep 2: Requesting SSL certificate for %s", tag, domain)
ok, out = run_hestia_cmd(f"v-add-letsencrypt-domain {HESTIA_USER} {domain}", timeout=60)
if not ok:
errors.append(f"Web SSL setup failed (DNS propagation may still be pending): {out}")
# ── Step 3: Deploy placeholder page or redirect ───────────────────────
if redirect_url:
LOG.info("%sStep 3: Setting redirect → %s", tag, redirect_url)
redirect_html = (
f'<html><head><meta http-equiv="refresh" content="0;url={redirect_url}">'
f"</head></html>"
)
run_hestia_cmd(
f"echo '{redirect_html}' "
f"> /home/{HESTIA_USER}/web/{domain}/public_html/index.html"
)
else:
LOG.info("%sStep 3: Deploying placeholder page", tag)
# Minimal branded placeholder — no PW branding, client is the carrier
placeholder = (
"<!DOCTYPE html>"
'<html lang="en">'
"<head>"
'<meta charset="UTF-8">'
'<meta name="viewport" content="width=device-width,initial-scale=1">'
f"<title>{client_name}</title>"
"<style>"
"body{font-family:system-ui,sans-serif;display:flex;align-items:center;"
"justify-content:center;min-height:100vh;margin:0;background:#0f1d2e;color:#fff}"
".c{text-align:center;max-width:500px;padding:2rem}"
"h1{font-size:1.5rem;margin-bottom:0.5rem}"
"p{color:#8daed3;font-size:0.9rem}"
"</style>"
"</head>"
"<body>"
f'<div class="c">'
f"<h1>{client_name}</h1>"
"<p>Canadian Telecommunications Carrier</p>"
'<p style="margin-top:2rem;font-size:0.75rem;color:#4a78ad">CRTC Registered</p>'
"</div>"
"</body>"
"</html>"
)
# Write via heredoc to avoid shell quoting issues
run_hestia_cmd(
f"cat > /home/{HESTIA_USER}/web/{domain}/public_html/index.html << 'PLACEHOLDER_EOF'\n"
f"{placeholder}\n"
f"PLACEHOLDER_EOF"
)
# ── Step 4: Add mail domain ───────────────────────────────────────────
LOG.info("%sStep 4: Adding mail domain %s", tag, domain)
ok, out = run_hestia_cmd(f"v-add-mail-domain {HESTIA_USER} {domain}")
if not ok:
errors.append(f"Failed to add mail domain: {out}")
# ── Step 5: Enable DKIM ───────────────────────────────────────────────
LOG.info("%sStep 5: Enabling DKIM for %s", tag, domain)
ok, out = run_hestia_cmd(f"v-add-mail-domain-dkim {HESTIA_USER} {domain}")
if not ok:
errors.append(f"DKIM setup failed: {out}")
# ── Step 6: Create all mailboxes ──────────────────────────────────────
LOG.info("%sStep 6: Creating %d mailboxes for %s", tag, len(MAILBOXES), domain)
for prefix, description in MAILBOXES.items():
password = generate_password()
email_addr = f"{prefix}@{domain}"
ok, out = run_hestia_cmd(
f"v-add-mail-account {HESTIA_USER} {domain} {prefix} {password}"
)
if ok:
mailbox_credentials[prefix] = {
"email": email_addr,
"password": password,
"description": description,
"smtp_host": SMTP_HOST,
"smtp_port": SMTP_PORT,
"imap_host": IMAP_HOST,
"imap_port": IMAP_PORT,
"webmail": WEBMAIL_URL,
}
LOG.info("%s Created: %s", tag, email_addr)
else:
errors.append(f"Failed to create {email_addr}: {out}")
LOG.error("%s Failed: %s%s", tag, email_addr, out)
# ── Step 7: Request mail SSL ──────────────────────────────────────────
LOG.info("%sStep 7: Requesting mail SSL for %s", tag, domain)
ok, out = run_hestia_cmd(
f"v-add-letsencrypt-domain {HESTIA_USER} {domain} '' yes", timeout=60
)
if not ok:
# Non-critical — mail works without dedicated TLS cert if MX points to
# the Carbonio server which has its own wildcard cert
LOG.warning("%sMail SSL note (non-fatal): %s", tag, out)
# ── Result ────────────────────────────────────────────────────────────
success = len(mailbox_credentials) > 0 # At least some mailboxes created
regulatory = mailbox_credentials.get("regulatory", {})
LOG.info(
"%sDomain provisioning %s: %s%d/%d mailboxes created, %d errors",
tag,
"COMPLETE" if success else "PARTIAL/FAILED",
domain,
len(mailbox_credentials),
len(MAILBOXES),
len(errors),
)
return {
"success": success,
"domain": domain,
"mailboxes": mailbox_credentials,
"regulatory_email": regulatory.get("email", f"regulatory@{domain}"),
"regulatory_password": regulatory.get("password", ""),
"errors": errors,
}
def deprovision_domain(domain: str) -> bool:
"""Remove a domain and all its mailboxes from HestiaCP.
Used when an order is cancelled or a client stops service."""
LOG.info("Deprovisioning domain: %s", domain)
ok, out = run_hestia_cmd(f"v-delete-domain {HESTIA_USER} {domain}")
if not ok:
LOG.error("Failed to deprovision domain %s: %s", domain, out)
return ok
# ─────────────────────────────────────────────────────────────────────────────
# HestiaProvisioner — class interface used by the CRTC pipeline
# ─────────────────────────────────────────────────────────────────────────────
class HestiaProvisioner:
"""Async-compatible wrapper around the HestiaCP provisioner functions.
The CRTC pipeline (canada_crtc.py) calls this as:
provisioner = HestiaProvisioner()
result = await provisioner.provision_domain(
domain=ca_domain,
client_name=company_display,
order_number=order_number,
)
After success, call store_credentials_to_erpnext() to persist the
mailbox passwords in an ERPNext Sensitive ID document (AES-256 encrypted
at rest by the ERPNext HSM).
"""
async def provision_domain(
self,
domain: str,
client_name: str = "",
order_number: str = "",
redirect_url: str = "",
) -> dict:
"""Provision a .ca domain. Runs the synchronous provision_domain()
function in an executor so it doesn't block the async event loop."""
import asyncio
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: provision_domain(
domain=domain,
client_name=client_name or domain,
redirect_url=redirect_url,
order_number=order_number,
),
)
return result
async def store_credentials_to_erpnext(
self,
erp, # ERPNextClient instance
order_number: str,
domain: str,
mailboxes: dict,
sales_order_name: str = "",
) -> str:
"""Store all mailbox credentials in an ERPNext Sensitive ID document.
The Sensitive ID DocType stores encrypted credentials for later retrieval
by the client via the portal or by admin for support purposes.
Returns the ERPNext Sensitive ID document name, or "" on failure.
"""
if not mailboxes:
return ""
# Build a structured JSON credential block
cred_lines = []
for prefix, cred in mailboxes.items():
cred_lines.append(
f"{cred['email']}\n"
f" Password: {cred['password']}\n"
f" IMAP: {cred['imap_host']}:{cred['imap_port']}\n"
f" SMTP: {cred['smtp_host']}:{cred['smtp_port']}\n"
f" Purpose: {cred['description']}"
)
credentials_text = "\n\n".join(cred_lines)
try:
doc = erp.create_resource("Sensitive ID", {
"title": f"Email credentials — {domain}",
"order_number": order_number,
"sales_order": sales_order_name,
"id_type": "Email Credentials",
"holder_name": domain,
"sensitive_data": credentials_text,
"notes": (
f"HestiaCP mailboxes for {domain}\n"
f"Server: {HESTIA_HOST}\n"
f"Webmail: {WEBMAIL_URL}\n"
f"Created by: hestia_provisioner.py"
),
})
doc_name = doc.get("name", "")
LOG.info("[%s] Credentials stored in Sensitive ID: %s", order_number, doc_name)
return doc_name
except Exception as exc:
LOG.error("[%s] Failed to store credentials in ERPNext: %s", order_number, exc)
return ""
async def deprovision_domain(self, domain: str) -> bool:
"""Deprovision a domain asynchronously."""
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: deprovision_domain(domain))
# ─────────────────────────────────────────────────────────────────────────────
# CLI entry point
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
import sys
if len(sys.argv) < 3:
print("Usage:")
print(" python -m scripts.workers.hestia_provisioner provision example.ca 'Company Name'")
print(" python -m scripts.workers.hestia_provisioner deprovision example.ca")
sys.exit(1)
action = sys.argv[1]
domain_arg = sys.argv[2]
if action == "provision":
client_name_arg = sys.argv[3] if len(sys.argv) > 3 else domain_arg
result = provision_domain(domain_arg, client_name_arg)
print(json.dumps(result, indent=2, default=str))
elif action == "deprovision":
ok = deprovision_domain(domain_arg)
print(f"Deprovision {'succeeded' if ok else 'failed'}")
else:
print(f"Unknown action: {action}")
sys.exit(1)