"""IRP correspondence + invoice reconciliation. IRP apportioned registration fees can only be computed by the base-state IRP office from the carrier's fleet (Schedule A/B): per-jurisdiction mileage x registered weight x each jurisdiction's per-mile rate. There is no pre-lookup. So for IRP we use the email/POA path (most convenient for the carrier): send_irp_submission(...) Email the base-state IRP unit a Schedule A/B submission request (with the signed POA reference), Reply-To the dedicated IRP filings mailbox, tagged with the order number so replies are matchable. poll_irp_invoices() (cron, every ~15 min) Scan the IRP filings mailbox for state replies, match them to a parent order by the [PW-IRP CO-XXXX] subject tag, parse the apportioned fee, fire a Telegram alert to the operator, and bill the customer the EXACT amount via a gov-fee child order + payment link. Mailbox config (env), so this activates the moment a clean mailbox exists: IRP_FILINGS_IMAP_HOST / _PORT / _USER / _PASS / _FOLDER IRP_FILINGS_FROM the address state replies come back to (Reply-To) Falls back to the OPS_IMAP_* mailbox when the dedicated one isn't configured; in that case it filters by the [PW-IRP ...] subject tag so it ignores unrelated mail. State IRP submission contacts (email/fax) per base state. Sourced from each state's Motor Carrier Services / IRP page. Add states as we confirm them. """ from __future__ import annotations import email import imaplib import json import logging import os import re import smtplib from datetime import datetime, timezone from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from scripts.workers.telegram_notify import send_telegram LOG = logging.getLogger("workers.services.irp_filing") # ── Mailbox config ─────────────────────────────────────────────────────────── IMAP_HOST = os.getenv("IRP_FILINGS_IMAP_HOST") or os.getenv("OPS_IMAP_HOST", "mail.performancewest.net") IMAP_PORT = int(os.getenv("IRP_FILINGS_IMAP_PORT") or os.getenv("OPS_IMAP_PORT", "993")) IMAP_USER = os.getenv("IRP_FILINGS_IMAP_USER") or os.getenv("OPS_IMAP_USER", "ops@performancewest.net") IMAP_PASS = os.getenv("IRP_FILINGS_IMAP_PASS") or os.getenv("OPS_IMAP_PASS", "") IMAP_FOLDER = os.getenv("IRP_FILINGS_IMAP_FOLDER", "INBOX") # Address the state replies to. Defaults to the IMAP user so replies land in the # mailbox we monitor. FILINGS_FROM = os.getenv("IRP_FILINGS_FROM") or IMAP_USER # True when a DEDICATED mailbox is configured (so we can process all unseen mail); # when falling back to a shared mailbox we ONLY touch [PW-IRP ...]-tagged mail. DEDICATED_MAILBOX = bool(os.getenv("IRP_FILINGS_IMAP_USER")) SMTP_HOST = os.getenv("SMTP_HOST", "co.carrierone.com") SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) SMTP_USER = os.getenv("SMTP_USER", "noreply@performancewest.net") SMTP_PASS = os.getenv("SMTP_PASS", "") # Base-state IRP submission contacts. email + optional fax. Extend as confirmed. IRP_STATE_CONTACTS = { "SC": {"agency": "SC DMV Motor Carrier Services", "email": os.getenv("IRP_SC_EMAIL", "MCS@scdmv.net"), "fax": "803-896-3713"}, # NC, GA, FL, TX, etc. — add as confirmed. } SUBJECT_TAG = "PW-IRP" # [PW-IRP CO-XXXXXXXX] in subject for reply matching # Match either state-agency tag: IRP or intrastate authority (ISA). Capture the # tag kind + the order number so the poller knows which service to bill. TAG_RE = re.compile(r"\[PW-(IRP|ISA)\s+(C[OG]-[A-Z0-9]+)\]", re.I) TAG_SLUG = {"IRP": "irp-registration", "ISA": "intrastate-authority"} # Fee patterns commonly seen on IRP invoices / replies. FEE_RE = [ re.compile(r"(?:total\s+(?:fees?\s+)?due|amount\s+due|total\s+amount|apportioned\s+fees?)[^$]{0,40}\$\s*([0-9][0-9,]*\.?\d{0,2})", re.I), re.compile(r"\$\s*([0-9][0-9,]*\.\d{2})\b"), ] def state_irp_contact(base_state: str) -> dict | None: return IRP_STATE_CONTACTS.get((base_state or "").upper()) def _enrich_address_from_census(dot_number: str, intake: dict) -> dict: """Fill missing legal_name / address fields from the FMCSA census so the IRP submission isn't sent with a blank address. Customer-supplied values win.""" out = dict(intake or {}) need = any(not out.get(k) for k in ("legal_name", "address_street", "address_city", "address_state", "address_zip")) if not need or not dot_number: return out try: from scripts.workers.services.mcs150_update import MCS150UpdateHandler census = MCS150UpdateHandler.__new__(MCS150UpdateHandler)._fetch_carrier_record(dot_number) for k in ("legal_name", "address_street", "address_city", "address_state", "address_zip", "phone"): if not out.get(k) and census.get(k): out[k] = census[k] except Exception as exc: # noqa: BLE001 LOG.warning("[irp] census enrich failed for DOT %s: %s", dot_number, exc) return out def _download_minio(key: str) -> bytes | None: """Fetch an object's bytes from MinIO (the signed POA PDF). None on failure.""" if not key: return None try: from minio import Minio mc = Minio( f"{os.getenv('MINIO_ENDPOINT', 'minio')}:{os.getenv('MINIO_PORT', '9000')}", access_key=os.getenv("MINIO_ACCESS_KEY", ""), secret_key=os.getenv("MINIO_SECRET_KEY", ""), secure=os.getenv("MINIO_SECURE", "false").lower() == "true", ) bucket = os.getenv("MINIO_BUCKET", "performancewest") resp = mc.get_object(bucket, key) data = resp.read() resp.close() resp.release_conn() return data except Exception as exc: # noqa: BLE001 LOG.warning("[irp] could not download POA %s: %s", key, exc) return None def send_irp_submission(order_number: str, entity_name: str, dot_number: str, base_state: str, intake: dict, signed_auth_key: str = "") -> bool: """Email the base-state IRP unit a Schedule A/B submission for this carrier, with the signed POA attached, tagged for reply matching. Returns True if sent. The state will not act on a third-party filing without the signed Power of Attorney, so we REQUIRE the POA PDF and attach it; if it's missing we do not send (the caller falls back to a manual todo).""" contact = state_irp_contact(base_state) if not contact or not contact.get("email"): LOG.warning("[%s] No IRP submission contact for base state %s", order_number, base_state) return False # Attach the signed POA — required by the state to act on our behalf. poa_bytes = _download_minio(signed_auth_key) if not poa_bytes: LOG.warning("[%s] No signed POA available (key=%s) — not emailing IRP office", order_number, signed_auth_key or "(none)") return False # Backfill legal name + address from the FMCSA census if intake lacks them. intake = _enrich_address_from_census(dot_number, intake) entity_name = entity_name or intake.get("legal_name", "") op_states = intake.get("operating_states") or [] if isinstance(op_states, str): try: op_states = json.loads(op_states) except Exception: op_states = [s.strip() for s in op_states.split(",") if s.strip()] addr = ", ".join(p for p in [ intake.get("address_street", ""), intake.get("address_city", ""), f"{intake.get('address_state','')} {intake.get('address_zip','')}".strip(), ] if p.strip()) subject = f"IRP Apportioned Registration Request — {entity_name} (USDOT {dot_number}) [{SUBJECT_TAG} {order_number}]" body = ( f"To {contact['agency']},\n\n" f"On behalf of our client, and under the signed Power of Attorney attached " f"to this email, we request IRP apportioned registration for the following " f"carrier. Please reply with the computed apportioned fee invoice so we can " f"remit payment.\n\n" f"Carrier: {entity_name}\n" f"USDOT: {dot_number}\n" f"MC/MX/FF: {intake.get('mc_number','')}\n" f"Base jurisdiction: {base_state}\n" f"Power units: {intake.get('power_units','')}\n" f"Registered weight bracket: {intake.get('gross_weight_bracket','')}\n" f"Operating jurisdictions: {', '.join(op_states) if op_states else base_state}\n" f"Legal/registered address: {addr or '(see attached)'}\n\n" f"Attached: signed Power of Attorney authorizing Performance West Inc. to " f"file and remit fees on the carrier's behalf.\n\n" f"Please reply to {FILINGS_FROM} with the apportioned fee total (including any " f"processing fees) and any required Schedule A/B forms. Keep the subject " f"reference [{SUBJECT_TAG} {order_number}] so we can match your reply.\n\n" f"Thank you,\n" f"Performance West Inc. — DOT / State Motor Carrier Compliance\n" f"(888) 411-0383 · {FILINGS_FROM}\n" ) try: from email.mime.application import MIMEApplication msg = MIMEMultipart() msg["From"] = SMTP_USER msg["To"] = contact["email"] msg["Reply-To"] = FILINGS_FROM msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) poa = MIMEApplication(poa_bytes, _subtype="pdf") poa.add_header("Content-Disposition", "attachment", filename=f"POA_{entity_name.replace(' ','_')}_{dot_number}.pdf") msg.attach(poa) with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as s: s.starttls() if SMTP_USER and SMTP_PASS: s.login(SMTP_USER, SMTP_PASS) s.sendmail(SMTP_USER, [contact["email"], FILINGS_FROM], msg.as_string()) LOG.info("[%s] IRP submission emailed to %s (%s) with POA attached", order_number, contact["email"], base_state) send_telegram( f"📤 IRP submission sent (POA attached)\n{entity_name} (DOT {dot_number})\n" f"Base state: {base_state} → {contact['email']}\n" f"Order: {order_number}\nAwaiting the state's apportioned-fee invoice." ) return True except Exception as exc: # noqa: BLE001 LOG.error("[%s] Failed to send IRP submission: %s", order_number, exc) return False def _parse_fee_cents(text: str) -> int | None: for pat in FEE_RE: m = pat.search(text or "") if m: try: return int(round(float(m.group(1).replace(",", "")) * 100)) except ValueError: continue return None def _body_text(msg) -> str: parts = [] if msg.is_multipart(): for p in msg.walk(): if p.get_content_type() == "text/plain": try: parts.append(p.get_payload(decode=True).decode(errors="replace")) except Exception: pass else: try: parts.append(msg.get_payload(decode=True).decode(errors="replace")) except Exception: pass return "\n".join(parts) def poll_irp_invoices() -> int: """Scan the IRP filings mailbox for state invoice replies, bill the exact fee, and alert the operator. Returns the number of invoices processed.""" if not IMAP_PASS: LOG.warning("[irp-poll] No IMAP password configured (IRP_FILINGS_IMAP_PASS / OPS_IMAP_PASS) — skipping") return 0 try: from scripts.workers.services.gov_fee import ( GovFeeEstimate, create_gov_fee_order, send_gov_fee_payment_email, ) except Exception as exc: # noqa: BLE001 LOG.error("[irp-poll] gov_fee import failed: %s", exc) return 0 processed = 0 try: m = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT, timeout=30) m.login(IMAP_USER, IMAP_PASS) m.select(IMAP_FOLDER) # Unseen mail only. On a shared mailbox, restrict to our subject tags. if DEDICATED_MAILBOX: typ, data = m.search(None, "UNSEEN") ids = data[0].split() if data and data[0] else [] else: ids = [] seen_ids = set() for tagword in ("PW-IRP", "PW-ISA"): typ, data = m.search(None, "UNSEEN", "SUBJECT", tagword) for i in (data[0].split() if data and data[0] else []): if i not in seen_ids: seen_ids.add(i) ids.append(i) for mid in ids: typ, md = m.fetch(mid, "(RFC822)") if typ != "OK" or not md or not md[0]: continue msg = email.message_from_bytes(md[0][1]) subject = str(email.header.make_header(email.header.decode_header(msg.get("Subject", "")))) tag = TAG_RE.search(subject) if not tag: # Not a state-agency reply we can match; leave unseen on shared mailbox. if not DEDICATED_MAILBOX: m.store(mid, "-FLAGS", "(\\Seen)") continue tag_kind = tag.group(1).upper() parent_order = tag.group(2).upper() slug = TAG_SLUG.get(tag_kind, "irp-registration") kind_label = "IRP" if tag_kind == "IRP" else "Intrastate authority" body = _body_text(msg) fee_cents = _parse_fee_cents(subject + "\n" + body) sender = msg.get("From", "") if not fee_cents: # Could not parse a fee — alert the operator to read it manually. send_telegram( f"📬 {kind_label} reply received (no fee auto-parsed)\n" f"Order: {parent_order}\nFrom: {sender}\n" f"Subject: {subject}\nOpen the {IMAP_USER} mailbox to review + enter the fee." ) m.store(mid, "+FLAGS", "(\\Seen)") processed += 1 continue ok = _bill_parent_fee(parent_order, slug, kind_label, fee_cents, sender, create_gov_fee_order, send_gov_fee_payment_email, GovFeeEstimate) send_telegram( f"💵 {kind_label} invoice received → customer billed\n" f"Order: {parent_order}\nState fee: ${fee_cents/100:,.2f}\n" f"From: {sender}\n" + ("Payment link emailed to the customer." if ok else "⚠️ Could not auto-bill — check logs.") ) m.store(mid, "+FLAGS", "(\\Seen)") processed += 1 m.logout() except Exception as exc: # noqa: BLE001 LOG.error("[irp-poll] IMAP error: %s", exc) LOG.info("[irp-poll] processed %s state-agency invoice reply(ies)", processed) return processed def _bill_parent_fee(parent_order, slug, kind_label, fee_cents, sender, create_gov_fee_order, send_gov_fee_payment_email, GovFeeEstimate) -> bool: """Create the exact-amount gov-fee child + email the customer the payment link.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) with conn.cursor() as cur: cur.execute( "SELECT customer_email, customer_name, customer_phone, service_name " "FROM compliance_orders WHERE order_number = %s", (parent_order,), ) row = cur.fetchone() conn.close() except Exception as exc: # noqa: BLE001 LOG.error("[irp-poll] DB lookup failed for %s: %s", parent_order, exc) return False if not row: LOG.warning("[irp-poll] No parent order %s for %s invoice", parent_order, kind_label) return False customer_email, customer_name, customer_phone, service_name = row est = GovFeeEstimate( cents=fee_cents, label=f"{kind_label} fee (state invoice) — {sender}", exact=True, breakdown=[f"State {kind_label} invoice: ${fee_cents/100:,.2f}"], ) child = create_gov_fee_order(parent_order, slug, est, customer_email, customer_name or "", customer_phone or "") if not child: return False send_gov_fee_payment_email(customer_email, customer_name or "", service_name or kind_label, customer_name or "", est, child) return True if __name__ == "__main__": logging.basicConfig(level=logging.INFO) print("processed:", poll_irp_invoices())