new-site/scripts/workers/client_email_processor.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

482 lines
16 KiB
Python

"""
Client Email Processor — monitors client @company.ca mailboxes on HestiaCP
for incoming regulatory, government, and business correspondence.
Connects via IMAP to cp.carrierone.com, checks purpose-built mailboxes,
and creates ERPNext Issues/tasks/alerts based on the sender and content.
Runs as a cron job every 15 minutes.
Email addresses we create per client domain:
REGULATORY:
regulatory@ — CRTC correspondence, public notices, compliance requests
crtc@ — Specifically for CRTC registration (address on the letter)
ccts@ — CCTS complaint notifications, membership correspondence
CORPORATE:
corpadmin@ — BC Registry correspondence, annual report notices
registeredoffice@ — Official legal service, government notices (high priority)
FINANCIAL:
accounting@ — CRA tax notices, GST/HST, bank correspondence
billing@ — Payment notifications, invoice inquiries
OPERATIONS:
abuse@ — Telecom abuse reports (spam, robocall complaints) — RFC required
noc@ — Network operations, outage notifications, interconnection partners
postmaster@ — Email delivery issues, bounce handling (RFC 2142 required)
BUSINESS:
info@ — General inquiries
admin@ — Administrative catch-all
sales@ — Wholesale partner inquiries
support@ — Customer support
"""
from __future__ import annotations
import email
import email.utils
import imaplib
import json
import logging
import os
import re
import time
from datetime import datetime
from typing import Optional
LOG = logging.getLogger("workers.email_processor")
# HestiaCP IMAP configuration
IMAP_HOST = os.getenv("HESTIA_IMAP_HOST", "cp.carrierone.com")
IMAP_PORT = int(os.getenv("HESTIA_IMAP_PORT", "993"))
IMAP_USE_SSL = os.getenv("HESTIA_IMAP_SSL", "true").lower() == "true"
# Database
DATABASE_URL = os.getenv("DATABASE_URL", "")
# Priority mapping: which mailboxes get which priority in ERPNext
MAILBOX_CONFIG = {
"registeredoffice": {
"priority": "Urgent",
"issue_type": "Legal Notice",
"alert_client": True,
"alert_admin": True,
"description": "Official correspondence to the registered office — may be legal service or government notice.",
},
"crtc": {
"priority": "High",
"issue_type": "Regulatory",
"alert_client": True,
"alert_admin": True,
"description": "Correspondence from or related to CRTC registration.",
},
"ccts": {
"priority": "High",
"issue_type": "Complaint",
"alert_client": True,
"alert_admin": True,
"description": "CCTS complaint or membership correspondence. Requires timely response.",
},
"regulatory": {
"priority": "High",
"issue_type": "Regulatory",
"alert_client": True,
"alert_admin": False,
"description": "General regulatory correspondence (CRTC public notices, consultations, decisions).",
},
"corpadmin": {
"priority": "Medium",
"issue_type": "Corporate",
"alert_client": True,
"alert_admin": True,
"description": "BC Registry or corporate administration correspondence.",
},
"accounting": {
"priority": "Medium",
"issue_type": "Financial",
"alert_client": True,
"alert_admin": False,
"description": "Financial correspondence — CRA tax notices, GST/HST, bank statements.",
},
"billing": {
"priority": "Low",
"issue_type": "Financial",
"alert_client": True,
"alert_admin": False,
"description": "Billing inquiry or payment notification.",
},
"abuse": {
"priority": "High",
"issue_type": "Abuse Report",
"alert_client": True,
"alert_admin": True,
"description": "Telecom abuse report — spam, robocall complaint, or network abuse allegation. Respond promptly.",
},
"noc": {
"priority": "High",
"issue_type": "Network Operations",
"alert_client": True,
"alert_admin": False,
"description": "Network operations — outage notification, interconnection issue, or technical alert from a partner.",
},
"postmaster": {
"priority": "Low",
"issue_type": "Technical",
"alert_client": False,
"alert_admin": False,
"description": "Email delivery issue or bounce notification.",
},
"info": {
"priority": "Low",
"issue_type": "General Inquiry",
"alert_client": True,
"alert_admin": False,
"description": "General inquiry received at info@ address.",
},
"admin": {
"priority": "Low",
"issue_type": "Administrative",
"alert_client": True,
"alert_admin": False,
"description": "Administrative correspondence.",
},
"sales": {
"priority": "Medium",
"issue_type": "Sales Inquiry",
"alert_client": True,
"alert_admin": False,
"description": "Sales or wholesale partnership inquiry.",
},
"support": {
"priority": "Medium",
"issue_type": "Support",
"alert_client": True,
"alert_admin": False,
"description": "Customer support request.",
},
}
# All mailbox prefixes we create per client domain
ALL_MAILBOXES = list(MAILBOX_CONFIG.keys())
def get_client_domains() -> list[dict]:
"""Get all active client domains from the database."""
import psycopg2
conn = psycopg2.connect(DATABASE_URL)
try:
cur = conn.cursor()
cur.execute("""
SELECT order_number, customer_email, customer_name, company_name_final
FROM canada_crtc_orders
WHERE status IN ('delivered', 'crtc_registration', 'ccts_registration', 'banking_setup', 'binder_compilation', 'phone_ready', 'domain_ready', 'review', 'incorporation')
AND order_number IS NOT NULL
""")
rows = cur.fetchall()
domains = []
for row in rows:
# Extract the .ca domain from the order (would be stored in a field)
# For now, derive from company name or order data
domains.append({
"order_number": row[0],
"customer_email": row[1],
"customer_name": row[2],
"company_name": row[3],
})
return domains
finally:
conn.close()
def check_mailbox(
domain: str,
mailbox_prefix: str,
imap_password: str,
) -> list[dict]:
"""Connect to IMAP and fetch unread emails from a specific mailbox.
Args:
domain: The client's .ca domain (e.g., "valleyinternet.ca")
mailbox_prefix: The mailbox prefix (e.g., "crtc", "abuse", "noc")
imap_password: IMAP password for this mailbox
Returns:
List of parsed email dicts
"""
email_address = f"{mailbox_prefix}@{domain}"
emails = []
try:
if IMAP_USE_SSL:
imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
else:
imap = imaplib.IMAP4(IMAP_HOST, IMAP_PORT)
imap.login(email_address, imap_password)
imap.select("INBOX")
# Search for unread emails
status, message_ids = imap.search(None, "UNSEEN")
if status != "OK" or not message_ids[0]:
imap.logout()
return []
for msg_id in message_ids[0].split():
try:
status, msg_data = imap.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
# Parse email
from_addr = email.utils.parseaddr(msg.get("From", ""))[1]
from_name = email.utils.parseaddr(msg.get("From", ""))[0]
subject = msg.get("Subject", "(no subject)")
date_str = msg.get("Date", "")
message_id = msg.get("Message-ID", "")
# Get body (prefer plain text)
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
break
elif part.get_content_type() == "text/html" and not body:
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
else:
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
emails.append({
"mailbox": mailbox_prefix,
"domain": domain,
"to": email_address,
"from_addr": from_addr,
"from_name": from_name,
"subject": subject,
"body": body[:5000], # Truncate very long emails
"date": date_str,
"message_id": message_id,
})
# Mark as seen (already done by IMAP fetch with UNSEEN filter)
imap.store(msg_id, "+FLAGS", "\\Seen")
except Exception as e:
LOG.error("Failed to parse email %s in %s@%s: %s", msg_id, mailbox_prefix, domain, e)
imap.logout()
except imaplib.IMAP4.error as e:
LOG.error("IMAP error for %s@%s: %s", mailbox_prefix, domain, e)
except Exception as e:
LOG.error("Connection error for %s@%s: %s", mailbox_prefix, domain, e)
return emails
def process_email(email_data: dict, order_info: dict) -> None:
"""Process a single incoming email — create ERPNext Issue and alert client.
Args:
email_data: Parsed email dict from check_mailbox()
order_info: Client order info (order_number, customer_email, customer_name)
"""
mailbox = email_data["mailbox"]
config = MAILBOX_CONFIG.get(mailbox, MAILBOX_CONFIG["info"])
LOG.info(
"Processing email: %s@%s from=%s subject='%s' priority=%s",
mailbox, email_data["domain"], email_data["from_addr"],
email_data["subject"][:60], config["priority"],
)
# Create ERPNext Issue
try:
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
issue_subject = f"[{mailbox.upper()}@{email_data['domain']}] {email_data['subject']}"
issue_description = (
f"**Incoming email processed automatically**\n\n"
f"**Mailbox:** {email_data['to']}\n"
f"**From:** {email_data['from_name']} <{email_data['from_addr']}>\n"
f"**Date:** {email_data['date']}\n"
f"**Subject:** {email_data['subject']}\n"
f"**Category:** {config['description']}\n\n"
f"---\n\n"
f"{email_data['body'][:3000]}"
)
client.create_resource("Issue", {
"subject": issue_subject[:140],
"description": issue_description,
"issue_type": config.get("issue_type", "General Inquiry"),
"priority": config["priority"],
})
LOG.info("Created ERPNext Issue: %s", issue_subject[:80])
except Exception as e:
LOG.error("Failed to create ERPNext Issue: %s", e)
# Alert client via email (forward the original email context)
if config.get("alert_client") and order_info.get("customer_email"):
try:
_send_client_alert(
to_email=order_info["customer_email"],
customer_name=order_info.get("customer_name", ""),
mailbox=mailbox,
domain=email_data["domain"],
from_addr=email_data["from_addr"],
subject=email_data["subject"],
body_preview=email_data["body"][:500],
priority=config["priority"],
)
except Exception as e:
LOG.error("Failed to send client alert: %s", e)
# Alert admin for high-priority items
if config.get("alert_admin"):
try:
_send_admin_alert(
mailbox=mailbox,
domain=email_data["domain"],
from_addr=email_data["from_addr"],
subject=email_data["subject"],
priority=config["priority"],
order_number=order_info.get("order_number", ""),
)
except Exception as e:
LOG.error("Failed to send admin alert: %s", e)
def _send_client_alert(
to_email: str,
customer_name: str,
mailbox: str,
domain: str,
from_addr: str,
subject: str,
body_preview: str,
priority: str,
):
"""Send an alert email to the client about incoming correspondence."""
import smtplib
from email.mime.text import MIMEText
smtp_host = os.getenv("SMTP_HOST", "")
smtp_port = int(os.getenv("SMTP_PORT", "587"))
smtp_user = os.getenv("SMTP_USER", "")
smtp_pass = os.getenv("SMTP_PASS", "")
smtp_from = os.getenv("SMTP_FROM", "noreply@performancewest.net")
if not smtp_host:
LOG.warning("SMTP not configured, skipping client alert")
return
priority_emoji = {"Urgent": "🚨", "High": "⚠️", "Medium": "📬", "Low": "📧"}.get(priority, "📧")
body = f"""Hi {customer_name or 'there'},
{priority_emoji} New correspondence received at {mailbox}@{domain}
From: {from_addr}
Subject: {subject}
Priority: {priority}
Preview:
{body_preview}
---
This email was received at your {mailbox}@{domain} mailbox and has been logged in your Performance West client portal. You can view the full message and respond through your portal.
If this requires urgent attention, please log in to your portal or contact us.
Performance West Inc.
performancewest.net
"""
msg = MIMEText(body)
msg["Subject"] = f"{priority_emoji} [{priority}] Mail received at {mailbox}@{domain}: {subject[:60]}"
msg["From"] = smtp_from
msg["To"] = to_email
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
LOG.info("Client alert sent to %s for %s@%s", to_email, mailbox, domain)
def _send_admin_alert(
mailbox: str,
domain: str,
from_addr: str,
subject: str,
priority: str,
order_number: str,
):
"""Send an alert to admin for high-priority correspondence."""
import smtplib
from email.mime.text import MIMEText
smtp_host = os.getenv("SMTP_HOST", "")
smtp_port = int(os.getenv("SMTP_PORT", "587"))
smtp_user = os.getenv("SMTP_USER", "")
smtp_pass = os.getenv("SMTP_PASS", "")
smtp_from = os.getenv("SMTP_FROM", "noreply@performancewest.net")
admin_email = "ops@performancewest.net"
if not smtp_host:
return
body = f"""[{priority}] Correspondence at {mailbox}@{domain}
Order: {order_number}
From: {from_addr}
Subject: {subject}
Mailbox: {mailbox}@{domain}
Action may be required. Check ERPNext Issues.
"""
msg = MIMEText(body)
msg["Subject"] = f"[PW Admin] {priority} mail at {mailbox}@{domain}: {subject[:50]}"
msg["From"] = smtp_from
msg["To"] = admin_email
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
def main():
"""Main processing loop — run via cron every 15 minutes."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
LOG.info("=" * 60)
LOG.info("Client Email Processor — checking mailboxes")
LOG.info("=" * 60)
# In production, we'd query the DB for all active client domains
# and their IMAP credentials (stored encrypted in ERPNext Sensitive ID).
# For now, this is a framework that processes any configured domains.
# TODO: Load client domains and IMAP passwords from ERPNext
# domains = get_client_domains()
# For each domain, check all mailboxes in ALL_MAILBOXES
LOG.info("Email processor framework ready. Configure client domains to begin processing.")
LOG.info("Mailboxes to monitor per domain: %s", ", ".join(ALL_MAILBOXES))
if __name__ == "__main__":
main()