Closes the data gap for orders that bypass the full intake (e.g. the DOT compliance-remediation pipeline) and for all MCS-150 variants: - Worker intake-completeness gate (mcs150_update): before filling, check the customer-required operational fields the FMCSA census cannot supply (operation classification, cargo, CURRENT annual mileage, email; plus signer/address for new-registration/reactivation, and states-of-operation for 150B hazmat). If missing, email the customer a census-pre-filled intake link and hold the order at fulfillment_status='awaiting_intake' with an admin todo, instead of fabricating a blank filing. The existing intake PUT endpoint already re-dispatches the worker on submit, so filing auto-resumes. - Intake wizard (Wizard.astro): when resuming ?order=CO-xxx for a DOT/MCS order, seed still-empty fields from the FMCSA census (name/address/fleet/interstate) so the customer only confirms the operational details. - /api/v1/dot/census now also returns total_drivers + a normalized carrier_operation_code for the prefill. - MCS150Step.astro extended to collect every field the filler needs across all variants: mailing address, cdl_drivers, primary_vehicle_type, reason_for_filing, usdot_revoked, cell/fax, hazmat-safety-permit block (needs_hmsp, operating states, security plan), and intermodal-equipment provider counts; all prefill from intake_data. verify_mcs150_variants.py covers 150/150B/150C end-to-end (ALL PASS).
844 lines
40 KiB
Python
844 lines
40 KiB
Python
"""
|
|
MCS-150 Biennial Update Service Handler.
|
|
|
|
The MCS-150 is filed through the FMCSA Portal (portal.fmcsa.dot.gov) using
|
|
Login.gov credentials + MFA. Since we can't automate through MFA, this is
|
|
an admin-assisted service:
|
|
|
|
1. Client orders MCS-150 update
|
|
2. We collect their updated info via intake form
|
|
3. Admin logs into FMCSA Portal with client's credentials (provided by client)
|
|
OR we prepare the data and walk the client through filing via screen share
|
|
4. We verify the update was accepted
|
|
5. We send confirmation with updated company snapshot
|
|
|
|
Service slug: mcs150-update
|
|
Price: $79
|
|
Gov fee: $0
|
|
|
|
Intake data needed:
|
|
- DOT number
|
|
- Legal name (confirm/update)
|
|
- DBA name (confirm/update)
|
|
- Principal business address
|
|
- Mailing address
|
|
- Phone number
|
|
- Email address
|
|
- Number of power units
|
|
- Number of drivers
|
|
- Operation type (interstate/intrastate)
|
|
- Carrier operation (authorized for hire, exempt for hire, private)
|
|
- Cargo types
|
|
- Hazmat (Y/N)
|
|
- Annual mileage + year
|
|
- FMCSA Portal login credentials (Login.gov email + password)
|
|
OR "I need help creating my Login.gov account"
|
|
|
|
Filing approach:
|
|
Option A: Client provides Login.gov credentials → admin files directly
|
|
Option B: Guided filing via screen share (Zoom/Teams) → $29 upcharge
|
|
Option C: We prepare a pre-filled PDF → client uploads themselves (cheapest)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
|
|
from scripts.workers.telegram_notify import notify_fulfillment_todo
|
|
|
|
LOG = logging.getLogger("workers.services.mcs150_update")
|
|
|
|
|
|
class MCS150UpdateHandler:
|
|
"""Handle MCS-150 biennial update orders."""
|
|
|
|
SERVICE_SLUG = "mcs150-update"
|
|
SERVICE_NAME = "MCS-150 Biennial Update"
|
|
|
|
async def process(self, order_data: dict) -> list[str]:
|
|
"""Entry point called by job_server. Delegates to handle()."""
|
|
order_number = order_data.get("order_number", order_data.get("name", ""))
|
|
return self.handle(order_data, order_number)
|
|
|
|
def handle(self, order_data: dict, order_number: str) -> list[str]:
|
|
"""
|
|
Process an MCS-150 update order.
|
|
|
|
Since FMCSA Portal requires Login.gov MFA, this creates an admin
|
|
todo rather than automating the filing directly.
|
|
"""
|
|
LOG.info("[%s] Processing MCS-150 update order", order_number)
|
|
|
|
intake = order_data.get("intake_data") or {}
|
|
if isinstance(intake, str):
|
|
intake = json.loads(intake)
|
|
|
|
slug = order_data.get("service_slug", self.SERVICE_SLUG)
|
|
dot_number = intake.get("dot_number", "")
|
|
entity_name = intake.get("entity_name", order_data.get("customer_name", ""))
|
|
customer_email = order_data.get("customer_email", "")
|
|
|
|
# The client signs the perjury certification before we file. When they
|
|
# sign, handle_esign_completed re-dispatches this handler with
|
|
# client_approved=True so we proceed past the signing checkpoint.
|
|
from scripts.workers.services.dot_esign import requires_signature, request_dot_esign
|
|
needs_signature = requires_signature(slug)
|
|
client_approved = bool(order_data.get("client_approved"))
|
|
admin_approved = bool(order_data.get("admin_approved"))
|
|
|
|
# Validate required fields
|
|
if not dot_number:
|
|
LOG.error("[%s] Missing DOT number in intake data", order_number)
|
|
return []
|
|
|
|
# Check current MCS-150 status via FMCSA API
|
|
mcs150_status = self._check_current_status(dot_number)
|
|
|
|
# Enrich the intake with the carrier's CURRENT registered data from the
|
|
# FMCSA carrier API. The MCS-150 biennial update re-confirms the carrier's
|
|
# existing FMCSA record, so the authoritative source for the form is the
|
|
# FMCSA census (legal name, address, EIN, fleet counts) -- the intake form
|
|
# only collects the DOT number + any changes. Customer-provided intake
|
|
# values take precedence over the census (so edits/changes win).
|
|
census = self._fetch_carrier_record(dot_number)
|
|
if census:
|
|
merged = {**census, **{k: v for k, v in intake.items() if v not in (None, "")}}
|
|
intake = merged
|
|
LOG.info("[%s] Enriched intake from FMCSA census (legal_name=%s)",
|
|
order_number, intake.get("legal_name"))
|
|
else:
|
|
LOG.warning("[%s] No FMCSA census data for DOT %s -- form may be sparse",
|
|
order_number, dot_number)
|
|
|
|
# Backfill the signer's printed name from the signed certification when
|
|
# the intake didn't carry one. For a typed signature the client typed
|
|
# their full legal name, which is exactly what belongs in the form's
|
|
# "print/type name" certification field (certifyName).
|
|
if not intake.get("signer_name"):
|
|
signed_name = self._signed_signer_name(order_number)
|
|
if signed_name:
|
|
intake["signer_name"] = signed_name
|
|
LOG.info("[%s] Backfilled signer_name from signed certification: %s",
|
|
order_number, signed_name)
|
|
|
|
# INTAKE-COMPLETENESS GATE. The FMCSA census gives us the carrier's
|
|
# registered name/address/fleet, but it cannot tell us the operational
|
|
# details the MCS-150 requires the filer to confirm/update: the
|
|
# operation classification (Q23), cargo types (Q24), current annual
|
|
# mileage (Q21 -- must be CURRENT), and a contact email. If those are
|
|
# missing we must NOT fabricate a filing; instead we ask the customer to
|
|
# complete a short, census-pre-filled intake and hold the order until
|
|
# they do. (New-registration / reactivation flows that have not yet
|
|
# signed also route through here.)
|
|
missing = self._missing_intake_fields(slug, intake)
|
|
if missing and not client_approved and not admin_approved:
|
|
self._request_intake_completion(
|
|
order_number, entity_name, customer_email, dot_number, missing)
|
|
LOG.info("[%s] Held for customer intake completion; missing=%s",
|
|
order_number, missing)
|
|
return []
|
|
|
|
# Step 1: Fill the official MCS-150 PDF
|
|
pdf_path = None
|
|
try:
|
|
from scripts.document_gen.templates.mcs150_pdf_filler import fill_mcs150
|
|
pdf_path = fill_mcs150(intake, order_number=order_number)
|
|
LOG.info("[%s] Filled MCS-150 PDF: %s", order_number, pdf_path)
|
|
except Exception as exc:
|
|
LOG.error("[%s] PDF fill failed: %s", order_number, exc)
|
|
|
|
# Step 2: Upload PDF to MinIO for storage
|
|
minio_path = None
|
|
pdf_url = None
|
|
if pdf_path:
|
|
try:
|
|
from minio import Minio
|
|
mc = Minio(
|
|
f"{os.environ.get('MINIO_ENDPOINT', 'minio')}:{os.environ.get('MINIO_PORT', '9000')}",
|
|
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
|
|
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
|
|
secure=False,
|
|
)
|
|
bucket = os.environ.get("MINIO_BUCKET", "performancewest")
|
|
minio_path = f"filings/mcs150/{order_number}/{os.path.basename(pdf_path)}"
|
|
mc.fput_object(bucket, minio_path, pdf_path,
|
|
content_type="application/pdf")
|
|
# Generate presigned URL for fax/web submission
|
|
from datetime import timedelta
|
|
pdf_url = mc.presigned_get_object(bucket, minio_path, expires=timedelta(hours=2))
|
|
LOG.info("[%s] PDF uploaded to MinIO: %s", order_number, minio_path)
|
|
except Exception as exc:
|
|
LOG.error("[%s] MinIO upload failed: %s", order_number, exc)
|
|
|
|
# If the client already signed (re-dispatched after signature), the
|
|
# signed PDF that the admin reviews must reflect the form we just
|
|
# (re)filled, not a stale earlier fill. Re-point the esign record at the
|
|
# fresh document and re-stamp the signature onto it.
|
|
if client_approved and minio_path:
|
|
self._restamp_signed_form(order_number, minio_path)
|
|
|
|
# Step 3: Check for photo ID
|
|
photo_id_path = None
|
|
if intake.get("photo_id_uploaded"):
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT minio_paths FROM id_upload_tokens WHERE order_number = %s AND front_uploaded = TRUE ORDER BY created_at DESC LIMIT 1",
|
|
(order_number,),
|
|
)
|
|
row = cur.fetchone()
|
|
conn.close()
|
|
if row and row[0]:
|
|
paths = json.loads(row[0]) if isinstance(row[0], str) else row[0]
|
|
if paths.get("front"):
|
|
photo_id_path = paths["front"]
|
|
LOG.info("[%s] Photo ID found: %s", order_number, photo_id_path)
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Could not retrieve photo ID: %s", order_number, exc)
|
|
|
|
# Step 4: SIGNATURE GATE. If this form needs the client's signed
|
|
# certification and they haven't signed yet, request the signature and
|
|
# STOP before filing. We never submit a perjury certification to FMCSA
|
|
# until the client has actually signed it.
|
|
if needs_signature and not client_approved:
|
|
request_dot_esign(
|
|
order_number=order_number,
|
|
slug=slug,
|
|
entity_name=entity_name,
|
|
customer_email=customer_email,
|
|
dot_number=dot_number,
|
|
document_minio_key=minio_path or "",
|
|
)
|
|
LOG.info("[%s] Awaiting client signature before filing %s — not submitting yet",
|
|
order_number, slug)
|
|
self._create_pending_signature_todo(
|
|
order_number, entity_name, dot_number, slug, minio_path, customer_email)
|
|
return [minio_path] if minio_path else []
|
|
|
|
# Past this point: either no signature is required for this service, or
|
|
# the client has signed (re-dispatched with client_approved=True).
|
|
|
|
# Step 4b: ADMIN VERIFICATION GATE. Before we submit anything to the
|
|
# government, a human verifies the prepared filing is correct (right
|
|
# form, right DOT#, right data, signed by the client). We STOP here and
|
|
# create an admin todo; an admin reviews the generated PDF and, when
|
|
# satisfied, re-dispatches this order with admin_approved=True (via the
|
|
# admin "Approve & Submit" action) to proceed to actual submission. This
|
|
# prevents a wrong/auto-generated filing from being submitted to FMCSA.
|
|
if not admin_approved:
|
|
self._set_fulfillment_status(order_number, "ready_to_file")
|
|
self._create_admin_review_todo(
|
|
order_number, entity_name, dot_number, slug, minio_path, customer_email,
|
|
client_signed=(needs_signature and client_approved),
|
|
)
|
|
LOG.info("[%s] Prepared + signed; HELD for admin verification before FMCSA submission",
|
|
order_number)
|
|
return [minio_path] if minio_path else []
|
|
|
|
# Step 5: Submit electronically (3x web → fax fallback)
|
|
# GUARD: Skip actual submission in dev/test environments
|
|
is_production = os.environ.get("NODE_ENV") == "production" or os.environ.get("ENV") == "production"
|
|
filing_result = None
|
|
if pdf_path and not is_production:
|
|
LOG.info("[%s] DEV MODE — skipping actual FMCSA submission", order_number)
|
|
filing_result = {
|
|
"success": True,
|
|
"method": "dev_skip",
|
|
"attested_pdf": None,
|
|
"submitted_at": datetime.now().isoformat(),
|
|
"screenshot_path": None,
|
|
"fax_log_id": None,
|
|
"error": None,
|
|
}
|
|
elif pdf_path:
|
|
try:
|
|
import asyncio
|
|
from scripts.workers.fax_sender import submit_filing
|
|
|
|
loop = asyncio.new_event_loop()
|
|
filing_result = loop.run_until_complete(submit_filing(
|
|
original_pdf_path=pdf_path,
|
|
pdf_url=pdf_url or "",
|
|
photo_id_path=photo_id_path,
|
|
order_number=order_number,
|
|
dot_number=dot_number,
|
|
mc_number=intake.get("mc_number", ""),
|
|
entity_name=entity_name,
|
|
document_type="MCS-150 Biennial Update",
|
|
web_retries=3,
|
|
web_retry_interval=600,
|
|
))
|
|
loop.close()
|
|
|
|
if filing_result and filing_result.get("success"):
|
|
LOG.info("[%s] Filing submitted via %s", order_number, filing_result.get("method"))
|
|
else:
|
|
LOG.warning("[%s] Electronic filing failed: %s", order_number,
|
|
filing_result.get("error") if filing_result else "unknown")
|
|
except Exception as exc:
|
|
LOG.error("[%s] Filing submission error: %s", order_number, exc)
|
|
|
|
# Persist durable submission EVIDENCE to MinIO (confirmation screenshot
|
|
# for web, attested PDF + fax log for fax) so we keep proof of every
|
|
# government submission -- the submitters only write to an ephemeral
|
|
# temp dir otherwise.
|
|
evidence = {}
|
|
if filing_result and filing_result.get("success") and filing_result.get("method") != "dev_skip":
|
|
evidence = self._upload_submission_evidence(order_number, slug, filing_result)
|
|
|
|
# Step 5: Update order status in database
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
cur = conn.cursor()
|
|
status_data = {
|
|
"mcs150_status": mcs150_status,
|
|
"pdf_minio_path": minio_path,
|
|
"filing_method": filing_result.get("method") if filing_result else None,
|
|
"filing_success": filing_result.get("success") if filing_result else False,
|
|
"fax_log_id": filing_result.get("fax_log_id") if filing_result else None,
|
|
"screenshot_path": filing_result.get("screenshot_path") if filing_result else None,
|
|
"submitted_at": filing_result.get("submitted_at") if filing_result else None,
|
|
"attested_pdf": filing_result.get("attested_pdf") if filing_result else None,
|
|
"evidence": evidence, # durable MinIO keys for proof of submission
|
|
}
|
|
cur.execute("""
|
|
UPDATE compliance_orders SET intake_data = jsonb_set(
|
|
COALESCE(intake_data, '{}'::jsonb),
|
|
'{filing_status}', %s::jsonb
|
|
) WHERE order_number = %s
|
|
""", (json.dumps(status_data), order_number))
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.error("[%s] DB update failed: %s", order_number, exc)
|
|
|
|
# Step 6: Create admin todo (for manual verification + customer delivery)
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
filed_method = filing_result.get("method", "pending") if filing_result else "pending"
|
|
filed_ok = filing_result.get("success", False) if filing_result else False
|
|
|
|
todo_title = f"MCS-150 {'Filed' if filed_ok else 'Review'} — {entity_name} (DOT {dot_number})"
|
|
todo_priority = "low" if filed_ok else "normal"
|
|
todo_description = (
|
|
f"MCS-150 for {entity_name} (DOT {dot_number}).\n"
|
|
f"Filing method: {filed_method}\n"
|
|
f"Status: {'SUBMITTED — verify in 5-10 days' if filed_ok else 'NEEDS MANUAL FILING'}\n"
|
|
f"Customer: {customer_email}\n"
|
|
f"PDF: {minio_path or 'not generated'}"
|
|
)
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO admin_todos (
|
|
title, category, priority, order_number, service_slug,
|
|
description, data, status
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
|
|
""", (
|
|
todo_title,
|
|
"filing",
|
|
todo_priority,
|
|
order_number,
|
|
self.SERVICE_SLUG,
|
|
todo_description,
|
|
json.dumps({
|
|
"order_number": order_number,
|
|
"dot_number": dot_number,
|
|
"entity_name": entity_name,
|
|
"filing_result": filing_result,
|
|
}),
|
|
))
|
|
conn.commit()
|
|
notify_fulfillment_todo(
|
|
title=todo_title,
|
|
order_number=order_number,
|
|
service_slug=self.SERVICE_SLUG,
|
|
priority=todo_priority,
|
|
description=todo_description,
|
|
)
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.error("[%s] Failed to create admin todo: %s", order_number, exc)
|
|
|
|
# Step 7: Send client status email
|
|
self._send_status_email(order_number, entity_name, dot_number, customer_email)
|
|
|
|
return [minio_path] if minio_path else []
|
|
|
|
def _missing_intake_fields(self, slug: str, intake: dict) -> list:
|
|
"""Return the customer-required intake fields still missing for this
|
|
service. These are the operational details the FMCSA census cannot
|
|
supply and that the MCS-150 requires the filer to confirm/update.
|
|
"""
|
|
# Per-variant required operational fields.
|
|
base_required = ["carrier_operation", "interstate_intrastate",
|
|
"annual_miles", "email"]
|
|
# Cargo only applies to carriers (not pure intermodal-equipment providers).
|
|
if not intake.get("is_intermodal_equipment_provider"):
|
|
base_required.append("cargo_types")
|
|
# New registrations / reactivations also need the signer + address.
|
|
if slug in ("dot-registration", "usdot-reactivation", "dot-full-compliance"):
|
|
base_required += ["signer_name", "signer_title", "address_street",
|
|
"address_city", "address_state", "address_zip"]
|
|
# Hazmat safety-permit (150B) needs states of operation.
|
|
if intake.get("hazmat") == "yes" and intake.get("needs_hmsp"):
|
|
base_required.append("operating_states")
|
|
|
|
missing = []
|
|
for f in base_required:
|
|
v = intake.get(f)
|
|
if v in (None, "", [], {}):
|
|
missing.append(f)
|
|
return missing
|
|
|
|
def _request_intake_completion(self, order_number, entity_name,
|
|
customer_email, dot_number, missing):
|
|
"""Email the customer a census-pre-filled intake link and create a
|
|
low-priority admin todo noting we're waiting on intake completion."""
|
|
domain = os.environ.get("PUBLIC_SITE_URL", "https://performancewest.net").rstrip("/")
|
|
intake_url = f"{domain}/order/dot-compliance?order={order_number}"
|
|
|
|
# Customer email (no paper/mail mechanics; public form names are fine).
|
|
try:
|
|
from scripts.workers.worker_email import send_worker_email
|
|
label = {
|
|
"carrier_operation": "how your company operates (for-hire, private, etc.)",
|
|
"interstate_intrastate": "interstate vs intrastate operation",
|
|
"annual_miles": "your current annual mileage",
|
|
"cargo_types": "the types of cargo you haul",
|
|
"email": "a contact email address",
|
|
"operating_states": "the states you operate in",
|
|
"signer_name": "the name of the company officer signing",
|
|
"signer_title": "the officer's title",
|
|
}
|
|
needed = ", ".join(label.get(m, m.replace("_", " ")) for m in missing)
|
|
subject = f"Action needed: confirm your MCS-150 details (DOT {dot_number})"
|
|
text = (
|
|
f"Hi,\n\n"
|
|
f"We're ready to prepare the MCS-150 update for {entity_name} "
|
|
f"(USDOT {dot_number}). We've pre-filled everything we can from "
|
|
f"your current FMCSA record. To finish, we just need you to "
|
|
f"confirm a few current details: {needed}.\n\n"
|
|
f"Please review and confirm here (it takes about 2 minutes):\n"
|
|
f"{intake_url}\n\n"
|
|
f"Once you submit, we'll prepare your filing for your signature "
|
|
f"and handle the rest.\n\n"
|
|
f"Thank you,\nPerformance West"
|
|
)
|
|
html = (
|
|
f"<p>Hi,</p>"
|
|
f"<p>We're ready to prepare the MCS-150 update for "
|
|
f"<strong>{entity_name}</strong> (USDOT {dot_number}). We've "
|
|
f"pre-filled everything we can from your current FMCSA record. "
|
|
f"To finish, we just need you to confirm a few current details: "
|
|
f"<strong>{needed}</strong>.</p>"
|
|
f"<p><a href=\"{intake_url}\">Review and confirm your details</a> "
|
|
f"(about 2 minutes).</p>"
|
|
f"<p>Once you submit, we'll prepare your filing for your "
|
|
f"signature and handle the rest.</p>"
|
|
f"<p>Thank you,<br>Performance West</p>"
|
|
)
|
|
if customer_email:
|
|
send_worker_email(customer_email, subject, html, text=text,
|
|
cc="justin@performancewest.net")
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Could not send intake-completion email: %s",
|
|
order_number, exc)
|
|
|
|
# Hold the order + admin todo.
|
|
self._set_fulfillment_status(order_number, "awaiting_intake")
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""INSERT INTO admin_todos (
|
|
title, category, priority, order_number, service_slug,
|
|
description, data, status
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')""",
|
|
(
|
|
f"Awaiting MCS-150 intake -- {entity_name} (DOT {dot_number})",
|
|
"intake", "normal", order_number, self.SERVICE_SLUG,
|
|
(f"Order {order_number} is missing customer-required intake "
|
|
f"fields: {', '.join(missing)}.\n"
|
|
f"A census-pre-filled intake link was emailed to "
|
|
f"{customer_email or 'the customer'}.\n"
|
|
f"Intake link: {intake_url}\n"
|
|
f"Filing auto-resumes once the customer submits."),
|
|
json.dumps({"order_number": order_number, "dot_number": dot_number,
|
|
"entity_name": entity_name, "awaiting_intake": True,
|
|
"missing_fields": missing}),
|
|
),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Could not create intake-pending todo: %s",
|
|
order_number, exc)
|
|
|
|
def _restamp_signed_form(self, order_number: str, document_key: str) -> None:
|
|
"""Point the signed esign record at ``document_key`` (the freshly filled
|
|
form) and re-stamp the signature onto it, so the signed PDF an admin
|
|
reviews reflects the current, complete form."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT id FROM esign_records
|
|
WHERE order_number = %s AND status = 'signed'
|
|
ORDER BY signed_at DESC LIMIT 1""",
|
|
(order_number,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return
|
|
rec_id = row[0]
|
|
# Re-point at the fresh document and clear the stale signed
|
|
# key so the stamper regenerates it.
|
|
cur.execute(
|
|
"""UPDATE esign_records
|
|
SET document_minio_key = %s,
|
|
signed_document_minio_key = NULL,
|
|
updated_at = NOW()
|
|
WHERE id = %s""",
|
|
(document_key, rec_id),
|
|
)
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
from scripts.workers.services.esign_stamp import stamp_esign_document
|
|
signed_key = stamp_esign_document(int(rec_id))
|
|
if signed_key:
|
|
LOG.info("[%s] Re-stamped signature onto fresh form -> %s",
|
|
order_number, signed_key)
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Re-stamp of signed form failed: %s", order_number, exc)
|
|
|
|
def _signed_signer_name(self, order_number: str) -> str:
|
|
"""Return the full name the client typed when signing the perjury
|
|
certification for this order (empty if no typed signature on file)."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT signature_data FROM esign_records
|
|
WHERE order_number = %s AND status = 'signed'
|
|
AND signature_type = 'typed'
|
|
ORDER BY signed_at DESC LIMIT 1""",
|
|
(order_number,),
|
|
)
|
|
row = cur.fetchone()
|
|
return (row[0] or "").strip() if row else ""
|
|
finally:
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Could not read signed signer name: %s", order_number, exc)
|
|
return ""
|
|
|
|
def _fetch_fmcsa_carrier(self, dot_number: str) -> dict:
|
|
"""Fetch the raw FMCSA carrier census record for a DOT number."""
|
|
try:
|
|
import urllib.request
|
|
api_key = os.environ.get("FMCSA_API_KEY", "")
|
|
if not api_key:
|
|
return {}
|
|
url = f"https://mobile.fmcsa.dot.gov/qc/services/carriers/{dot_number}?webKey={api_key}"
|
|
req = urllib.request.Request(url, headers={"Accept": "application/json"})
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
data = json.loads(resp.read())
|
|
return data.get("content", {}).get("carrier", {}) or {}
|
|
except Exception as exc:
|
|
LOG.warning("FMCSA carrier fetch failed for %s: %s", dot_number, exc)
|
|
return {}
|
|
|
|
def _fetch_carrier_record(self, dot_number: str) -> dict:
|
|
"""Return the carrier's current FMCSA data mapped to the intake keys the
|
|
PDF filler expects (so the biennial-update form is pre-filled with the
|
|
carrier's existing registered data). Empty dict if unavailable.
|
|
"""
|
|
c = self._fetch_fmcsa_carrier(dot_number)
|
|
if not c:
|
|
return {}
|
|
|
|
def s(v):
|
|
return "" if v is None else str(v)
|
|
|
|
out: dict = {
|
|
"legal_name": s(c.get("legalName")),
|
|
"dba_name": s(c.get("dbaName")),
|
|
"dot_number": s(c.get("dotNumber") or dot_number),
|
|
"ein": s(c.get("ein")),
|
|
"address_street": s(c.get("phyStreet")),
|
|
"address_city": s(c.get("phyCity")),
|
|
"address_state": s(c.get("phyState")),
|
|
"address_zip": s(c.get("phyZipcode")),
|
|
"phone": s(c.get("telephone") or c.get("phone")),
|
|
"power_units": s(c.get("totalPowerUnits")),
|
|
"drivers": s(c.get("totalDrivers")),
|
|
}
|
|
|
|
# Interstate vs intrastate (form Q26/27). The census exposes this as
|
|
# carrierOperation.carrierOperationCode: A=Interstate, B=Intrastate
|
|
# Hazmat, C=Intrastate Non-Hazmat.
|
|
op = c.get("carrierOperation") or {}
|
|
code = (op.get("carrierOperationCode") or "").strip().upper()
|
|
if code == "A":
|
|
out["interstate_intrastate"] = "interstate"
|
|
elif code == "B":
|
|
out["interstate_intrastate"] = "intrastate_hazmat"
|
|
elif code == "C":
|
|
out["interstate_intrastate"] = "intrastate_non_hazmat"
|
|
|
|
# Passenger carrier flag (form Q29 passenger compliance cert).
|
|
if s(c.get("isPassengerCarrier")).upper() == "Y":
|
|
out["is_passenger_carrier"] = "yes"
|
|
|
|
return {k: v for k, v in out.items() if v}
|
|
|
|
def _check_current_status(self, dot_number: str) -> str:
|
|
"""Check current MCS-150 status via FMCSA API."""
|
|
c = self._fetch_fmcsa_carrier(dot_number)
|
|
if not c:
|
|
return "Could not check FMCSA status"
|
|
return (
|
|
f"Status: {c.get('statusCode', '?')}, "
|
|
f"Allowed: {c.get('allowedToOperate', '?')}, "
|
|
f"MCS-150 Outdated: {c.get('mcs150Outdated', '?')}"
|
|
)
|
|
|
|
def _create_pending_signature_todo(self, order_number, entity_name, dot_number,
|
|
slug, minio_path, customer_email):
|
|
"""Low-priority admin todo noting we're waiting on the client's signature."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
todo_title = f"Awaiting client signature — {entity_name} (DOT {dot_number})"
|
|
todo_description = (
|
|
f"{slug} for {entity_name} (DOT {dot_number}).\n"
|
|
f"Status: AWAITING CLIENT SIGNATURE before filing.\n"
|
|
f"Signing link emailed to {customer_email}.\n"
|
|
f"PDF: {minio_path or 'not generated'}\n"
|
|
f"Filing auto-resumes once the client signs."
|
|
)
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO admin_todos (
|
|
title, category, priority, order_number, service_slug,
|
|
description, data, status
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
|
|
""",
|
|
(
|
|
todo_title,
|
|
"filing", "low", order_number, slug,
|
|
todo_description,
|
|
json.dumps({"order_number": order_number, "dot_number": dot_number,
|
|
"entity_name": entity_name, "awaiting_signature": True}),
|
|
),
|
|
)
|
|
conn.commit()
|
|
notify_fulfillment_todo(
|
|
title=todo_title,
|
|
order_number=order_number,
|
|
service_slug=slug,
|
|
priority="low",
|
|
description=todo_description,
|
|
)
|
|
conn.close()
|
|
LOG.info("[%s] Pending-signature todo created", order_number)
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Failed to create pending-signature todo: %s", order_number, exc)
|
|
|
|
def _set_fulfillment_status(self, order_number, status):
|
|
"""Persist the fulfillment_status on the compliance order (e.g.
|
|
'ready_to_file' = prepared + signed, held for admin verification)."""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE compliance_orders SET fulfillment_status=%s, "
|
|
"fulfillment_status_at=now() WHERE order_number=%s",
|
|
(status, order_number),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Failed to set fulfillment_status=%s: %s", order_number, status, exc)
|
|
|
|
def _create_admin_review_todo(self, order_number, entity_name, dot_number,
|
|
slug, minio_path, customer_email, client_signed):
|
|
"""High-priority admin todo: verify the prepared filing BEFORE submission.
|
|
|
|
The order is held at fulfillment_status='ready_to_file'. An admin opens
|
|
the generated PDF (minio_path), confirms it is correct (right form, DOT#,
|
|
data, signature), and then re-dispatches with admin_approved=True via the
|
|
admin 'Approve & Submit' action to proceed to actual FMCSA submission.
|
|
"""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
sig = "client SIGNED" if client_signed else "no signature required"
|
|
todo_title = f"VERIFY before filing — {entity_name} (DOT {dot_number})"
|
|
todo_description = (
|
|
f"{slug} for {entity_name} (DOT {dot_number}).\n"
|
|
f"Status: READY TO FILE — held for admin verification ({sig}).\n"
|
|
f"ACTION: review the prepared PDF, confirm it is correct, then\n"
|
|
f"approve & submit (re-dispatch with admin_approved=true).\n"
|
|
f"We do NOT submit to FMCSA until you approve.\n"
|
|
f"PDF: {minio_path or 'not generated'}\n"
|
|
f"Customer: {customer_email}"
|
|
)
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO admin_todos (
|
|
title, category, priority, order_number, service_slug,
|
|
description, data, status
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
|
|
""",
|
|
(
|
|
todo_title,
|
|
"filing", "high", order_number, slug,
|
|
todo_description,
|
|
json.dumps({"order_number": order_number, "dot_number": dot_number,
|
|
"entity_name": entity_name, "awaiting_admin_review": True,
|
|
"client_signed": client_signed, "pdf_minio_path": minio_path}),
|
|
),
|
|
)
|
|
conn.commit()
|
|
notify_fulfillment_todo(
|
|
title=todo_title,
|
|
order_number=order_number,
|
|
service_slug=slug,
|
|
priority="high",
|
|
description=todo_description,
|
|
view_url=self._presigned_view_url(minio_path),
|
|
)
|
|
conn.close()
|
|
LOG.info("[%s] Admin-review (pre-submission) todo created", order_number)
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Failed to create admin-review todo: %s", order_number, exc)
|
|
|
|
@staticmethod
|
|
def _presigned_view_url(minio_path, days=7):
|
|
"""Presigned, browser-openable URL to a MinIO object via the PUBLIC
|
|
endpoint (minio.performancewest.net), so operator Telegram alerts can
|
|
link directly to the prepared PDF for review. Returns "" on failure.
|
|
The public host is IP-allowlisted at nginx, so the link only opens from
|
|
an allowlisted office/admin IP -- which is the intended audience.
|
|
"""
|
|
if not minio_path:
|
|
return ""
|
|
try:
|
|
from minio import Minio
|
|
from datetime import timedelta
|
|
pub = os.environ.get("MINIO_PUBLIC_ENDPOINT", "minio.performancewest.net")
|
|
mc = Minio(
|
|
pub,
|
|
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
|
|
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
|
|
secure=True,
|
|
region=os.environ.get("MINIO_REGION", "us-east-1"),
|
|
)
|
|
bucket = os.environ.get("MINIO_BUCKET", "performancewest")
|
|
return mc.presigned_get_object(bucket, minio_path, expires=timedelta(days=days))
|
|
except Exception as exc: # noqa: BLE001
|
|
LOG.warning("Could not presign view URL for %s: %s", minio_path, exc)
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _upload_submission_evidence(order_number, slug, filing_result):
|
|
"""Persist submission proof (confirmation screenshot for web, attested
|
|
PDF for fax) to MinIO so we keep durable evidence of every government
|
|
submission. The submitters write screenshots to an ephemeral temp dir;
|
|
we copy the proof into MinIO under filings/<slug>/<order>/evidence/ and
|
|
return the MinIO keys to store on the order.
|
|
"""
|
|
evidence = {}
|
|
try:
|
|
from minio import Minio
|
|
mc = Minio(
|
|
f"{os.environ.get('MINIO_ENDPOINT', 'minio')}:{os.environ.get('MINIO_PORT', '9000')}",
|
|
access_key=os.environ.get("MINIO_ACCESS_KEY", ""),
|
|
secret_key=os.environ.get("MINIO_SECRET_KEY", ""),
|
|
secure=os.environ.get("MINIO_SECURE", "false").lower() == "true",
|
|
)
|
|
bucket = os.environ.get("MINIO_BUCKET", "performancewest")
|
|
base = f"filings/{slug}/{order_number}/evidence"
|
|
|
|
def _put(local_path, name, content_type):
|
|
if local_path and os.path.exists(local_path):
|
|
key = f"{base}/{name}"
|
|
mc.fput_object(bucket, key, local_path, content_type=content_type)
|
|
return key
|
|
return None
|
|
|
|
shot = filing_result.get("screenshot_path") or filing_result.get("pre_submit_screenshot")
|
|
k = _put(shot, "fmcsa_confirmation.png", "image/png")
|
|
if k:
|
|
evidence["confirmation_screenshot"] = k
|
|
att = filing_result.get("attested_pdf")
|
|
k = _put(att, "attested_filing.pdf", "application/pdf")
|
|
if k:
|
|
evidence["attested_pdf_minio"] = k
|
|
if filing_result.get("fax_log_id"):
|
|
evidence["fax_log_id"] = filing_result["fax_log_id"]
|
|
if evidence:
|
|
LOG.info("[%s] Submission evidence saved to MinIO: %s",
|
|
order_number, list(evidence.keys()))
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Failed to persist submission evidence: %s", order_number, exc)
|
|
return evidence
|
|
|
|
def _send_status_email(self, order_number, entity_name, dot_number, customer_email):
|
|
"""Send client an email that we're working on their update."""
|
|
if not customer_email:
|
|
return
|
|
try:
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
|
|
body = (
|
|
f"Hi,\n\n"
|
|
f"We've received your MCS-150 biennial update order for "
|
|
f"{entity_name} (DOT# {dot_number}).\n\n"
|
|
f"Order: {order_number}\n\n"
|
|
f"Our team will review your intake information and complete "
|
|
f"the filing within 1-2 business days. We'll send you a "
|
|
f"confirmation with your updated company snapshot once it's done.\n\n"
|
|
f"If we need your FMCSA Portal login credentials, we'll reach "
|
|
f"out via a separate secure email.\n\n"
|
|
f"Questions? Reply to this email or call (888) 411-0383.\n\n"
|
|
f"Performance West Inc.\n"
|
|
f"DOT Compliance Services\n"
|
|
)
|
|
|
|
msg = MIMEText(body)
|
|
msg["Subject"] = f"MCS-150 Update In Progress — {entity_name} (DOT {dot_number})"
|
|
msg["From"] = "noreply@performancewest.net"
|
|
msg["To"] = customer_email
|
|
|
|
import os as _smtp_os
|
|
with smtplib.SMTP(_smtp_os.getenv("SMTP_HOST", "co.carrierone.com"), int(_smtp_os.getenv("SMTP_PORT", "587")), timeout=30) as s:
|
|
s.starttls()
|
|
_u, _p = _smtp_os.getenv("SMTP_USER", ""), _smtp_os.getenv("SMTP_PASS", "")
|
|
if _u and _p:
|
|
s.login(_u, _p)
|
|
s.sendmail(msg["From"], [customer_email], msg.as_string())
|
|
|
|
LOG.info("[%s] Status email sent to %s", order_number, customer_email)
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Failed to send status email: %s", order_number, exc)
|