diff --git a/scripts/workers/client_email_processor.py b/scripts/workers/client_email_processor.py index 6b77bf5..7bcd694 100644 --- a/scripts/workers/client_email_processor.py +++ b/scripts/workers/client_email_processor.py @@ -455,6 +455,224 @@ Action may be required. Check ERPNext Issues. server.send_message(msg) +# ══════════════════════════════════════════════════════════════════════ +# FMCSA Filing Confirmation Monitor +# ══════════════════════════════════════════════════════════════════════ +# Monitors ops@performancewest.net for FMCSA confirmation emails +# (forwarded from filings@performancewest.net) and links them to orders. +# +# FMCSA replies from: fmcsa_ask@mailag.cx.usg.oraclecloud.com +# Subject contains: "Question Reference #" + reference number +# Body contains: DOT number, status, timestamps + +FMCSA_IMAP_HOST = os.getenv("PW_IMAP_HOST", "cp.carrierone.com") +FMCSA_IMAP_PORT = int(os.getenv("PW_IMAP_PORT", "993")) +FMCSA_IMAP_USER = os.getenv("PW_FILINGS_IMAP_USER", "ops@performancewest.net") +FMCSA_IMAP_PASS = os.getenv("PW_FILINGS_IMAP_PASS", "") +FMCSA_SENDER = "fmcsa_ask@mailag.cx.usg.oraclecloud.com" + + +def check_fmcsa_confirmations() -> list[dict]: + """Check ops@ mailbox for FMCSA filing confirmations. + + Returns list of parsed confirmation dicts. + """ + if not FMCSA_IMAP_PASS: + LOG.warning("[fmcsa-mail] No IMAP password configured (PW_FILINGS_IMAP_PASS)") + return [] + + confirmations = [] + + try: + imap = imaplib.IMAP4_SSL(FMCSA_IMAP_HOST, FMCSA_IMAP_PORT) + imap.login(FMCSA_IMAP_USER, FMCSA_IMAP_PASS) + imap.select("INBOX") + + # Search for unread emails from FMCSA + status, msg_ids = imap.search( + None, f'(UNSEEN FROM "{FMCSA_SENDER}")' + ) + if status != "OK" or not msg_ids[0]: + imap.logout() + return [] + + for msg_id in msg_ids[0].split(): + try: + status, msg_data = imap.fetch(msg_id, "(RFC822)") + if status != "OK": + continue + + raw = msg_data[0][1] + msg = email.message_from_bytes(raw) + + subject = msg.get("Subject", "") + date_str = msg.get("Date", "") + + # Get body + body = "" + if msg.is_multipart(): + for part in msg.walk(): + ct = part.get_content_type() + if ct == "text/plain": + body = part.get_payload(decode=True).decode("utf-8", errors="replace") + break + elif ct == "text/html" and not body: + body = part.get_payload(decode=True).decode("utf-8", errors="replace") + else: + body = msg.get_payload(decode=True).decode("utf-8", errors="replace") + + # Extract FMCSA reference number (format: YYMMDD-NNNNNN) + ref_match = re.search(r"(\d{6}-\d{6})", subject + " " + body) + fmcsa_ref = ref_match.group(1) if ref_match else None + + # Extract DOT number from body + dot_match = re.search( + r"(?:USDOT|DOT)\s*#?\s*:?\s*(\d{5,8})", body, re.IGNORECASE + ) + dot_number = dot_match.group(1) if dot_match else None + + # Extract status + status_match = re.search( + r"Status\s*:?\s*(Unresolved|Resolved|Waiting|Closed|Updated)", + body, re.IGNORECASE, + ) + fmcsa_status = status_match.group(1) if status_match else None + + confirmation = { + "fmcsa_reference": fmcsa_ref, + "dot_number": dot_number, + "subject": subject, + "status": fmcsa_status, + "date": date_str, + "body": body[:3000], + "message_id": msg.get("Message-ID", ""), + } + confirmations.append(confirmation) + + # Mark as read + imap.store(msg_id, "+FLAGS", "\\Seen") + + LOG.info( + "[fmcsa-mail] Confirmation: ref=%s dot=%s status=%s", + fmcsa_ref, dot_number, fmcsa_status, + ) + + except Exception as e: + LOG.error("[fmcsa-mail] Failed to parse email %s: %s", msg_id, e) + + imap.logout() + + except Exception as e: + LOG.error("[fmcsa-mail] IMAP error: %s", e) + + return confirmations + + +def process_fmcsa_confirmation(conf: dict) -> None: + """Link an FMCSA confirmation to the matching compliance order. + + Updates the order's intake_data with the FMCSA reference number, + submission status, and confirmation email body. Also sends a Telegram + notification to admin. + """ + import psycopg2 + + fmcsa_ref = conf.get("fmcsa_reference") + dot_number = conf.get("dot_number") + + if not dot_number and not fmcsa_ref: + LOG.warning("[fmcsa-mail] No DOT or reference to match: %s", conf.get("subject")) + return + + try: + conn = psycopg2.connect(DATABASE_URL) + cur = conn.cursor() + + # Find the matching MCS-150 order by DOT number + cur.execute(""" + SELECT order_number, customer_email, intake_data + FROM compliance_orders + WHERE service_slug = 'mcs150-update' + AND (intake_data->>'dot_number' = %s OR intake_data->>'dot_number' = %s) + AND status IN ('processing', 'in_progress', 'filed', 'submitted') + ORDER BY created_at DESC + LIMIT 1 + """, (dot_number, dot_number)) + + row = cur.fetchone() + if not row: + LOG.info("[fmcsa-mail] No matching order for DOT %s", dot_number) + conn.close() + return + + order_number = row[0] + + # Update order with FMCSA confirmation + import json + fmcsa_log = { + "fmcsa_reference": fmcsa_ref, + "fmcsa_status": conf.get("status"), + "confirmation_date": conf.get("date"), + "confirmation_subject": conf.get("subject"), + "confirmation_body": conf.get("body", "")[:1000], + } + + cur.execute(""" + UPDATE compliance_orders + SET intake_data = jsonb_set( + COALESCE(intake_data, '{}'::jsonb), + '{fmcsa_confirmation}', + %s::jsonb + ), + status = CASE + WHEN status IN ('processing', 'in_progress') THEN 'filed' + ELSE status + END + WHERE order_number = %s + """, (json.dumps(fmcsa_log), order_number)) + + conn.commit() + conn.close() + + LOG.info( + "[fmcsa-mail] Linked ref %s to order %s (DOT %s)", + fmcsa_ref, order_number, dot_number, + ) + + # Send Telegram notification + _notify_fmcsa_confirmation(order_number, dot_number, fmcsa_ref, conf.get("status")) + + except Exception as e: + LOG.error("[fmcsa-mail] Failed to process confirmation: %s", e) + + +def _notify_fmcsa_confirmation(order_number, dot_number, fmcsa_ref, status): + """Send Telegram notification about FMCSA confirmation.""" + try: + import urllib.request + bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "") + chat_id = os.getenv("TELEGRAM_CHAT_ID", "") + if not bot_token or not chat_id: + return + + text = ( + f"📋 FMCSA Filing Confirmed\n" + f"Order: {order_number}\n" + f"DOT#: {dot_number}\n" + f"Reference: {fmcsa_ref or 'pending'}\n" + f"Status: {status or 'received'}" + ) + + url = f"https://api.telegram.org/bot{bot_token}/sendMessage" + data = json.dumps({"chat_id": chat_id, "text": text}).encode() + req = urllib.request.Request( + url, data=data, headers={"Content-Type": "application/json"} + ) + urllib.request.urlopen(req, timeout=10) + except Exception: + pass + + def main(): """Main processing loop — run via cron every 15 minutes.""" logging.basicConfig( @@ -466,15 +684,22 @@ def main(): LOG.info("Client Email Processor — checking mailboxes") LOG.info("=" * 60) + # ── FMCSA filing confirmations ── + fmcsa_confs = check_fmcsa_confirmations() + for conf in fmcsa_confs: + process_fmcsa_confirmation(conf) + if fmcsa_confs: + LOG.info("Processed %d FMCSA confirmation(s)", len(fmcsa_confs)) + + # ── Client domain mailboxes ── # In production, we'd query the DB for all active client domains # and their IMAP credentials (stored encrypted in ERPNext Sensitive ID). - # For now, this is a framework that processes any configured domains. # TODO: Load client domains and IMAP passwords from ERPNext # domains = get_client_domains() # For each domain, check all mailboxes in ALL_MAILBOXES - LOG.info("Email processor framework ready. Configure client domains to begin processing.") + LOG.info("Email processor complete.") LOG.info("Mailboxes to monitor per domain: %s", ", ".join(ALL_MAILBOXES))