Intrastate operating authority is state-specific + application-based like IRP, so
it reuses the same email/POA + invoice-reconciliation flow:
- intrastate_filing.send_intrastate_submission: emails the state PSC/PUC the
authority application with the signed POA attached (subject tag [PW-ISA CO-..]),
reusing irp_filing's MinIO download + census enrich helpers.
- The shared poller (irp_invoice_poller) now matches BOTH [PW-IRP] and [PW-ISA]
tags, parses the fee, Telegram-alerts, and bills the customer the exact amount
with the correct service slug.
- state_trucking gov-fee gate routes intrastate-authority to the PSC/PUC email
path; if no submission email is configured for the base state it falls back
to a manual todo (safe default — no emailing guessed agency addresses).
Per-state ISA_<ST>_EMAIL env (blank until the exact agency address is verified).
SC/GA/TX scaffolded. Customer still only sees an exact-fee payment link; you only
approve the final filing.
372 lines
16 KiB
Python
372 lines
16 KiB
Python
"""IRP correspondence + invoice reconciliation.
|
|
|
|
IRP apportioned registration fees can only be computed by the base-state IRP
|
|
office from the carrier's fleet (Schedule A/B): per-jurisdiction mileage x
|
|
registered weight x each jurisdiction's per-mile rate. There is no pre-lookup.
|
|
So for IRP we use the email/POA path (most convenient for the carrier):
|
|
|
|
send_irp_submission(...)
|
|
Email the base-state IRP unit a Schedule A/B submission request (with the
|
|
signed POA reference), Reply-To the dedicated IRP filings mailbox, tagged
|
|
with the order number so replies are matchable.
|
|
|
|
poll_irp_invoices() (cron, every ~15 min)
|
|
Scan the IRP filings mailbox for state replies, match them to a parent
|
|
order by the [PW-IRP CO-XXXX] subject tag, parse the apportioned fee, fire
|
|
a Telegram alert to the operator, and bill the customer the EXACT amount
|
|
via a gov-fee child order + payment link.
|
|
|
|
Mailbox config (env), so this activates the moment a clean mailbox exists:
|
|
IRP_FILINGS_IMAP_HOST / _PORT / _USER / _PASS / _FOLDER
|
|
IRP_FILINGS_FROM the address state replies come back to (Reply-To)
|
|
Falls back to the OPS_IMAP_* mailbox when the dedicated one isn't configured;
|
|
in that case it filters by the [PW-IRP ...] subject tag so it ignores unrelated
|
|
mail.
|
|
|
|
State IRP submission contacts (email/fax) per base state. Sourced from each
|
|
state's Motor Carrier Services / IRP page. Add states as we confirm them.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import email
|
|
import imaplib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import smtplib
|
|
from datetime import datetime, timezone
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
|
|
from scripts.workers.telegram_notify import send_telegram
|
|
|
|
LOG = logging.getLogger("workers.services.irp_filing")
|
|
|
|
# ── Mailbox config ───────────────────────────────────────────────────────────
|
|
IMAP_HOST = os.getenv("IRP_FILINGS_IMAP_HOST") or os.getenv("OPS_IMAP_HOST", "mail.performancewest.net")
|
|
IMAP_PORT = int(os.getenv("IRP_FILINGS_IMAP_PORT") or os.getenv("OPS_IMAP_PORT", "993"))
|
|
IMAP_USER = os.getenv("IRP_FILINGS_IMAP_USER") or os.getenv("OPS_IMAP_USER", "ops@performancewest.net")
|
|
IMAP_PASS = os.getenv("IRP_FILINGS_IMAP_PASS") or os.getenv("OPS_IMAP_PASS", "")
|
|
IMAP_FOLDER = os.getenv("IRP_FILINGS_IMAP_FOLDER", "INBOX")
|
|
# Address the state replies to. Defaults to the IMAP user so replies land in the
|
|
# mailbox we monitor.
|
|
FILINGS_FROM = os.getenv("IRP_FILINGS_FROM") or IMAP_USER
|
|
# True when a DEDICATED mailbox is configured (so we can process all unseen mail);
|
|
# when falling back to a shared mailbox we ONLY touch [PW-IRP ...]-tagged mail.
|
|
DEDICATED_MAILBOX = bool(os.getenv("IRP_FILINGS_IMAP_USER"))
|
|
|
|
SMTP_HOST = os.getenv("SMTP_HOST", "co.carrierone.com")
|
|
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
|
|
SMTP_USER = os.getenv("SMTP_USER", "noreply@performancewest.net")
|
|
SMTP_PASS = os.getenv("SMTP_PASS", "")
|
|
|
|
# Base-state IRP submission contacts. email + optional fax. Extend as confirmed.
|
|
IRP_STATE_CONTACTS = {
|
|
"SC": {"agency": "SC DMV Motor Carrier Services",
|
|
"email": os.getenv("IRP_SC_EMAIL", "MCS@scdmv.net"),
|
|
"fax": "803-896-3713"},
|
|
# NC, GA, FL, TX, etc. — add as confirmed.
|
|
}
|
|
|
|
SUBJECT_TAG = "PW-IRP" # [PW-IRP CO-XXXXXXXX] in subject for reply matching
|
|
# Match either state-agency tag: IRP or intrastate authority (ISA). Capture the
|
|
# tag kind + the order number so the poller knows which service to bill.
|
|
TAG_RE = re.compile(r"\[PW-(IRP|ISA)\s+(C[OG]-[A-Z0-9]+)\]", re.I)
|
|
TAG_SLUG = {"IRP": "irp-registration", "ISA": "intrastate-authority"}
|
|
# Fee patterns commonly seen on IRP invoices / replies.
|
|
FEE_RE = [
|
|
re.compile(r"(?:total\s+(?:fees?\s+)?due|amount\s+due|total\s+amount|apportioned\s+fees?)[^$]{0,40}\$\s*([0-9][0-9,]*\.?\d{0,2})", re.I),
|
|
re.compile(r"\$\s*([0-9][0-9,]*\.\d{2})\b"),
|
|
]
|
|
|
|
|
|
def state_irp_contact(base_state: str) -> dict | None:
|
|
return IRP_STATE_CONTACTS.get((base_state or "").upper())
|
|
|
|
|
|
def _enrich_address_from_census(dot_number: str, intake: dict) -> dict:
|
|
"""Fill missing legal_name / address fields from the FMCSA census so the IRP
|
|
submission isn't sent with a blank address. Customer-supplied values win."""
|
|
out = dict(intake or {})
|
|
need = any(not out.get(k) for k in ("legal_name", "address_street", "address_city",
|
|
"address_state", "address_zip"))
|
|
if not need or not dot_number:
|
|
return out
|
|
try:
|
|
from scripts.workers.services.mcs150_update import MCS150UpdateHandler
|
|
census = MCS150UpdateHandler.__new__(MCS150UpdateHandler)._fetch_carrier_record(dot_number)
|
|
for k in ("legal_name", "address_street", "address_city", "address_state",
|
|
"address_zip", "phone"):
|
|
if not out.get(k) and census.get(k):
|
|
out[k] = census[k]
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.warning("[irp] census enrich failed for DOT %s: %s", dot_number, exc)
|
|
return out
|
|
|
|
|
|
def _download_minio(key: str) -> bytes | None:
|
|
"""Fetch an object's bytes from MinIO (the signed POA PDF). None on failure."""
|
|
if not key:
|
|
return None
|
|
try:
|
|
from minio import Minio
|
|
mc = Minio(
|
|
f"{os.getenv('MINIO_ENDPOINT', 'minio')}:{os.getenv('MINIO_PORT', '9000')}",
|
|
access_key=os.getenv("MINIO_ACCESS_KEY", ""),
|
|
secret_key=os.getenv("MINIO_SECRET_KEY", ""),
|
|
secure=os.getenv("MINIO_SECURE", "false").lower() == "true",
|
|
)
|
|
bucket = os.getenv("MINIO_BUCKET", "performancewest")
|
|
resp = mc.get_object(bucket, key)
|
|
data = resp.read()
|
|
resp.close()
|
|
resp.release_conn()
|
|
return data
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.warning("[irp] could not download POA %s: %s", key, exc)
|
|
return None
|
|
|
|
|
|
def send_irp_submission(order_number: str, entity_name: str, dot_number: str,
|
|
base_state: str, intake: dict, signed_auth_key: str = "") -> bool:
|
|
"""Email the base-state IRP unit a Schedule A/B submission for this carrier,
|
|
with the signed POA attached, tagged for reply matching. Returns True if sent.
|
|
|
|
The state will not act on a third-party filing without the signed Power of
|
|
Attorney, so we REQUIRE the POA PDF and attach it; if it's missing we do not
|
|
send (the caller falls back to a manual todo)."""
|
|
contact = state_irp_contact(base_state)
|
|
if not contact or not contact.get("email"):
|
|
LOG.warning("[%s] No IRP submission contact for base state %s", order_number, base_state)
|
|
return False
|
|
|
|
# Attach the signed POA — required by the state to act on our behalf.
|
|
poa_bytes = _download_minio(signed_auth_key)
|
|
if not poa_bytes:
|
|
LOG.warning("[%s] No signed POA available (key=%s) — not emailing IRP office",
|
|
order_number, signed_auth_key or "(none)")
|
|
return False
|
|
|
|
# Backfill legal name + address from the FMCSA census if intake lacks them.
|
|
intake = _enrich_address_from_census(dot_number, intake)
|
|
entity_name = entity_name or intake.get("legal_name", "")
|
|
|
|
op_states = intake.get("operating_states") or []
|
|
if isinstance(op_states, str):
|
|
try:
|
|
op_states = json.loads(op_states)
|
|
except Exception:
|
|
op_states = [s.strip() for s in op_states.split(",") if s.strip()]
|
|
|
|
addr = ", ".join(p for p in [
|
|
intake.get("address_street", ""),
|
|
intake.get("address_city", ""),
|
|
f"{intake.get('address_state','')} {intake.get('address_zip','')}".strip(),
|
|
] if p.strip())
|
|
|
|
subject = f"IRP Apportioned Registration Request — {entity_name} (USDOT {dot_number}) [{SUBJECT_TAG} {order_number}]"
|
|
body = (
|
|
f"To {contact['agency']},\n\n"
|
|
f"On behalf of our client, and under the signed Power of Attorney attached "
|
|
f"to this email, we request IRP apportioned registration for the following "
|
|
f"carrier. Please reply with the computed apportioned fee invoice so we can "
|
|
f"remit payment.\n\n"
|
|
f"Carrier: {entity_name}\n"
|
|
f"USDOT: {dot_number}\n"
|
|
f"MC/MX/FF: {intake.get('mc_number','')}\n"
|
|
f"Base jurisdiction: {base_state}\n"
|
|
f"Power units: {intake.get('power_units','')}\n"
|
|
f"Registered weight bracket: {intake.get('gross_weight_bracket','')}\n"
|
|
f"Operating jurisdictions: {', '.join(op_states) if op_states else base_state}\n"
|
|
f"Legal/registered address: {addr or '(see attached)'}\n\n"
|
|
f"Attached: signed Power of Attorney authorizing Performance West Inc. to "
|
|
f"file and remit fees on the carrier's behalf.\n\n"
|
|
f"Please reply to {FILINGS_FROM} with the apportioned fee total (including any "
|
|
f"processing fees) and any required Schedule A/B forms. Keep the subject "
|
|
f"reference [{SUBJECT_TAG} {order_number}] so we can match your reply.\n\n"
|
|
f"Thank you,\n"
|
|
f"Performance West Inc. — DOT / State Motor Carrier Compliance\n"
|
|
f"(888) 411-0383 · {FILINGS_FROM}\n"
|
|
)
|
|
try:
|
|
from email.mime.application import MIMEApplication
|
|
msg = MIMEMultipart()
|
|
msg["From"] = SMTP_USER
|
|
msg["To"] = contact["email"]
|
|
msg["Reply-To"] = FILINGS_FROM
|
|
msg["Subject"] = subject
|
|
msg.attach(MIMEText(body, "plain"))
|
|
poa = MIMEApplication(poa_bytes, _subtype="pdf")
|
|
poa.add_header("Content-Disposition", "attachment",
|
|
filename=f"POA_{entity_name.replace(' ','_')}_{dot_number}.pdf")
|
|
msg.attach(poa)
|
|
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as s:
|
|
s.starttls()
|
|
if SMTP_USER and SMTP_PASS:
|
|
s.login(SMTP_USER, SMTP_PASS)
|
|
s.sendmail(SMTP_USER, [contact["email"], FILINGS_FROM], msg.as_string())
|
|
LOG.info("[%s] IRP submission emailed to %s (%s) with POA attached",
|
|
order_number, contact["email"], base_state)
|
|
send_telegram(
|
|
f"📤 IRP submission sent (POA attached)\n{entity_name} (DOT {dot_number})\n"
|
|
f"Base state: {base_state} → {contact['email']}\n"
|
|
f"Order: {order_number}\nAwaiting the state's apportioned-fee invoice."
|
|
)
|
|
return True
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.error("[%s] Failed to send IRP submission: %s", order_number, exc)
|
|
return False
|
|
|
|
|
|
def _parse_fee_cents(text: str) -> int | None:
|
|
for pat in FEE_RE:
|
|
m = pat.search(text or "")
|
|
if m:
|
|
try:
|
|
return int(round(float(m.group(1).replace(",", "")) * 100))
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def _body_text(msg) -> str:
|
|
parts = []
|
|
if msg.is_multipart():
|
|
for p in msg.walk():
|
|
if p.get_content_type() == "text/plain":
|
|
try:
|
|
parts.append(p.get_payload(decode=True).decode(errors="replace"))
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
parts.append(msg.get_payload(decode=True).decode(errors="replace"))
|
|
except Exception:
|
|
pass
|
|
return "\n".join(parts)
|
|
|
|
|
|
def poll_irp_invoices() -> int:
|
|
"""Scan the IRP filings mailbox for state invoice replies, bill the exact fee,
|
|
and alert the operator. Returns the number of invoices processed."""
|
|
if not IMAP_PASS:
|
|
LOG.warning("[irp-poll] No IMAP password configured (IRP_FILINGS_IMAP_PASS / OPS_IMAP_PASS) — skipping")
|
|
return 0
|
|
|
|
try:
|
|
from scripts.workers.services.gov_fee import (
|
|
GovFeeEstimate, create_gov_fee_order, send_gov_fee_payment_email,
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.error("[irp-poll] gov_fee import failed: %s", exc)
|
|
return 0
|
|
|
|
processed = 0
|
|
try:
|
|
m = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT, timeout=30)
|
|
m.login(IMAP_USER, IMAP_PASS)
|
|
m.select(IMAP_FOLDER)
|
|
# Unseen mail only. On a shared mailbox, restrict to our subject tags.
|
|
if DEDICATED_MAILBOX:
|
|
typ, data = m.search(None, "UNSEEN")
|
|
ids = data[0].split() if data and data[0] else []
|
|
else:
|
|
ids = []
|
|
seen_ids = set()
|
|
for tagword in ("PW-IRP", "PW-ISA"):
|
|
typ, data = m.search(None, "UNSEEN", "SUBJECT", tagword)
|
|
for i in (data[0].split() if data and data[0] else []):
|
|
if i not in seen_ids:
|
|
seen_ids.add(i)
|
|
ids.append(i)
|
|
for mid in ids:
|
|
typ, md = m.fetch(mid, "(RFC822)")
|
|
if typ != "OK" or not md or not md[0]:
|
|
continue
|
|
msg = email.message_from_bytes(md[0][1])
|
|
subject = str(email.header.make_header(email.header.decode_header(msg.get("Subject", ""))))
|
|
tag = TAG_RE.search(subject)
|
|
if not tag:
|
|
# Not a state-agency reply we can match; leave unseen on shared mailbox.
|
|
if not DEDICATED_MAILBOX:
|
|
m.store(mid, "-FLAGS", "(\\Seen)")
|
|
continue
|
|
tag_kind = tag.group(1).upper()
|
|
parent_order = tag.group(2).upper()
|
|
slug = TAG_SLUG.get(tag_kind, "irp-registration")
|
|
kind_label = "IRP" if tag_kind == "IRP" else "Intrastate authority"
|
|
body = _body_text(msg)
|
|
fee_cents = _parse_fee_cents(subject + "\n" + body)
|
|
sender = msg.get("From", "")
|
|
|
|
if not fee_cents:
|
|
# Could not parse a fee — alert the operator to read it manually.
|
|
send_telegram(
|
|
f"📬 {kind_label} reply received (no fee auto-parsed)\n"
|
|
f"Order: {parent_order}\nFrom: {sender}\n"
|
|
f"Subject: {subject}\nOpen the {IMAP_USER} mailbox to review + enter the fee."
|
|
)
|
|
m.store(mid, "+FLAGS", "(\\Seen)")
|
|
processed += 1
|
|
continue
|
|
|
|
ok = _bill_parent_fee(parent_order, slug, kind_label, fee_cents, sender,
|
|
create_gov_fee_order, send_gov_fee_payment_email, GovFeeEstimate)
|
|
send_telegram(
|
|
f"💵 {kind_label} invoice received → customer billed\n"
|
|
f"Order: {parent_order}\nState fee: ${fee_cents/100:,.2f}\n"
|
|
f"From: {sender}\n"
|
|
+ ("Payment link emailed to the customer." if ok else "⚠️ Could not auto-bill — check logs.")
|
|
)
|
|
m.store(mid, "+FLAGS", "(\\Seen)")
|
|
processed += 1
|
|
m.logout()
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.error("[irp-poll] IMAP error: %s", exc)
|
|
LOG.info("[irp-poll] processed %s state-agency invoice reply(ies)", processed)
|
|
return processed
|
|
|
|
|
|
def _bill_parent_fee(parent_order, slug, kind_label, fee_cents, sender,
|
|
create_gov_fee_order, send_gov_fee_payment_email, GovFeeEstimate) -> bool:
|
|
"""Create the exact-amount gov-fee child + email the customer the payment link."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT customer_email, customer_name, customer_phone, service_name "
|
|
"FROM compliance_orders WHERE order_number = %s",
|
|
(parent_order,),
|
|
)
|
|
row = cur.fetchone()
|
|
conn.close()
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.error("[irp-poll] DB lookup failed for %s: %s", parent_order, exc)
|
|
return False
|
|
if not row:
|
|
LOG.warning("[irp-poll] No parent order %s for %s invoice", parent_order, kind_label)
|
|
return False
|
|
customer_email, customer_name, customer_phone, service_name = row
|
|
|
|
est = GovFeeEstimate(
|
|
cents=fee_cents,
|
|
label=f"{kind_label} fee (state invoice) — {sender}",
|
|
exact=True,
|
|
breakdown=[f"State {kind_label} invoice: ${fee_cents/100:,.2f}"],
|
|
)
|
|
child = create_gov_fee_order(parent_order, slug, est,
|
|
customer_email, customer_name or "", customer_phone or "")
|
|
if not child:
|
|
return False
|
|
send_gov_fee_payment_email(customer_email, customer_name or "",
|
|
service_name or kind_label,
|
|
customer_name or "", est, child)
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO)
|
|
print("processed:", poll_irp_invoices())
|