new-site/scripts/workers/services/state_trucking.py
justin b85be726b7 feat(fulfillment): bundle/exclusion enforcement + REQUIRED_FIELDS + intake wiring (Phases 1/1.5/2)
- compliance-orders: hazmat-phmsa/state-emissions products, full REQUIRED_FIELDS
  table for all DOT/state/hazmat slugs, BUNDLE_COMPONENTS dedup + MUTUALLY_EXCLUSIVE
  enforcement on /batch (single source of truth, exported)
- checkout: empty ADMIN_ASSISTED_SLUGS (state/hazmat now get intake links)
- services/__init__: register HazmatPHMSAHandler + state-emissions handler
- state_trucking: _summarize_intake admin-todo enrichment
- Wizard: wire StateTruckingIntakeStep + step labels
2026-06-02 03:51:25 -05:00

466 lines
19 KiB
Python

"""
State-Level Trucking Compliance Service Handler.
Handles all state-specific motor carrier compliance services:
- IRP Registration Assistance
- IFTA Application + Decals
- IFTA Quarterly Filing
- Oregon Weight-Mile Tax Setup
- NY Highway Use Tax Registration
- KY Weight-Distance Tax Setup
- NM Weight-Distance Tax Setup
- CT Highway Use Fee Setup
- California MCP + CARB Compliance
- State DOT Registration
- Intrastate Operating Authority
- Oversize/Overweight Permit
- State Compliance Bundle
All are admin-assisted: we create a todo with state-specific filing
instructions. The state_trucking_requirements table (migration 079)
provides agency names, portal URLs, and requirement details.
Pricing: $99-$599 depending on service.
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime
LOG = logging.getLogger("workers.services.state_trucking")
# Map service slugs to human-readable names and filing instructions
SERVICE_INFO = {
"irp-registration": {
"name": "IRP Registration Assistance",
"category": "irp",
"steps": [
"1. Verify carrier's base state and operating states",
"2. Determine vehicle types and weights for apportionment",
"3. File IRP application through base state's IRP office",
"4. Submit cab card documentation for each power unit",
"5. Pay apportioned fees to each jurisdiction",
"6. Send confirmation + cab cards to client",
],
},
"ifta-application": {
"name": "IFTA Application + Decals",
"category": "ifta",
"steps": [
"1. Verify carrier's base state for IFTA",
"2. File IFTA license application through base state",
"3. Order IFTA decals (2 per qualifying vehicle)",
"4. Set up quarterly filing schedule",
"5. Send IFTA license + decals to client",
],
},
"ifta-quarterly": {
"name": "IFTA Quarterly Filing",
"category": "ifta",
"steps": [
"1. Collect mileage records for the quarter by jurisdiction",
"2. Collect fuel purchase records for the quarter",
"3. Calculate net tax/credit per jurisdiction",
"4. File quarterly return through base state's IFTA portal",
"5. Pay any taxes due or process credits",
"6. Send confirmation to client",
],
},
"or-weight-mile-tax": {
"name": "Oregon Weight-Mile Tax Setup",
"category": "weight_distance",
"steps": [
"1. Register carrier with Oregon DOT for Weight-Mile Tax",
"2. Set up Oregon Trucking Online (OTO) account",
"3. File for weight receipt / temporary pass if needed",
"4. Configure monthly/quarterly reporting schedule",
"5. File first weight-mile tax report",
"6. Send account credentials and confirmation to client",
],
},
"ny-hut-registration": {
"name": "NY Highway Use Tax Registration",
"category": "weight_distance",
"steps": [
"1. Register carrier with NY Department of Tax and Finance",
"2. File Form TMT-1 (Highway Use Tax Return) registration",
"3. Obtain NYHUT certificate of registration",
"4. Set up quarterly filing schedule",
"5. Send certificate and filing instructions to client",
],
},
"ky-kyu-registration": {
"name": "KY Weight-Distance Tax Setup",
"category": "weight_distance",
"steps": [
"1. Register carrier with Kentucky Department of Revenue",
"2. Obtain KYU number",
"3. Set up quarterly reporting on KY E-file system",
"4. File first weight-distance tax return",
"5. Send KYU number and confirmation to client",
],
},
"nm-weight-distance": {
"name": "NM Weight-Distance Tax Setup",
"category": "weight_distance",
"steps": [
"1. Register carrier with NM Motor Vehicle Division",
"2. Obtain annual weight-distance tax permit (per vehicle)",
"3. Set up e-permit account",
"4. Send permits and confirmation to client",
],
},
"ct-highway-use-fee": {
"name": "CT Highway Use Fee Setup",
"category": "weight_distance",
"steps": [
"1. Register carrier with CT Department of Revenue Services",
"2. Set up myconneCT portal account",
"3. File initial Highway Use Fee registration",
"4. Set up quarterly filing schedule",
"5. Send registration confirmation to client",
],
},
"ca-mcp-carb": {
"name": "California MCP + CARB Compliance",
"category": "state_permit",
"steps": [
"1. Obtain CA Number from California Highway Patrol (CHP)",
"2. Apply for Motor Carrier Permit (MCP) through CA DMV",
"3. Verify vehicle fleet meets CARB Truck & Bus Rule requirements",
"4. File CARB compliance documentation if needed",
"5. Set up annual MCP renewal reminders",
"6. Send CA Number, MCP, and CARB compliance status to client",
],
},
"state-dot-registration": {
"name": "State DOT Registration",
"category": "state_dot",
"steps": [
"1. Determine which state requires separate DOT registration",
"2. File registration application with state DOT/DMV",
"3. Submit required documentation (insurance, USDOT, etc.)",
"4. Obtain state registration number",
"5. Send registration confirmation to client",
],
},
"intrastate-authority": {
"name": "Intrastate Operating Authority",
"category": "intrastate",
"steps": [
"1. Determine state-specific authority type (COA, CPCN, etc.)",
"2. File application with state PUC/PSC/DOT",
"3. Submit required documentation (insurance, BOC-3, financials)",
"4. Pay state filing fees",
"5. Monitor application status",
"6. Send authority certificate to client when issued",
],
},
"osow-permit": {
"name": "Oversize/Overweight Permit",
"category": "osow",
"steps": [
"1. Determine permit type (single trip vs annual)",
"2. Collect load specifications (dimensions, weight, route)",
"3. File permit application with state DOT",
"4. Pay permit fees",
"5. Obtain and verify permit conditions/restrictions",
"6. Send permit to client",
],
},
"state-trucking-bundle": {
"name": "State Compliance Bundle",
"category": "bundle",
"steps": [
"1. Review carrier's base state and operating states",
"2. Identify all state-level obligations (IRP, IFTA, weight tax, permits)",
"3. File IRP registration through base state",
"4. File IFTA application through base state",
"5. Register for any applicable weight-distance taxes",
"6. Apply for state carrier permits where required",
"7. Send all registrations and confirmations to client",
],
},
"state-emissions": {
"name": "State Clean-Truck / Emissions Compliance",
"category": "emissions",
"steps": [
"1. Identify carrier's base/operating states with emissions programs "
"(NY, CO, MD, NJ, MA, etc. — Advanced Clean Trucks / Clean Truck Check)",
"2. Review fleet engine model-years against state emissions thresholds",
"3. Register/report fleet in the applicable state emissions portal",
"4. File any required compliance certification or fee",
"5. Set up annual renewal/reporting reminders",
"6. Send compliance confirmation + next-steps to client",
],
},
}
class StateTruckingHandler:
"""Handle all state-level trucking compliance orders."""
SERVICE_SLUG = "state-trucking"
SERVICE_NAME = "State Trucking Compliance"
async def process(self, order_data: dict) -> list[str]:
"""Entry point called by job_server. Delegates to handle()."""
order_number = order_data.get("order_number", order_data.get("name", ""))
return self.handle(order_data, order_number)
def handle(self, order_data: dict, order_number: str) -> list[str]:
"""Process a state trucking compliance order."""
# Resolve the service slug — job_server may store it in order_data,
# or we can look it up from the DB via order_number.
service_slug = order_data.get("service_slug", "")
if not service_slug and order_number:
service_slug = self._resolve_slug(order_number)
info = SERVICE_INFO.get(service_slug, {})
service_name = info.get("name", service_slug)
LOG.info("[%s] Processing %s order", order_number, service_name)
intake = order_data.get("intake_data") or {}
if isinstance(intake, str):
intake = json.loads(intake)
dot_number = intake.get("dot_number", "")
entity_name = intake.get("entity_name", order_data.get("customer_name", ""))
customer_email = order_data.get("customer_email", "")
base_state = intake.get("base_state", intake.get("phy_state", ""))
operating_states = intake.get("operating_states", [])
# Slug-specific intake fields collected by StateTruckingIntakeStep.
intake_summary = self._summarize_intake(service_slug, intake)
# Look up state requirements if we have a base state
state_reqs = None
if base_state:
state_reqs = self._get_state_requirements(base_state)
# Build the admin todo
steps = info.get("steps", ["1. Review order and fulfill manually"])
# Enrich steps with state-specific agency info if available
if state_reqs:
agency_info = self._get_agency_info(info.get("category", ""), state_reqs)
if agency_info:
steps = steps + [f"Agency: {agency_info['agency']}", f"Portal: {agency_info['url']}"]
todo_data = {
"order_number": order_number,
"service": service_name,
"service_slug": service_slug,
"dot_number": dot_number,
"entity_name": entity_name,
"customer_email": customer_email,
"base_state": base_state,
"operating_states": operating_states,
"intake_data": intake,
"intake_summary": intake_summary,
"state_requirements": state_reqs,
"steps": steps,
}
# Render the slug-specific intake fields into the description.
intake_lines = ""
if intake_summary:
intake_lines = "\nFiling details:\n" + "\n".join(
f" - {k}: {v}" for k, v in intake_summary.items()
) + "\n"
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO admin_todos (
title, category, priority, order_number, service_slug,
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""", (
f"{service_name}{entity_name} (DOT {dot_number})"
if dot_number else f"{service_name}{entity_name}",
"filing",
"high" if service_slug in ("ca-mcp-carb", "state-trucking-bundle") else "normal",
order_number,
service_slug,
f"Service: {service_name}\n"
f"DOT: {dot_number}\n"
f"Base state: {base_state}\n"
f"Operating states: {', '.join(operating_states) if operating_states else 'N/A'}\n"
f"Customer: {customer_email}\n"
+ intake_lines +
f"\nSteps:\n" + "\n".join(steps),
json.dumps(todo_data),
))
conn.commit()
finally:
conn.close()
LOG.info("[%s] Admin todo created for %s", order_number, service_name)
except Exception as exc:
LOG.error("[%s] Failed to create admin todo: %s", order_number, exc)
# Send status email
self._send_status_email(
order_number, service_name, entity_name, dot_number, customer_email
)
return []
# Map slug -> list of (intake_data key, human label) to surface in the todo.
_INTAKE_FIELD_MAP = {
"_common": [
("power_units", "Power units"),
("mc_number", "MC/MX/FF #"),
],
"irp-ifta": [
("fuel_type", "Fuel type"),
("gross_weight_bracket", "Gross weight bracket"),
],
"emissions": [
("ca_number", "CA #"),
("engine_model_years", "Oldest engine model year"),
],
"intrastate": [
("authority_type", "Authority type"),
("boc3_on_file", "BOC-3 on file"),
("insurance_carrier", "Insurance carrier"),
("insurance_policy", "Insurance policy #"),
],
"osow": [
("load_dimensions", "Load dimensions"),
("load_weight", "Load weight (lbs)"),
],
"hazmat": [
("hazmat_classes", "Hazmat classes"),
("bulk_packaging", "Bulk packaging"),
("small_business", "Small business"),
],
}
# Which field groups apply to each slug (mirrors the front-end ST_SECTIONS).
_SLUG_FIELD_GROUPS = {
"irp-registration": ["irp-ifta"],
"ifta-application": ["irp-ifta"],
"ifta-quarterly": ["irp-ifta"],
"or-weight-mile-tax": ["irp-ifta"],
"ny-hut-registration": ["irp-ifta"],
"ky-kyu-registration": ["irp-ifta"],
"nm-weight-distance": ["irp-ifta"],
"ct-highway-use-fee": ["irp-ifta"],
"ca-mcp-carb": ["emissions"],
"state-emissions": ["emissions"],
"state-dot-registration": [],
"intrastate-authority": ["intrastate"],
"osow-permit": ["osow"],
"state-trucking-bundle": ["irp-ifta", "emissions", "intrastate"],
"hazmat-phmsa": ["hazmat"],
}
def _summarize_intake(self, service_slug: str, intake: dict) -> dict:
"""Extract the slug-relevant intake fields into a flat label->value dict."""
groups = ["_common"] + self._SLUG_FIELD_GROUPS.get(service_slug, [])
summary: dict = {}
for group in groups:
for key, label in self._INTAKE_FIELD_MAP.get(group, []):
val = intake.get(key)
if val in (None, "", [], {}):
continue
if isinstance(val, (list, tuple)):
val = ", ".join(str(v) for v in val)
summary[label] = val
return summary
def _resolve_slug(self, order_number: str) -> str:
"""Look up the service_slug from compliance_orders by order_number."""
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute(
"SELECT service_slug FROM compliance_orders WHERE order_number = %s",
(order_number,),
)
row = cur.fetchone()
return row[0] if row else ""
finally:
conn.close()
except Exception as exc:
LOG.warning("Could not resolve slug for %s: %s", order_number, exc)
return ""
def _get_state_requirements(self, state_code: str) -> dict | None:
"""Fetch state trucking requirements from database."""
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
try:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM state_trucking_requirements WHERE state_code = %s",
(state_code.upper(),),
)
cols = [d[0] for d in cur.description]
row = cur.fetchone()
if row:
return dict(zip(cols, row))
finally:
conn.close()
except Exception as exc:
LOG.warning("Could not fetch state requirements for %s: %s", state_code, exc)
return None
def _get_agency_info(self, category: str, reqs: dict) -> dict | None:
"""Extract the relevant agency name and URL for a service category."""
mapping = {
"irp": ("irp_agency", "irp_url"),
"ifta": ("ifta_agency", "ifta_url"),
"weight_distance": ("weight_distance_agency", "weight_distance_url"),
"state_permit": ("state_carrier_permit_agency", "state_carrier_permit_url"),
"state_dot": ("state_dot_agency", "state_dot_url"),
"intrastate": ("intrastate_authority_agency", "intrastate_authority_url"),
}
keys = mapping.get(category)
if keys and reqs.get(keys[0]):
return {"agency": reqs[keys[0]], "url": reqs.get(keys[1], "")}
return None
def _send_status_email(self, order_number, service_name, entity_name, dot_number, customer_email):
"""Send client a status email."""
if not customer_email:
return
try:
import smtplib
from email.mime.text import MIMEText
dot_line = f" (DOT# {dot_number})" if dot_number else ""
body = (
f"Hi,\n\n"
f"We've received your {service_name} order for "
f"{entity_name}{dot_line}.\n\n"
f"Order: {order_number}\n\n"
f"Our team is reviewing your information and will complete "
f"the filing typically within 1-2 business days. We'll send "
f"you a confirmation email when everything is done.\n\n"
f"Questions? Reply to this email or call (888) 411-0383.\n\n"
f"Performance West Inc.\n"
f"DOT Compliance Services\n"
)
msg = MIMEText(body)
msg["Subject"] = f"{service_name} In Progress — {entity_name}{dot_line}"
msg["From"] = "noreply@performancewest.net"
msg["To"] = customer_email
with smtplib.SMTP("localhost", 25) as s:
s.sendmail(msg["From"], [customer_email], msg.as_string())
LOG.info("[%s] Status email sent to %s", order_number, customer_email)
except Exception as exc:
LOG.warning("[%s] Failed to send status email: %s", order_number, exc)