new-site/scripts/workers/services/hazmat_phmsa.py
justin 28b1af341d Wire fulfillment alerts to Telegram + surface order progress in portal + even out ERPNext sync
Telegram notifications:
- Add shared scripts/workers/telegram_notify.py (send_telegram, notify_fulfillment_todo,
  create_admin_todo) so every worker alerts the operator the same way; fire-and-forget.
- Fire notify_fulfillment_todo after each admin_todos insert across all 8 service
  handlers (9 sites) so no fulfillment task waits unseen.
  (Orders + quotes + tickets already notified via checkout/quotes/tickets routes.)

Client portal order progress:
- order-timeline: derive real per-step status from live signals (payment paid,
  e-signature signed, fulfillment_status) instead of a static template; add
  current_step to the response.
- Extract pure applyLiveStatus into order-timeline-status.ts (DB-free) + unit test
  (api/test/test_timeline_status.ts, 8 cases).
- portal /me now returns compliance_orders.fulfillment_status.
- Dashboard renders a client-safe Progress badge (In progress / Action needed /
  Filed-awaiting-confirmation / Completed); batches show the most actionable status.
  No back-office mechanics exposed.

ERPNext sync parity:
- Create a Sales Order for formation and fcc_carrier_registration orders (previously
  only canada_crtc + compliance synced); write erpnext_sales_order back to each table.
  Non-blocking, matches existing pattern.

Verified: API tsc clean, timeline unit tests 8/8, Astro build 58 pages,
cms10114/ink/paper_batch Python tests still green, no mechanics leaks.
2026-06-07 03:17:46 -05:00

192 lines
8.2 KiB
Python

