Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
511 lines
18 KiB
Python
511 lines
18 KiB
Python
"""Delivery worker – polls for admin-approved ("Ready") orders and emails
|
||
document links to the customer.
|
||
|
||
Run alongside the base worker:
|
||
|
||
python -m scripts.workers.delivery_worker
|
||
|
||
Environment variables (in addition to ERPNext & MinIO vars):
|
||
SMTP_HOST, SMTP_PORT, SMTP_USER, SMTP_PASSWORD, SMTP_FROM
|
||
PRESIGN_EXPIRY – presigned-URL lifetime in seconds (default 7 days)
|
||
DELIVERY_POLL_INTERVAL – seconds between polls (default 60)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
import signal
|
||
import sys
|
||
import time
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Any
|
||
|
||
from minio import Minio
|
||
|
||
from .erpnext_client import ERPNextClient, ERPNextClientError
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Configuration
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
POLL_INTERVAL = int(os.getenv("DELIVERY_POLL_INTERVAL", "60"))
|
||
PRESIGN_EXPIRY = int(os.getenv("PRESIGN_EXPIRY", str(7 * 24 * 3600))) # 7 days
|
||
|
||
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
|
||
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "")
|
||
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "")
|
||
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "performancewest")
|
||
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
|
||
|
||
SMTP_HOST = os.getenv("SMTP_HOST", "co.carrierone.com")
|
||
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
|
||
SMTP_USER = os.getenv("SMTP_USER", "")
|
||
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "")
|
||
SMTP_FROM = os.getenv("SMTP_FROM", "Performance West <noreply@performancewest.net>")
|
||
|
||
# Upsell + portal onboarding
|
||
SITE_URL = os.getenv("SITE_URL", "https://performancewest.net")
|
||
API_URL = os.getenv("API_URL", "http://api:3001")
|
||
PORTAL_URL = os.getenv("PORTAL_URL", "https://portal.performancewest.net")
|
||
CUSTOMER_JWT_SECRET = os.getenv(
|
||
"CUSTOMER_JWT_SECRET", "changeme_long_random_string"
|
||
)
|
||
SET_PASSWORD_TTL_HOURS = int(os.getenv("SET_PASSWORD_TTL_HOURS", "72"))
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Logging
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
handlers=[logging.StreamHandler(sys.stdout)],
|
||
)
|
||
logger = logging.getLogger("pw.worker.delivery")
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Graceful shutdown
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
_shutdown_requested = False
|
||
|
||
|
||
def _signal_handler(signum: int, _frame: Any) -> None:
|
||
global _shutdown_requested
|
||
logger.info("Received signal %s – shutting down gracefully …", signum)
|
||
_shutdown_requested = True
|
||
|
||
|
||
signal.signal(signal.SIGTERM, _signal_handler)
|
||
signal.signal(signal.SIGINT, _signal_handler)
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Email
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
_EMAIL_TEMPLATE = """\
|
||
<!DOCTYPE html>
|
||
<html lang="en">
|
||
<head><meta charset="utf-8"></head>
|
||
<body style="margin:0;padding:0;font-family:Arial,Helvetica,sans-serif;background:#f4f4f4;">
|
||
<table width="100%" cellpadding="0" cellspacing="0" style="max-width:640px;margin:24px auto;background:#ffffff;border-radius:8px;overflow:hidden;">
|
||
<tr>
|
||
<td style="background:#1a2744;padding:24px 32px;">
|
||
<h1 style="margin:0;color:#ffffff;font-size:22px;">Performance West Inc.</h1>
|
||
<p style="margin:4px 0 0;color:#8fa4c8;font-size:13px;">Compliance & Regulatory Services</p>
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td style="padding:32px;">
|
||
<p style="margin:0 0 16px;color:#333;font-size:15px;">Hello {customer_name},</p>
|
||
<p style="margin:0 0 16px;color:#333;font-size:15px;">
|
||
Your compliance deliverables for order <strong>{order_name}</strong> are ready.
|
||
The documents are available for download using the links below. These links
|
||
are valid for <strong>7 days</strong>.
|
||
</p>
|
||
<table width="100%" cellpadding="0" cellspacing="0" style="margin:0 0 24px;">
|
||
{file_rows}
|
||
</table>
|
||
<p style="margin:0 0 8px;color:#666;font-size:13px;">
|
||
If you have questions or need further assistance, reply to this email or
|
||
contact us at <a href="mailto:support@performancewest.net" style="color:#2d6cdf;">support@performancewest.net</a>.
|
||
</p>
|
||
<p style="margin:24px 0 0;color:#333;font-size:15px;">
|
||
Thank you for choosing Performance West.<br/>
|
||
— The Compliance Team
|
||
</p>
|
||
</td>
|
||
</tr>
|
||
<tr>
|
||
<td style="background:#f0f0f0;padding:16px 32px;text-align:center;">
|
||
<p style="margin:0;color:#999;font-size:11px;">
|
||
© {year} Performance West Inc. • Confidential
|
||
</p>
|
||
</td>
|
||
</tr>
|
||
</table>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
_FILE_ROW = """\
|
||
<tr>
|
||
<td style="padding:8px 0;">
|
||
<a href="{url}" style="display:inline-block;padding:10px 20px;background:#2d6cdf;color:#fff;border-radius:4px;text-decoration:none;font-size:14px;">
|
||
{filename}
|
||
</a>
|
||
</td>
|
||
</tr>
|
||
"""
|
||
|
||
|
||
def _build_email_html(
|
||
customer_name: str,
|
||
order_name: str,
|
||
file_links: list[tuple[str, str]],
|
||
*,
|
||
upsell_html: str = "",
|
||
portal_onboard_html: str = "",
|
||
) -> str:
|
||
rows = "".join(
|
||
_FILE_ROW.format(url=url, filename=filename)
|
||
for filename, url in file_links
|
||
)
|
||
# Inject upsell + onboarding sections between the file list and the
|
||
# sign-off paragraph. Both sections are empty strings when not
|
||
# applicable, so no layout holes on returning customers with a clean
|
||
# compliance posture.
|
||
base = _EMAIL_TEMPLATE.format(
|
||
customer_name=customer_name,
|
||
order_name=order_name,
|
||
file_rows=rows,
|
||
year=datetime.now().year,
|
||
)
|
||
if upsell_html or portal_onboard_html:
|
||
marker = '<p style="margin:0 0 8px;color:#666;font-size:13px;">'
|
||
base = base.replace(marker, upsell_html + portal_onboard_html + marker, 1)
|
||
return base
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# PG lookups (recommended_slugs + portal_user_created)
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def _pg_connect():
|
||
try:
|
||
import psycopg2
|
||
|
||
return psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
||
except Exception as exc:
|
||
logger.warning("delivery: could not connect to PG: %s", exc)
|
||
return None
|
||
|
||
|
||
def _fetch_pg_compliance_order(erpnext_sales_order: str) -> dict | None:
|
||
"""Read the compliance_orders row for an ERPNext Sales Order name."""
|
||
conn = _pg_connect()
|
||
if conn is None:
|
||
return None
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT order_number, customer_email, customer_name, "
|
||
"portal_user_created, recommended_slugs "
|
||
"FROM compliance_orders WHERE erpnext_sales_order = %s",
|
||
(erpnext_sales_order,),
|
||
)
|
||
row = cur.fetchone()
|
||
if not row:
|
||
return None
|
||
return {
|
||
"order_number": row[0],
|
||
"customer_email": row[1],
|
||
"customer_name": row[2],
|
||
"portal_user_created": bool(row[3]),
|
||
"recommended_slugs": list(row[4] or []),
|
||
}
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Upsell block — "Recommended next steps" with deep-links
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def _fetch_recommendations(pg_order_number: str) -> dict | None:
|
||
"""Call the API's recommendations endpoint — returns {slugs, individual_urls, bundle_url, ...}."""
|
||
if not pg_order_number:
|
||
return None
|
||
try:
|
||
import urllib.request
|
||
import json as _json
|
||
|
||
url = (
|
||
f"{API_URL.rstrip('/')}/api/v1/compliance-orders/"
|
||
f"{pg_order_number}/recommendations"
|
||
)
|
||
with urllib.request.urlopen(url, timeout=15) as resp:
|
||
return _json.loads(resp.read())
|
||
except Exception as exc:
|
||
logger.warning(
|
||
"delivery: could not fetch recommendations for %s: %s",
|
||
pg_order_number,
|
||
exc,
|
||
)
|
||
return None
|
||
|
||
|
||
_UPSELL_TEMPLATE = """
|
||
<div style="margin:0 0 24px;padding:20px;background:#f9fafb;border:1px solid #e5e7eb;border-radius:6px;">
|
||
<h2 style="margin:0 0 8px;color:#1a2744;font-size:17px;">Recommended next steps</h2>
|
||
<p style="margin:0 0 12px;color:#444;font-size:14px;">
|
||
Your compliance checkup flagged the following items. Fix them now to bring
|
||
your FCC filings up to date:
|
||
</p>
|
||
<ul style="margin:0 0 16px 20px;padding:0;color:#333;font-size:14px;">
|
||
{items}
|
||
</ul>
|
||
{bundle_cta}
|
||
</div>
|
||
"""
|
||
|
||
_UPSELL_ITEM = (
|
||
'<li style="margin:0 0 6px;"><a href="{url}" style="color:#2d6cdf;">'
|
||
'{name}</a> — ${price_dollars}</li>'
|
||
)
|
||
|
||
_UPSELL_BUNDLE_CTA = """
|
||
<div style="text-align:center;margin-top:8px;">
|
||
<a href="{bundle_url}" style="display:inline-block;padding:12px 24px;background:#1a2744;color:#fff;border-radius:4px;text-decoration:none;font-size:14px;font-weight:bold;">
|
||
Fix everything in one bundle — save {discount_pct}%
|
||
</a>
|
||
</div>
|
||
"""
|
||
|
||
|
||
def _build_upsell_html(recs: dict | None) -> str:
|
||
if not recs or not recs.get("individual_urls"):
|
||
return ""
|
||
items = "".join(
|
||
_UPSELL_ITEM.format(
|
||
url=u["checkout_url"],
|
||
name=u["name"],
|
||
price_dollars=f"{u['price_cents']/100:.2f}",
|
||
)
|
||
for u in recs["individual_urls"]
|
||
)
|
||
bundle_cta = ""
|
||
if recs.get("bundle_eligible") and recs.get("bundle_url"):
|
||
bundle_cta = _UPSELL_BUNDLE_CTA.format(
|
||
bundle_url=recs["bundle_url"],
|
||
discount_pct=recs.get("bundle_discount_pct", 15),
|
||
)
|
||
return _UPSELL_TEMPLATE.format(items=items, bundle_cta=bundle_cta)
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Portal onboarding — magic link for new customers
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def _generate_set_password_token(email: str, order_number: str) -> str:
|
||
"""Sign a short-lived JWT for the portal /set-password page."""
|
||
try:
|
||
import jwt as _jwt
|
||
except ImportError:
|
||
import subprocess
|
||
|
||
subprocess.run(["pip", "install", "PyJWT"], check=True, capture_output=True)
|
||
import jwt as _jwt
|
||
|
||
now = datetime.now(timezone.utc)
|
||
payload = {
|
||
"email": email,
|
||
"order_number": order_number,
|
||
"purpose": "set_password",
|
||
"iat": int(now.timestamp()),
|
||
"exp": int((now + timedelta(hours=SET_PASSWORD_TTL_HOURS)).timestamp()),
|
||
}
|
||
return _jwt.encode(payload, CUSTOMER_JWT_SECRET, algorithm="HS256")
|
||
|
||
|
||
_ONBOARD_TEMPLATE = """
|
||
<div style="margin:0 0 24px;padding:20px;background:#ecfdf5;border:1px solid #a7f3d0;border-radius:6px;">
|
||
<h2 style="margin:0 0 8px;color:#065f46;font-size:17px;">Access your portal</h2>
|
||
<p style="margin:0 0 12px;color:#065f46;font-size:14px;">
|
||
We've created a portal account for you. Set your password to view your
|
||
deliverables, place follow-up orders, and track your compliance status.
|
||
</p>
|
||
<div style="text-align:center;">
|
||
<a href="{url}" style="display:inline-block;padding:12px 24px;background:#059669;color:#fff;border-radius:4px;text-decoration:none;font-size:14px;font-weight:bold;">
|
||
Set your password
|
||
</a>
|
||
</div>
|
||
<p style="margin:12px 0 0;color:#065f46;font-size:12px;text-align:center;">
|
||
This link is valid for {ttl_hours} hours.
|
||
</p>
|
||
</div>
|
||
"""
|
||
|
||
|
||
def _build_portal_onboard_html(pg_order: dict | None) -> str:
|
||
if not pg_order or not pg_order.get("portal_user_created"):
|
||
return ""
|
||
email = pg_order.get("customer_email", "")
|
||
order_number = pg_order.get("order_number", "")
|
||
if not email:
|
||
return ""
|
||
token = _generate_set_password_token(email, order_number)
|
||
url = f"{PORTAL_URL.rstrip('/')}/set-password?token={token}"
|
||
return _ONBOARD_TEMPLATE.format(url=url, ttl_hours=SET_PASSWORD_TTL_HOURS)
|
||
|
||
|
||
def _send_email(to_address: str, subject: str, html_body: str) -> None:
|
||
"""Send an HTML email via SMTP."""
|
||
import smtplib
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.text import MIMEText
|
||
|
||
msg = MIMEMultipart("alternative")
|
||
msg["Subject"] = subject
|
||
msg["From"] = SMTP_FROM
|
||
msg["To"] = to_address
|
||
msg.attach(MIMEText(html_body, "html"))
|
||
|
||
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
|
||
if SMTP_PORT != 25:
|
||
server.starttls()
|
||
if SMTP_USER:
|
||
server.login(SMTP_USER, SMTP_PASSWORD)
|
||
server.sendmail(SMTP_FROM, [to_address], msg.as_string())
|
||
|
||
logger.info("Email sent to %s", to_address)
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# MinIO helpers
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def _get_minio_client() -> Minio:
|
||
return Minio(
|
||
MINIO_ENDPOINT,
|
||
access_key=MINIO_ACCESS_KEY,
|
||
secret_key=MINIO_SECRET_KEY,
|
||
secure=MINIO_SECURE,
|
||
)
|
||
|
||
|
||
def _generate_presigned_urls(
|
||
minio_client: Minio, minio_paths: list[str]
|
||
) -> list[tuple[str, str]]:
|
||
"""Return list of (filename, presigned_url) tuples."""
|
||
results: list[tuple[str, str]] = []
|
||
for path in minio_paths:
|
||
# path format: "minio://bucket/compliance/SO-00001/file.pdf"
|
||
obj_name = path.split(f"{MINIO_BUCKET}/", 1)[-1] if MINIO_BUCKET in path else path
|
||
url = minio_client.presigned_get_object(
|
||
MINIO_BUCKET,
|
||
obj_name,
|
||
expires=timedelta(seconds=PRESIGN_EXPIRY),
|
||
)
|
||
filename = obj_name.rsplit("/", 1)[-1]
|
||
results.append((filename, url))
|
||
return results
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Order delivery
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def _deliver_order(
|
||
erp: ERPNextClient, minio_client: Minio, order_data: dict
|
||
) -> None:
|
||
order_name = order_data["name"]
|
||
customer_name = order_data.get("customer_name", order_data.get("customer", ""))
|
||
|
||
# Fetch full order to get generated files and customer email
|
||
full_order = erp.get_resource("Sales Order", order_name)
|
||
generated_files_raw = full_order.get("custom_generated_files", "")
|
||
if not generated_files_raw:
|
||
logger.warning("Order %s has no generated files – skipping", order_name)
|
||
return
|
||
|
||
minio_paths = [p.strip() for p in generated_files_raw.split("\n") if p.strip()]
|
||
|
||
# Get customer email
|
||
customer_doc = erp.get_resource("Customer", order_data.get("customer", ""))
|
||
email = customer_doc.get("email_id") or customer_doc.get("custom_contact_email", "")
|
||
if not email:
|
||
logger.error("No email found for customer %s on order %s", customer_name, order_name)
|
||
return
|
||
|
||
# Generate presigned URLs
|
||
file_links = _generate_presigned_urls(minio_client, minio_paths)
|
||
|
||
# ── Upsell + portal onboarding (compliance orders only) ─────────────
|
||
pg_order = _fetch_pg_compliance_order(order_name)
|
||
upsell_html = ""
|
||
portal_onboard_html = ""
|
||
if pg_order:
|
||
recs = _fetch_recommendations(pg_order.get("order_number", ""))
|
||
upsell_html = _build_upsell_html(recs)
|
||
portal_onboard_html = _build_portal_onboard_html(pg_order)
|
||
|
||
# Build and send email
|
||
html = _build_email_html(
|
||
customer_name,
|
||
order_name,
|
||
file_links,
|
||
upsell_html=upsell_html,
|
||
portal_onboard_html=portal_onboard_html,
|
||
)
|
||
subject = f"Your compliance documents are ready – {order_name}"
|
||
_send_email(email, subject, html)
|
||
|
||
# Update order status to Delivered
|
||
erp.update_resource(
|
||
"Sales Order", order_name, {"workflow_state": "Delivered"}
|
||
)
|
||
logger.info("Order %s delivered to %s", order_name, email)
|
||
|
||
# Update customer record with document references
|
||
try:
|
||
erp.update_resource(
|
||
"Customer",
|
||
order_data.get("customer", ""),
|
||
{"custom_latest_deliverables": generated_files_raw},
|
||
)
|
||
except ERPNextClientError:
|
||
logger.warning(
|
||
"Could not update Customer record for %s (non-fatal)", customer_name
|
||
)
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Main loop
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def run() -> None:
|
||
logger.info("Delivery worker started (PID %d). Poll interval: %ds", os.getpid(), POLL_INTERVAL)
|
||
|
||
erp = ERPNextClient()
|
||
minio_client = _get_minio_client()
|
||
|
||
try:
|
||
while not _shutdown_requested:
|
||
try:
|
||
orders = erp.get_queued_orders(status="Ready")
|
||
if orders:
|
||
logger.info("Found %d ready order(s) for delivery", len(orders))
|
||
|
||
for order in orders:
|
||
if _shutdown_requested:
|
||
break
|
||
try:
|
||
_deliver_order(erp, minio_client, order)
|
||
except Exception:
|
||
logger.exception("Error delivering order %s", order.get("name"))
|
||
|
||
except ERPNextClientError:
|
||
logger.exception("ERPNext API error during delivery poll")
|
||
except Exception:
|
||
logger.exception("Unexpected error during delivery poll")
|
||
|
||
for _ in range(POLL_INTERVAL):
|
||
if _shutdown_requested:
|
||
break
|
||
time.sleep(1)
|
||
|
||
finally:
|
||
erp.close()
|
||
logger.info("Delivery worker stopped.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run()
|