new-site/scripts/workers/delivery_worker.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

511 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Delivery worker polls for admin-approved ("Ready") orders and emails
document links to the customer.
Run alongside the base worker:
python -m scripts.workers.delivery_worker
Environment variables (in addition to ERPNext & MinIO vars):
SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASSWORD, SMTP_FROM
PRESIGN_EXPIRY presigned-URL lifetime in seconds (default 7 days)
DELIVERY_POLL_INTERVAL seconds between polls (default 60)
"""
from __future__ import annotations
import logging
import os
import signal
import sys
import time
from datetime import datetime, timedelta, timezone
from typing import Any
from minio import Minio
from .erpnext_client import ERPNextClient, ERPNextClientError
# --------------------------------------------------------------------------- #
# Configuration
# --------------------------------------------------------------------------- #
POLL_INTERVAL = int(os.getenv("DELIVERY_POLL_INTERVAL", "60"))
PRESIGN_EXPIRY = int(os.getenv("PRESIGN_EXPIRY", str(7 * 24 * 3600))) # 7 days
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "performancewest")
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
SMTP_HOST = os.getenv("SMTP_HOST", "co.carrierone.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER", "")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "")
SMTP_FROM = os.getenv("SMTP_FROM", "Performance West <noreply@performancewest.net>")
# Upsell + portal onboarding
SITE_URL = os.getenv("SITE_URL", "https://performancewest.net")
API_URL = os.getenv("API_URL", "http://api:3001")
PORTAL_URL = os.getenv("PORTAL_URL", "https://portal.performancewest.net")
CUSTOMER_JWT_SECRET = os.getenv(
"CUSTOMER_JWT_SECRET", "changeme_long_random_string"
)
SET_PASSWORD_TTL_HOURS = int(os.getenv("SET_PASSWORD_TTL_HOURS", "72"))
# --------------------------------------------------------------------------- #
# Logging
# --------------------------------------------------------------------------- #
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("pw.worker.delivery")
# --------------------------------------------------------------------------- #
# Graceful shutdown
# --------------------------------------------------------------------------- #
_shutdown_requested = False
def _signal_handler(signum: int, _frame: Any) -> None:
global _shutdown_requested
logger.info("Received signal %s shutting down gracefully …", signum)
_shutdown_requested = True
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
# --------------------------------------------------------------------------- #
# Email
# --------------------------------------------------------------------------- #
_EMAIL_TEMPLATE = """\
<!DOCTYPE html>
<html lang="en">
<head><meta charset="utf-8"></head>
<body style="margin:0;padding:0;font-family:Arial,Helvetica,sans-serif;background:#f4f4f4;">
<table width="100%" cellpadding="0" cellspacing="0" style="max-width:640px;margin:24px auto;background:#ffffff;border-radius:8px;overflow:hidden;">
<tr>
<td style="background:#1a2744;padding:24px 32px;">
<h1 style="margin:0;color:#ffffff;font-size:22px;">Performance West Inc.</h1>
<p style="margin:4px 0 0;color:#8fa4c8;font-size:13px;">Compliance &amp; Regulatory Services</p>
</td>
</tr>
<tr>
<td style="padding:32px;">
<p style="margin:0 0 16px;color:#333;font-size:15px;">Hello {customer_name},</p>
<p style="margin:0 0 16px;color:#333;font-size:15px;">
Your compliance deliverables for order <strong>{order_name}</strong> are ready.
The documents are available for download using the links below. These links
are valid for <strong>7 days</strong>.
</p>
<table width="100%" cellpadding="0" cellspacing="0" style="margin:0 0 24px;">
{file_rows}
</table>
<p style="margin:0 0 8px;color:#666;font-size:13px;">
If you have questions or need further assistance, reply to this email or
contact us at <a href="mailto:support@performancewest.net" style="color:#2d6cdf;">support@performancewest.net</a>.
</p>
<p style="margin:24px 0 0;color:#333;font-size:15px;">
Thank you for choosing Performance West.<br/>
— The Compliance Team
</p>
</td>
</tr>
<tr>
<td style="background:#f0f0f0;padding:16px 32px;text-align:center;">
<p style="margin:0;color:#999;font-size:11px;">
&copy; {year} Performance West Inc. &bull; Confidential
</p>
</td>
</tr>
</table>
</body>
</html>
"""
_FILE_ROW = """\
<tr>
<td style="padding:8px 0;">
<a href="{url}" style="display:inline-block;padding:10px 20px;background:#2d6cdf;color:#fff;border-radius:4px;text-decoration:none;font-size:14px;">
{filename}
</a>
</td>
</tr>
"""
def _build_email_html(
customer_name: str,
order_name: str,
file_links: list[tuple[str, str]],
*,
upsell_html: str = "",
portal_onboard_html: str = "",
) -> str:
rows = "".join(
_FILE_ROW.format(url=url, filename=filename)
for filename, url in file_links
)
# Inject upsell + onboarding sections between the file list and the
# sign-off paragraph. Both sections are empty strings when not
# applicable, so no layout holes on returning customers with a clean
# compliance posture.
base = _EMAIL_TEMPLATE.format(
customer_name=customer_name,
order_name=order_name,
file_rows=rows,
year=datetime.now().year,
)
if upsell_html or portal_onboard_html:
marker = '<p style="margin:0 0 8px;color:#666;font-size:13px;">'
base = base.replace(marker, upsell_html + portal_onboard_html + marker, 1)
return base
# --------------------------------------------------------------------------- #
# PG lookups (recommended_slugs + portal_user_created)
# --------------------------------------------------------------------------- #
def _pg_connect():
try:
import psycopg2
return psycopg2.connect(os.environ.get("DATABASE_URL", ""))
except Exception as exc:
logger.warning("delivery: could not connect to PG: %s", exc)
return None
def _fetch_pg_compliance_order(erpnext_sales_order: str) -> dict | None:
"""Read the compliance_orders row for an ERPNext Sales Order name."""
conn = _pg_connect()
if conn is None:
return None
try:
with conn.cursor() as cur:
cur.execute(
"SELECT order_number, customer_email, customer_name, "
"portal_user_created, recommended_slugs "
"FROM compliance_orders WHERE erpnext_sales_order = %s",
(erpnext_sales_order,),
)
row = cur.fetchone()
if not row:
return None
return {
"order_number": row[0],
"customer_email": row[1],
"customer_name": row[2],
"portal_user_created": bool(row[3]),
"recommended_slugs": list(row[4] or []),
}
finally:
conn.close()
# --------------------------------------------------------------------------- #
# Upsell block — "Recommended next steps" with deep-links
# --------------------------------------------------------------------------- #
def _fetch_recommendations(pg_order_number: str) -> dict | None:
"""Call the API's recommendations endpoint — returns {slugs, individual_urls, bundle_url, ...}."""
if not pg_order_number:
return None
try:
import urllib.request
import json as _json
url = (
f"{API_URL.rstrip('/')}/api/v1/compliance-orders/"
f"{pg_order_number}/recommendations"
)
with urllib.request.urlopen(url, timeout=15) as resp:
return _json.loads(resp.read())
except Exception as exc:
logger.warning(
"delivery: could not fetch recommendations for %s: %s",
pg_order_number,
exc,
)
return None
_UPSELL_TEMPLATE = """
<div style="margin:0 0 24px;padding:20px;background:#f9fafb;border:1px solid #e5e7eb;border-radius:6px;">
<h2 style="margin:0 0 8px;color:#1a2744;font-size:17px;">Recommended next steps</h2>
<p style="margin:0 0 12px;color:#444;font-size:14px;">
Your compliance checkup flagged the following items. Fix them now to bring
your FCC filings up to date:
</p>
<ul style="margin:0 0 16px 20px;padding:0;color:#333;font-size:14px;">
{items}
</ul>
{bundle_cta}
</div>
"""
_UPSELL_ITEM = (
'<li style="margin:0 0 6px;"><a href="{url}" style="color:#2d6cdf;">'
'{name}</a> — ${price_dollars}</li>'
)
_UPSELL_BUNDLE_CTA = """
<div style="text-align:center;margin-top:8px;">
<a href="{bundle_url}" style="display:inline-block;padding:12px 24px;background:#1a2744;color:#fff;border-radius:4px;text-decoration:none;font-size:14px;font-weight:bold;">
Fix everything in one bundle — save {discount_pct}%
</a>
</div>
"""
def _build_upsell_html(recs: dict | None) -> str:
if not recs or not recs.get("individual_urls"):
return ""
items = "".join(
_UPSELL_ITEM.format(
url=u["checkout_url"],
name=u["name"],
price_dollars=f"{u['price_cents']/100:.2f}",
)
for u in recs["individual_urls"]
)
bundle_cta = ""
if recs.get("bundle_eligible") and recs.get("bundle_url"):
bundle_cta = _UPSELL_BUNDLE_CTA.format(
bundle_url=recs["bundle_url"],
discount_pct=recs.get("bundle_discount_pct", 15),
)
return _UPSELL_TEMPLATE.format(items=items, bundle_cta=bundle_cta)
# --------------------------------------------------------------------------- #
# Portal onboarding — magic link for new customers
# --------------------------------------------------------------------------- #
def _generate_set_password_token(email: str, order_number: str) -> str:
"""Sign a short-lived JWT for the portal /set-password page."""
try:
import jwt as _jwt
except ImportError:
import subprocess
subprocess.run(["pip", "install", "PyJWT"], check=True, capture_output=True)
import jwt as _jwt
now = datetime.now(timezone.utc)
payload = {
"email": email,
"order_number": order_number,
"purpose": "set_password",
"iat": int(now.timestamp()),
"exp": int((now + timedelta(hours=SET_PASSWORD_TTL_HOURS)).timestamp()),
}
return _jwt.encode(payload, CUSTOMER_JWT_SECRET, algorithm="HS256")
_ONBOARD_TEMPLATE = """
<div style="margin:0 0 24px;padding:20px;background:#ecfdf5;border:1px solid #a7f3d0;border-radius:6px;">
<h2 style="margin:0 0 8px;color:#065f46;font-size:17px;">Access your portal</h2>
<p style="margin:0 0 12px;color:#065f46;font-size:14px;">
We've created a portal account for you. Set your password to view your
deliverables, place follow-up orders, and track your compliance status.
</p>
<div style="text-align:center;">
<a href="{url}" style="display:inline-block;padding:12px 24px;background:#059669;color:#fff;border-radius:4px;text-decoration:none;font-size:14px;font-weight:bold;">
Set your password
</a>
</div>
<p style="margin:12px 0 0;color:#065f46;font-size:12px;text-align:center;">
This link is valid for {ttl_hours} hours.
</p>
</div>
"""
def _build_portal_onboard_html(pg_order: dict | None) -> str:
if not pg_order or not pg_order.get("portal_user_created"):
return ""
email = pg_order.get("customer_email", "")
order_number = pg_order.get("order_number", "")
if not email:
return ""
token = _generate_set_password_token(email, order_number)
url = f"{PORTAL_URL.rstrip('/')}/set-password?token={token}"
return _ONBOARD_TEMPLATE.format(url=url, ttl_hours=SET_PASSWORD_TTL_HOURS)
def _send_email(to_address: str, subject: str, html_body: str) -> None:
"""Send an HTML email via SMTP."""
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = SMTP_FROM
msg["To"] = to_address
msg.attach(MIMEText(html_body, "html"))
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
if SMTP_PORT != 25:
server.starttls()
if SMTP_USER:
server.login(SMTP_USER, SMTP_PASSWORD)
server.sendmail(SMTP_FROM, [to_address], msg.as_string())
logger.info("Email sent to %s", to_address)
# --------------------------------------------------------------------------- #
# MinIO helpers
# --------------------------------------------------------------------------- #
def _get_minio_client() -> Minio:
return Minio(
MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=MINIO_SECURE,
)
def _generate_presigned_urls(
minio_client: Minio, minio_paths: list[str]
) -> list[tuple[str, str]]:
"""Return list of (filename, presigned_url) tuples."""
results: list[tuple[str, str]] = []
for path in minio_paths:
# path format: "minio://bucket/compliance/SO-00001/file.pdf"
obj_name = path.split(f"{MINIO_BUCKET}/", 1)[-1] if MINIO_BUCKET in path else path
url = minio_client.presigned_get_object(
MINIO_BUCKET,
obj_name,
expires=timedelta(seconds=PRESIGN_EXPIRY),
)
filename = obj_name.rsplit("/", 1)[-1]
results.append((filename, url))
return results
# --------------------------------------------------------------------------- #
# Order delivery
# --------------------------------------------------------------------------- #
def _deliver_order(
erp: ERPNextClient, minio_client: Minio, order_data: dict
) -> None:
order_name = order_data["name"]
customer_name = order_data.get("customer_name", order_data.get("customer", ""))
# Fetch full order to get generated files and customer email
full_order = erp.get_resource("Sales Order", order_name)
generated_files_raw = full_order.get("custom_generated_files", "")
if not generated_files_raw:
logger.warning("Order %s has no generated files skipping", order_name)
return
minio_paths = [p.strip() for p in generated_files_raw.split("\n") if p.strip()]
# Get customer email
customer_doc = erp.get_resource("Customer", order_data.get("customer", ""))
email = customer_doc.get("email_id") or customer_doc.get("custom_contact_email", "")
if not email:
logger.error("No email found for customer %s on order %s", customer_name, order_name)
return
# Generate presigned URLs
file_links = _generate_presigned_urls(minio_client, minio_paths)
# ── Upsell + portal onboarding (compliance orders only) ─────────────
pg_order = _fetch_pg_compliance_order(order_name)
upsell_html = ""
portal_onboard_html = ""
if pg_order:
recs = _fetch_recommendations(pg_order.get("order_number", ""))
upsell_html = _build_upsell_html(recs)
portal_onboard_html = _build_portal_onboard_html(pg_order)
# Build and send email
html = _build_email_html(
customer_name,
order_name,
file_links,
upsell_html=upsell_html,
portal_onboard_html=portal_onboard_html,
)
subject = f"Your compliance documents are ready {order_name}"
_send_email(email, subject, html)
# Update order status to Delivered
erp.update_resource(
"Sales Order", order_name, {"workflow_state": "Delivered"}
)
logger.info("Order %s delivered to %s", order_name, email)
# Update customer record with document references
try:
erp.update_resource(
"Customer",
order_data.get("customer", ""),
{"custom_latest_deliverables": generated_files_raw},
)
except ERPNextClientError:
logger.warning(
"Could not update Customer record for %s (non-fatal)", customer_name
)
# --------------------------------------------------------------------------- #
# Main loop
# --------------------------------------------------------------------------- #
def run() -> None:
logger.info("Delivery worker started (PID %d). Poll interval: %ds", os.getpid(), POLL_INTERVAL)
erp = ERPNextClient()
minio_client = _get_minio_client()
try:
while not _shutdown_requested:
try:
orders = erp.get_queued_orders(status="Ready")
if orders:
logger.info("Found %d ready order(s) for delivery", len(orders))
for order in orders:
if _shutdown_requested:
break
try:
_deliver_order(erp, minio_client, order)
except Exception:
logger.exception("Error delivering order %s", order.get("name"))
except ERPNextClientError:
logger.exception("ERPNext API error during delivery poll")
except Exception:
logger.exception("Unexpected error during delivery poll")
for _ in range(POLL_INTERVAL):
if _shutdown_requested:
break
time.sleep(1)
finally:
erp.close()
logger.info("Delivery worker stopped.")
if __name__ == "__main__":
run()