- gov_fee: add AGENCY_PROCESSING_FEE (per-service card/convenience fee passed through so the customer pays the true all-in cost); estimate_gov_fee now folds it into the billed total. IFTA/intrastate/UCR fees are published/near-exact. - IRP fees can't be looked up — only the base state computes them. New irp_filing.py: emails the base-state IRP unit a Schedule A/B request (Reply-To the IRP filings mailbox, [PW-IRP CO-...] subject tag), and a 15-min cron (irp_invoice_poller) scans the mailbox for the state's invoice reply, parses the exact apportioned fee, Telegram-alerts you, and bills the customer the EXACT amount via a gov-fee child order + payment link. Then it proceeds to ready_to_file for your final approval. - state_trucking gov-fee gate now routes IRP to the email/invoice path and IFTA/intrastate to immediate exact-fee billing. - Mailbox is configurable (IRP_FILINGS_IMAP_* in app.env.j2); falls back to OPS_IMAP_* filtered by the [PW-IRP] tag until a dedicated mailbox exists. Telegram alerts fire on IRP submission sent, invoice received (billed), and un-parseable replies (so you can read + enter the fee manually).
925 lines
40 KiB
Python
925 lines
40 KiB
Python
"""
|
|
State-Level Trucking Compliance Service Handler.
|
|
|
|
Handles all state-specific motor carrier compliance services:
|
|
- IRP Registration Assistance
|
|
- IFTA Application + Decals
|
|
- IFTA Quarterly Filing
|
|
- Oregon Weight-Mile Tax Setup
|
|
- NY Highway Use Tax Registration
|
|
- KY Weight-Distance Tax Setup
|
|
- NM Weight-Distance Tax Setup
|
|
- CT Highway Use Fee Setup
|
|
- California MCP + CARB Compliance
|
|
- State DOT Registration
|
|
- Intrastate Operating Authority
|
|
- Oversize/Overweight Permit
|
|
- State Compliance Bundle
|
|
|
|
All are admin-assisted: we create a todo with state-specific filing
|
|
instructions. The state_trucking_requirements table (migration 079)
|
|
provides agency names, portal URLs, and requirement details.
|
|
|
|
Pricing: $99-$599 depending on service.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
|
|
from scripts.workers.telegram_notify import notify_fulfillment_todo
|
|
|
|
LOG = logging.getLogger("workers.services.state_trucking")
|
|
|
|
# State motor carrier fulfillment lifecycle (compliance_orders.fulfillment_status,
|
|
# migration 086). These let ops and the customer portal see exactly where a state
|
|
# filing is, independent of payment_status.
|
|
FULFILLMENT_AUTHORIZATION_REQUIRED = "authorization_required"
|
|
FULFILLMENT_AUTHORIZATION_SIGNED = "authorization_signed"
|
|
FULFILLMENT_AWAITING_DELEGATION = "awaiting_customer_delegation"
|
|
FULFILLMENT_AWAITING_CREDENTIALS = "awaiting_secure_credentials"
|
|
FULFILLMENT_AWAITING_FEE_APPROVAL = "awaiting_government_fee_approval"
|
|
FULFILLMENT_AWAITING_INSURANCE = "awaiting_insurance_filing"
|
|
FULFILLMENT_READY_TO_FILE = "ready_to_file"
|
|
FULFILLMENT_FILED_WAITING_STATE = "filed_waiting_state"
|
|
FULFILLMENT_COMPLETED = "completed"
|
|
|
|
# Map service slugs to human-readable names and filing instructions
|
|
SERVICE_INFO = {
|
|
"irp-registration": {
|
|
"name": "IRP Registration Assistance",
|
|
"category": "irp",
|
|
"steps": [
|
|
"1. Verify carrier's base state and operating states",
|
|
"2. Determine vehicle types and weights for apportionment",
|
|
"3. File IRP application through base state's IRP office",
|
|
"4. Submit cab card documentation for each power unit",
|
|
"5. Pay apportioned fees to each jurisdiction",
|
|
"6. Send confirmation + cab cards to client",
|
|
],
|
|
},
|
|
"ifta-application": {
|
|
"name": "IFTA Application + Decals",
|
|
"category": "ifta",
|
|
"steps": [
|
|
"1. Verify carrier's base state for IFTA",
|
|
"2. File IFTA license application through base state",
|
|
"3. Order IFTA decals (2 per qualifying vehicle)",
|
|
"4. Set up quarterly filing schedule",
|
|
"5. Send IFTA license + decals to client",
|
|
],
|
|
},
|
|
"ifta-quarterly": {
|
|
"name": "IFTA Quarterly Filing",
|
|
"category": "ifta",
|
|
"steps": [
|
|
"1. Collect mileage records for the quarter by jurisdiction",
|
|
"2. Collect fuel purchase records for the quarter",
|
|
"3. Calculate net tax/credit per jurisdiction",
|
|
"4. File quarterly return through base state's IFTA portal",
|
|
"5. Pay any taxes due or process credits",
|
|
"6. Send confirmation to client",
|
|
],
|
|
},
|
|
"or-weight-mile-tax": {
|
|
"name": "Oregon Weight-Mile Tax Setup",
|
|
"category": "weight_distance",
|
|
"steps": [
|
|
"1. Register carrier with Oregon DOT for Weight-Mile Tax",
|
|
"2. Set up Oregon Trucking Online (OTO) account",
|
|
"3. File for weight receipt / temporary pass if needed",
|
|
"4. Configure monthly/quarterly reporting schedule",
|
|
"5. File first weight-mile tax report",
|
|
"6. Send account credentials and confirmation to client",
|
|
],
|
|
},
|
|
"ny-hut-registration": {
|
|
"name": "NY Highway Use Tax Registration",
|
|
"category": "weight_distance",
|
|
"steps": [
|
|
"1. Register carrier with NY Department of Tax and Finance",
|
|
"2. File Form TMT-1 (Highway Use Tax Return) registration",
|
|
"3. Obtain NYHUT certificate of registration",
|
|
"4. Set up quarterly filing schedule",
|
|
"5. Send certificate and filing instructions to client",
|
|
],
|
|
},
|
|
"ky-kyu-registration": {
|
|
"name": "KY Weight-Distance Tax Setup",
|
|
"category": "weight_distance",
|
|
"steps": [
|
|
"1. Register carrier with Kentucky Department of Revenue",
|
|
"2. Obtain KYU number",
|
|
"3. Set up quarterly reporting on KY E-file system",
|
|
"4. File first weight-distance tax return",
|
|
"5. Send KYU number and confirmation to client",
|
|
],
|
|
},
|
|
"nm-weight-distance": {
|
|
"name": "NM Weight-Distance Tax Setup",
|
|
"category": "weight_distance",
|
|
"steps": [
|
|
"1. Register carrier with NM Motor Vehicle Division",
|
|
"2. Obtain annual weight-distance tax permit (per vehicle)",
|
|
"3. Set up e-permit account",
|
|
"4. Send permits and confirmation to client",
|
|
],
|
|
},
|
|
"ct-highway-use-fee": {
|
|
"name": "CT Highway Use Fee Setup",
|
|
"category": "weight_distance",
|
|
"steps": [
|
|
"1. Register carrier with CT Department of Revenue Services",
|
|
"2. Set up myconneCT portal account",
|
|
"3. File initial Highway Use Fee registration",
|
|
"4. Set up quarterly filing schedule",
|
|
"5. Send registration confirmation to client",
|
|
],
|
|
},
|
|
"ca-mcp-carb": {
|
|
"name": "California MCP + CARB Compliance",
|
|
"category": "state_permit",
|
|
"steps": [
|
|
"1. Obtain CA Number from California Highway Patrol (CHP)",
|
|
"2. Apply for Motor Carrier Permit (MCP) through CA DMV",
|
|
"3. Verify vehicle fleet meets CARB Truck & Bus Rule requirements",
|
|
"4. File CARB compliance documentation if needed",
|
|
"5. Set up annual MCP renewal reminders",
|
|
"6. Send CA Number, MCP, and CARB compliance status to client",
|
|
],
|
|
},
|
|
"state-dot-registration": {
|
|
"name": "State DOT Registration",
|
|
"category": "state_dot",
|
|
"steps": [
|
|
"1. Determine which state requires separate DOT registration",
|
|
"2. File registration application with state DOT/DMV",
|
|
"3. Submit required documentation (insurance, USDOT, etc.)",
|
|
"4. Obtain state registration number",
|
|
"5. Send registration confirmation to client",
|
|
],
|
|
},
|
|
"intrastate-authority": {
|
|
"name": "Intrastate Operating Authority",
|
|
"category": "intrastate",
|
|
"steps": [
|
|
"1. Determine state-specific authority type (COA, CPCN, etc.)",
|
|
"2. File application with state PUC/PSC/DOT",
|
|
"3. Submit required documentation (insurance, BOC-3, financials)",
|
|
"4. Pay state filing fees",
|
|
"5. Monitor application status",
|
|
"6. Send authority certificate to client when issued",
|
|
],
|
|
},
|
|
"osow-permit": {
|
|
"name": "Oversize/Overweight Permit",
|
|
"category": "osow",
|
|
"steps": [
|
|
"1. Determine permit type (single trip vs annual)",
|
|
"2. Collect load specifications (dimensions, weight, route)",
|
|
"3. File permit application with state DOT",
|
|
"4. Pay permit fees",
|
|
"5. Obtain and verify permit conditions/restrictions",
|
|
"6. Send permit to client",
|
|
],
|
|
},
|
|
"state-trucking-bundle": {
|
|
"name": "State Compliance Bundle",
|
|
"category": "bundle",
|
|
"steps": [
|
|
"1. Review carrier's base state and operating states",
|
|
"2. Identify all state-level obligations (IRP, IFTA, weight tax, permits)",
|
|
"3. File IRP registration through base state",
|
|
"4. File IFTA application through base state",
|
|
"5. Register for any applicable weight-distance taxes",
|
|
"6. Apply for state carrier permits where required",
|
|
"7. Send all registrations and confirmations to client",
|
|
],
|
|
},
|
|
"state-emissions": {
|
|
"name": "State Clean-Truck / Emissions Compliance",
|
|
"category": "emissions",
|
|
"steps": [
|
|
"1. Identify carrier's base/operating states with emissions programs "
|
|
"(NY, CO, MD, NJ, MA, etc. — Advanced Clean Trucks / Clean Truck Check)",
|
|
"2. Review fleet engine model-years against state emissions thresholds",
|
|
"3. Register/report fleet in the applicable state emissions portal",
|
|
"4. File any required compliance certification or fee",
|
|
"5. Set up annual renewal/reporting reminders",
|
|
"6. Send compliance confirmation + next-steps to client",
|
|
],
|
|
},
|
|
}
|
|
|
|
|
|
class StateTruckingHandler:
|
|
"""Handle all state-level trucking compliance orders."""
|
|
|
|
SERVICE_SLUG = "state-trucking"
|
|
SERVICE_NAME = "State Trucking Compliance"
|
|
|
|
# At-cost services that collect the government/state fee from the customer
|
|
# after authorization (separate from our service fee). See gov_fee.py.
|
|
GOV_FEE_SERVICES = frozenset({
|
|
"irp-registration",
|
|
"ifta-application",
|
|
"intrastate-authority",
|
|
})
|
|
|
|
async def process(self, order_data: dict) -> list[str]:
|
|
"""Entry point called by job_server. Delegates to handle()."""
|
|
order_number = order_data.get("order_number", order_data.get("name", ""))
|
|
return self.handle(order_data, order_number)
|
|
|
|
def handle(self, order_data: dict, order_number: str) -> list[str]:
|
|
"""Process a state trucking compliance order."""
|
|
# Resolve the service slug — job_server may store it in order_data,
|
|
# or we can look it up from the DB via order_number.
|
|
service_slug = order_data.get("service_slug", "")
|
|
if not service_slug and order_number:
|
|
service_slug = self._resolve_slug(order_number)
|
|
info = SERVICE_INFO.get(service_slug, {})
|
|
service_name = info.get("name", service_slug)
|
|
|
|
LOG.info("[%s] Processing %s order", order_number, service_name)
|
|
|
|
intake = order_data.get("intake_data") or {}
|
|
if isinstance(intake, str):
|
|
intake = json.loads(intake)
|
|
|
|
dot_number = intake.get("dot_number", "")
|
|
entity_name = intake.get("entity_name", order_data.get("customer_name", ""))
|
|
customer_email = order_data.get("customer_email", "")
|
|
customer_phone = order_data.get("customer_phone", "") or intake.get("phone", "")
|
|
base_state = intake.get("base_state", intake.get("phy_state", ""))
|
|
operating_states = intake.get("operating_states", [])
|
|
|
|
# ── Authorization gate ───────────────────────────────────────────
|
|
# Every state portal filing legally requires the customer's signed
|
|
# "Limited Authorization to File State Motor Carrier Compliance
|
|
# Documents" before we touch a state portal/tax system. On the first
|
|
# run we generate that authorization, email the signing link, and
|
|
# PAUSE. The pipeline resumes here with client_approved=True once the
|
|
# customer signs (handle_esign_completed re-dispatches us).
|
|
client_approved = bool(order_data.get("client_approved"))
|
|
signed_auth_key = None
|
|
if not client_approved:
|
|
requested = self._request_authorization(
|
|
order_number=order_number,
|
|
service_slug=service_slug,
|
|
service_name=service_name,
|
|
entity_name=entity_name,
|
|
customer_email=customer_email,
|
|
dot_number=dot_number,
|
|
mc_number=str(intake.get("mc_number", "")),
|
|
fein_last4=str(intake.get("fein_last4", "")),
|
|
base_state=base_state,
|
|
operating_states=operating_states,
|
|
signer_name=str(intake.get("signer_name", order_data.get("customer_name", ""))),
|
|
signer_title=str(intake.get("signer_title", "")),
|
|
)
|
|
if requested:
|
|
self._set_fulfillment_status(order_number, FULFILLMENT_AUTHORIZATION_REQUIRED)
|
|
LOG.info(
|
|
"[%s] Authorization requested — pipeline PAUSED pending signature",
|
|
order_number,
|
|
)
|
|
return []
|
|
# If we could not request a signature (e.g. no email), fall through
|
|
# and create the admin todo so ops can chase the authorization.
|
|
LOG.warning(
|
|
"[%s] Could not request authorization e-sign — proceeding to admin todo",
|
|
order_number,
|
|
)
|
|
else:
|
|
signed_auth_key = self._signed_authorization_key(order_number)
|
|
self._set_fulfillment_status(order_number, FULFILLMENT_AUTHORIZATION_SIGNED)
|
|
LOG.info("[%s] Authorization signed (%s) — proceeding to filing", order_number, signed_auth_key)
|
|
|
|
# ── Government fee gate ──────────────────────────────────────────────
|
|
# At-cost services (IRP, IFTA, intrastate) collect only our service fee
|
|
# at checkout; the state fee is variable and billed at cost. Once
|
|
# authorization is signed, auto-quote the gov fee, create a child payment
|
|
# order, email the customer a payment link (all methods + surcharges),
|
|
# and HOLD at awaiting_government_fee_approval until they pay. The
|
|
# gov-fee child's payment webhook re-dispatches us with
|
|
# gov_fee_paid=True, at which point we fall through to filing.
|
|
if (service_slug in self.GOV_FEE_SERVICES
|
|
and not order_data.get("gov_fee_paid")
|
|
and not self._gov_fee_settled(order_number)):
|
|
if self._request_gov_fee_payment(order_number, service_slug, service_name,
|
|
entity_name, customer_email, customer_phone, intake):
|
|
self._set_fulfillment_status(order_number, FULFILLMENT_AWAITING_FEE_APPROVAL)
|
|
LOG.info("[%s] Gov fee quoted — held pending customer payment", order_number)
|
|
return []
|
|
LOG.warning("[%s] Could not set up gov-fee payment — proceeding to manual todo", order_number)
|
|
|
|
# Slug-specific intake fields collected by StateTruckingIntakeStep.
|
|
intake_summary = self._summarize_intake(service_slug, intake)
|
|
|
|
# Look up state requirements if we have a base state
|
|
state_reqs = None
|
|
if base_state:
|
|
state_reqs = self._get_state_requirements(base_state)
|
|
|
|
# Build the admin todo
|
|
steps = info.get("steps", ["1. Review order and fulfill manually"])
|
|
|
|
# Enrich steps with state-specific agency info if available
|
|
if state_reqs:
|
|
agency_info = self._get_agency_info(info.get("category", ""), state_reqs)
|
|
if agency_info:
|
|
steps = steps + [f"Agency: {agency_info['agency']}", f"Portal: {agency_info['url']}"]
|
|
|
|
todo_data = {
|
|
"order_number": order_number,
|
|
"service": service_name,
|
|
"service_slug": service_slug,
|
|
"dot_number": dot_number,
|
|
"entity_name": entity_name,
|
|
"customer_email": customer_email,
|
|
"base_state": base_state,
|
|
"operating_states": operating_states,
|
|
"intake_data": intake,
|
|
"intake_summary": intake_summary,
|
|
"state_requirements": state_reqs,
|
|
"steps": steps,
|
|
"signed_authorization_minio_key": signed_auth_key,
|
|
}
|
|
|
|
# Render the slug-specific intake fields into the description.
|
|
intake_lines = ""
|
|
if intake_summary:
|
|
intake_lines = "\nFiling details:\n" + "\n".join(
|
|
f" - {k}: {v}" for k, v in intake_summary.items()
|
|
) + "\n"
|
|
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
todo_title = (
|
|
f"{service_name} — {entity_name} (DOT {dot_number})"
|
|
if dot_number else f"{service_name} — {entity_name}"
|
|
)
|
|
todo_priority = (
|
|
"high" if service_slug in ("ca-mcp-carb", "state-trucking-bundle") else "normal"
|
|
)
|
|
todo_description = (
|
|
f"Service: {service_name}\n"
|
|
f"DOT: {dot_number}\n"
|
|
f"Base state: {base_state}\n"
|
|
f"Operating states: {', '.join(operating_states) if operating_states else 'N/A'}\n"
|
|
f"Customer: {customer_email}\n"
|
|
+ intake_lines +
|
|
f"\nSteps:\n" + "\n".join(steps)
|
|
)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO admin_todos (
|
|
title, category, priority, order_number, service_slug,
|
|
description, data, status
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
|
|
""", (
|
|
todo_title,
|
|
"filing",
|
|
todo_priority,
|
|
order_number,
|
|
service_slug,
|
|
todo_description,
|
|
json.dumps(todo_data),
|
|
))
|
|
conn.commit()
|
|
notify_fulfillment_todo(
|
|
title=todo_title,
|
|
order_number=order_number,
|
|
service_slug=service_slug,
|
|
priority=todo_priority,
|
|
description=todo_description,
|
|
)
|
|
finally:
|
|
conn.close()
|
|
LOG.info("[%s] Admin todo created for %s", order_number, service_name)
|
|
# Authorization is in hand and the filing task is queued for ops.
|
|
self._set_fulfillment_status(order_number, FULFILLMENT_READY_TO_FILE)
|
|
except Exception as exc:
|
|
LOG.error("[%s] Failed to create admin todo: %s", order_number, exc)
|
|
|
|
# Send status email
|
|
self._send_status_email(
|
|
order_number, service_name, entity_name, dot_number, customer_email
|
|
)
|
|
|
|
return []
|
|
|
|
# Map slug -> list of (intake_data key, human label) to surface in the todo.
|
|
_INTAKE_FIELD_MAP = {
|
|
"_common": [
|
|
("power_units", "Power units"),
|
|
("mc_number", "MC/MX/FF #"),
|
|
],
|
|
"irp-ifta": [
|
|
("fuel_type", "Fuel type"),
|
|
("gross_weight_bracket", "Gross weight bracket"),
|
|
],
|
|
"emissions": [
|
|
("ca_number", "CA #"),
|
|
("engine_model_years", "Oldest engine model year"),
|
|
],
|
|
"intrastate": [
|
|
("authority_type", "Authority type"),
|
|
("boc3_on_file", "BOC-3 on file"),
|
|
("insurance_carrier", "Insurance carrier"),
|
|
("insurance_policy", "Insurance policy #"),
|
|
],
|
|
"osow": [
|
|
("load_dimensions", "Load dimensions"),
|
|
("load_weight", "Load weight (lbs)"),
|
|
],
|
|
"hazmat": [
|
|
("hazmat_classes", "Hazmat classes"),
|
|
("bulk_packaging", "Bulk packaging"),
|
|
("small_business", "Small business"),
|
|
],
|
|
}
|
|
|
|
# Which field groups apply to each slug (mirrors the front-end ST_SECTIONS).
|
|
_SLUG_FIELD_GROUPS = {
|
|
"irp-registration": ["irp-ifta"],
|
|
"ifta-application": ["irp-ifta"],
|
|
"ifta-quarterly": ["irp-ifta"],
|
|
"or-weight-mile-tax": ["irp-ifta"],
|
|
"ny-hut-registration": ["irp-ifta"],
|
|
"ky-kyu-registration": ["irp-ifta"],
|
|
"nm-weight-distance": ["irp-ifta"],
|
|
"ct-highway-use-fee": ["irp-ifta"],
|
|
"ca-mcp-carb": ["emissions"],
|
|
"state-emissions": ["emissions"],
|
|
"state-dot-registration": [],
|
|
"intrastate-authority": ["intrastate"],
|
|
"osow-permit": ["osow"],
|
|
"state-trucking-bundle": ["irp-ifta", "emissions", "intrastate"],
|
|
"hazmat-phmsa": ["hazmat"],
|
|
}
|
|
|
|
def _summarize_intake(self, service_slug: str, intake: dict) -> dict:
|
|
"""Extract the slug-relevant intake fields into a flat label->value dict."""
|
|
groups = ["_common"] + self._SLUG_FIELD_GROUPS.get(service_slug, [])
|
|
summary: dict = {}
|
|
for group in groups:
|
|
for key, label in self._INTAKE_FIELD_MAP.get(group, []):
|
|
val = intake.get(key)
|
|
if val in (None, "", [], {}):
|
|
continue
|
|
if isinstance(val, (list, tuple)):
|
|
val = ", ".join(str(v) for v in val)
|
|
summary[label] = val
|
|
return summary
|
|
|
|
def _resolve_slug(self, order_number: str) -> str:
|
|
"""Look up the service_slug from compliance_orders by order_number."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT service_slug FROM compliance_orders WHERE order_number = %s",
|
|
(order_number,),
|
|
)
|
|
row = cur.fetchone()
|
|
return row[0] if row else ""
|
|
finally:
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("Could not resolve slug for %s: %s", order_number, exc)
|
|
return ""
|
|
|
|
def _get_state_requirements(self, state_code: str) -> dict | None:
|
|
"""Fetch state trucking requirements from database."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * FROM state_trucking_requirements WHERE state_code = %s",
|
|
(state_code.upper(),),
|
|
)
|
|
cols = [d[0] for d in cur.description]
|
|
row = cur.fetchone()
|
|
if row:
|
|
return dict(zip(cols, row))
|
|
finally:
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("Could not fetch state requirements for %s: %s", state_code, exc)
|
|
return None
|
|
|
|
def _get_agency_info(self, category: str, reqs: dict) -> dict | None:
|
|
"""Extract the relevant agency name and URL for a service category."""
|
|
mapping = {
|
|
"irp": ("irp_agency", "irp_url"),
|
|
"ifta": ("ifta_agency", "ifta_url"),
|
|
"weight_distance": ("weight_distance_agency", "weight_distance_url"),
|
|
"state_permit": ("state_carrier_permit_agency", "state_carrier_permit_url"),
|
|
"state_dot": ("state_dot_agency", "state_dot_url"),
|
|
"intrastate": ("intrastate_authority_agency", "intrastate_authority_url"),
|
|
}
|
|
keys = mapping.get(category)
|
|
if keys and reqs.get(keys[0]):
|
|
return {"agency": reqs[keys[0]], "url": reqs.get(keys[1], "")}
|
|
return None
|
|
|
|
# ── Authorization (signed "Limited Authorization to File") ──────────
|
|
|
|
# document_type used in esign_records for the state-trucking authorization.
|
|
AUTH_DOCUMENT_TYPE = "state-trucking-authorization"
|
|
|
|
def _request_authorization(
|
|
self,
|
|
*,
|
|
order_number: str,
|
|
service_slug: str,
|
|
service_name: str,
|
|
entity_name: str,
|
|
customer_email: str,
|
|
dot_number: str = "",
|
|
mc_number: str = "",
|
|
fein_last4: str = "",
|
|
base_state: str = "",
|
|
operating_states: list | None = None,
|
|
signer_name: str = "",
|
|
signer_title: str = "",
|
|
) -> bool:
|
|
"""Generate + upload the authorization PDF and email the signing link.
|
|
|
|
Returns True if a signing request was created (pipeline should pause),
|
|
False if it could not be requested (e.g. no customer email) so the
|
|
caller can fall back to an admin todo.
|
|
"""
|
|
if not customer_email:
|
|
return False
|
|
|
|
# If a record already exists (pending or signed), don't duplicate.
|
|
existing = self._authorization_status(order_number)
|
|
if existing == "signed":
|
|
return False # already signed — proceed to filing
|
|
# (pending records are upserted/refreshed below)
|
|
|
|
try:
|
|
from .state_trucking_authorization import build_state_trucking_authorization
|
|
from .signature_stamper import signature_box # noqa: F401 (ensures module import)
|
|
except Exception as exc:
|
|
LOG.error("[%s] Authorization generator unavailable: %s", order_number, exc)
|
|
return False
|
|
|
|
try:
|
|
pdf_bytes, anchors = build_state_trucking_authorization(
|
|
order_number=order_number,
|
|
entity_name=entity_name,
|
|
service_name=service_name,
|
|
dot_number=dot_number,
|
|
mc_number=mc_number,
|
|
fein_last4=fein_last4,
|
|
base_state=base_state,
|
|
operating_states=operating_states or [],
|
|
signer_name=signer_name,
|
|
signer_title=signer_title,
|
|
)
|
|
except Exception as exc:
|
|
LOG.error("[%s] Failed to render authorization PDF: %s", order_number, exc)
|
|
return False
|
|
|
|
# Upload the unsigned authorization to MinIO.
|
|
document_key = f"compliance/{order_number}/state_trucking_authorization.pdf"
|
|
try:
|
|
import tempfile
|
|
try:
|
|
from scripts.document_gen.minio_client import MinioStorage
|
|
except ImportError:
|
|
from document_gen.minio_client import MinioStorage # type: ignore
|
|
storage = MinioStorage()
|
|
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf:
|
|
tf.write(pdf_bytes)
|
|
tf.flush()
|
|
storage.upload(tf.name, document_key, content_type="application/pdf")
|
|
except Exception as exc:
|
|
LOG.error("[%s] Failed to upload authorization PDF: %s", order_number, exc)
|
|
return False
|
|
|
|
# Create the esign record (with the signature anchors so the stamper can
|
|
# place the signature on the line) and email the signing link.
|
|
try:
|
|
import json
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO esign_records (
|
|
order_number, document_type, document_title, entity_name,
|
|
document_minio_key, document_metadata, signature_anchors,
|
|
requires_perjury, status, expires_at
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, FALSE, 'pending',
|
|
NOW() + INTERVAL '14 days')
|
|
ON CONFLICT (order_number, document_type)
|
|
WHERE status IN ('pending', 'signed')
|
|
DO UPDATE SET
|
|
document_title = EXCLUDED.document_title,
|
|
entity_name = EXCLUDED.entity_name,
|
|
document_minio_key = EXCLUDED.document_minio_key,
|
|
document_metadata = EXCLUDED.document_metadata,
|
|
signature_anchors = EXCLUDED.signature_anchors,
|
|
expires_at = EXCLUDED.expires_at,
|
|
updated_at = NOW()
|
|
""",
|
|
(
|
|
order_number,
|
|
self.AUTH_DOCUMENT_TYPE,
|
|
f"Authorization to File: {service_name}",
|
|
entity_name,
|
|
document_key,
|
|
json.dumps({"service_slug": service_slug, "dot_number": dot_number}),
|
|
json.dumps(anchors),
|
|
),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.error("[%s] Failed to create authorization esign record: %s", order_number, exc)
|
|
return False
|
|
|
|
# Email the signing link (JWT signed with CUSTOMER_JWT_SECRET).
|
|
try:
|
|
self._send_authorization_email(
|
|
order_number=order_number,
|
|
service_name=service_name,
|
|
entity_name=entity_name,
|
|
customer_email=customer_email,
|
|
)
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Authorization email failed (record exists): %s", order_number, exc)
|
|
|
|
return True
|
|
|
|
def _set_fulfillment_status(self, order_number: str, status: str) -> None:
|
|
"""Advance compliance_orders.fulfillment_status (best-effort).
|
|
|
|
``status`` must be one of the FULFILLMENT_* constants. Non-fatal: a
|
|
failure here never blocks the filing pipeline.
|
|
"""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""UPDATE compliance_orders
|
|
SET fulfillment_status = %s, fulfillment_status_at = NOW(),
|
|
updated_at = NOW()
|
|
WHERE order_number = %s""",
|
|
(status, order_number),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Could not set fulfillment_status=%s: %s", order_number, status, exc)
|
|
|
|
def _gov_fee_settled(self, order_number: str) -> bool:
|
|
"""True if a gov-fee child order for this parent is already paid."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT 1 FROM compliance_orders
|
|
WHERE parent_order_number = %s AND payment_status = 'paid'
|
|
LIMIT 1""",
|
|
(order_number,),
|
|
)
|
|
return cur.fetchone() is not None
|
|
finally:
|
|
conn.close()
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.warning("[%s] Could not check gov-fee settlement: %s", order_number, exc)
|
|
return False
|
|
|
|
def _request_gov_fee_payment(self, order_number, service_slug, service_name,
|
|
entity_name, customer_email, customer_phone, intake) -> bool:
|
|
"""Set up government-fee collection after authorization.
|
|
|
|
IRP: the fee is unknown until the base state computes it, so we EMAIL the
|
|
state's IRP unit a submission request and wait for their invoice reply
|
|
(irp_filing.poll_irp_invoices bills the customer the exact amount when it
|
|
arrives). IFTA / intrastate: the fee is published/near-fixed, so we
|
|
estimate it and bill the customer immediately. Returns True if the order
|
|
should hold at awaiting_government_fee_approval."""
|
|
# ── IRP: email the state, wait for the apportioned-fee invoice ────────
|
|
if service_slug == "irp-registration":
|
|
try:
|
|
from scripts.workers.services.irp_filing import send_irp_submission
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.error("[%s] irp_filing import failed: %s", order_number, exc)
|
|
return False
|
|
base_state = (intake.get("base_state") or intake.get("address_state") or "").upper()
|
|
sent = send_irp_submission(order_number, entity_name,
|
|
intake.get("dot_number", ""), base_state, intake)
|
|
if sent:
|
|
try:
|
|
notify_fulfillment_todo(
|
|
title=f"IRP submitted to {base_state} — awaiting fee invoice — {entity_name}",
|
|
order_number=order_number,
|
|
service_slug=service_slug,
|
|
priority="normal",
|
|
description=(f"IRP Schedule A/B emailed to the {base_state} IRP office.\n"
|
|
f"Waiting on their apportioned-fee invoice reply; when it "
|
|
f"arrives we auto-bill the customer the exact amount and "
|
|
f"you'll get a Telegram alert.\nCustomer: {customer_email}"),
|
|
)
|
|
except Exception:
|
|
pass
|
|
return sent
|
|
|
|
# ── IFTA / intrastate: published fee, bill the estimate now ───────────
|
|
try:
|
|
from scripts.workers.services.gov_fee import (
|
|
estimate_gov_fee, create_gov_fee_order, send_gov_fee_payment_email,
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.error("[%s] gov_fee import failed: %s", order_number, exc)
|
|
return False
|
|
|
|
est = estimate_gov_fee(service_slug, intake)
|
|
if est.cents <= 0:
|
|
LOG.info("[%s] No government fee for %s — skipping fee gate", order_number, service_slug)
|
|
return False
|
|
|
|
child = create_gov_fee_order(
|
|
order_number, service_slug, est,
|
|
customer_email, entity_name, customer_phone,
|
|
)
|
|
if not child:
|
|
return False
|
|
|
|
# Notify the customer + ops.
|
|
send_gov_fee_payment_email(customer_email, entity_name, service_name,
|
|
entity_name, est, child)
|
|
try:
|
|
notify_fulfillment_todo(
|
|
title=f"{service_name} — gov fee billed ({'${:,.2f}'.format(est.cents/100)}) — {entity_name}",
|
|
order_number=order_number,
|
|
service_slug=service_slug,
|
|
priority="normal",
|
|
description=(f"Auto-quoted government fee for {service_name}.\n"
|
|
f"Amount: ${est.cents/100:,.2f} ({'exact' if est.exact else 'estimate, at cost'})\n"
|
|
f"Payment order: {child}\n"
|
|
f"Customer: {customer_email}\n"
|
|
f"Held at awaiting_government_fee_approval until paid; then auto-files."),
|
|
)
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
def _authorization_status(self, order_number: str) -> str | None:
|
|
"""Return the current authorization esign status: 'signed', 'pending', or None."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT status FROM esign_records
|
|
WHERE order_number = %s AND document_type = %s
|
|
AND status IN ('pending', 'signed')
|
|
ORDER BY created_at DESC LIMIT 1""",
|
|
(order_number, self.AUTH_DOCUMENT_TYPE),
|
|
)
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
finally:
|
|
conn.close()
|
|
except Exception:
|
|
return None
|
|
|
|
def _signed_authorization_key(self, order_number: str) -> str | None:
|
|
"""Return the MinIO key of the stamped, signed authorization PDF if available."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT signed_document_minio_key, document_minio_key
|
|
FROM esign_records
|
|
WHERE order_number = %s AND document_type = %s
|
|
AND status = 'signed'
|
|
ORDER BY signed_at DESC NULLS LAST LIMIT 1""",
|
|
(order_number, self.AUTH_DOCUMENT_TYPE),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return None
|
|
return row[0] or row[1]
|
|
finally:
|
|
conn.close()
|
|
except Exception:
|
|
return None
|
|
|
|
def _send_authorization_email(
|
|
self, *, order_number, service_name, entity_name, customer_email
|
|
):
|
|
"""Email the customer a link to review and sign the authorization."""
|
|
import os as _os
|
|
import smtplib
|
|
from datetime import datetime, timedelta, timezone
|
|
from email.mime.text import MIMEText
|
|
|
|
try:
|
|
import jwt as pyjwt
|
|
except ImportError: # pragma: no cover
|
|
import PyJWT as pyjwt # type: ignore
|
|
|
|
secret = _os.environ.get("CUSTOMER_JWT_SECRET", "changeme_long_random_string")
|
|
domain = _os.environ.get("DOMAIN", "performancewest.net")
|
|
token = pyjwt.encode(
|
|
{
|
|
"order_id": order_number,
|
|
"order_type": self.AUTH_DOCUMENT_TYPE,
|
|
"email": customer_email,
|
|
"exp": datetime.now(timezone.utc) + timedelta(days=14),
|
|
},
|
|
secret,
|
|
algorithm="HS256",
|
|
)
|
|
sign_url = f"https://{domain}/portal/esign?token={token}"
|
|
body = (
|
|
f"Hi,\n\n"
|
|
f"To complete your {service_name} order for {entity_name}, we need your "
|
|
f"signed authorization before we can file with the state on your behalf.\n\n"
|
|
f"Please review and sign here:\n{sign_url}\n\n"
|
|
f"This authorization lets Performance West prepare and submit your state "
|
|
f"motor carrier filing, communicate with the agency, and remit government "
|
|
f"fees you provide. Government fees, taxes, card processing fees, decals, "
|
|
f"permits, bonds, and insurance filing costs are pass-through charges and "
|
|
f"may be billed separately if not known at checkout.\n\n"
|
|
f"This link expires in 14 days.\n\n"
|
|
f"Order: {order_number}\n"
|
|
f"Questions? Call (888) 411-0383.\n\n"
|
|
f"Performance West Inc.\nDOT / State Motor Carrier Compliance\n"
|
|
)
|
|
msg = MIMEText(body)
|
|
msg["Subject"] = f"Action Required: Sign Your Authorization — {service_name}"
|
|
msg["From"] = "noreply@performancewest.net"
|
|
msg["To"] = customer_email
|
|
import os as _smtp_os
|
|
with smtplib.SMTP(_smtp_os.getenv("SMTP_HOST", "co.carrierone.com"), int(_smtp_os.getenv("SMTP_PORT", "587")), timeout=30) as s:
|
|
s.starttls()
|
|
_u, _p = _smtp_os.getenv("SMTP_USER", ""), _smtp_os.getenv("SMTP_PASS", "")
|
|
if _u and _p:
|
|
s.login(_u, _p)
|
|
s.sendmail(msg["From"], [customer_email], msg.as_string())
|
|
|
|
def _send_status_email(self, order_number, service_name, entity_name, dot_number, customer_email):
|
|
"""Send client a status email."""
|
|
try:
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
|
|
dot_line = f" (DOT# {dot_number})" if dot_number else ""
|
|
body = (
|
|
f"Hi,\n\n"
|
|
f"We've received your {service_name} order for "
|
|
f"{entity_name}{dot_line}.\n\n"
|
|
f"Order: {order_number}\n\n"
|
|
f"Our team is reviewing your information and will complete "
|
|
f"the filing typically within 1-2 business days. We'll send "
|
|
f"you a confirmation email when everything is done.\n\n"
|
|
f"Questions? Reply to this email or call (888) 411-0383.\n\n"
|
|
f"Performance West Inc.\n"
|
|
f"DOT Compliance Services\n"
|
|
)
|
|
|
|
msg = MIMEText(body)
|
|
msg["Subject"] = f"{service_name} In Progress — {entity_name}{dot_line}"
|
|
msg["From"] = "noreply@performancewest.net"
|
|
msg["To"] = customer_email
|
|
|
|
import os as _smtp_os
|
|
with smtplib.SMTP(_smtp_os.getenv("SMTP_HOST", "co.carrierone.com"), int(_smtp_os.getenv("SMTP_PORT", "587")), timeout=30) as s:
|
|
s.starttls()
|
|
_u, _p = _smtp_os.getenv("SMTP_USER", ""), _smtp_os.getenv("SMTP_PASS", "")
|
|
if _u and _p:
|
|
s.login(_u, _p)
|
|
s.sendmail(msg["From"], [customer_email], msg.as_string())
|
|
|
|
LOG.info("[%s] Status email sent to %s", order_number, customer_email)
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Failed to send status email: %s", order_number, exc)
|