new-site/scripts/workers/client_email_processor.py

707 lines
24 KiB
Python

"""
Client Email Processor — monitors client @company.ca mailboxes on HestiaCP
for incoming regulatory, government, and business correspondence.
Connects via IMAP to cp.carrierone.com, checks purpose-built mailboxes,
and creates ERPNext Issues/tasks/alerts based on the sender and content.
Runs as a cron job every 15 minutes.
Email addresses we create per client domain:
REGULATORY:
regulatory@ — CRTC correspondence, public notices, compliance requests
crtc@ — Specifically for CRTC registration (address on the letter)
ccts@ — CCTS complaint notifications, membership correspondence
CORPORATE:
corpadmin@ — BC Registry correspondence, annual report notices
registeredoffice@ — Official legal service, government notices (high priority)
FINANCIAL:
accounting@ — CRA tax notices, GST/HST, bank correspondence
billing@ — Payment notifications, invoice inquiries
OPERATIONS:
abuse@ — Telecom abuse reports (spam, robocall complaints) — RFC required
noc@ — Network operations, outage notifications, interconnection partners
postmaster@ — Email delivery issues, bounce handling (RFC 2142 required)
BUSINESS:
info@ — General inquiries
admin@ — Administrative catch-all
sales@ — Wholesale partner inquiries
support@ — Customer support
"""
from __future__ import annotations
import email
import email.utils
import imaplib
import json
import logging
import os
import re
import time
from datetime import datetime
from typing import Optional
LOG = logging.getLogger("workers.email_processor")
# HestiaCP IMAP configuration
IMAP_HOST = os.getenv("HESTIA_IMAP_HOST", "cp.carrierone.com")
IMAP_PORT = int(os.getenv("HESTIA_IMAP_PORT", "993"))
IMAP_USE_SSL = os.getenv("HESTIA_IMAP_SSL", "true").lower() == "true"
# Database
DATABASE_URL = os.getenv("DATABASE_URL", "")
# Priority mapping: which mailboxes get which priority in ERPNext
MAILBOX_CONFIG = {
"registeredoffice": {
"priority": "Urgent",
"issue_type": "Legal Notice",
"alert_client": True,
"alert_admin": True,
"description": "Official correspondence to the registered office — may be legal service or government notice.",
},
"crtc": {
"priority": "High",
"issue_type": "Regulatory",
"alert_client": True,
"alert_admin": True,
"description": "Correspondence from or related to CRTC registration.",
},
"ccts": {
"priority": "High",
"issue_type": "Complaint",
"alert_client": True,
"alert_admin": True,
"description": "CCTS complaint or membership correspondence. Requires timely response.",
},
"regulatory": {
"priority": "High",
"issue_type": "Regulatory",
"alert_client": True,
"alert_admin": False,
"description": "General regulatory correspondence (CRTC public notices, consultations, decisions).",
},
"corpadmin": {
"priority": "Medium",
"issue_type": "Corporate",
"alert_client": True,
"alert_admin": True,
"description": "BC Registry or corporate administration correspondence.",
},
"accounting": {
"priority": "Medium",
"issue_type": "Financial",
"alert_client": True,
"alert_admin": False,
"description": "Financial correspondence — CRA tax notices, GST/HST, bank statements.",
},
"billing": {
"priority": "Low",
"issue_type": "Financial",
"alert_client": True,
"alert_admin": False,
"description": "Billing inquiry or payment notification.",
},
"abuse": {
"priority": "High",
"issue_type": "Abuse Report",
"alert_client": True,
"alert_admin": True,
"description": "Telecom abuse report — spam, robocall complaint, or network abuse allegation. Respond promptly.",
},
"noc": {
"priority": "High",
"issue_type": "Network Operations",
"alert_client": True,
"alert_admin": False,
"description": "Network operations — outage notification, interconnection issue, or technical alert from a partner.",
},
"postmaster": {
"priority": "Low",
"issue_type": "Technical",
"alert_client": False,
"alert_admin": False,
"description": "Email delivery issue or bounce notification.",
},
"info": {
"priority": "Low",
"issue_type": "General Inquiry",
"alert_client": True,
"alert_admin": False,
"description": "General inquiry received at info@ address.",
},
"admin": {
"priority": "Low",
"issue_type": "Administrative",
"alert_client": True,
"alert_admin": False,
"description": "Administrative correspondence.",
},
"sales": {
"priority": "Medium",
"issue_type": "Sales Inquiry",
"alert_client": True,
"alert_admin": False,
"description": "Sales or wholesale partnership inquiry.",
},
"support": {
"priority": "Medium",
"issue_type": "Support",
"alert_client": True,
"alert_admin": False,
"description": "Customer support request.",
},
}
# All mailbox prefixes we create per client domain
ALL_MAILBOXES = list(MAILBOX_CONFIG.keys())
def get_client_domains() -> list[dict]:
"""Get all active client domains from the database."""
import psycopg2
conn = psycopg2.connect(DATABASE_URL)
try:
cur = conn.cursor()
cur.execute("""
SELECT order_number, customer_email, customer_name, company_name_final
FROM canada_crtc_orders
WHERE status IN ('delivered', 'crtc_registration', 'ccts_registration', 'banking_setup', 'binder_compilation', 'phone_ready', 'domain_ready', 'review', 'incorporation')
AND order_number IS NOT NULL
""")
rows = cur.fetchall()
domains = []
for row in rows:
# Extract the .ca domain from the order (would be stored in a field)
# For now, derive from company name or order data
domains.append({
"order_number": row[0],
"customer_email": row[1],
"customer_name": row[2],
"company_name": row[3],
})
return domains
finally:
conn.close()
def check_mailbox(
domain: str,
mailbox_prefix: str,
imap_password: str,
) -> list[dict]:
"""Connect to IMAP and fetch unread emails from a specific mailbox.
Args:
domain: The client's .ca domain (e.g., "valleyinternet.ca")
mailbox_prefix: The mailbox prefix (e.g., "crtc", "abuse", "noc")
imap_password: IMAP password for this mailbox
Returns:
List of parsed email dicts
"""
email_address = f"{mailbox_prefix}@{domain}"
emails = []
try:
if IMAP_USE_SSL:
imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
else:
imap = imaplib.IMAP4(IMAP_HOST, IMAP_PORT)
imap.login(email_address, imap_password)
imap.select("INBOX")
# Search for unread emails
status, message_ids = imap.search(None, "UNSEEN")
if status != "OK" or not message_ids[0]:
imap.logout()
return []
for msg_id in message_ids[0].split():
try:
status, msg_data = imap.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
# Parse email
from_addr = email.utils.parseaddr(msg.get("From", ""))[1]
from_name = email.utils.parseaddr(msg.get("From", ""))[0]
subject = msg.get("Subject", "(no subject)")
date_str = msg.get("Date", "")
message_id = msg.get("Message-ID", "")
# Get body (prefer plain text)
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
break
elif part.get_content_type() == "text/html" and not body:
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
else:
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
emails.append({
"mailbox": mailbox_prefix,
"domain": domain,
"to": email_address,
"from_addr": from_addr,
"from_name": from_name,
"subject": subject,
"body": body[:5000], # Truncate very long emails
"date": date_str,
"message_id": message_id,
})
# Mark as seen (already done by IMAP fetch with UNSEEN filter)
imap.store(msg_id, "+FLAGS", "\\Seen")
except Exception as e:
LOG.error("Failed to parse email %s in %s@%s: %s", msg_id, mailbox_prefix, domain, e)
imap.logout()
except imaplib.IMAP4.error as e:
LOG.error("IMAP error for %s@%s: %s", mailbox_prefix, domain, e)
except Exception as e:
LOG.error("Connection error for %s@%s: %s", mailbox_prefix, domain, e)
return emails
def process_email(email_data: dict, order_info: dict) -> None:
"""Process a single incoming email — create ERPNext Issue and alert client.
Args:
email_data: Parsed email dict from check_mailbox()
order_info: Client order info (order_number, customer_email, customer_name)
"""
mailbox = email_data["mailbox"]
config = MAILBOX_CONFIG.get(mailbox, MAILBOX_CONFIG["info"])
LOG.info(
"Processing email: %s@%s from=%s subject='%s' priority=%s",
mailbox, email_data["domain"], email_data["from_addr"],
email_data["subject"][:60], config["priority"],
)
# Create ERPNext Issue
try:
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
issue_subject = f"[{mailbox.upper()}@{email_data['domain']}] {email_data['subject']}"
issue_description = (
f"**Incoming email processed automatically**\n\n"
f"**Mailbox:** {email_data['to']}\n"
f"**From:** {email_data['from_name']} <{email_data['from_addr']}>\n"
f"**Date:** {email_data['date']}\n"
f"**Subject:** {email_data['subject']}\n"
f"**Category:** {config['description']}\n\n"
f"---\n\n"
f"{email_data['body'][:3000]}"
)
client.create_resource("Issue", {
"subject": issue_subject[:140],
"description": issue_description,
"issue_type": config.get("issue_type", "General Inquiry"),
"priority": config["priority"],
})
LOG.info("Created ERPNext Issue: %s", issue_subject[:80])
except Exception as e:
LOG.error("Failed to create ERPNext Issue: %s", e)
# Alert client via email (forward the original email context)
if config.get("alert_client") and order_info.get("customer_email"):
try:
_send_client_alert(
to_email=order_info["customer_email"],
customer_name=order_info.get("customer_name", ""),
mailbox=mailbox,
domain=email_data["domain"],
from_addr=email_data["from_addr"],
subject=email_data["subject"],
body_preview=email_data["body"][:500],
priority=config["priority"],
)
except Exception as e:
LOG.error("Failed to send client alert: %s", e)
# Alert admin for high-priority items
if config.get("alert_admin"):
try:
_send_admin_alert(
mailbox=mailbox,
domain=email_data["domain"],
from_addr=email_data["from_addr"],
subject=email_data["subject"],
priority=config["priority"],
order_number=order_info.get("order_number", ""),
)
except Exception as e:
LOG.error("Failed to send admin alert: %s", e)
def _send_client_alert(
to_email: str,
customer_name: str,
mailbox: str,
domain: str,
from_addr: str,
subject: str,
body_preview: str,
priority: str,
):
"""Send an alert email to the client about incoming correspondence."""
import smtplib
from email.mime.text import MIMEText
smtp_host = os.getenv("SMTP_HOST", "")
smtp_port = int(os.getenv("SMTP_PORT", "587"))
smtp_user = os.getenv("SMTP_USER", "")
smtp_pass = os.getenv("SMTP_PASS", "")
smtp_from = os.getenv("SMTP_FROM", "noreply@performancewest.net")
if not smtp_host:
LOG.warning("SMTP not configured, skipping client alert")
return
priority_emoji = {"Urgent": "🚨", "High": "⚠️", "Medium": "📬", "Low": "📧"}.get(priority, "📧")
body = f"""Hi {customer_name or 'there'},
{priority_emoji} New correspondence received at {mailbox}@{domain}
From: {from_addr}
Subject: {subject}
Priority: {priority}
Preview:
{body_preview}
---
This email was received at your {mailbox}@{domain} mailbox and has been logged in your Performance West client portal. You can view the full message and respond through your portal.
If this requires urgent attention, please log in to your portal or contact us.
Performance West Inc.
performancewest.net
"""
msg = MIMEText(body)
msg["Subject"] = f"{priority_emoji} [{priority}] Mail received at {mailbox}@{domain}: {subject[:60]}"
msg["From"] = smtp_from
msg["To"] = to_email
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
LOG.info("Client alert sent to %s for %s@%s", to_email, mailbox, domain)
def _send_admin_alert(
mailbox: str,
domain: str,
from_addr: str,
subject: str,
priority: str,
order_number: str,
):
"""Send an alert to admin for high-priority correspondence."""
import smtplib
from email.mime.text import MIMEText
smtp_host = os.getenv("SMTP_HOST", "")
smtp_port = int(os.getenv("SMTP_PORT", "587"))
smtp_user = os.getenv("SMTP_USER", "")
smtp_pass = os.getenv("SMTP_PASS", "")
smtp_from = os.getenv("SMTP_FROM", "noreply@performancewest.net")
admin_email = "ops@performancewest.net"
if not smtp_host:
return
body = f"""[{priority}] Correspondence at {mailbox}@{domain}
Order: {order_number}
From: {from_addr}
Subject: {subject}
Mailbox: {mailbox}@{domain}
Action may be required. Check ERPNext Issues.
"""
msg = MIMEText(body)
msg["Subject"] = f"[PW Admin] {priority} mail at {mailbox}@{domain}: {subject[:50]}"
msg["From"] = smtp_from
msg["To"] = admin_email
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
# ══════════════════════════════════════════════════════════════════════
# FMCSA Filing Confirmation Monitor
# ══════════════════════════════════════════════════════════════════════
# Monitors ops@performancewest.net for FMCSA confirmation emails
# (forwarded from filings@performancewest.net) and links them to orders.
#
# FMCSA replies from: fmcsa_ask@mailag.cx.usg.oraclecloud.com
# Subject contains: "Question Reference #" + reference number
# Body contains: DOT number, status, timestamps
FMCSA_IMAP_HOST = os.getenv("PW_IMAP_HOST", "cp.carrierone.com")
FMCSA_IMAP_PORT = int(os.getenv("PW_IMAP_PORT", "993"))
FMCSA_IMAP_USER = os.getenv("PW_FILINGS_IMAP_USER", "ops@performancewest.net")
FMCSA_IMAP_PASS = os.getenv("PW_FILINGS_IMAP_PASS", "")
FMCSA_SENDER = "fmcsa_ask@mailag.cx.usg.oraclecloud.com"
def check_fmcsa_confirmations() -> list[dict]:
"""Check ops@ mailbox for FMCSA filing confirmations.
Returns list of parsed confirmation dicts.
"""
if not FMCSA_IMAP_PASS:
LOG.warning("[fmcsa-mail] No IMAP password configured (PW_FILINGS_IMAP_PASS)")
return []
confirmations = []
try:
imap = imaplib.IMAP4_SSL(FMCSA_IMAP_HOST, FMCSA_IMAP_PORT)
imap.login(FMCSA_IMAP_USER, FMCSA_IMAP_PASS)
imap.select("INBOX")
# Search for unread emails from FMCSA
status, msg_ids = imap.search(
None, f'(UNSEEN FROM "{FMCSA_SENDER}")'
)
if status != "OK" or not msg_ids[0]:
imap.logout()
return []
for msg_id in msg_ids[0].split():
try:
status, msg_data = imap.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
subject = msg.get("Subject", "")
date_str = msg.get("Date", "")
# Get body
body = ""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
break
elif ct == "text/html" and not body:
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
else:
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
# Extract FMCSA reference number (format: YYMMDD-NNNNNN)
ref_match = re.search(r"(\d{6}-\d{6})", subject + " " + body)
fmcsa_ref = ref_match.group(1) if ref_match else None
# Extract DOT number from body
dot_match = re.search(
r"(?:USDOT|DOT)\s*#?\s*:?\s*(\d{5,8})", body, re.IGNORECASE
)
dot_number = dot_match.group(1) if dot_match else None
# Extract status
status_match = re.search(
r"Status\s*:?\s*(Unresolved|Resolved|Waiting|Closed|Updated)",
body, re.IGNORECASE,
)
fmcsa_status = status_match.group(1) if status_match else None
confirmation = {
"fmcsa_reference": fmcsa_ref,
"dot_number": dot_number,
"subject": subject,
"status": fmcsa_status,
"date": date_str,
"body": body[:3000],
"message_id": msg.get("Message-ID", ""),
}
confirmations.append(confirmation)
# Mark as read
imap.store(msg_id, "+FLAGS", "\\Seen")
LOG.info(
"[fmcsa-mail] Confirmation: ref=%s dot=%s status=%s",
fmcsa_ref, dot_number, fmcsa_status,
)
except Exception as e:
LOG.error("[fmcsa-mail] Failed to parse email %s: %s", msg_id, e)
imap.logout()
except Exception as e:
LOG.error("[fmcsa-mail] IMAP error: %s", e)
return confirmations
def process_fmcsa_confirmation(conf: dict) -> None:
"""Link an FMCSA confirmation to the matching compliance order.
Updates the order's intake_data with the FMCSA reference number,
submission status, and confirmation email body. Also sends a Telegram
notification to admin.
"""
import psycopg2
fmcsa_ref = conf.get("fmcsa_reference")
dot_number = conf.get("dot_number")
if not dot_number and not fmcsa_ref:
LOG.warning("[fmcsa-mail] No DOT or reference to match: %s", conf.get("subject"))
return
try:
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
# Find the matching MCS-150 order by DOT number
cur.execute("""
SELECT order_number, customer_email, intake_data
FROM compliance_orders
WHERE service_slug = 'mcs150-update'
AND (intake_data->>'dot_number' = %s OR intake_data->>'dot_number' = %s)
AND status IN ('processing', 'in_progress', 'filed', 'submitted')
ORDER BY created_at DESC
LIMIT 1
""", (dot_number, dot_number))
row = cur.fetchone()
if not row:
LOG.info("[fmcsa-mail] No matching order for DOT %s", dot_number)
conn.close()
return
order_number = row[0]
# Update order with FMCSA confirmation
import json
fmcsa_log = {
"fmcsa_reference": fmcsa_ref,
"fmcsa_status": conf.get("status"),
"confirmation_date": conf.get("date"),
"confirmation_subject": conf.get("subject"),
"confirmation_body": conf.get("body", "")[:1000],
}
cur.execute("""
UPDATE compliance_orders
SET intake_data = jsonb_set(
COALESCE(intake_data, '{}'::jsonb),
'{fmcsa_confirmation}',
%s::jsonb
),
status = CASE
WHEN status IN ('processing', 'in_progress') THEN 'filed'
ELSE status
END
WHERE order_number = %s
""", (json.dumps(fmcsa_log), order_number))
conn.commit()
conn.close()
LOG.info(
"[fmcsa-mail] Linked ref %s to order %s (DOT %s)",
fmcsa_ref, order_number, dot_number,
)
# Send Telegram notification
_notify_fmcsa_confirmation(order_number, dot_number, fmcsa_ref, conf.get("status"))
except Exception as e:
LOG.error("[fmcsa-mail] Failed to process confirmation: %s", e)
def _notify_fmcsa_confirmation(order_number, dot_number, fmcsa_ref, status):
"""Send Telegram notification about FMCSA confirmation."""
try:
import urllib.request
bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot_token or not chat_id:
return
text = (
f"📋 FMCSA Filing Confirmed\n"
f"Order: {order_number}\n"
f"DOT#: {dot_number}\n"
f"Reference: {fmcsa_ref or 'pending'}\n"
f"Status: {status or 'received'}"
)
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = json.dumps({"chat_id": chat_id, "text": text}).encode()
req = urllib.request.Request(
url, data=data, headers={"Content-Type": "application/json"}
)
urllib.request.urlopen(req, timeout=10)
except Exception:
pass
def main():
"""Main processing loop — run via cron every 15 minutes."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
LOG.info("=" * 60)
LOG.info("Client Email Processor — checking mailboxes")
LOG.info("=" * 60)
# ── FMCSA filing confirmations ──
fmcsa_confs = check_fmcsa_confirmations()
for conf in fmcsa_confs:
process_fmcsa_confirmation(conf)
if fmcsa_confs:
LOG.info("Processed %d FMCSA confirmation(s)", len(fmcsa_confs))
# ── Client domain mailboxes ──
# In production, we'd query the DB for all active client domains
# and their IMAP credentials (stored encrypted in ERPNext Sensitive ID).
# TODO: Load client domains and IMAP passwords from ERPNext
# domains = get_client_domains()
# For each domain, check all mailboxes in ALL_MAILBOXES
LOG.info("Email processor complete.")
LOG.info("Mailboxes to monitor per domain: %s", ", ".join(ALL_MAILBOXES))
if __name__ == "__main__":
main()