""" MCS-150 Biennial Update Service Handler. The MCS-150 is filed through the FMCSA Portal (portal.fmcsa.dot.gov) using Login.gov credentials + MFA. Since we can't automate through MFA, this is an admin-assisted service: 1. Client orders MCS-150 update 2. We collect their updated info via intake form 3. Admin logs into FMCSA Portal with client's credentials (provided by client) OR we prepare the data and walk the client through filing via screen share 4. We verify the update was accepted 5. We send confirmation with updated company snapshot Service slug: mcs150-update Price: $79 Gov fee: $0 Intake data needed: - DOT number - Legal name (confirm/update) - DBA name (confirm/update) - Principal business address - Mailing address - Phone number - Email address - Number of power units - Number of drivers - Operation type (interstate/intrastate) - Carrier operation (authorized for hire, exempt for hire, private) - Cargo types - Hazmat (Y/N) - Annual mileage + year - FMCSA Portal login credentials (Login.gov email + password) OR "I need help creating my Login.gov account" Filing approach: Option A: Client provides Login.gov credentials → admin files directly Option B: Guided filing via screen share (Zoom/Teams) → $29 upcharge Option C: We prepare a pre-filled PDF → client uploads themselves (cheapest) """ from __future__ import annotations import json import logging import os from datetime import datetime from scripts.workers.telegram_notify import notify_fulfillment_todo LOG = logging.getLogger("workers.services.mcs150_update") class MCS150UpdateHandler: """Handle MCS-150 biennial update orders.""" SERVICE_SLUG = "mcs150-update" SERVICE_NAME = "MCS-150 Biennial Update" # Services routed to this handler that actually produce an MCS-150 form # (and thus need the operational-detail intake gate). Other slugs routed # here (UCR, MC authority, audit prep, ETA, name reservation, registered # agent, annual report, etc.) are admin-assisted and do NOT file an MCS-150. MCS150_FORM_SLUGS = frozenset({ "mcs150-update", "dot-registration", "usdot-reactivation", "dot-full-compliance", }) # Admin-assisted DOT services routed to this handler that do NOT produce an # MCS-150 form, but still need a minimum set of intake fields before a human # can actually file them. Without this, a paid order would be marked # ready_to_file with empty intake (e.g. UCR with only a DOT number -- no # legal name / state / fleet size, which determine the UCR fee bracket and # the filing itself). Most of these can be DERIVED from the FMCSA census + # the order email (see _enrich_admin_assisted_intake); we only hold the order # at awaiting_intake for whatever genuinely can't be filled in. Keys are # service slugs; values are the required intake_data fields. Mirrors # REQUIRED_FIELDS in api/src/routes/compliance-orders.ts. ADMIN_ASSISTED_REQUIRED = { "ucr-registration": ["dot_number", "legal_name", "address_state", "email", "fleet_size_bracket"], "boc3-filing": ["dot_number", "legal_name", "email"], } # UCR fee tiers by total power units (FMCSA 2025 schedule). Used to derive # fleet_size_bracket from the carrier's census power-unit count so we don't # have to ask the customer for something the public record already states. # (low, high_inclusive, bracket_label) UCR_FLEET_BRACKETS = [ (0, 2, "0-2"), (3, 5, "3-5"), (6, 20, "6-20"), (21, 100, "21-100"), (101, 1000, "101-1000"), (1001, 10**9, "1001+"), ] # Human-readable names per slug for todos / notifications, so an admin-assisted # service (e.g. UCR) routed through this handler is not mislabeled as "MCS-150". SERVICE_DISPLAY_NAMES = { "mcs150-update": "MCS-150 Biennial Update", "dot-registration": "New USDOT Registration", "usdot-reactivation": "USDOT Reactivation", "dot-full-compliance": "DOT Full Compliance", "ucr-registration": "UCR Annual Registration", "boc3-filing": "BOC-3 Process Agent Filing", "mc-authority": "MC Operating Authority", "dot-audit-prep": "New Entrant Safety Audit Prep", "emergency-temporary-authority": "Emergency Temporary Authority", } def _service_label(self, slug: str) -> str: """Friendly service name for the given slug (falls back to a title-cased slug). Keeps todos/notifications accurate for non-MCS-150 services.""" return self.SERVICE_DISPLAY_NAMES.get(slug) or slug.replace("-", " ").title() async def process(self, order_data: dict) -> list[str]: """Entry point called by job_server. Delegates to handle().""" order_number = order_data.get("order_number", order_data.get("name", "")) return self.handle(order_data, order_number) def handle(self, order_data: dict, order_number: str) -> list[str]: """ Process an MCS-150 update order. Since FMCSA Portal requires Login.gov MFA, this creates an admin todo rather than automating the filing directly. """ LOG.info("[%s] Processing MCS-150 update order", order_number) intake = order_data.get("intake_data") or {} if isinstance(intake, str): intake = json.loads(intake) slug = order_data.get("service_slug", self.SERVICE_SLUG) dot_number = intake.get("dot_number", "") entity_name = intake.get("entity_name", order_data.get("customer_name", "")) customer_email = order_data.get("customer_email", "") # The client signs the perjury certification before we file. When they # sign, handle_esign_completed re-dispatches this handler with # client_approved=True so we proceed past the signing checkpoint. from scripts.workers.services.dot_esign import requires_signature, request_dot_esign needs_signature = requires_signature(slug) client_approved = bool(order_data.get("client_approved")) admin_approved = bool(order_data.get("admin_approved")) # Validate required fields if not dot_number: LOG.error("[%s] Missing DOT number in intake data", order_number) return [] # Check current MCS-150 status via FMCSA API mcs150_status = self._check_current_status(dot_number) # Enrich the intake with the carrier's CURRENT registered data from the # FMCSA carrier API. The MCS-150 biennial update re-confirms the carrier's # existing FMCSA record, so the authoritative source for the form is the # FMCSA census (legal name, address, EIN, fleet counts) -- the intake form # only collects the DOT number + any changes. Customer-provided intake # values take precedence over the census (so edits/changes win). census = self._fetch_carrier_record(dot_number) if census: merged = {**census, **{k: v for k, v in intake.items() if v not in (None, "")}} intake = merged LOG.info("[%s] Enriched intake from FMCSA census (legal_name=%s)", order_number, intake.get("legal_name")) else: LOG.warning("[%s] No FMCSA census data for DOT %s -- form may be sparse", order_number, dot_number) # Backfill the signer's printed name from the signed certification when # the intake didn't carry one. For a typed signature the client typed # their full legal name, which is exactly what belongs in the form's # "print/type name" certification field (certifyName). if not intake.get("signer_name"): signed_name = self._signed_signer_name(order_number) if signed_name: intake["signer_name"] = signed_name LOG.info("[%s] Backfilled signer_name from signed certification: %s", order_number, signed_name) # INTAKE-COMPLETENESS GATE. The FMCSA census gives us the carrier's # registered name/address/fleet, but it cannot tell us the operational # details the MCS-150 requires the filer to confirm/update: the # operation classification (Q23), cargo types (Q24), current annual # mileage (Q21 -- must be CURRENT), and a contact email. If those are # missing we must NOT fabricate a filing; instead we ask the customer to # complete a short, census-pre-filled intake and hold the order until # they do. (New-registration / reactivation flows that have not yet # signed also route through here.) # # Only applies to services that actually produce an MCS-150 form. This # handler is also the catch-all for several admin-assisted DOT services # (UCR, MC authority, audit prep, ETA) that do NOT file an MCS-150 and # must not receive the "confirm your MCS-150 details" intake email. if slug in self.MCS150_FORM_SLUGS: missing = self._missing_intake_fields(slug, intake) if missing and not client_approved and not admin_approved: self._request_intake_completion( order_number, entity_name, customer_email, dot_number, missing, slug) LOG.info("[%s] Held for customer intake completion; missing=%s", order_number, missing) return [] # INTAKE-COMPLETENESS GATE for admin-assisted (non-MCS-150) services. # These don't produce a form, but we still must not mark them # ready_to_file with empty intake (e.g. a UCR with only a DOT number). # First DERIVE everything we can from the FMCSA census + the order email, # then -- if any required field is still missing -- email the customer to # complete intake and hold the order at awaiting_intake. When all # required fields are present (derived or supplied) we fall through to the # admin review gate, which marks it ready_to_file for a human to file. if slug in self.ADMIN_ASSISTED_REQUIRED: intake = self._enrich_admin_assisted_intake(slug, intake, customer_email) order_data["intake_data"] = intake # persist derived values downstream missing = [f for f in self.ADMIN_ASSISTED_REQUIRED[slug] if intake.get(f) in (None, "", [], {})] if missing: self._persist_intake(order_number, intake) self._set_fulfillment_status(order_number, "awaiting_intake") self._request_intake_completion( order_number, entity_name, customer_email, dot_number, missing, slug) LOG.info("[%s] Admin-assisted %s held for intake; missing=%s", order_number, slug, missing) return [] # All required fields present -- persist the census-derived values so # the admin (and validation) sees a complete intake. self._persist_intake(order_number, intake) self._mark_intake_validated(order_number) LOG.info("[%s] Admin-assisted %s intake complete (derived from census)", order_number, slug) # Step 1: Fill the official MCS-150 PDF. Only services that actually file # an MCS-150 produce the form; the other admin-assisted DOT services # routed to this handler (UCR, MC authority, audit prep, ETA, name # reservation, registered agent, annual report, etc.) must NOT generate # an MCS-150 -- they are handled manually by an admin from the review # todo created below. pdf_path = None if slug in self.MCS150_FORM_SLUGS: try: from scripts.document_gen.templates.mcs150_pdf_filler import fill_mcs150 pdf_path = fill_mcs150(intake, order_number=order_number) LOG.info("[%s] Filled MCS-150 PDF: %s", order_number, pdf_path) except Exception as exc: LOG.error("[%s] PDF fill failed: %s", order_number, exc) # Step 2: Upload PDF to MinIO for storage minio_path = None pdf_url = None if pdf_path: try: from minio import Minio mc = Minio( f"{os.environ.get('MINIO_ENDPOINT', 'minio')}:{os.environ.get('MINIO_PORT', '9000')}", access_key=os.environ.get("MINIO_ACCESS_KEY", ""), secret_key=os.environ.get("MINIO_SECRET_KEY", ""), secure=False, ) bucket = os.environ.get("MINIO_BUCKET", "performancewest") minio_path = f"filings/mcs150/{order_number}/{os.path.basename(pdf_path)}" mc.fput_object(bucket, minio_path, pdf_path, content_type="application/pdf") # Generate presigned URL for fax/web submission from datetime import timedelta pdf_url = mc.presigned_get_object(bucket, minio_path, expires=timedelta(hours=2)) LOG.info("[%s] PDF uploaded to MinIO: %s", order_number, minio_path) except Exception as exc: LOG.error("[%s] MinIO upload failed: %s", order_number, exc) # If the client already signed (re-dispatched after signature), the # signed PDF that the admin reviews must reflect the form we just # (re)filled, not a stale earlier fill. Re-point the esign record at the # fresh document and re-stamp the signature onto it. if client_approved and minio_path: self._restamp_signed_form(order_number, minio_path) # Step 3: Check for photo ID photo_id_path = None if intake.get("photo_id_uploaded"): try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) cur = conn.cursor() cur.execute( "SELECT minio_paths FROM id_upload_tokens WHERE order_number = %s AND front_uploaded = TRUE ORDER BY created_at DESC LIMIT 1", (order_number,), ) row = cur.fetchone() conn.close() if row and row[0]: paths = json.loads(row[0]) if isinstance(row[0], str) else row[0] if paths.get("front"): photo_id_path = paths["front"] LOG.info("[%s] Photo ID found: %s", order_number, photo_id_path) except Exception as exc: LOG.warning("[%s] Could not retrieve photo ID: %s", order_number, exc) # Step 4: SIGNATURE GATE. If this form needs the client's signed # certification and they haven't signed yet, request the signature and # STOP before filing. We never submit a perjury certification to FMCSA # until the client has actually signed it. if needs_signature and not client_approved: request_dot_esign( order_number=order_number, slug=slug, entity_name=entity_name, customer_email=customer_email, dot_number=dot_number, document_minio_key=minio_path or "", ) LOG.info("[%s] Awaiting client signature before filing %s — not submitting yet", order_number, slug) self._create_pending_signature_todo( order_number, entity_name, dot_number, slug, minio_path, customer_email) return [minio_path] if minio_path else [] # Past this point: either no signature is required for this service, or # the client has signed (re-dispatched with client_approved=True). # Step 4b: ADMIN VERIFICATION GATE. Before we submit anything to the # government, a human verifies the prepared filing is correct (right # form, right DOT#, right data, signed by the client). We STOP here and # create an admin todo; an admin reviews the generated PDF and, when # satisfied, re-dispatches this order with admin_approved=True (via the # admin "Approve & Submit" action) to proceed to actual submission. This # prevents a wrong/auto-generated filing from being submitted to FMCSA. if not admin_approved: self._set_fulfillment_status(order_number, "ready_to_file") self._create_admin_review_todo( order_number, entity_name, dot_number, slug, minio_path, customer_email, client_signed=(needs_signature and client_approved), ) LOG.info("[%s] Prepared + signed; HELD for admin verification before FMCSA submission", order_number) return [minio_path] if minio_path else [] # Step 4c: ADMIN-ASSISTED AUTO-FILING. Some admin-assisted services have # a real automated filing path even though they don't produce an MCS-150 # form. Once an admin has approved (admin_approved=True), run the # automation here instead of just creating a manual todo. Currently: # - ucr-registration -> ucr.gov National Registration System if slug == "ucr-registration": return self._file_ucr_registration( order_number, entity_name, dot_number, intake, customer_email) # Step 5: Submit electronically (3x web → fax fallback) # GUARD: Skip actual submission in dev/test environments is_production = os.environ.get("NODE_ENV") == "production" or os.environ.get("ENV") == "production" filing_result = None if pdf_path and not is_production: LOG.info("[%s] DEV MODE — skipping actual FMCSA submission", order_number) filing_result = { "success": True, "method": "dev_skip", "attested_pdf": None, "submitted_at": datetime.now().isoformat(), "screenshot_path": None, "fax_log_id": None, "error": None, } elif pdf_path: try: import asyncio from scripts.workers.fax_sender import submit_filing loop = asyncio.new_event_loop() filing_result = loop.run_until_complete(submit_filing( original_pdf_path=pdf_path, pdf_url=pdf_url or "", photo_id_path=photo_id_path, order_number=order_number, dot_number=dot_number, mc_number=intake.get("mc_number", ""), entity_name=entity_name, document_type="MCS-150 Biennial Update", web_retries=3, web_retry_interval=600, )) loop.close() if filing_result and filing_result.get("success"): LOG.info("[%s] Filing submitted via %s", order_number, filing_result.get("method")) else: LOG.warning("[%s] Electronic filing failed: %s", order_number, filing_result.get("error") if filing_result else "unknown") except Exception as exc: LOG.error("[%s] Filing submission error: %s", order_number, exc) # Persist durable submission EVIDENCE to MinIO (confirmation screenshot # for web, attested PDF + fax log for fax) so we keep proof of every # government submission -- the submitters only write to an ephemeral # temp dir otherwise. evidence = {} if filing_result and filing_result.get("success") and filing_result.get("method") != "dev_skip": evidence = self._upload_submission_evidence(order_number, slug, filing_result) # Step 5: Update order status in database try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) cur = conn.cursor() status_data = { "mcs150_status": mcs150_status, "pdf_minio_path": minio_path, "filing_method": filing_result.get("method") if filing_result else None, "filing_success": filing_result.get("success") if filing_result else False, "fax_log_id": filing_result.get("fax_log_id") if filing_result else None, "screenshot_path": filing_result.get("screenshot_path") if filing_result else None, "submitted_at": filing_result.get("submitted_at") if filing_result else None, "attested_pdf": filing_result.get("attested_pdf") if filing_result else None, "evidence": evidence, # durable MinIO keys for proof of submission } cur.execute(""" UPDATE compliance_orders SET intake_data = jsonb_set( COALESCE(intake_data, '{}'::jsonb), '{filing_status}', %s::jsonb ) WHERE order_number = %s """, (json.dumps(status_data), order_number)) conn.commit() conn.close() except Exception as exc: LOG.error("[%s] DB update failed: %s", order_number, exc) # Step 6: Create admin todo (for manual verification + customer delivery) try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) filed_method = filing_result.get("method", "pending") if filing_result else "pending" filed_ok = filing_result.get("success", False) if filing_result else False svc_label = self._service_label(slug) is_form = slug in self.MCS150_FORM_SLUGS if is_form: todo_title = f"{svc_label} {'Filed' if filed_ok else 'Review'} — {entity_name} (DOT {dot_number})" todo_priority = "low" if filed_ok else "normal" todo_description = ( f"{svc_label} for {entity_name} (DOT {dot_number}).\n" f"Filing method: {filed_method}\n" f"Status: {'SUBMITTED — verify in 5-10 days' if filed_ok else 'NEEDS MANUAL FILING'}\n" f"Customer: {customer_email}\n" f"PDF: {minio_path or 'not generated'}" ) else: # Admin-assisted service (UCR, MC authority, etc.): no auto-filing # and no generated form. Approval means it's ready for a human to # file on the relevant government portal. todo_title = f"{svc_label} — FILE NOW — {entity_name} (DOT {dot_number})" todo_priority = "high" todo_description = ( f"{svc_label} for {entity_name} (DOT {dot_number}).\n" f"Status: APPROVED — file manually on the {svc_label} portal.\n" f"This service has no auto-generated form; submit it directly.\n" f"Customer: {customer_email}" ) with conn.cursor() as cur: cur.execute(""" INSERT INTO admin_todos ( title, category, priority, order_number, service_slug, description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( todo_title, "filing", todo_priority, order_number, slug, todo_description, json.dumps({ "order_number": order_number, "dot_number": dot_number, "entity_name": entity_name, "filing_result": filing_result, }), )) conn.commit() notify_fulfillment_todo( title=todo_title, order_number=order_number, service_slug=slug, priority=todo_priority, description=todo_description, ) conn.close() except Exception as exc: LOG.error("[%s] Failed to create admin todo: %s", order_number, exc) # Step 7: Send client status email self._send_status_email(order_number, entity_name, dot_number, customer_email, slug) return [minio_path] if minio_path else [] def _missing_intake_fields(self, slug: str, intake: dict) -> list: """Return the customer-required intake fields still missing for this service. These are the operational details the FMCSA census cannot supply and that the MCS-150 requires the filer to confirm/update. """ # Per-variant required operational fields. base_required = ["carrier_operation", "interstate_intrastate", "annual_miles", "email"] # Cargo only applies to carriers (not pure intermodal-equipment providers). if not intake.get("is_intermodal_equipment_provider"): base_required.append("cargo_types") # New registrations / reactivations also need the signer + address. if slug in ("dot-registration", "usdot-reactivation", "dot-full-compliance"): base_required += ["signer_name", "signer_title", "address_street", "address_city", "address_state", "address_zip"] # Hazmat safety-permit (150B) needs states of operation. if intake.get("hazmat") == "yes" and intake.get("needs_hmsp"): base_required.append("operating_states") missing = [] for f in base_required: v = intake.get(f) if v in (None, "", [], {}): missing.append(f) return missing def _request_intake_completion(self, order_number, entity_name, customer_email, dot_number, missing, slug=None): """Email the customer a census-pre-filled intake link and create a low-priority admin todo noting we're waiting on intake completion.""" domain = os.environ.get("PUBLIC_SITE_URL", "https://performancewest.net").rstrip("/") # Link to the per-service intake WIZARD (/order/?order=...), which # loads the existing paid order, pre-fills it from the FMCSA census, and # drops the payment step. Do NOT link to /order/dot-compliance -- that is # the sales/checkout page; it ignores ?order= and asks the customer to # re-select services and pay again (they can't "enter any data" there). intake_slug = slug or self.SERVICE_SLUG intake_url = f"{domain}/order/{intake_slug}?order={order_number}" # Customer email (no paper/mail mechanics; public form names are fine). try: from scripts.workers.worker_email import send_worker_email label = { "carrier_operation": "how your company operates (for-hire, private, etc.)", "interstate_intrastate": "interstate vs intrastate operation", "annual_miles": "your current annual mileage", "cargo_types": "the types of cargo you haul", "email": "a contact email address", "operating_states": "the states you operate in", "signer_name": "the name of the company officer signing", "signer_title": "the officer's title", } needed = ", ".join(label.get(m, m.replace("_", " ")) for m in missing) subject = f"Action needed: confirm your MCS-150 details (DOT {dot_number})" text = ( f"Hi,\n\n" f"We're ready to prepare the MCS-150 update for {entity_name} " f"(USDOT {dot_number}). We've pre-filled everything we can from " f"your current FMCSA record. To finish, we just need you to " f"confirm a few current details: {needed}.\n\n" f"Please review and confirm here (it takes about 2 minutes):\n" f"{intake_url}\n\n" f"Once you submit, we'll prepare your filing for your signature " f"and handle the rest.\n\n" f"Thank you,\nPerformance West" ) html = ( f"

