add FMCSA confirmation email monitor: IMAP watcher links confirmations to orders, Telegram notify

This commit is contained in:
justin 2026-05-30 19:01:42 -05:00
parent b90d443667
commit a3a546abff

View file

@ -455,6 +455,224 @@ Action may be required. Check ERPNext Issues.
server.send_message(msg) server.send_message(msg)
# ══════════════════════════════════════════════════════════════════════
# FMCSA Filing Confirmation Monitor
# ══════════════════════════════════════════════════════════════════════
# Monitors ops@performancewest.net for FMCSA confirmation emails
# (forwarded from filings@performancewest.net) and links them to orders.
#
# FMCSA replies from: fmcsa_ask@mailag.cx.usg.oraclecloud.com
# Subject contains: "Question Reference #" + reference number
# Body contains: DOT number, status, timestamps
FMCSA_IMAP_HOST = os.getenv("PW_IMAP_HOST", "cp.carrierone.com")
FMCSA_IMAP_PORT = int(os.getenv("PW_IMAP_PORT", "993"))
FMCSA_IMAP_USER = os.getenv("PW_FILINGS_IMAP_USER", "ops@performancewest.net")
FMCSA_IMAP_PASS = os.getenv("PW_FILINGS_IMAP_PASS", "")
FMCSA_SENDER = "fmcsa_ask@mailag.cx.usg.oraclecloud.com"
def check_fmcsa_confirmations() -> list[dict]:
"""Check ops@ mailbox for FMCSA filing confirmations.
Returns list of parsed confirmation dicts.
"""
if not FMCSA_IMAP_PASS:
LOG.warning("[fmcsa-mail] No IMAP password configured (PW_FILINGS_IMAP_PASS)")
return []
confirmations = []
try:
imap = imaplib.IMAP4_SSL(FMCSA_IMAP_HOST, FMCSA_IMAP_PORT)
imap.login(FMCSA_IMAP_USER, FMCSA_IMAP_PASS)
imap.select("INBOX")
# Search for unread emails from FMCSA
status, msg_ids = imap.search(
None, f'(UNSEEN FROM "{FMCSA_SENDER}")'
)
if status != "OK" or not msg_ids[0]:
imap.logout()
return []
for msg_id in msg_ids[0].split():
try:
status, msg_data = imap.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
subject = msg.get("Subject", "")
date_str = msg.get("Date", "")
# Get body
body = ""
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
break
elif ct == "text/html" and not body:
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
else:
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
# Extract FMCSA reference number (format: YYMMDD-NNNNNN)
ref_match = re.search(r"(\d{6}-\d{6})", subject + " " + body)
fmcsa_ref = ref_match.group(1) if ref_match else None
# Extract DOT number from body
dot_match = re.search(
r"(?:USDOT|DOT)\s*#?\s*:?\s*(\d{5,8})", body, re.IGNORECASE
)
dot_number = dot_match.group(1) if dot_match else None
# Extract status
status_match = re.search(
r"Status\s*:?\s*(Unresolved|Resolved|Waiting|Closed|Updated)",
body, re.IGNORECASE,
)
fmcsa_status = status_match.group(1) if status_match else None
confirmation = {
"fmcsa_reference": fmcsa_ref,
"dot_number": dot_number,
"subject": subject,
"status": fmcsa_status,
"date": date_str,
"body": body[:3000],
"message_id": msg.get("Message-ID", ""),
}
confirmations.append(confirmation)
# Mark as read
imap.store(msg_id, "+FLAGS", "\\Seen")
LOG.info(
"[fmcsa-mail] Confirmation: ref=%s dot=%s status=%s",
fmcsa_ref, dot_number, fmcsa_status,
)
except Exception as e:
LOG.error("[fmcsa-mail] Failed to parse email %s: %s", msg_id, e)
imap.logout()
except Exception as e:
LOG.error("[fmcsa-mail] IMAP error: %s", e)
return confirmations
def process_fmcsa_confirmation(conf: dict) -> None:
"""Link an FMCSA confirmation to the matching compliance order.
Updates the order's intake_data with the FMCSA reference number,
submission status, and confirmation email body. Also sends a Telegram
notification to admin.
"""
import psycopg2
fmcsa_ref = conf.get("fmcsa_reference")
dot_number = conf.get("dot_number")
if not dot_number and not fmcsa_ref:
LOG.warning("[fmcsa-mail] No DOT or reference to match: %s", conf.get("subject"))
return
try:
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
# Find the matching MCS-150 order by DOT number
cur.execute("""
SELECT order_number, customer_email, intake_data
FROM compliance_orders
WHERE service_slug = 'mcs150-update'
AND (intake_data->>'dot_number' = %s OR intake_data->>'dot_number' = %s)
AND status IN ('processing', 'in_progress', 'filed', 'submitted')
ORDER BY created_at DESC
LIMIT 1
""", (dot_number, dot_number))
row = cur.fetchone()
if not row:
LOG.info("[fmcsa-mail] No matching order for DOT %s", dot_number)
conn.close()
return
order_number = row[0]
# Update order with FMCSA confirmation
import json
fmcsa_log = {
"fmcsa_reference": fmcsa_ref,
"fmcsa_status": conf.get("status"),
"confirmation_date": conf.get("date"),
"confirmation_subject": conf.get("subject"),
"confirmation_body": conf.get("body", "")[:1000],
}
cur.execute("""
UPDATE compliance_orders
SET intake_data = jsonb_set(
COALESCE(intake_data, '{}'::jsonb),
'{fmcsa_confirmation}',
%s::jsonb
),
status = CASE
WHEN status IN ('processing', 'in_progress') THEN 'filed'
ELSE status
END
WHERE order_number = %s
""", (json.dumps(fmcsa_log), order_number))
conn.commit()
conn.close()
LOG.info(
"[fmcsa-mail] Linked ref %s to order %s (DOT %s)",
fmcsa_ref, order_number, dot_number,
)
# Send Telegram notification
_notify_fmcsa_confirmation(order_number, dot_number, fmcsa_ref, conf.get("status"))
except Exception as e:
LOG.error("[fmcsa-mail] Failed to process confirmation: %s", e)
def _notify_fmcsa_confirmation(order_number, dot_number, fmcsa_ref, status):
"""Send Telegram notification about FMCSA confirmation."""
try:
import urllib.request
bot_token = os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("TELEGRAM_CHAT_ID", "")
if not bot_token or not chat_id:
return
text = (
f"📋 FMCSA Filing Confirmed\n"
f"Order: {order_number}\n"
f"DOT#: {dot_number}\n"
f"Reference: {fmcsa_ref or 'pending'}\n"
f"Status: {status or 'received'}"
)
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = json.dumps({"chat_id": chat_id, "text": text}).encode()
req = urllib.request.Request(
url, data=data, headers={"Content-Type": "application/json"}
)
urllib.request.urlopen(req, timeout=10)
except Exception:
pass
def main(): def main():
"""Main processing loop — run via cron every 15 minutes.""" """Main processing loop — run via cron every 15 minutes."""
logging.basicConfig( logging.basicConfig(
@ -466,15 +684,22 @@ def main():
LOG.info("Client Email Processor — checking mailboxes") LOG.info("Client Email Processor — checking mailboxes")
LOG.info("=" * 60) LOG.info("=" * 60)
# ── FMCSA filing confirmations ──
fmcsa_confs = check_fmcsa_confirmations()
for conf in fmcsa_confs:
process_fmcsa_confirmation(conf)
if fmcsa_confs:
LOG.info("Processed %d FMCSA confirmation(s)", len(fmcsa_confs))
# ── Client domain mailboxes ──
# In production, we'd query the DB for all active client domains # In production, we'd query the DB for all active client domains
# and their IMAP credentials (stored encrypted in ERPNext Sensitive ID). # and their IMAP credentials (stored encrypted in ERPNext Sensitive ID).
# For now, this is a framework that processes any configured domains.
# TODO: Load client domains and IMAP passwords from ERPNext # TODO: Load client domains and IMAP passwords from ERPNext
# domains = get_client_domains() # domains = get_client_domains()
# For each domain, check all mailboxes in ALL_MAILBOXES # For each domain, check all mailboxes in ALL_MAILBOXES
LOG.info("Email processor framework ready. Configure client domains to begin processing.") LOG.info("Email processor complete.")
LOG.info("Mailboxes to monitor per domain: %s", ", ".join(ALL_MAILBOXES)) LOG.info("Mailboxes to monitor per domain: %s", ", ".join(ALL_MAILBOXES))