""" Client Email Processor — monitors client @company.ca mailboxes on HestiaCP for incoming regulatory, government, and business correspondence. Connects via IMAP to cp.carrierone.com, checks purpose-built mailboxes, and creates ERPNext Issues/tasks/alerts based on the sender and content. Runs as a cron job every 15 minutes. Email addresses we create per client domain: REGULATORY: regulatory@ — CRTC correspondence, public notices, compliance requests crtc@ — Specifically for CRTC registration (address on the letter) ccts@ — CCTS complaint notifications, membership correspondence CORPORATE: corpadmin@ — BC Registry correspondence, annual report notices registeredoffice@ — Official legal service, government notices (high priority) FINANCIAL: accounting@ — CRA tax notices, GST/HST, bank correspondence billing@ — Payment notifications, invoice inquiries OPERATIONS: abuse@ — Telecom abuse reports (spam, robocall complaints) — RFC required noc@ — Network operations, outage notifications, interconnection partners postmaster@ — Email delivery issues, bounce handling (RFC 2142 required) BUSINESS: info@ — General inquiries admin@ — Administrative catch-all sales@ — Wholesale partner inquiries support@ — Customer support """ from __future__ import annotations import email import email.utils import imaplib import json import logging import os import re import time from datetime import datetime from typing import Optional LOG = logging.getLogger("workers.email_processor") # HestiaCP IMAP configuration IMAP_HOST = os.getenv("HESTIA_IMAP_HOST", "cp.carrierone.com") IMAP_PORT = int(os.getenv("HESTIA_IMAP_PORT", "993")) IMAP_USE_SSL = os.getenv("HESTIA_IMAP_SSL", "true").lower() == "true" # Database DATABASE_URL = os.getenv("DATABASE_URL", "") # Priority mapping: which mailboxes get which priority in ERPNext MAILBOX_CONFIG = { "registeredoffice": { "priority": "Urgent", "issue_type": "Legal Notice", "alert_client": True, "alert_admin": True, "description": "Official correspondence to the registered office — may be legal service or government notice.", }, "crtc": { "priority": "High", "issue_type": "Regulatory", "alert_client": True, "alert_admin": True, "description": "Correspondence from or related to CRTC registration.", }, "ccts": { "priority": "High", "issue_type": "Complaint", "alert_client": True, "alert_admin": True, "description": "CCTS complaint or membership correspondence. Requires timely response.", }, "regulatory": { "priority": "High", "issue_type": "Regulatory", "alert_client": True, "alert_admin": False, "description": "General regulatory correspondence (CRTC public notices, consultations, decisions).", }, "corpadmin": { "priority": "Medium", "issue_type": "Corporate", "alert_client": True, "alert_admin": True, "description": "BC Registry or corporate administration correspondence.", }, "accounting": { "priority": "Medium", "issue_type": "Financial", "alert_client": True, "alert_admin": False, "description": "Financial correspondence — CRA tax notices, GST/HST, bank statements.", }, "billing": { "priority": "Low", "issue_type": "Financial", "alert_client": True, "alert_admin": False, "description": "Billing inquiry or payment notification.", }, "abuse": { "priority": "High", "issue_type": "Abuse Report", "alert_client": True, "alert_admin": True, "description": "Telecom abuse report — spam, robocall complaint, or network abuse allegation. Respond promptly.", }, "noc": { "priority": "High", "issue_type": "Network Operations", "alert_client": True, "alert_admin": False, "description": "Network operations — outage notification, interconnection issue, or technical alert from a partner.", }, "postmaster": { "priority": "Low", "issue_type": "Technical", "alert_client": False, "alert_admin": False, "description": "Email delivery issue or bounce notification.", }, "info": { "priority": "Low", "issue_type": "General Inquiry", "alert_client": True, "alert_admin": False, "description": "General inquiry received at info@ address.", }, "admin": { "priority": "Low", "issue_type": "Administrative", "alert_client": True, "alert_admin": False, "description": "Administrative correspondence.", }, "sales": { "priority": "Medium", "issue_type": "Sales Inquiry", "alert_client": True, "alert_admin": False, "description": "Sales or wholesale partnership inquiry.", }, "support": { "priority": "Medium", "issue_type": "Support", "alert_client": True, "alert_admin": False, "description": "Customer support request.", }, } # All mailbox prefixes we create per client domain ALL_MAILBOXES = list(MAILBOX_CONFIG.keys()) def get_client_domains() -> list[dict]: """Get all active client domains from the database.""" import psycopg2 conn = psycopg2.connect(DATABASE_URL) try: cur = conn.cursor() cur.execute(""" SELECT order_number, customer_email, customer_name, company_name_final FROM canada_crtc_orders WHERE status IN ('delivered', 'crtc_registration', 'ccts_registration', 'banking_setup', 'binder_compilation', 'phone_ready', 'domain_ready', 'review', 'incorporation') AND order_number IS NOT NULL """) rows = cur.fetchall() domains = [] for row in rows: # Extract the .ca domain from the order (would be stored in a field) # For now, derive from company name or order data domains.append({ "order_number": row[0], "customer_email": row[1], "customer_name": row[2], "company_name": row[3], }) return domains finally: conn.close() def check_mailbox( domain: str, mailbox_prefix: str, imap_password: str, ) -> list[dict]: """Connect to IMAP and fetch unread emails from a specific mailbox. Args: domain: The client's .ca domain (e.g., "valleyinternet.ca") mailbox_prefix: The mailbox prefix (e.g., "crtc", "abuse", "noc") imap_password: IMAP password for this mailbox Returns: List of parsed email dicts """ email_address = f"{mailbox_prefix}@{domain}" emails = [] try: if IMAP_USE_SSL: imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) else: imap = imaplib.IMAP4(IMAP_HOST, IMAP_PORT) imap.login(email_address, imap_password) imap.select("INBOX") # Search for unread emails status, message_ids = imap.search(None, "UNSEEN") if status != "OK" or not message_ids[0]: imap.logout() return [] for msg_id in message_ids[0].split(): try: status, msg_data = imap.fetch(msg_id, "(RFC822)") if status != "OK": continue raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) # Parse email from_addr = email.utils.parseaddr(msg.get("From", ""))[1] from_name = email.utils.parseaddr(msg.get("From", ""))[0] subject = msg.get("Subject", "(no subject)") date_str = msg.get("Date", "") message_id = msg.get("Message-ID", "") # Get body (prefer plain text) body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": body = part.get_payload(decode=True).decode("utf-8", errors="replace") break elif part.get_content_type() == "text/html" and not body: body = part.get_payload(decode=True).decode("utf-8", errors="replace") else: body = msg.get_payload(decode=True).decode("utf-8", errors="replace") emails.append({ "mailbox": mailbox_prefix, "domain": domain, "to": email_address, "from_addr": from_addr, "from_name": from_name, "subject": subject, "body": body[:5000], # Truncate very long emails "date": date_str, "message_id": message_id, }) # Mark as seen (already done by IMAP fetch with UNSEEN filter) imap.store(msg_id, "+FLAGS", "\\Seen") except Exception as e: LOG.error("Failed to parse email %s in %s@%s: %s", msg_id, mailbox_prefix, domain, e) imap.logout() except imaplib.IMAP4.error as e: LOG.error("IMAP error for %s@%s: %s", mailbox_prefix, domain, e) except Exception as e: LOG.error("Connection error for %s@%s: %s", mailbox_prefix, domain, e) return emails def process_email(email_data: dict, order_info: dict) -> None: """Process a single incoming email — create ERPNext Issue and alert client. Args: email_data: Parsed email dict from check_mailbox() order_info: Client order info (order_number, customer_email, customer_name) """ mailbox = email_data["mailbox"] config = MAILBOX_CONFIG.get(mailbox, MAILBOX_CONFIG["info"]) LOG.info( "Processing email: %s@%s from=%s subject='%s' priority=%s", mailbox, email_data["domain"], email_data["from_addr"], email_data["subject"][:60], config["priority"], ) # Create ERPNext Issue try: from scripts.workers.erpnext_client import ERPNextClient client = ERPNextClient() issue_subject = f"[{mailbox.upper()}@{email_data['domain']}] {email_data['subject']}" issue_description = ( f"**Incoming email processed automatically**\n\n" f"**Mailbox:** {email_data['to']}\n" f"**From:** {email_data['from_name']} <{email_data['from_addr']}>\n" f"**Date:** {email_data['date']}\n" f"**Subject:** {email_data['subject']}\n" f"**Category:** {config['description']}\n\n" f"---\n\n" f"{email_data['body'][:3000]}" ) client.create_resource("Issue", { "subject": issue_subject[:140], "description": issue_description, "issue_type": config.get("issue_type", "General Inquiry"), "priority": config["priority"], }) LOG.info("Created ERPNext Issue: %s", issue_subject[:80]) except Exception as e: LOG.error("Failed to create ERPNext Issue: %s", e) # Alert client via email (forward the original email context) if config.get("alert_client") and order_info.get("customer_email"): try: _send_client_alert( to_email=order_info["customer_email"], customer_name=order_info.get("customer_name", ""), mailbox=mailbox, domain=email_data["domain"], from_addr=email_data["from_addr"], subject=email_data["subject"], body_preview=email_data["body"][:500], priority=config["priority"], ) except Exception as e: LOG.error("Failed to send client alert: %s", e) # Alert admin for high-priority items if config.get("alert_admin"): try: _send_admin_alert( mailbox=mailbox, domain=email_data["domain"], from_addr=email_data["from_addr"], subject=email_data["subject"], priority=config["priority"], order_number=order_info.get("order_number", ""), ) except Exception as e: LOG.error("Failed to send admin alert: %s", e) def _send_client_alert( to_email: str, customer_name: str, mailbox: str, domain: str, from_addr: str, subject: str, body_preview: str, priority: str, ): """Send an alert email to the client about incoming correspondence.""" import smtplib from email.mime.text import MIMEText smtp_host = os.getenv("SMTP_HOST", "") smtp_port = int(os.getenv("SMTP_PORT", "587")) smtp_user = os.getenv("SMTP_USER", "") smtp_pass = os.getenv("SMTP_PASS", "") smtp_from = os.getenv("SMTP_FROM", "noreply@performancewest.net") if not smtp_host: LOG.warning("SMTP not configured, skipping client alert") return priority_emoji = {"Urgent": "🚨", "High": "⚠️", "Medium": "📬", "Low": "📧"}.get(priority, "📧") body = f"""Hi {customer_name or 'there'}, {priority_emoji} New correspondence received at {mailbox}@{domain} From: {from_addr} Subject: {subject} Priority: {priority} Preview: {body_preview} --- This email was received at your {mailbox}@{domain} mailbox and has been logged in your Performance West client portal. You can view the full message and respond through your portal. If this requires urgent attention, please log in to your portal or contact us. Performance West Inc. performancewest.net """ msg = MIMEText(body) msg["Subject"] = f"{priority_emoji} [{priority}] Mail received at {mailbox}@{domain}: {subject[:60]}" msg["From"] = smtp_from msg["To"] = to_email with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_pass) server.send_message(msg) LOG.info("Client alert sent to %s for %s@%s", to_email, mailbox, domain) def _send_admin_alert( mailbox: str, domain: str, from_addr: str, subject: str, priority: str, order_number: str, ): """Send an alert to admin for high-priority correspondence.""" import smtplib from email.mime.text import MIMEText smtp_host = os.getenv("SMTP_HOST", "") smtp_port = int(os.getenv("SMTP_PORT", "587")) smtp_user = os.getenv("SMTP_USER", "") smtp_pass = os.getenv("SMTP_PASS", "") smtp_from = os.getenv("SMTP_FROM", "noreply@performancewest.net") admin_email = "ops@performancewest.net" if not smtp_host: return body = f"""[{priority}] Correspondence at {mailbox}@{domain} Order: {order_number} From: {from_addr} Subject: {subject} Mailbox: {mailbox}@{domain} Action may be required. Check ERPNext Issues. """ msg = MIMEText(body) msg["Subject"] = f"[PW Admin] {priority} mail at {mailbox}@{domain}: {subject[:50]}" msg["From"] = smtp_from msg["To"] = admin_email with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_pass) server.send_message(msg) # ══════════════════════════════════════════════════════════════════════ # FMCSA Filing Confirmation Monitor # ══════════════════════════════════════════════════════════════════════ # Monitors ops@performancewest.net for FMCSA confirmation emails # (forwarded from filings@performancewest.net) and links them to orders. # # FMCSA replies from: fmcsa_ask@mailag.cx.usg.oraclecloud.com # Subject contains: "Question Reference #" + reference number # Body contains: DOT number, status, timestamps FMCSA_IMAP_HOST = os.getenv("PW_IMAP_HOST", "cp.carrierone.com") FMCSA_IMAP_PORT = int(os.getenv("PW_IMAP_PORT", "993")) FMCSA_IMAP_USER = os.getenv("PW_FILINGS_IMAP_USER", "ops@performancewest.net") FMCSA_IMAP_PASS = os.getenv("PW_FILINGS_IMAP_PASS", "") FMCSA_SENDER = "fmcsa_ask@mailag.cx.usg.oraclecloud.com" def check_fmcsa_confirmations() -> list[dict]: """Check ops@ mailbox for FMCSA filing confirmations. Returns list of parsed confirmation dicts. """ if not FMCSA_IMAP_PASS: LOG.warning("[fmcsa-mail] No IMAP password configured (PW_FILINGS_IMAP_PASS)") return [] confirmations = [] try: imap = imaplib.IMAP4_SSL(FMCSA_IMAP_HOST, FMCSA_IMAP_PORT) imap.login(FMCSA_IMAP_USER, FMCSA_IMAP_PASS) imap.select("INBOX") # Search for unread emails from FMCSA status, msg_ids = imap.search( None, f'(UNSEEN FROM "{FMCSA_SENDER}")' ) if status != "OK" or not msg_ids[0]: imap.logout() return [] for msg_id in msg_ids[0].split(): try: status, msg_data = imap.fetch(msg_id, "(RFC822)") if status != "OK": continue raw = msg_data[0][1] msg = email.message_from_bytes(raw) subject = msg.get("Subject", "") date_str = msg.get("Date", "") # Get body body = "" if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() if ct == "text/plain": body = part.get_payload(decode=True).decode("utf-8", errors="replace") break elif ct == "text/html" and not body: body = part.get_payload(decode=True).decode("utf-8", errors="replace") else: body = msg.get_payload(decode=True).decode("utf-8", errors="replace") # Extract FMCSA reference number (format: YYMMDD-NNNNNN) ref_match = re.search(r"(\d{6}-\d{6})", subject + " " + body) fmcsa_ref = ref_match.group(1) if ref_match else None # Extract DOT number from body dot_match = re.search( r"(?:USDOT|DOT)\s*#?\s*:?\s*(\d{5,8})", body, re.IGNORECASE ) dot_number = dot_match.group(1) if dot_match else None # Extract status status_match = re.search( r"Status\s*:?\s*(Unresolved|Resolved|Waiting|Closed|Updated)", body, re.IGNORECASE, ) fmcsa_status = status_match.group(1) if status_match else None confirmation = { "fmcsa_reference": fmcsa_ref, "dot_number": dot_number, "subject": subject, "status": fmcsa_status, "date": date_str, "body": body[:3000], "message_id": msg.get("Message-ID", ""), } confirmations.append(confirmation) # Mark as read imap.store(msg_id, "+FLAGS", "\\Seen") LOG.info( "[fmcsa-mail] Confirmation: ref=%s dot=%s status=%s", fmcsa_ref, dot_number, fmcsa_status, ) except Exception as e: LOG.error("[fmcsa-mail] Failed to parse email %s: %s", msg_id, e) imap.logout() except Exception as e: LOG.error("[fmcsa-mail] IMAP error: %s", e) return confirmations def process_fmcsa_confirmation(conf: dict) -> None: """Link an FMCSA confirmation to the matching compliance order. Updates the order's intake_data with the FMCSA reference number, submission status, and confirmation email body. Also sends a Telegram notification to admin. """ import psycopg2 fmcsa_ref = conf.get("fmcsa_reference") dot_number = conf.get("dot_number") if not dot_number and not fmcsa_ref: LOG.warning("[fmcsa-mail] No DOT or reference to match: %s", conf.get("subject")) return try: conn = psycopg2.connect(DATABASE_URL) cur = conn.cursor() # Find the matching MCS-150 order by DOT number cur.execute(""" SELECT order_number, customer_email, intake_data FROM compliance_orders WHERE service_slug = 'mcs150-update' AND (intake_data->>'dot_number' = %s OR intake_data->>'dot_number' = %s) AND status IN ('processing', 'in_progress', 'filed', 'submitted') ORDER BY created_at DESC LIMIT 1 """, (dot_number, dot_number)) row = cur.fetchone() if not row: LOG.info("[fmcsa-mail] No matching order for DOT %s", dot_number) conn.close() return order_number = row[0] # Update order with FMCSA confirmation import json fmcsa_log = { "fmcsa_reference": fmcsa_ref, "fmcsa_status": conf.get("status"), "confirmation_date": conf.get("date"), "confirmation_subject": conf.get("subject"), "confirmation_body": conf.get("body", "")[:1000], } cur.execute(""" UPDATE compliance_orders SET intake_data = jsonb_set( COALESCE(intake_data, '{}'::jsonb), '{fmcsa_confirmation}', %s::jsonb ), status = CASE WHEN status IN ('processing', 'in_progress') THEN 'filed' ELSE status END WHERE order_number = %s """, (json.dumps(fmcsa_log), order_number)) conn.commit() conn.close() LOG.info( "[fmcsa-mail] Linked ref %s to order %s (DOT %s)", fmcsa_ref, order_number, dot_number, ) # Send Telegram notification _notify_fmcsa_confirmation(order_number, dot_number, fmcsa_ref, conf.get("status")) except Exception as e: LOG.error("[fmcsa-mail] Failed to process confirmation: %s", e) def _notify_fmcsa_confirmation(order_number, dot_number, fmcsa_ref, status): """Send Telegram notification about FMCSA confirmation.""" try: import urllib.request bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "") chat_id = os.getenv("TELEGRAM_CHAT_ID", "") if not bot_token or not chat_id: return text = ( f"📋 FMCSA Filing Confirmed\n" f"Order: {order_number}\n" f"DOT#: {dot_number}\n" f"Reference: {fmcsa_ref or 'pending'}\n" f"Status: {status or 'received'}" ) url = f"https://api.telegram.org/bot{bot_token}/sendMessage" data = json.dumps({"chat_id": chat_id, "text": text}).encode() req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"} ) urllib.request.urlopen(req, timeout=10) except Exception: pass def main(): """Main processing loop — run via cron every 15 minutes.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", ) LOG.info("=" * 60) LOG.info("Client Email Processor — checking mailboxes") LOG.info("=" * 60) # ── FMCSA filing confirmations ── fmcsa_confs = check_fmcsa_confirmations() for conf in fmcsa_confs: process_fmcsa_confirmation(conf) if fmcsa_confs: LOG.info("Processed %d FMCSA confirmation(s)", len(fmcsa_confs)) # ── Client domain mailboxes ── # In production, we'd query the DB for all active client domains # and their IMAP credentials (stored encrypted in ERPNext Sensitive ID). # TODO: Load client domains and IMAP passwords from ERPNext # domains = get_client_domains() # For each domain, check all mailboxes in ALL_MAILBOXES LOG.info("Email processor complete.") LOG.info("Mailboxes to monitor per domain: %s", ", ".join(ALL_MAILBOXES)) if __name__ == "__main__": main()