Hi,

" f"

We're ready to prepare the MCS-150 update for " f"{entity_name} (USDOT {dot_number}). We've " f"pre-filled everything we can from your current FMCSA record. " f"To finish, we just need you to confirm a few current details: " f"{needed}.

" f"

Review and confirm your details " f"(about 2 minutes).

" f"

Once you submit, we'll prepare your filing for your " f"signature and handle the rest.

" f"

Thank you,
Performance West

" ) if customer_email: send_worker_email(customer_email, subject, html, text=text, cc="justin@performancewest.net") except Exception as exc: LOG.warning("[%s] Could not send intake-completion email: %s", order_number, exc) # Hold the order + admin todo. self._set_fulfillment_status(order_number, "awaiting_intake") try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) with conn.cursor() as cur: cur.execute( """INSERT INTO admin_todos ( title, category, priority, order_number, service_slug, description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')""", ( f"Awaiting MCS-150 intake -- {entity_name} (DOT {dot_number})", "intake", "normal", order_number, self.SERVICE_SLUG, (f"Order {order_number} is missing customer-required intake " f"fields: {', '.join(missing)}.\n" f"A census-pre-filled intake link was emailed to " f"{customer_email or 'the customer'}.\n" f"Intake link: {intake_url}\n" f"Filing auto-resumes once the customer submits."), json.dumps({"order_number": order_number, "dot_number": dot_number, "entity_name": entity_name, "awaiting_intake": True, "missing_fields": missing}), ), ) conn.commit() conn.close() except Exception as exc: LOG.warning("[%s] Could not create intake-pending todo: %s", order_number, exc) def _restamp_signed_form(self, order_number: str, document_key: str) -> None: """Point the signed esign record at ``document_key`` (the freshly filled form) and re-stamp the signature onto it, so the signed PDF an admin reviews reflects the current, complete form.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( """SELECT id FROM esign_records WHERE order_number = %s AND status = 'signed' ORDER BY signed_at DESC LIMIT 1""", (order_number,), ) row = cur.fetchone() if not row: return rec_id = row[0] # Re-point at the fresh document and clear the stale signed # key so the stamper regenerates it. cur.execute( """UPDATE esign_records SET document_minio_key = %s, signed_document_minio_key = NULL, updated_at = NOW() WHERE id = %s""", (document_key, rec_id), ) conn.commit() finally: conn.close() from scripts.workers.services.esign_stamp import stamp_esign_document signed_key = stamp_esign_document(int(rec_id)) if signed_key: LOG.info("[%s] Re-stamped signature onto fresh form -> %s", order_number, signed_key) except Exception as exc: LOG.warning("[%s] Re-stamp of signed form failed: %s", order_number, exc) def _signed_signer_name(self, order_number: str) -> str: """Return the full name the client typed when signing the perjury certification for this order (empty if no typed signature on file).""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) try: with conn.cursor() as cur: cur.execute( """SELECT signature_data FROM esign_records WHERE order_number = %s AND status = 'signed' AND signature_type = 'typed' ORDER BY signed_at DESC LIMIT 1""", (order_number,), ) row = cur.fetchone() return (row[0] or "").strip() if row else "" finally: conn.close() except Exception as exc: LOG.warning("[%s] Could not read signed signer name: %s", order_number, exc) return "" def _fetch_fmcsa_carrier(self, dot_number: str) -> dict: """Fetch the raw FMCSA carrier census record for a DOT number.""" try: import urllib.request api_key = os.environ.get("FMCSA_API_KEY", "") if not api_key: return {} url = f"https://mobile.fmcsa.dot.gov/qc/services/carriers/{dot_number}?webKey={api_key}" req = urllib.request.Request(url, headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read()) return data.get("content", {}).get("carrier", {}) or {} except Exception as exc: LOG.warning("FMCSA carrier fetch failed for %s: %s", dot_number, exc) return {} def _fetch_carrier_record(self, dot_number: str) -> dict: """Return the carrier's current FMCSA data mapped to the intake keys the PDF filler expects (so the biennial-update form is pre-filled with the carrier's existing registered data). Empty dict if unavailable. """ c = self._fetch_fmcsa_carrier(dot_number) if not c: return {} def s(v): return "" if v is None else str(v) out: dict = { "legal_name": s(c.get("legalName")), "dba_name": s(c.get("dbaName")), "dot_number": s(c.get("dotNumber") or dot_number), "ein": s(c.get("ein")), "address_street": s(c.get("phyStreet")), "address_city": s(c.get("phyCity")), "address_state": s(c.get("phyState")), "address_zip": s(c.get("phyZipcode")), "phone": s(c.get("telephone") or c.get("phone")), "power_units": s(c.get("totalPowerUnits")), "drivers": s(c.get("totalDrivers")), } # Interstate vs intrastate (form Q26/27). The census exposes this as # carrierOperation.carrierOperationCode: A=Interstate, B=Intrastate # Hazmat, C=Intrastate Non-Hazmat. op = c.get("carrierOperation") or {} code = (op.get("carrierOperationCode") or "").strip().upper() if code == "A": out["interstate_intrastate"] = "interstate" elif code == "B": out["interstate_intrastate"] = "intrastate_hazmat" elif code == "C": out["interstate_intrastate"] = "intrastate_non_hazmat" # Passenger carrier flag (form Q29 passenger compliance cert). if s(c.get("isPassengerCarrier")).upper() == "Y": out["is_passenger_carrier"] = "yes" return {k: v for k, v in out.items() if v} def _check_current_status(self, dot_number: str) -> str: """Check current MCS-150 status via FMCSA API.""" c = self._fetch_fmcsa_carrier(dot_number) if not c: return "Could not check FMCSA status" return ( f"Status: {c.get('statusCode', '?')}, " f"Allowed: {c.get('allowedToOperate', '?')}, " f"MCS-150 Outdated: {c.get('mcs150Outdated', '?')}" ) def _create_pending_signature_todo(self, order_number, entity_name, dot_number, slug, minio_path, customer_email): """Low-priority admin todo noting we're waiting on the client's signature.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) todo_title = f"Awaiting client signature — {entity_name} (DOT {dot_number})" todo_description = ( f"{slug} for {entity_name} (DOT {dot_number}).\n" f"Status: AWAITING CLIENT SIGNATURE before filing.\n" f"Signing link emailed to {customer_email}.\n" f"PDF: {minio_path or 'not generated'}\n" f"Filing auto-resumes once the client signs." ) with conn.cursor() as cur: cur.execute( """ INSERT INTO admin_todos ( title, category, priority, order_number, service_slug, description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( todo_title, "filing", "low", order_number, slug, todo_description, json.dumps({"order_number": order_number, "dot_number": dot_number, "entity_name": entity_name, "awaiting_signature": True}), ), ) conn.commit() notify_fulfillment_todo( title=todo_title, order_number=order_number, service_slug=slug, priority="low", description=todo_description, ) conn.close() LOG.info("[%s] Pending-signature todo created", order_number) except Exception as exc: LOG.warning("[%s] Failed to create pending-signature todo: %s", order_number, exc) def _set_fulfillment_status(self, order_number, status): """Persist the fulfillment_status on the compliance order (e.g. 'ready_to_file' = prepared + signed, held for admin verification).""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) with conn.cursor() as cur: cur.execute( "UPDATE compliance_orders SET fulfillment_status=%s, " "fulfillment_status_at=now() WHERE order_number=%s", (status, order_number), ) conn.commit() conn.close() except Exception as exc: LOG.warning("[%s] Failed to set fulfillment_status=%s: %s", order_number, status, exc) def _enrich_admin_assisted_intake(self, slug: str, intake: dict, customer_email: str) -> dict: """Fill an admin-assisted service's required intake from data we already have: the FMCSA census (legal_name, address_state, power_units) and the order's customer_email. Customer-supplied values always win. Returns a new merged dict; never raises. """ out = dict(intake or {}) dot = out.get("dot_number", "") try: census = self._fetch_carrier_record(dot) if dot else {} except Exception as exc: # noqa: BLE001 LOG.warning("[enrich] census fetch failed for DOT %s: %s", dot, exc) census = {} # Census fills only what the customer didn't already provide. for k in ("legal_name", "address_state", "address_city", "address_street", "address_zip", "power_units", "ein", "phone"): if not out.get(k) and census.get(k): out[k] = census[k] # Contact email: fall back to the order's customer email. if not out.get("email") and customer_email: out["email"] = customer_email # UCR fee bracket: derive from power units when not supplied. if slug == "ucr-registration" and not out.get("fleet_size_bracket"): pu = out.get("power_units") or census.get("power_units") try: n = int(str(pu).strip()) except (TypeError, ValueError): n = None if n is not None: for lo, hi, label in self.UCR_FLEET_BRACKETS: if lo <= n <= hi: out["fleet_size_bracket"] = label break return out def _persist_intake(self, order_number: str, intake: dict): """Write the (possibly census-enriched) intake_data back to the order so the admin view, validation, and any later re-dispatch see it.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) with conn.cursor() as cur: cur.execute( "UPDATE compliance_orders SET intake_data=%s, updated_at=now() " "WHERE order_number=%s", (json.dumps(intake), order_number), ) conn.commit() conn.close() except Exception as exc: # noqa: BLE001 LOG.warning("[%s] Failed to persist intake_data: %s", order_number, exc) def _mark_intake_validated(self, order_number: str): """Flag intake as complete so the order stops getting reminder nudges and the admin sees a green 'intake complete'. Only call once every required field is present (derived or supplied).""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) with conn.cursor() as cur: cur.execute( "UPDATE compliance_orders SET intake_data_validated=TRUE, " "validation_errors=NULL, updated_at=now() WHERE order_number=%s", (order_number,), ) conn.commit() conn.close() except Exception as exc: # noqa: BLE001 LOG.warning("[%s] Failed to mark intake validated: %s", order_number, exc) def _file_ucr_registration(self, order_number, entity_name, dot_number, intake, customer_email) -> list: """Run the UCR.gov Playwright automation for an approved UCR order, then persist the result. On success -> completed (+ confirmation + screenshot evidence). On CAPTCHA / fee-mismatch / failure -> ready_to_file with a high-priority 'file manually' todo so a human takes over.""" import asyncio LOG.info("[%s] Auto-filing UCR registration via ucr.gov", order_number) # Resolve the customer's payment method so we charge the right card. payment_method = "card" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) with conn.cursor() as cur: cur.execute("SELECT payment_method FROM compliance_orders WHERE order_number=%s", (order_number,)) row = cur.fetchone() if row and row[0]: payment_method = row[0] conn.close() except Exception as exc: # noqa: BLE001 LOG.warning("[%s] Could not read payment_method (default card): %s", order_number, exc) try: from scripts.workers.services.ucr_playwright import UCRRegistration except Exception as exc: # noqa: BLE001 LOG.error("[%s] UCR adapter import failed: %s", order_number, exc) self._set_fulfillment_status(order_number, "ready_to_file") self._create_admin_review_todo( order_number, entity_name, dot_number, "ucr-registration", None, customer_email, client_signed=False) return [] data = { "dot_number": dot_number, "legal_name": intake.get("legal_name", entity_name), "power_units": intake.get("power_units", ""), "email": intake.get("email") or customer_email, "phone": intake.get("phone", ""), "address_street": intake.get("address_street", ""), "address_city": intake.get("address_city", ""), "address_state": intake.get("address_state", ""), "address_zip": intake.get("address_zip", ""), } try: adapter = UCRRegistration() loop = asyncio.new_event_loop() try: result = loop.run_until_complete( adapter.file_ucr(data, order_number=order_number, payment_method=payment_method)) finally: loop.close() except Exception as exc: # noqa: BLE001 LOG.error("[%s] UCR automation crashed: %s", order_number, exc) self._set_fulfillment_status(order_number, "ready_to_file") self._create_admin_review_todo( order_number, entity_name, dot_number, "ucr-registration", None, customer_email, client_signed=False) return [] # Upload the confirmation screenshot to MinIO as durable evidence. evidence = {} if result.screenshot_path and os.path.exists(result.screenshot_path): try: from minio import Minio mc = Minio( f"{os.environ.get('MINIO_ENDPOINT', 'minio')}:{os.environ.get('MINIO_PORT', '9000')}", access_key=os.environ.get("MINIO_ACCESS_KEY", ""), secret_key=os.environ.get("MINIO_SECRET_KEY", ""), secure=os.environ.get("MINIO_SECURE", "false").lower() == "true", ) bucket = os.environ.get("MINIO_BUCKET", "performancewest") key = f"filings/ucr-registration/{order_number}/evidence/ucr_confirmation.png" mc.fput_object(bucket, key, result.screenshot_path, content_type="image/png") evidence["confirmation_screenshot"] = key except Exception as exc: # noqa: BLE001 LOG.warning("[%s] Could not upload UCR screenshot: %s", order_number, exc) # Persist filing_status on the order. filing_status = { "filing_method": "ucr_gov_web", "filing_success": bool(result.success), "submitted_at": datetime.now(timezone.utc).isoformat() if result.success else None, "manual_confirmation": result.confirmation_number or None, "fee_paid_usd": result.fee_paid_usd or None, "dry_run": bool(result.dry_run), "evidence": evidence, "error": result.error or None, } try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) with conn.cursor() as cur: cur.execute( "UPDATE compliance_orders SET intake_data = jsonb_set(" "COALESCE(intake_data,'{}'::jsonb), '{filing_status}', %s::jsonb), " "updated_at=now() WHERE order_number=%s", (json.dumps(filing_status), order_number), ) conn.commit() conn.close() except Exception as exc: # noqa: BLE001 LOG.error("[%s] Failed to persist UCR filing_status: %s", order_number, exc) svc_label = self._service_label("ucr-registration") if result.success: self._set_fulfillment_status(order_number, "completed") conf = result.confirmation_number or "(see receipt screenshot)" self._notify_todo( order_number, "ucr-registration", f"{svc_label} FILED — {entity_name} (DOT {dot_number})", "low", (f"{svc_label} for {entity_name} (DOT {dot_number}).\n" f"Status: FILED on ucr.gov{' (DEV dry-run)' if result.dry_run else ''}.\n" f"Confirmation: {conf}\n" f"Fee paid: ${result.fee_paid_usd:.2f}\n" f"Customer: {customer_email}"), ) self._send_status_email(order_number, entity_name, dot_number, customer_email, "ucr-registration") LOG.info("[%s] UCR filed (completed). conf=%s", order_number, conf) else: # Could not auto-file (CAPTCHA, fee mismatch, error) -> back to the # manual-filing queue with a clear, high-priority todo. self._set_fulfillment_status(order_number, "ready_to_file") reason = "CAPTCHA — needs manual filing" if result.captcha_hit else (result.error or "automation could not confirm") self._notify_todo( order_number, "ucr-registration", f"{svc_label} — FILE MANUALLY — {entity_name} (DOT {dot_number})", "high", (f"{svc_label} for {entity_name} (DOT {dot_number}).\n" f"Status: AUTO-FILE FAILED — {reason}.\n" f"ACTION: file manually on ucr.gov, then mark the order completed " f"in the admin dashboard (enter the confirmation #).\n" f"Customer: {customer_email}"), ) LOG.warning("[%s] UCR auto-file failed: %s", order_number, reason) return [] def _notify_todo(self, order_number, slug, title, priority, description): """Create an admin_todos row + Telegram fulfillment notification.""" try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) with conn.cursor() as cur: cur.execute( """INSERT INTO admin_todos (title, category, priority, order_number, service_slug, description, data, status) VALUES (%s, 'filing', %s, %s, %s, %s, %s, 'pending')""", (title, priority, order_number, slug, description, json.dumps({"order_number": order_number})), ) conn.commit() conn.close() except Exception as exc: # noqa: BLE001 LOG.error("[%s] Failed to create todo: %s", order_number, exc) try: notify_fulfillment_todo(title=title, order_number=order_number, service_slug=slug, priority=priority, description=description) except Exception as exc: # noqa: BLE001 LOG.warning("[%s] Telegram notify failed: %s", order_number, exc) def _create_admin_review_todo(self, order_number, entity_name, dot_number, slug, minio_path, customer_email, client_signed): """High-priority admin todo: verify the prepared filing BEFORE submission. The order is held at fulfillment_status='ready_to_file'. An admin opens the generated PDF (minio_path), confirms it is correct (right form, DOT#, data, signature), and then re-dispatches with admin_approved=True via the admin 'Approve & Submit' action to proceed to actual FMCSA submission. """ try: import psycopg2 conn = psycopg2.connect(os.environ.get("DATABASE_URL", "")) sig = "client SIGNED" if client_signed else "no signature required" svc_label = self._service_label(slug) todo_title = f"VERIFY before filing — {svc_label} — {entity_name} (DOT {dot_number})" if slug in self.MCS150_FORM_SLUGS: todo_description = ( f"{svc_label} for {entity_name} (DOT {dot_number}).\n" f"Status: READY TO FILE — held for admin verification ({sig}).\n" f"ACTION: review the prepared PDF, confirm it is correct, then\n" f"approve & submit (re-dispatch with admin_approved=true).\n" f"We do NOT submit to FMCSA until you approve.\n" f"PDF: {minio_path or 'not generated'}\n" f"Customer: {customer_email}" ) else: # Admin-assisted service: no generated form to review; approval # just clears it for a human to file on the government portal. todo_description = ( f"{svc_label} for {entity_name} (DOT {dot_number}).\n" f"Status: READY TO FILE — held for admin verification.\n" f"ACTION: confirm the order details, then approve in the admin\n" f"dashboard. This service has no auto-generated form; you file\n" f"it manually on the {svc_label} portal after approving.\n" f"Customer: {customer_email}" ) with conn.cursor() as cur: cur.execute( """ INSERT INTO admin_todos ( title, category, priority, order_number, service_slug, description, data, status ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending') """, ( todo_title, "filing", "high", order_number, slug, todo_description, json.dumps({"order_number": order_number, "dot_number": dot_number, "entity_name": entity_name, "awaiting_admin_review": True, "client_signed": client_signed, "pdf_minio_path": minio_path}), ), ) conn.commit() notify_fulfillment_todo( title=todo_title, order_number=order_number, service_slug=slug, priority="high", description=todo_description, view_url=self._presigned_view_url(minio_path), ) conn.close() LOG.info("[%s] Admin-review (pre-submission) todo created", order_number) except Exception as exc: LOG.warning("[%s] Failed to create admin-review todo: %s", order_number, exc) @staticmethod def _presigned_view_url(minio_path, days=7): """Presigned, browser-openable URL to a MinIO object via the PUBLIC endpoint (minio.performancewest.net), so operator Telegram alerts can link directly to the prepared PDF for review. Returns "" on failure. The public host is IP-allowlisted at nginx, so the link only opens from an allowlisted office/admin IP -- which is the intended audience. """ if not minio_path: return "" try: from minio import Minio from datetime import timedelta pub = os.environ.get("MINIO_PUBLIC_ENDPOINT", "minio.performancewest.net") mc = Minio( pub, access_key=os.environ.get("MINIO_ACCESS_KEY", ""), secret_key=os.environ.get("MINIO_SECRET_KEY", ""), secure=True, region=os.environ.get("MINIO_REGION", "us-east-1"), ) bucket = os.environ.get("MINIO_BUCKET", "performancewest") return mc.presigned_get_object(bucket, minio_path, expires=timedelta(days=days)) except Exception as exc: # noqa: BLE001 LOG.warning("Could not presign view URL for %s: %s", minio_path, exc) return "" @staticmethod def _upload_submission_evidence(order_number, slug, filing_result): """Persist submission proof (confirmation screenshot for web, attested PDF for fax) to MinIO so we keep durable evidence of every government submission. The submitters write screenshots to an ephemeral temp dir; we copy the proof into MinIO under filings///evidence/ and return the MinIO keys to store on the order. """ evidence = {} try: from minio import Minio mc = Minio( f"{os.environ.get('MINIO_ENDPOINT', 'minio')}:{os.environ.get('MINIO_PORT', '9000')}", access_key=os.environ.get("MINIO_ACCESS_KEY", ""), secret_key=os.environ.get("MINIO_SECRET_KEY", ""), secure=os.environ.get("MINIO_SECURE", "false").lower() == "true", ) bucket = os.environ.get("MINIO_BUCKET", "performancewest") base = f"filings/{slug}/{order_number}/evidence" def _put(local_path, name, content_type): if local_path and os.path.exists(local_path): key = f"{base}/{name}" mc.fput_object(bucket, key, local_path, content_type=content_type) return key return None shot = filing_result.get("screenshot_path") or filing_result.get("pre_submit_screenshot") k = _put(shot, "fmcsa_confirmation.png", "image/png") if k: evidence["confirmation_screenshot"] = k att = filing_result.get("attested_pdf") k = _put(att, "attested_filing.pdf", "application/pdf") if k: evidence["attested_pdf_minio"] = k if filing_result.get("fax_log_id"): evidence["fax_log_id"] = filing_result["fax_log_id"] if evidence: LOG.info("[%s] Submission evidence saved to MinIO: %s", order_number, list(evidence.keys())) except Exception as exc: LOG.warning("[%s] Failed to persist submission evidence: %s", order_number, exc) return evidence def _send_status_email(self, order_number, entity_name, dot_number, customer_email, slug=None): """Send client an email that we're working on their order.""" if not customer_email: return try: import smtplib from email.mime.text import MIMEText svc_label = self._service_label(slug) if slug else "MCS-150 Biennial Update" body = ( f"Hi,\n\n" f"We've received your {svc_label} order for " f"{entity_name} (DOT# {dot_number}).\n\n" f"Order: {order_number}\n\n" f"Our team will review your information and complete " f"the filing within 1-2 business days. We'll send you a " f"confirmation once it's done.\n\n" f"If we need any additional information or portal credentials, " f"we'll reach out via a separate secure email.\n\n" f"Questions? Reply to this email or call (888) 411-0383.\n\n" f"Performance West Inc.\n" f"DOT Compliance Services\n" ) msg = MIMEText(body) msg["Subject"] = f"{svc_label} In Progress — {entity_name} (DOT {dot_number})" msg["From"] = "noreply@performancewest.net" msg["To"] = customer_email import os as _smtp_os with smtplib.SMTP(_smtp_os.getenv("SMTP_HOST", "co.carrierone.com"), int(_smtp_os.getenv("SMTP_PORT", "587")), timeout=30) as s: s.starttls() _u, _p = _smtp_os.getenv("SMTP_USER", ""), _smtp_os.getenv("SMTP_PASS", "") if _u and _p: s.login(_u, _p) s.sendmail(msg["From"], [customer_email], msg.as_string()) LOG.info("[%s] Status email sent to %s", order_number, customer_email) except Exception as exc: LOG.warning("[%s] Failed to send status email: %s", order_number, exc)