"""
Hazmat / PHMSA Registration Service Handler.
Carriers that transport placardable quantities of hazardous materials must
register annually with PHMSA (Pipeline and Hazardous Materials Safety
Administration) under 49 CFR Part 107 Subpart G. This is separate from the
USDOT/FMCSA registration and from any HM safety permit.
Service slug: hazmat-phmsa
Price: $149 (admin-assisted)
Gov fee: PHMSA registration fee (varies by carrier size; ~$25 + $250-$3,000
processing fee depending on revenue/size) — billed at cost.
This is admin-assisted: we collect the carrier's hazmat profile via the intake
form, then file the PHMSA registration (Form via https://hazmatonline.phmsa.dot.gov)
on the carrier's behalf and send the registration certificate.
Intake data needed:
- DOT number
- Legal name / DBA
- Business address + contact
- EIN
- Hazmat classes / divisions transported
- Whether they transport in bulk packaging
- Estimated annual gross revenue (drives the PHMSA fee bracket)
- Number of employees
- Whether a small business (SBA size standard)
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime
from scripts.workers.telegram_notify import notify_fulfillment_todo
LOG = logging.getLogger("workers.services.hazmat_phmsa")
# PHMSA registration fee brackets (49 CFR 107.612). The processing fee depends on
# whether the registrant qualifies as a small business / not-for-profit.
PHMSA_FEE_INFO = {
"small_business": {"registration_fee_cents": 2500, "processing_fee_cents": 25000}, # $25 + $250
"not_small": {"registration_fee_cents": 2500, "processing_fee_cents": 300000}, # $25 + $3,000
"portal": "https://hazmatonline.phmsa.dot.gov",
"regulation": "49 CFR Part 107 Subpart G",
}
class HazmatPHMSAHandler:
"""Handle PHMSA hazmat registration orders (admin-assisted)."""
SERVICE_SLUG = "hazmat-phmsa"
SERVICE_NAME = "PHMSA Hazmat Registration"
async def process(self, order_data: dict) -> list[str]:
"""Entry point called by job_server. Delegates to handle()."""
order_number = order_data.get("order_number", order_data.get("name", ""))
return self.handle(order_data, order_number)
def handle(self, order_data: dict, order_number: str) -> list[str]:
"""Process a PHMSA hazmat registration order."""
LOG.info("[%s] Processing PHMSA hazmat registration order", order_number)
intake = order_data.get("intake_data") or {}
if isinstance(intake, str):
intake = json.loads(intake)
dot_number = intake.get("dot_number", "")
entity_name = intake.get("entity_name", intake.get("legal_name",
order_data.get("customer_name", "")))
customer_email = order_data.get("customer_email", "")
# Determine fee bracket from small-business flag.
is_small = bool(intake.get("small_business"))
fee = PHMSA_FEE_INFO["small_business" if is_small else "not_small"]
hazmat_classes = intake.get("hazmat_classes", [])
bulk = bool(intake.get("bulk_packaging"))
steps = [
f"1. Log into PHMSA portal: {PHMSA_FEE_INFO['portal']}",
"2. Start a new registration (or renewal) under 49 CFR Part 107 Subpart G",
"3. Enter carrier identity (legal name, DOT#, EIN, address)",
f"4. Enter hazmat classes/divisions: {', '.join(hazmat_classes) if hazmat_classes else 'PER INTAKE'}",
f"5. Indicate bulk packaging: {'YES' if bulk else 'NO'}",
f"6. Select fee bracket: {'small business ($25 + $250)' if is_small else 'standard ($25 + $3,000)'}",
"7. Pay the PHMSA registration + processing fee (billed to client at cost)",
"8. Download the Certificate of Registration",
"9. Send certificate + registration number to client; set renewal reminder (annual)",
]
todo_data = {
"order_number": order_number,
"service": self.SERVICE_NAME,
"service_slug": self.SERVICE_SLUG,
"dot_number": dot_number,
"entity_name": entity_name,
"customer_email": customer_email,
"hazmat_classes": hazmat_classes,
"bulk_packaging": bulk,
"small_business": is_small,
"estimated_gov_fee_cents": fee["registration_fee_cents"] + fee["processing_fee_cents"],
"intake_data": intake,
"steps": steps,
}
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
todo_title = (
f"PHMSA Hazmat Registration — {entity_name} (DOT {dot_number})"
if dot_number else f"PHMSA Hazmat Registration — {entity_name}"
)
todo_description = (
f"Service: {self.SERVICE_NAME}\n"
f"DOT: {dot_number}\n"
f"Hazmat classes: {', '.join(hazmat_classes) if hazmat_classes else 'see intake'}\n"
f"Bulk packaging: {'Yes' if bulk else 'No'}\n"
f"Small business: {'Yes' if is_small else 'No'}\n"
f"Est. gov fee: ${(fee['registration_fee_cents'] + fee['processing_fee_cents']) / 100:,.2f}\n"
f"Customer: {customer_email}\n\n"
f"Steps:\n" + "\n".join(steps)
)
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO admin_todos (
title, category, priority, order_number, service_slug,
description, data, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending')
""", (
todo_title,
"filing",
"high",
order_number,
self.SERVICE_SLUG,
todo_description,
json.dumps(todo_data),
))
conn.commit()
notify_fulfillment_todo(
title=todo_title,
order_number=order_number,
service_slug=self.SERVICE_SLUG,
priority="high",
description=todo_description,
)
finally:
conn.close()
LOG.info("[%s] Admin todo created for PHMSA hazmat registration", order_number)
except Exception as exc:
LOG.error("[%s] Failed to create admin todo: %s", order_number, exc)
self._send_status_email(order_number, entity_name, dot_number, customer_email)
return []
def _send_status_email(self, order_number, entity_name, dot_number, customer_email):
"""Send the client a status email."""
if not customer_email:
return
try:
import smtplib
from email.mime.text import MIMEText
dot_line = f" (DOT# {dot_number})" if dot_number else ""
body = (
f"Hi,\n\n"
f"We've received your PHMSA Hazmat Registration order for "
f"{entity_name}{dot_line}.\n\n"
f"Order: {order_number}\n\n"
f"Our team will prepare and file your PHMSA registration "
f"(49 CFR Part 107) and send you the Certificate of Registration "
f"once complete, typically within 1-2 business days. The PHMSA "
f"government fee is billed at cost and depends on your business size.\n\n"
f"Questions? Reply to this email or call (888) 411-0383.\n\n"
f"Performance West Inc.\n"
f"DOT Compliance Services\n"
)
msg = MIMEText(body)
msg["Subject"] = f"PHMSA Hazmat Registration In Progress — {entity_name}{dot_line}"
msg["From"] = "noreply@performancewest.net"
msg["To"] = customer_email
with smtplib.SMTP("localhost", 25) as s:
s.sendmail(msg["From"], [customer_email], msg.as_string())
LOG.info("[%s] Status email sent to %s", order_number, customer_email)
except Exception as exc:
LOG.warning("[%s] Failed to send status email: %s", order_number, exc)