diff --git a/infra/ansible/roles/app/templates/app.env.j2 b/infra/ansible/roles/app/templates/app.env.j2 index 956b0db..69ecfa6 100644 --- a/infra/ansible/roles/app/templates/app.env.j2 +++ b/infra/ansible/roles/app/templates/app.env.j2 @@ -106,6 +106,19 @@ RELAY_IMAP_FOLDER={{ vault_relay_imap_folder | default('INBOX') }} RELAY_FILING_CARD_ID={{ vault_relay_filing_card_id | default('') }} CRYPTO_FILING_CARD_ID={{ vault_crypto_filing_card_id | default('') }} +# ── IRP filings mailbox (state apportioned-fee invoice replies) ────────────── +# Dedicated mailbox the IRP submission Reply-To points at; the irp-invoice-poller +# cron scans it for state fee invoices and bills customers the exact amount. +# Leave the IMAP_USER blank to fall back to OPS_IMAP_* and filter by the +# [PW-IRP ...] subject tag. +IRP_FILINGS_IMAP_HOST={{ vault_irp_filings_imap_host | default(smtp_host) }} +IRP_FILINGS_IMAP_PORT={{ vault_irp_filings_imap_port | default('993') }} +IRP_FILINGS_IMAP_USER={{ vault_irp_filings_imap_user | default('') }} +IRP_FILINGS_IMAP_PASS={{ vault_irp_filings_imap_pass | default('') }} +IRP_FILINGS_IMAP_FOLDER={{ vault_irp_filings_imap_folder | default('INBOX') }} +IRP_FILINGS_FROM={{ vault_irp_filings_from | default('filings@performancewest.net') }} +IRP_SC_EMAIL={{ vault_irp_sc_email | default('MCS@scdmv.net') }} + # ── Porkbun (.ca domain registration) ──────────────────────────────────────── PORKBUN_API_KEY={{ vault_porkbun_api_key | default('') }} PORKBUN_SECRET_KEY={{ vault_porkbun_secret_key | default('') }} diff --git a/infra/ansible/roles/worker-crons/defaults/main.yml b/infra/ansible/roles/worker-crons/defaults/main.yml index 1b2ba33..755eeed 100644 --- a/infra/ansible/roles/worker-crons/defaults/main.yml +++ b/infra/ansible/roles/worker-crons/defaults/main.yml @@ -220,6 +220,16 @@ worker_crons: on_calendar: "*-*-* 16:00:00 UTC" persistent: true + # IRP apportioned-fee invoice poller — every 15 min. Scans the IRP filings + # mailbox for state replies to our IRP submissions, parses the apportioned fee, + # bills the customer the EXACT amount (gov-fee child order + payment link), and + # Telegram-alerts the operator. See scripts/workers/services/irp_filing.py. + - name: pw-irp-invoice-poller + description: Poll IRP filings mailbox for state fee invoices and bill customers + module: scripts.workers.irp_invoice_poller + on_calendar: "*-*-* *:00/15:00 UTC" + persistent: true + # Daily paper-filing batch (Standard no-login CMS filing path) — weekday # mornings 13:30 UTC (08:30 CT). Groups all signed, not-yet-mailed CMS filings # by destination agency (provider's MAC; NPI Enumerator in Fargo for NPPES) diff --git a/scripts/workers/irp_invoice_poller.py b/scripts/workers/irp_invoice_poller.py new file mode 100644 index 0000000..65ea8fb --- /dev/null +++ b/scripts/workers/irp_invoice_poller.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +"""Cron entrypoint: poll the IRP filings mailbox for state apportioned-fee +invoice replies, bill the customer the exact amount, and Telegram-alert the +operator. See scripts/workers/services/irp_filing.py. + +Runs every ~15 minutes (worker-crons role). +""" +from __future__ import annotations + +import logging +import sys + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", +) + + +def main() -> int: + from scripts.workers.services.irp_filing import poll_irp_invoices + n = poll_irp_invoices() + logging.getLogger("workers.irp_invoice_poller").info("Done. Processed %s invoice(s).", n) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/workers/services/gov_fee.py b/scripts/workers/services/gov_fee.py index e04c868..721fdae 100644 --- a/scripts/workers/services/gov_fee.py +++ b/scripts/workers/services/gov_fee.py @@ -66,6 +66,25 @@ IRP_WEIGHT_MULTIPLIERS = { "_default": 1.0, } +# Agency / government payment-processing (card convenience) fees. Many state +# portals add a card convenience fee on top of the statutory fee; we pass it +# through so the customer pays the true all-in cost. Modeled as a percentage of +# the government fee plus an optional flat add-on, per service. Refine per state +# as confirmed. (pct_of_gov_fee, flat_cents) +AGENCY_PROCESSING_FEE = { + "irp-registration": (0.0, 0), # SC IRP invoice states the all-in total; no extra + "ifta-application": (0.0, 0), # decals billed at cost; no card fee on a tiny amount + "intrastate-authority": (0.0, 0), # filing fee paid by check/ACH; no card fee + "ucr-registration": (0.0, 0), # ucr.gov includes no separate convenience fee + "_default": (0.0, 0), +} + + +def agency_processing_fee_cents(slug: str, gov_fee_cents: int) -> int: + """Government/agency card-convenience fee passed through to the customer.""" + pct, flat = AGENCY_PROCESSING_FEE.get(slug, AGENCY_PROCESSING_FEE["_default"]) + return int(round(gov_fee_cents * pct / 100.0)) + flat + @dataclass class GovFeeEstimate: @@ -83,7 +102,19 @@ def _int(v, default=0) -> int: def estimate_gov_fee(slug: str, intake: dict) -> GovFeeEstimate: - """Estimate the government/state fee for an at-cost trucking service.""" + """Estimate the all-in government fee (statutory fee + any agency card/ + convenience processing fee) for an at-cost trucking service.""" + est = _estimate_gov_fee_base(slug, intake) + proc = agency_processing_fee_cents(slug, est.cents) + if proc > 0: + est.breakdown.append(f"Agency processing fee: ${proc/100:.2f}") + est.cents += proc + est.label += f" (incl. ${proc/100:.2f} agency processing fee)" + return est + + +def _estimate_gov_fee_base(slug: str, intake: dict) -> GovFeeEstimate: + """Statutory/base government fee before any agency processing surcharge.""" intake = intake or {} power_units = max(_int(intake.get("power_units"), 1), 1) base_state = (intake.get("base_state") or intake.get("address_state") or "").upper() diff --git a/scripts/workers/services/irp_filing.py b/scripts/workers/services/irp_filing.py new file mode 100644 index 0000000..d704f4f --- /dev/null +++ b/scripts/workers/services/irp_filing.py @@ -0,0 +1,289 @@ +"""IRP correspondence + invoice reconciliation. + +IRP apportioned registration fees can only be computed by the base-state IRP +office from the carrier's fleet (Schedule A/B): per-jurisdiction mileage x +registered weight x each jurisdiction's per-mile rate. There is no pre-lookup. +So for IRP we use the email/POA path (most convenient for the carrier): + + send_irp_submission(...) + Email the base-state IRP unit a Schedule A/B submission request (with the + signed POA reference), Reply-To the dedicated IRP filings mailbox, tagged + with the order number so replies are matchable. + + poll_irp_invoices() (cron, every ~15 min) + Scan the IRP filings mailbox for state replies, match them to a parent + order by the [PW-IRP CO-XXXX] subject tag, parse the apportioned fee, fire + a Telegram alert to the operator, and bill the customer the EXACT amount + via a gov-fee child order + payment link. + +Mailbox config (env), so this activates the moment a clean mailbox exists: + IRP_FILINGS_IMAP_HOST / _PORT / _USER / _PASS / _FOLDER + IRP_FILINGS_FROM the address state replies come back to (Reply-To) +Falls back to the OPS_IMAP_* mailbox when the dedicated one isn't configured; +in that case it filters by the [PW-IRP ...] subject tag so it ignores unrelated +mail. + +State IRP submission contacts (email/fax) per base state. Sourced from each +state's Motor Carrier Services / IRP page. Add states as we confirm them. +""" + +from __future__ import annotations + +import email +import imaplib +import json +import logging +import os +import re +import smtplib +from datetime import datetime, timezone +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +from scripts.workers.telegram_notify import send_telegram + +LOG = logging.getLogger("workers.services.irp_filing") + +# ── Mailbox config ─────────────────────────────────────────────────────────── +IMAP_HOST = os.getenv("IRP_FILINGS_IMAP_HOST") or os.getenv("OPS_IMAP_HOST", "mail.performancewest.net") +IMAP_PORT = int(os.getenv("IRP_FILINGS_IMAP_PORT") or os.getenv("OPS_IMAP_PORT", "993")) +IMAP_USER = os.getenv("IRP_FILINGS_IMAP_USER") or os.getenv("OPS_IMAP_USER", "ops@performancewest.net") +IMAP_PASS = os.getenv("IRP_FILINGS_IMAP_PASS") or os.getenv("OPS_IMAP_PASS", "") +IMAP_FOLDER = os.getenv("IRP_FILINGS_IMAP_FOLDER", "INBOX") +# Address the state replies to. Defaults to the IMAP user so replies land in the +# mailbox we monitor. +FILINGS_FROM = os.getenv("IRP_FILINGS_FROM") or IMAP_USER +# True when a DEDICATED mailbox is configured (so we can process all unseen mail); +# when falling back to a shared mailbox we ONLY touch [PW-IRP ...]-tagged mail. +DEDICATED_MAILBOX = bool(os.getenv("IRP_FILINGS_IMAP_USER")) + +SMTP_HOST = os.getenv("SMTP_HOST", "co.carrierone.com") +SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) +SMTP_USER = os.getenv("SMTP_USER", "noreply@performancewest.net") +SMTP_PASS = os.getenv("SMTP_PASS", "") + +# Base-state IRP submission contacts. email + optional fax. Extend as confirmed. +IRP_STATE_CONTACTS = { + "SC": {"agency": "SC DMV Motor Carrier Services", + "email": os.getenv("IRP_SC_EMAIL", "MCS@scdmv.net"), + "fax": "803-896-3713"}, + # NC, GA, FL, TX, etc. — add as confirmed. +} + +SUBJECT_TAG = "PW-IRP" # [PW-IRP CO-XXXXXXXX] in subject for reply matching +TAG_RE = re.compile(r"\[PW-IRP\s+(C[OG]-[A-Z0-9]+)\]", re.I) +# Fee patterns commonly seen on IRP invoices / replies. +FEE_RE = [ + re.compile(r"(?:total\s+(?:fees?\s+)?due|amount\s+due|total\s+amount|apportioned\s+fees?)[^$]{0,40}\$\s*([0-9][0-9,]*\.?\d{0,2})", re.I), + re.compile(r"\$\s*([0-9][0-9,]*\.\d{2})\b"), +] + + +def state_irp_contact(base_state: str) -> dict | None: + return IRP_STATE_CONTACTS.get((base_state or "").upper()) + + +def send_irp_submission(order_number: str, entity_name: str, dot_number: str, + base_state: str, intake: dict, signed_auth_note: str = "") -> bool: + """Email the base-state IRP unit a Schedule A/B submission for this carrier, + tagged for reply matching. Returns True if sent.""" + contact = state_irp_contact(base_state) + if not contact or not contact.get("email"): + LOG.warning("[%s] No IRP submission contact for base state %s", order_number, base_state) + return False + + op_states = intake.get("operating_states") or [] + if isinstance(op_states, str): + try: + op_states = json.loads(op_states) + except Exception: + op_states = [s.strip() for s in op_states.split(",") if s.strip()] + + subject = f"IRP Apportioned Registration Request — {entity_name} (USDOT {dot_number}) [{SUBJECT_TAG} {order_number}]" + body = ( + f"To {contact['agency']},\n\n" + f"On behalf of our client (signed Power of Attorney on file), we request " + f"IRP apportioned registration for the following carrier and ask that you " + f"reply with the computed apportioned fee invoice so we can remit payment.\n\n" + f"Carrier: {entity_name}\n" + f"USDOT: {dot_number}\n" + f"MC/MX/FF: {intake.get('mc_number','')}\n" + f"Base jurisdiction: {base_state}\n" + f"Power units: {intake.get('power_units','')}\n" + f"Registered weight bracket: {intake.get('gross_weight_bracket','')}\n" + f"Operating jurisdictions: {', '.join(op_states) if op_states else base_state}\n" + f"Legal/registered address: {intake.get('address_street','')}, " + f"{intake.get('address_city','')}, {intake.get('address_state','')} " + f"{intake.get('address_zip','')}\n\n" + f"{signed_auth_note or 'A signed Power of Attorney authorizing Performance West Inc. to file on the carrier behalf is available on request.'}\n\n" + f"Please reply to {FILINGS_FROM} with the apportioned fee total (including any " + f"processing fees) and any required Schedule A/B forms. Keep the subject " + f"reference [{SUBJECT_TAG} {order_number}] so we can match your reply.\n\n" + f"Thank you,\n" + f"Performance West Inc. — DOT / State Motor Carrier Compliance\n" + f"(888) 411-0383 · {FILINGS_FROM}\n" + ) + try: + msg = MIMEMultipart() + msg["From"] = SMTP_USER + msg["To"] = contact["email"] + msg["Reply-To"] = FILINGS_FROM + msg["Subject"] = subject + msg.attach(MIMEText(body, "plain")) + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as s: + s.starttls() + if SMTP_USER and SMTP_PASS: + s.login(SMTP_USER, SMTP_PASS) + s.sendmail(SMTP_USER, [contact["email"], FILINGS_FROM], msg.as_string()) + LOG.info("[%s] IRP submission emailed to %s (%s)", order_number, contact["email"], base_state) + send_telegram( + f"📤 IRP submission sent\n{entity_name} (DOT {dot_number})\n" + f"Base state: {base_state} → {contact['email']}\n" + f"Order: {order_number}\nAwaiting the state's apportioned-fee invoice." + ) + return True + except Exception as exc: # noqa: BLE001 + LOG.error("[%s] Failed to send IRP submission: %s", order_number, exc) + return False + + +def _parse_fee_cents(text: str) -> int | None: + for pat in FEE_RE: + m = pat.search(text or "") + if m: + try: + return int(round(float(m.group(1).replace(",", "")) * 100)) + except ValueError: + continue + return None + + +def _body_text(msg) -> str: + parts = [] + if msg.is_multipart(): + for p in msg.walk(): + if p.get_content_type() == "text/plain": + try: + parts.append(p.get_payload(decode=True).decode(errors="replace")) + except Exception: + pass + else: + try: + parts.append(msg.get_payload(decode=True).decode(errors="replace")) + except Exception: + pass + return "\n".join(parts) + + +def poll_irp_invoices() -> int: + """Scan the IRP filings mailbox for state invoice replies, bill the exact fee, + and alert the operator. Returns the number of invoices processed.""" + if not IMAP_PASS: + LOG.warning("[irp-poll] No IMAP password configured (IRP_FILINGS_IMAP_PASS / OPS_IMAP_PASS) — skipping") + return 0 + + try: + from scripts.workers.services.gov_fee import ( + GovFeeEstimate, create_gov_fee_order, send_gov_fee_payment_email, + ) + except Exception as exc: # noqa: BLE001 + LOG.error("[irp-poll] gov_fee import failed: %s", exc) + return 0 + + processed = 0 + try: + m = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT, timeout=30) + m.login(IMAP_USER, IMAP_PASS) + m.select(IMAP_FOLDER) + # Unseen mail only. On a shared mailbox, restrict to our subject tag. + if DEDICATED_MAILBOX: + typ, data = m.search(None, "UNSEEN") + else: + typ, data = m.search(None, "UNSEEN", "SUBJECT", SUBJECT_TAG) + ids = data[0].split() if data and data[0] else [] + for mid in ids: + typ, md = m.fetch(mid, "(RFC822)") + if typ != "OK" or not md or not md[0]: + continue + msg = email.message_from_bytes(md[0][1]) + subject = str(email.header.make_header(email.header.decode_header(msg.get("Subject", "")))) + tag = TAG_RE.search(subject) + if not tag: + # Not an IRP reply we can match; leave it unseen on shared mailbox. + if not DEDICATED_MAILBOX: + m.store(mid, "-FLAGS", "(\\Seen)") + continue + parent_order = tag.group(1).upper() + body = _body_text(msg) + fee_cents = _parse_fee_cents(subject + "\n" + body) + sender = msg.get("From", "") + + if not fee_cents: + # Could not parse a fee — alert the operator to read it manually. + send_telegram( + f"📬 IRP reply received (no fee auto-parsed)\n" + f"Order: {parent_order}\nFrom: {sender}\n" + f"Subject: {subject}\nOpen the {IMAP_USER} mailbox to review + enter the fee." + ) + m.store(mid, "+FLAGS", "(\\Seen)") + processed += 1 + continue + + ok = _bill_parent_irp_fee(parent_order, fee_cents, sender, + create_gov_fee_order, send_gov_fee_payment_email, GovFeeEstimate) + send_telegram( + f"💵 IRP invoice received → customer billed\n" + f"Order: {parent_order}\nState fee: ${fee_cents/100:,.2f}\n" + f"From: {sender}\n" + + ("Payment link emailed to the customer." if ok else "⚠️ Could not auto-bill — check logs.") + ) + m.store(mid, "+FLAGS", "(\\Seen)") + processed += 1 + m.logout() + except Exception as exc: # noqa: BLE001 + LOG.error("[irp-poll] IMAP error: %s", exc) + LOG.info("[irp-poll] processed %s IRP invoice reply(ies)", processed) + return processed + + +def _bill_parent_irp_fee(parent_order, fee_cents, sender, + create_gov_fee_order, send_gov_fee_payment_email, GovFeeEstimate) -> bool: + """Create the exact-amount gov-fee child + email the customer the payment link.""" + try: + import psycopg2 + conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) + with conn.cursor() as cur: + cur.execute( + "SELECT customer_email, customer_name, customer_phone, service_name " + "FROM compliance_orders WHERE order_number = %s", + (parent_order,), + ) + row = cur.fetchone() + conn.close() + except Exception as exc: # noqa: BLE001 + LOG.error("[irp-poll] DB lookup failed for %s: %s", parent_order, exc) + return False + if not row: + LOG.warning("[irp-poll] No parent order %s for IRP invoice", parent_order) + return False + customer_email, customer_name, customer_phone, service_name = row + + est = GovFeeEstimate( + cents=fee_cents, + label=f"IRP apportioned registration fee (state invoice) — {sender}", + exact=True, + breakdown=[f"State IRP invoice: ${fee_cents/100:,.2f}"], + ) + child = create_gov_fee_order(parent_order, "irp-registration", est, + customer_email, customer_name or "", customer_phone or "") + if not child: + return False + send_gov_fee_payment_email(customer_email, customer_name or "", + service_name or "IRP Registration", + customer_name or "", est, child) + return True + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + print("processed:", poll_irp_invoices()) diff --git a/scripts/workers/services/state_trucking.py b/scripts/workers/services/state_trucking.py index f712ef0..9021106 100644 --- a/scripts/workers/services/state_trucking.py +++ b/scripts/workers/services/state_trucking.py @@ -714,8 +714,41 @@ class StateTruckingHandler: def _request_gov_fee_payment(self, order_number, service_slug, service_name, entity_name, customer_email, customer_phone, intake) -> bool: - """Quote the government fee, create a child payment order, and email the - customer a payment link. Returns True if the customer was asked to pay.""" + """Set up government-fee collection after authorization. + + IRP: the fee is unknown until the base state computes it, so we EMAIL the + state's IRP unit a submission request and wait for their invoice reply + (irp_filing.poll_irp_invoices bills the customer the exact amount when it + arrives). IFTA / intrastate: the fee is published/near-fixed, so we + estimate it and bill the customer immediately. Returns True if the order + should hold at awaiting_government_fee_approval.""" + # ── IRP: email the state, wait for the apportioned-fee invoice ──────── + if service_slug == "irp-registration": + try: + from scripts.workers.services.irp_filing import send_irp_submission + except Exception as exc: # noqa: BLE001 + LOG.error("[%s] irp_filing import failed: %s", order_number, exc) + return False + base_state = (intake.get("base_state") or intake.get("address_state") or "").upper() + sent = send_irp_submission(order_number, entity_name, + intake.get("dot_number", ""), base_state, intake) + if sent: + try: + notify_fulfillment_todo( + title=f"IRP submitted to {base_state} — awaiting fee invoice — {entity_name}", + order_number=order_number, + service_slug=service_slug, + priority="normal", + description=(f"IRP Schedule A/B emailed to the {base_state} IRP office.\n" + f"Waiting on their apportioned-fee invoice reply; when it " + f"arrives we auto-bill the customer the exact amount and " + f"you'll get a Telegram alert.\nCustomer: {customer_email}"), + ) + except Exception: + pass + return sent + + # ── IFTA / intrastate: published fee, bill the estimate now ─────────── try: from scripts.workers.services.gov_fee import ( estimate_gov_fee, create_gov_fee_order, send_gov_fee_payment_email,