""" State-Level Trucking Compliance Service Handler. Handles all state-specific motor carrier compliance services: - IRP Registration Assistance - IFTA Application + Decals - IFTA Quarterly Filing - Oregon Weight-Mile Tax Setup - NY Highway Use Tax Registration - KY Weight-Distance Tax Setup - NM Weight-Distance Tax Setup - CT Highway Use Fee Setup - California MCP + CARB Compliance - State DOT Registration - Intrastate Operating Authority - Oversize/Overweight Permit - State Compliance Bundle All are admin-assisted: we create a todo with state-specific filing instructions. The state_trucking_requirements table (migration 079) provides agency names, portal URLs, and requirement details. Pricing: $99-$599 depending on service. """ from __future__ import annotations import json import logging import os from datetime import datetime from scripts.workers.telegram_notify import notify_fulfillment_todo LOG = logging.getLogger("workers.services.state_trucking") # State motor carrier fulfillment lifecycle (compliance_orders.fulfillment_status, # migration 086). These let ops and the customer portal see exactly where a state # filing is, independent of payment_status. FULFILLMENT_AUTHORIZATION_REQUIRED = "authorization_required" FULFILLMENT_AUTHORIZATION_SIGNED = "authorization_signed" FULFILLMENT_AWAITING_DELEGATION = "awaiting_customer_delegation" FULFILLMENT_AWAITING_CREDENTIALS = "awaiting_secure_credentials" FULFILLMENT_AWAITING_FEE_APPROVAL = "awaiting_government_fee_approval" FULFILLMENT_AWAITING_INSURANCE = "awaiting_insurance_filing" FULFILLMENT_READY_TO_FILE = "ready_to_file" FULFILLMENT_FILED_WAITING_STATE = "filed_waiting_state" FULFILLMENT_COMPLETED = "completed" # Map service slugs to human-readable names and filing instructions SERVICE_INFO = { "irp-registration": { "name": "IRP Registration Assistance", "category": "irp", "steps": [ "1. Verify carrier's base state and operating states", "2. Determine vehicle types and weights for apportionment", "3. File IRP application through base state's IRP office", "4. Submit cab card documentation for each power unit", "5. Pay apportioned fees to each jurisdiction", "6. Send confirmation + cab cards to client", ], }, "ifta-application": { "name": "IFTA Application + Decals", "category": "ifta", "steps": [ "1. Verify carrier's base state for IFTA", "2. File IFTA license application through base state", "3. Order IFTA decals (2 per qualifying vehicle)", "4. Set up quarterly filing schedule", "5. Send IFTA license + decals to client", ], }, "ifta-quarterly": { "name": "IFTA Quarterly Filing", "category": "ifta", "steps": [ "1. Collect mileage records for the quarter by jurisdiction", "2. Collect fuel purchase records for the quarter", "3. Calculate net tax/credit per jurisdiction", "4. File quarterly return through base state's IFTA portal", "5. Pay any taxes due or process credits", "6. Send confirmation to client", ], }, "or-weight-mile-tax": { "name": "Oregon Weight-Mile Tax Setup", "category": "weight_distance", "steps": [ "1. Register carrier with Oregon DOT for Weight-Mile Tax", "2. Set up Oregon Trucking Online (OTO) account", "3. File for weight receipt / temporary pass if needed", "4. Configure monthly/quarterly reporting schedule", "5. File first weight-mile tax report", "6. Send account credentials and confirmation to client", ], }, "ny-hut-registration": { "name": "NY Highway Use Tax Registration", "category": "weight_distance", "steps": [ "1. Register carrier with NY Department of Tax and Finance", "2. File Form TMT-1 (Highway Use Tax Return) registration", "3. Obtain NYHUT certificate of registration", "4. Set up quarterly filing schedule", "5. Send certificate and filing instructions to client", ], }, "ky-kyu-registration": { "name": "KY Weight-Distance Tax Setup", "category": "weight_distance", "steps": [ "1. Register carrier with Kentucky Department of Revenue", "2. Obtain KYU number", "3. Set up quarterly reporting on KY E-file system", "4. File first weight-distance tax return", "5. Send KYU number and confirmation to client", ], }, "nm-weight-distance": { "name": "NM Weight-Distance Tax Setup", "category": "weight_distance", "steps": [ "1. Register carrier with NM Motor Vehicle Division", "2. Obtain annual weight-distance tax permit (per vehicle)", "3. Set up e-permit account", "4. Send permits and confirmation to client", ], }, "ct-highway-use-fee": { "name": "CT Highway Use Fee Setup", "category": "weight_distance", "steps": [ "1. Register carrier with CT Department of Revenue Services", "2. Set up myconneCT portal account", "3. File initial Highway Use Fee registration", "4. Set up quarterly filing schedule", "5. Send registration confirmation to client", ], }, "ca-mcp-carb": { "name": "California MCP + CARB Compliance", "category": "state_permit", "steps": [ "1. Obtain CA Number from California Highway Patrol (CHP)", "2. Apply for Motor Carrier Permit (MCP) through CA DMV", "3. Verify vehicle fleet meets CARB Truck & Bus Rule requirements", "4. File CARB compliance documentation if needed", "5. Set up annual MCP renewal reminders", "6. Send CA Number, MCP, and CARB compliance status to client", ], }, "state-dot-registration": { "name": "State DOT Registration", "category": "state_dot", "steps": [ "1. Determine which state requires separate DOT registration", "2. File registration application with state DOT/DMV", "3. Submit required documentation (insurance, USDOT, etc.)", "4. Obtain state registration number", "5. Send registration confirmation to client", ], }, "intrastate-authority": { "name": "Intrastate Operating Authority", "category": "intrastate", "steps": [ "1. Determine state-specific authority type (COA, CPCN, etc.)", "2. File application with state PUC/PSC/DOT", "3. Submit required documentation (insurance, BOC-3, financials)", "4. Pay state filing fees", "5. Monitor application status", "6. Send authority certificate to client when issued", ], }, "osow-permit": { "name": "Oversize/Overweight Permit", "category": "osow", "steps": [ "1. Determine permit type (single trip vs annual)", "2. Collect load specifications (dimensions, weight, route)", "3. File permit application with state DOT", "4. Pay permit fees", "5. Obtain and verify permit conditions/restrictions", "6. Send permit to client", ], }, "state-trucking-bundle": { "name": "State Compliance Bundle", "category": "bundle", "steps": [ "1. Review carrier's base state and operating states", "2. Identify all state-level obligations (IRP, IFTA, weight tax, permits)", "3. File IRP registration through base state", "4. File IFTA application through base state", "5. Register for any applicable weight-distance taxes", "6. Apply for state carrier permits where required", "7. Send all registrations and confirmations to client", ], }, "state-emissions": { "name": "State Clean-Truck / Emissions Compliance", "category": "emissions", "steps": [ "1. Identify carrier's base/operating states with emissions programs " "(NY, CO, MD, NJ, MA, etc. — Advanced Clean Trucks / Clean Truck Check)", "2. Review fleet engine model-years against state emissions thresholds", "3. Register/report fleet in the applicable state emissions portal", "4. File any required compliance certification or fee", "5. Set up annual renewal/reporting reminders", "6. Send compliance confirmation + next-steps to client", ], }, } class StateTruckingHandler: """Handle all state-level trucking compliance orders.""" SERVICE_SLUG = "state-trucking" SERVICE_NAME = "State Trucking Compliance" # At-cost services that collect the government/state fee from the customer # after authorization (separate from our service fee). See gov_fee.py. GOV_FEE_SERVICES = frozenset({ "irp-registration", "ifta-application", "intrastate-authority", }) async def process(self, order_data: dict) -> list[str]: """Entry point called by job_server. Delegates to handle().""" order_number = order_data.get("order_number", order_data.get("name", "")) return self.handle(order_data, order_number) def handle(self, order_data: dict, order_number: str) -> list[str]: """Process a state trucking compliance order.""" # Resolve the service slug — job_server may store it in order_data, # or we can look it up from the DB via order_number. service_slug = order_data.get("service_slug", "") if not service_slug and order_number: service_slug = self._resolve_slug(order_number) info = SERVICE_INFO.get(service_slug, {}) service_name = info.get("name", service_slug) LOG.info("[%s] Processing %s order", order_number, service_name) intake = order_data.get("intake_data") or {} if isinstance(intake, str): intake = json.loads(intake) dot_number = intake.get("dot_number", "") entity_name = intake.get("entity_name", order_data.get("customer_name", "")) customer_email = order_data.get("customer_email", "") customer_phone = order_data.get("customer_phone", "") or intake.get("phone", "") base_state = intake.get("base_state", intake.get("phy_state", "")) operating_states = intake.get("operating_states", []) # ── Authorization gate ─────────────────────────────────────────── # Every state portal filing legally requires the customer's signed # "Limited Authorization to File State Motor Carrier Compliance # Documents" before we touch a state portal/tax system. On the first # run we generate that authorization, email the signing link, and # PAUSE. The pipeline resumes here with client_approved=True once the # customer signs (handle_esign_completed re-dispatches us). client_approved = bool(order_data.get("client_approved")) signed_auth_key = None if not client_approved: requested = self._request_authorization( order_number=order_number, service_slug=service_slug, service_name=service_name, entity_name=entity_name, customer_email=customer_email, dot_number=dot_number, mc_number=str(intake.get("mc_number", "")), fein_last4=str(intake.get("fein_last4", "")), base_state=base_state, operating_states=operating_states, signer_name=str(intake.get("signer_name", order_data.get("customer_name", ""))), signer_title=str(intake.get("signer_title", "")), ) if requested: self._set_fulfillment_status(order_number, FULFILLMENT_AUTHORIZATION_REQUIRED) LOG.info( "[%s] Authorization requested — pipeline PAUSED pending signature", order_number, ) return [] # If we could not request a signature (e.g. no email), fall through # and create the admin todo so ops can chase the authorization. LOG.warning( "[%s] Could not request authorization e-sign — proceeding to admin todo", order_number, ) else: signed_auth_key = self._signed_authorization_key(order_number) self._set_fulfillment_status(order_number, FULFILLMENT_AUTHORIZATION_SIGNED) LOG.info("[%s] Authorization signed (%s) — proceeding to filing", order_number, signed_auth_key) # ── Government fee gate ────────────────────────────────────────────── # At-cost services (IRP, IFTA, intrastate) collect only our service fee # at checkout; the state fee is variable and billed at cost. Once # authorization is signed, auto-quote the gov fee, create a child payment # order, email the customer a payment link (all methods + surcharges), # and HOLD at awaiting_government_fee_approval until they pay. The # gov-fee child's payment webhook re-dispatches us with # gov_fee_paid=True, at which point we fall through to filing. if (service_slug in self.GOV_FEE_SERVICES and not order_data.get("gov_fee_paid") and not self._gov_fee_settled(order_number)): if self._request_gov_fee_payment(order_number, service_slug, service_name, entity_name, customer_email, customer_phone, intake, signed_auth_key): self._set_fulfillment_status(order_number, FULFILLMENT_AWAITING_FEE_APPROVAL) LOG.info("[%s] Gov fee quoted — held pending customer payment", order_number) return [] LOG.warning("[%s] Could not set up gov-fee payment — proceeding to manual todo", order_number) # Slug-specific intake fields collected by StateTruckingIntakeStep. intake_summary = self._summarize_intake(service_slug, intake) # Look up state requirements if we have a base state state_reqs = None if base_state: state_reqs = self._get_state_requirements(base_state) # Build the admin todo steps = info.get("steps", ["1. Review order and fulfill manually"]) # Enrich steps with state-specific agency info if available if state_reqs: agency_info = self._get_agency_info(info.get("category", ""), state_reqs) if agency_info: steps = steps + [f"Agency: {agency_info['agency']}", f"Portal: {agency_info['url']}"] todo_data = { "order_number": order_number, "service": service_name, "service_slug": service_slug, "dot_number": dot_number, "entity_name": entity_name, "customer_email": customer_email, "base_state": base_state, "operating_states": operating_states, "intake_data": intake, "intake_summary": intake_summary, "state_requirements": state_reqs, "steps": steps, "signed_authorization_minio_key": signed_auth_key, } # Render the slug-specific intake fields into the description. intake_lines = "" if intake_summary: intake_lines = "\nFiling details:\n" + "\n".join( f" - {k}: {v}" for k, v in intake_summary.items() ) + "\n" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) todo_title = ( f"{service_name} — {entity_name} (DOT {dot_number})" if dot_number else f"{service_name} — {entity_name}" ) todo_priority = ( "high" if service_slug in ("ca-mcp-carb", "state-trucking-bundle") else "normal" ) todo_description = ( f"Service: {service_name}\n" f"DOT: {dot_number}\n" f"Base state: {base_state}\n" f"Operating states: {', '.join(operating_states) if operating_states else 'N/A'}\n" f"Customer: {customer_email}\n" + intake_lines + f"\nSteps:\n" + "\n".join(steps) ) try: with conn.cursor() as cur: cur.execute(""" INSERT INTO admin_todos ( title, category, priority, order_number, service_slug, description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( todo_title, "filing", todo_priority, order_number, service_slug, todo_description, json.dumps(todo_data, default=str), )) conn.commit() notify_fulfillment_todo( title=todo_title, order_number=order_number, service_slug=service_slug, priority=todo_priority, description=todo_description, ) finally: conn.close() LOG.info("[%s] Admin todo created for %s", order_number, service_name) # Authorization is in hand and the filing task is queued for ops. self._set_fulfillment_status(order_number, FULFILLMENT_READY_TO_FILE) except Exception as exc: LOG.error("[%s] Failed to create admin todo: %s", order_number, exc) # Send status email self._send_status_email( order_number, service_name, entity_name, dot_number, customer_email ) return [] # Map slug -> list of (intake_data key, human label) to surface in the todo. _INTAKE_FIELD_MAP = { "_common": [ ("power_units", "Power units"), ("mc_number", "MC/MX/FF #"), ], "irp-ifta": [ ("fuel_type", "Fuel type"), ("gross_weight_bracket", "Gross weight bracket"), ], "emissions": [ ("ca_number", "CA #"), ("engine_model_years", "Oldest engine model year"), ], "intrastate": [ ("authority_type", "Authority type"), ("boc3_on_file", "BOC-3 on file"), ("insurance_carrier", "Insurance carrier"), ("insurance_policy", "Insurance policy #"), ], "osow": [ ("load_dimensions", "Load dimensions"), ("load_weight", "Load weight (lbs)"), ], "hazmat": [ ("hazmat_classes", "Hazmat classes"), ("bulk_packaging", "Bulk packaging"), ("small_business", "Small business"), ], } # Which field groups apply to each slug (mirrors the front-end ST_SECTIONS). _SLUG_FIELD_GROUPS = { "irp-registration": ["irp-ifta"], "ifta-application": ["irp-ifta"], "ifta-quarterly": ["irp-ifta"], "or-weight-mile-tax": ["irp-ifta"], "ny-hut-registration": ["irp-ifta"], "ky-kyu-registration": ["irp-ifta"], "nm-weight-distance": ["irp-ifta"], "ct-highway-use-fee": ["irp-ifta"], "ca-mcp-carb": ["emissions"], "state-emissions": ["emissions"], "state-dot-registration": [], "intrastate-authority": ["intrastate"], "osow-permit": ["osow"], "state-trucking-bundle": ["irp-ifta", "emissions", "intrastate"], "hazmat-phmsa": ["hazmat"], } def _summarize_intake(self, service_slug: str, intake: dict) -> dict: """Extract the slug-relevant intake fields into a flat label->value dict.""" groups = ["_common"] + self._SLUG_FIELD_GROUPS.get(service_slug, []) summary: dict = {} for group in groups: for key, label in self._INTAKE_FIELD_MAP.get(group, []): val = intake.get(key) if val in (None, "", [], {}): continue if isinstance(val, (list, tuple)): val = ", ".join(str(v) for v in val) summary[label] = val return summary def _resolve_slug(self, order_number: str) -> str: """Look up the service_slug from compliance_orders by order_number.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( "SELECT service_slug FROM compliance_orders WHERE order_number = %s", (order_number,), ) row = cur.fetchone() return row[0] if row else "" finally: conn.close() except Exception as exc: LOG.warning("Could not resolve slug for %s: %s", order_number, exc) return "" def _get_state_requirements(self, state_code: str) -> dict | None: """Fetch state trucking requirements from database.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( "SELECT * FROM state_trucking_requirements WHERE state_code = %s", (state_code.upper(),), ) cols = [d[0] for d in cur.description] row = cur.fetchone() if row: return dict(zip(cols, row)) finally: conn.close() except Exception as exc: LOG.warning("Could not fetch state requirements for %s: %s", state_code, exc) return None def _get_agency_info(self, category: str, reqs: dict) -> dict | None: """Extract the relevant agency name and URL for a service category.""" mapping = { "irp": ("irp_agency", "irp_url"), "ifta": ("ifta_agency", "ifta_url"), "weight_distance": ("weight_distance_agency", "weight_distance_url"), "state_permit": ("state_carrier_permit_agency", "state_carrier_permit_url"), "state_dot": ("state_dot_agency", "state_dot_url"), "intrastate": ("intrastate_authority_agency", "intrastate_authority_url"), } keys = mapping.get(category) if keys and reqs.get(keys[0]): return {"agency": reqs[keys[0]], "url": reqs.get(keys[1], "")} return None # ── Authorization (signed "Limited Authorization to File") ────────── # document_type used in esign_records for the state-trucking authorization. AUTH_DOCUMENT_TYPE = "state-trucking-authorization" def _request_authorization( self, *, order_number: str, service_slug: str, service_name: str, entity_name: str, customer_email: str, dot_number: str = "", mc_number: str = "", fein_last4: str = "", base_state: str = "", operating_states: list | None = None, signer_name: str = "", signer_title: str = "", ) -> bool: """Generate + upload the authorization PDF and email the signing link. Returns True if a signing request was created (pipeline should pause), False if it could not be requested (e.g. no customer email) so the caller can fall back to an admin todo. """ if not customer_email: return False # If a record already exists (pending or signed), don't duplicate. existing = self._authorization_status(order_number) if existing == "signed": return False # already signed — proceed to filing # (pending records are upserted/refreshed below) try: from .state_trucking_authorization import build_state_trucking_authorization from .signature_stamper import signature_box # noqa: F401 (ensures module import) except Exception as exc: LOG.error("[%s] Authorization generator unavailable: %s", order_number, exc) return False try: pdf_bytes, anchors = build_state_trucking_authorization( order_number=order_number, entity_name=entity_name, service_name=service_name, dot_number=dot_number, mc_number=mc_number, fein_last4=fein_last4, base_state=base_state, operating_states=operating_states or [], signer_name=signer_name, signer_title=signer_title, ) except Exception as exc: LOG.error("[%s] Failed to render authorization PDF: %s", order_number, exc) return False # Upload the unsigned authorization to MinIO. document_key = f"compliance/{order_number}/state_trucking_authorization.pdf" try: import tempfile try: from scripts.document_gen.minio_client import MinioStorage except ImportError: from document_gen.minio_client import MinioStorage # type: ignore storage = MinioStorage() with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tf: tf.write(pdf_bytes) tf.flush() storage.upload(tf.name, document_key, content_type="application/pdf") except Exception as exc: LOG.error("[%s] Failed to upload authorization PDF: %s", order_number, exc) return False # Create the esign record (with the signature anchors so the stamper can # place the signature on the line) and email the signing link. try: import json import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( """ INSERT INTO esign_records ( order_number, document_type, document_title, entity_name, document_minio_key, document_metadata, signature_anchors, requires_perjury, status, expires_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, FALSE, 'pending', NOW() + INTERVAL '14 days') ON CONFLICT (order_number, document_type) WHERE status IN ('pending', 'signed') DO UPDATE SET document_title = EXCLUDED.document_title, entity_name = EXCLUDED.entity_name, document_minio_key = EXCLUDED.document_minio_key, document_metadata = EXCLUDED.document_metadata, signature_anchors = EXCLUDED.signature_anchors, expires_at = EXCLUDED.expires_at, updated_at = NOW() """, ( order_number, self.AUTH_DOCUMENT_TYPE, f"Authorization to File: {service_name}", entity_name, document_key, json.dumps({"service_slug": service_slug, "dot_number": dot_number}), json.dumps(anchors), ), ) conn.commit() finally: conn.close() except Exception as exc: LOG.error("[%s] Failed to create authorization esign record: %s", order_number, exc) return False # Email the signing link (JWT signed with CUSTOMER_JWT_SECRET). try: self._send_authorization_email( order_number=order_number, service_name=service_name, entity_name=entity_name, customer_email=customer_email, ) except Exception as exc: LOG.warning("[%s] Authorization email failed (record exists): %s", order_number, exc) return True def _set_fulfillment_status(self, order_number: str, status: str) -> None: """Advance compliance_orders.fulfillment_status (best-effort). ``status`` must be one of the FULFILLMENT_* constants. Non-fatal: a failure here never blocks the filing pipeline. """ try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( """UPDATE compliance_orders SET fulfillment_status = %s, fulfillment_status_at = NOW(), updated_at = NOW() WHERE order_number = %s""", (status, order_number), ) conn.commit() finally: conn.close() except Exception as exc: LOG.warning("[%s] Could not set fulfillment_status=%s: %s", order_number, status, exc) def _gov_fee_settled(self, order_number: str) -> bool: """True if a gov-fee child order for this parent is already paid.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( """SELECT 1 FROM compliance_orders WHERE parent_order_number = %s AND payment_status = 'paid' LIMIT 1""", (order_number,), ) return cur.fetchone() is not None finally: conn.close() except Exception as exc: # noqa: BLE001 LOG.warning("[%s] Could not check gov-fee settlement: %s", order_number, exc) return False def _handle_sc_coc_gate(self, order_number, entity_name, customer_email, customer_phone, intake) -> bool: """SC intrastate Certificate of Compliance insurance gate. SC for-hire property carriers register via the SCDMV COC, which requires their INSURER to file a Form E (liability). We ask the carrier a one-click yes/no first: - no answer yet -> email the question, HOLD (return True) - answered 'no' -> broker referral is in progress, HOLD (return True) - answered 'yes' -> insurance confirmed, PROCEED to bill the $25 COC fee (return False so the caller falls through to fee billing) """ answer = str(intake.get("sc_coc_insurance", "")).lower() if answer == "yes": LOG.info("[%s] SC COC: insurance confirmed — proceeding to $25 COC fee", order_number) return False # proceed to bill the COC fee if answer == "no": # Broker referral opened (by the response endpoint). Hold until ops # gets them covered, then they re-answer yes. LOG.info("[%s] SC COC: carrier needs insurance — broker referral, holding", order_number) try: notify_fulfillment_todo( title=f"SC COC — carrier needs insurance broker — {entity_name}", order_number=order_number, service_slug="intrastate-authority", priority="normal", description=(f"{entity_name} answered NO to the SC intrastate Form E " f"insurance question. A broker-referral ticket was opened.\n" f"Connect them with an SC intrastate liability insurer who can " f"file a Form E with SCDMV, then they re-confirm and we file the " f"$25 Certificate of Compliance.\nCustomer: {customer_email}"), ) except Exception: pass return True # hold # No answer yet: send the yes/no insurance question (once). try: from scripts.workers.services.sc_coc_filing import send_insurance_question_email except Exception as exc: # noqa: BLE001 LOG.error("[%s] sc_coc_filing import failed: %s", order_number, exc) return True # hold safely rather than mis-bill already = str(intake.get("sc_coc_insurance_asked", "")).lower() in ("1", "true", "yes") if not already: send_insurance_question_email(order_number, customer_email, intake.get("customer_name", ""), entity_name, intake) self._mark_intake_flag(order_number, "sc_coc_insurance_asked", "true") try: notify_fulfillment_todo( title=f"SC COC — insurance question sent — {entity_name}", order_number=order_number, service_slug="intrastate-authority", priority="low", description=(f"Asked {entity_name} whether their insurer can file a Form E " f"for SC intrastate. When they confirm YES we bill the $25 SCDMV " f"Certificate of Compliance fee and file; if NO we open a broker " f"referral.\nCustomer: {customer_email}"), ) except Exception: pass return True # hold pending their answer def _mark_intake_flag(self, order_number: str, key: str, value: str) -> None: """Best-effort set intake_data[key]=value on the order (no schema change).""" try: import json as _json import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) with conn.cursor() as cur: cur.execute("SELECT intake_data FROM compliance_orders WHERE order_number = %s", (order_number,)) row = cur.fetchone() if not row: conn.close() return data = row[0] or {} if isinstance(data, str): data = _json.loads(data) data[key] = value cur.execute("UPDATE compliance_orders SET intake_data = %s, updated_at = now() " "WHERE order_number = %s", (_json.dumps(data), order_number)) conn.commit() conn.close() except Exception as exc: # noqa: BLE001 LOG.warning("[%s] Could not set intake flag %s: %s", order_number, key, exc) def _request_gov_fee_payment(self, order_number, service_slug, service_name, entity_name, customer_email, customer_phone, intake, signed_auth_key=None) -> bool: """Set up government-fee collection after authorization. IRP: the fee is unknown until the base state computes it, so we EMAIL the state's IRP unit a submission request and wait for their invoice reply (irp_filing.poll_irp_invoices bills the customer the exact amount when it arrives). IFTA / intrastate: the fee is published/near-fixed, so we estimate it and bill the customer immediately. Returns True if the order should hold at awaiting_government_fee_approval.""" # ── IRP: email the state, wait for the apportioned-fee invoice ──────── if service_slug == "irp-registration": try: from scripts.workers.services.irp_filing import send_irp_submission except Exception as exc: # noqa: BLE001 LOG.error("[%s] irp_filing import failed: %s", order_number, exc) return False base_state = (intake.get("base_state") or intake.get("address_state") or "").upper() sent = send_irp_submission(order_number, entity_name, intake.get("dot_number", ""), base_state, intake, signed_auth_key=signed_auth_key) if sent: try: notify_fulfillment_todo( title=f"IRP submitted to {base_state} — awaiting fee invoice — {entity_name}", order_number=order_number, service_slug=service_slug, priority="normal", description=(f"IRP Schedule A/B emailed to the {base_state} IRP office.\n" f"Waiting on their apportioned-fee invoice reply; when it " f"arrives we auto-bill the customer the exact amount and " f"you'll get a Telegram alert.\nCustomer: {customer_email}"), ) except Exception: pass return sent # ── Intrastate authority ────────────────────────────────────────────── # Per-state: figure out what the carrier actually needs and route to it. # SC for-hire PROPERTY carriers register via the SCDMV Certificate of # Compliance (COC) — gated on a Form E liability filing by their insurer. # We email a yes/no insurance question first; once they confirm (or we've # referred them to a broker) we bill the exact $25 COC fee + file. Other # states fall through to the PSC/PUC email path or a manual todo. if service_slug == "intrastate-authority": base_state = (intake.get("base_state") or intake.get("address_state") or "").upper() if base_state == "SC": if self._handle_sc_coc_gate(order_number, entity_name, customer_email, customer_phone, intake): return True # hold pending insurance answer / broker referral # else: insurance confirmed -> fall through to bill the $25 COC fee else: # Non-SC: email the state PSC/PUC if we have a submission contact; # otherwise fall back to a manual todo. try: from scripts.workers.services.intrastate_filing import ( send_intrastate_submission, state_isa_contact, ) except Exception as exc: # noqa: BLE001 LOG.error("[%s] intrastate_filing import failed: %s", order_number, exc) return False if not state_isa_contact(base_state): LOG.info("[%s] No intrastate email contact for %s — manual todo", order_number, base_state) return False sent = send_intrastate_submission(order_number, entity_name, intake.get("dot_number", ""), base_state, intake, signed_auth_key=signed_auth_key) if sent: try: notify_fulfillment_todo( title=f"Intrastate authority submitted to {base_state} PSC — awaiting fee — {entity_name}", order_number=order_number, service_slug=service_slug, priority="normal", description=(f"Intrastate authority application emailed to the {base_state} " f"PSC/PUC with the signed POA.\nWaiting on their requirements + " f"fee invoice; when it arrives we auto-bill the customer the exact " f"amount and you'll get a Telegram alert.\nCustomer: {customer_email}"), ) except Exception: pass return sent # ── IFTA / intrastate: published fee, bill the estimate now ─────────── try: from scripts.workers.services.gov_fee import ( estimate_gov_fee, create_gov_fee_order, send_gov_fee_payment_email, ) except Exception as exc: # noqa: BLE001 LOG.error("[%s] gov_fee import failed: %s", order_number, exc) return False est = estimate_gov_fee(service_slug, intake) if est.cents <= 0: LOG.info("[%s] No government fee for %s — skipping fee gate", order_number, service_slug) return False child = create_gov_fee_order( order_number, service_slug, est, customer_email, entity_name, customer_phone, ) if not child: return False # Notify the customer + ops. send_gov_fee_payment_email(customer_email, entity_name, service_name, entity_name, est, child) try: notify_fulfillment_todo( title=f"{service_name} — gov fee billed ({'${:,.2f}'.format(est.cents/100)}) — {entity_name}", order_number=order_number, service_slug=service_slug, priority="normal", description=(f"Auto-quoted government fee for {service_name}.\n" f"Amount: ${est.cents/100:,.2f} ({'exact' if est.exact else 'estimate, at cost'})\n" f"Payment order: {child}\n" f"Customer: {customer_email}\n" f"Held at awaiting_government_fee_approval until paid; then auto-files."), ) except Exception: pass return True def _authorization_status(self, order_number: str) -> str | None: """Return the current authorization esign status: 'signed', 'pending', or None.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( """SELECT status FROM esign_records WHERE order_number = %s AND document_type = %s AND status IN ('pending', 'signed') ORDER BY created_at DESC LIMIT 1""", (order_number, self.AUTH_DOCUMENT_TYPE), ) row = cur.fetchone() return row[0] if row else None finally: conn.close() except Exception: return None def _signed_authorization_key(self, order_number: str) -> str | None: """Return the MinIO key of the stamped, signed authorization PDF if available.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( """SELECT signed_document_minio_key, document_minio_key FROM esign_records WHERE order_number = %s AND document_type = %s AND status = 'signed' ORDER BY signed_at DESC NULLS LAST LIMIT 1""", (order_number, self.AUTH_DOCUMENT_TYPE), ) row = cur.fetchone() if not row: return None return row[0] or row[1] finally: conn.close() except Exception: return None def _send_authorization_email( self, *, order_number, service_name, entity_name, customer_email ): """Email the customer a link to review and sign the authorization.""" import os as _os import smtplib from datetime import datetime, timedelta, timezone from email.mime.text import MIMEText try: import jwt as pyjwt except ImportError: # pragma: no cover import PyJWT as pyjwt # type: ignore secret = _os.environ.get("CUSTOMER_JWT_SECRET", "changeme_long_random_string") domain = _os.environ.get("DOMAIN", "performancewest.net") token = pyjwt.encode( { "order_id": order_number, "order_type": self.AUTH_DOCUMENT_TYPE, "email": customer_email, "exp": datetime.now(timezone.utc) + timedelta(days=14), }, secret, algorithm="HS256", ) sign_url = f"https://{domain}/portal/esign?token={token}" body = ( f"Hi,\n\n" f"To complete your {service_name} order for {entity_name}, we need your " f"signed authorization before we can file with the state on your behalf.\n\n" f"Please review and sign here:\n{sign_url}\n\n" f"This authorization lets Performance West prepare and submit your state " f"motor carrier filing, communicate with the agency, and remit government " f"fees you provide. Government fees, taxes, card processing fees, decals, " f"permits, bonds, and insurance filing costs are pass-through charges and " f"may be billed separately if not known at checkout.\n\n" f"This link expires in 14 days.\n\n" f"Order: {order_number}\n" f"Questions? Call (888) 411-0383.\n\n" f"Performance West Inc.\nDOT / State Motor Carrier Compliance\n" ) msg = MIMEText(body) msg["Subject"] = f"Action Required: Sign Your Authorization — {service_name}" msg["From"] = "noreply@performancewest.net" msg["To"] = customer_email import os as _smtp_os with smtplib.SMTP(_smtp_os.getenv("SMTP_HOST", "co.carrierone.com"), int(_smtp_os.getenv("SMTP_PORT", "587")), timeout=30) as s: s.starttls() _u, _p = _smtp_os.getenv("SMTP_USER", ""), _smtp_os.getenv("SMTP_PASS", "") if _u and _p: s.login(_u, _p) s.sendmail(msg["From"], [customer_email], msg.as_string()) def _send_status_email(self, order_number, service_name, entity_name, dot_number, customer_email): """Send client a status email.""" try: import smtplib from email.mime.text import MIMEText dot_line = f" (DOT# {dot_number})" if dot_number else "" body = ( f"Hi,\n\n" f"We've received your {service_name} order for " f"{entity_name}{dot_line}.\n\n" f"Order: {order_number}\n\n" f"Our team is reviewing your information and will complete " f"the filing typically within 1-2 business days. We'll send " f"you a confirmation email when everything is done.\n\n" f"Questions? Reply to this email or call (888) 411-0383.\n\n" f"Performance West Inc.\n" f"DOT Compliance Services\n" ) msg = MIMEText(body) msg["Subject"] = f"{service_name} In Progress — {entity_name}{dot_line}" msg["From"] = "noreply@performancewest.net" msg["To"] = customer_email import os as _smtp_os with smtplib.SMTP(_smtp_os.getenv("SMTP_HOST", "co.carrierone.com"), int(_smtp_os.getenv("SMTP_PORT", "587")), timeout=30) as s: s.starttls() _u, _p = _smtp_os.getenv("SMTP_USER", ""), _smtp_os.getenv("SMTP_PASS", "") if _u and _p: s.login(_u, _p) s.sendmail(msg["From"], [customer_email], msg.as_string()) LOG.info("[%s] Status email sent to %s", order_number, customer_email) except Exception as exc: LOG.warning("[%s] Failed to send status email: %s", order_number, exc)