"""VitalPBX Fax Sender. Sends PDFs via VitalPBX API and polls for delivery confirmation. After confirmed delivery, generates an attestation cover page and prepends it to the original document for the customer's records. Usage: from scripts.workers.fax_sender import send_fax, send_and_attest # Just send result = await send_fax(pdf_url, recipient="12023663477") # Send, wait for confirmation, then generate attested copy attested_path = await send_and_attest( pdf_url=presigned_url, original_pdf_path="/tmp/mcs150_filled.pdf", recipient="12023663477", order_number="CO-12345", dot_number="1234567", entity_name="ACME TRUCKING INC", document_type="MCS-150 Biennial Update", ) """ from __future__ import annotations import asyncio import json import logging import os from datetime import datetime, timezone import httpx LOG = logging.getLogger("workers.fax_sender") # VitalPBX configuration PBX_HOST = os.getenv("VITALPBX_HOST", "pbx.carrierone.com") PBX_API_KEY = os.getenv("VITALPBX_API_KEY", "cc50de7d854f344c596855435ce40821") PBX_TENANT = os.getenv("VITALPBX_TENANT", "VitalPBX") PBX_FAX_ID = int(os.getenv("VITALPBX_FAX_ID", "1")) # FMCSA fax number FMCSA_FAX = "12023663477" async def send_fax( pdf_url: str, recipient: str, resolution: str = "medium", retries: int = 2, retry_time: int = 300, ) -> dict: """Send a fax via VitalPBX API. Args: pdf_url: Public URL to the PDF to fax. recipient: Phone number to fax to (e.g. "12023663477"). resolution: "standard", "medium", or "high". retries: Number of retry attempts on failure. retry_time: Seconds between retries. Returns: dict with keys: success, log_id, device, status, error """ # Guard: don't fax in dev/test is_prod = os.getenv("NODE_ENV") == "production" or os.getenv("ENV") == "production" if not is_prod: LOG.info("[fax] DEV MODE — skipping fax to %s", recipient) return {"success": True, "log_id": "dev-skip", "device": "dev", "status": "skipped", "error": None} url = f"https://{PBX_HOST}/api/v2/virtual_faxes/{PBX_FAX_ID}/send" headers = { "app-key": PBX_API_KEY, "tenant": PBX_TENANT, } form_data = { "url": pdf_url, "recipients": recipient, "resolution": resolution, "retries": str(retries), "retry_time": str(retry_time), } try: async with httpx.AsyncClient(verify=False, timeout=30) as client: resp = await client.post(url, headers=headers, data=form_data) data = resp.json() if data.get("status") == "success": log_ids = data.get("data", {}).get("logs", []) log_id = log_ids[0] if log_ids else None LOG.info("[fax] Sent to %s — log_id=%s, device=%s", recipient, log_id, data["data"].get("device")) return { "success": True, "log_id": log_id, "device": data["data"].get("device"), "file": data["data"].get("file"), "status": "queued", "error": None, } else: LOG.error("[fax] API error: %s", data) return { "success": False, "log_id": None, "status": "error", "error": data.get("message", "Unknown API error"), } except Exception as exc: LOG.error("[fax] Send failed: %s", exc) return { "success": False, "log_id": None, "status": "error", "error": str(exc), } async def poll_fax_status(log_id: str | int, timeout: int = 300, interval: int = 10) -> dict: """Poll VitalPBX for fax delivery status. Args: log_id: Fax log ID from send_fax(). timeout: Max seconds to wait. interval: Seconds between polls. Returns: dict with keys: delivered, status, error, duration """ url = f"https://{PBX_HOST}/api/v2/virtual_faxes/log/{log_id}" headers = { "app-key": PBX_API_KEY, "tenant": PBX_TENANT, } start = datetime.now(timezone.utc) elapsed = 0 while elapsed < timeout: try: async with httpx.AsyncClient(verify=False, timeout=15) as client: resp = await client.get(url, headers=headers) data = resp.json() fax = data.get("data", {}) status = fax.get("status", "").lower() error = fax.get("error") if status == "sent" or (status == "sent" and error == "OK"): LOG.info("[fax] Log %s delivered in %ds", log_id, elapsed) return { "delivered": True, "status": "sent", "error": error, "duration": elapsed, } elif status == "failed": LOG.warning("[fax] Log %s failed: %s", log_id, error) return { "delivered": False, "status": "failed", "error": error, "duration": elapsed, } # Still sending/queued — keep polling except Exception as exc: LOG.debug("[fax] Poll error (will retry): %s", exc) await asyncio.sleep(interval) elapsed = int((datetime.now(timezone.utc) - start).total_seconds()) LOG.warning("[fax] Log %s timed out after %ds", log_id, timeout) return { "delivered": False, "status": "timeout", "error": f"Timed out after {timeout}s", "duration": timeout, } async def submit_filing( original_pdf_path: str, pdf_url: str = "", photo_id_path: str | None = None, order_number: str = "", dot_number: str = "", mc_number: str = "", entity_name: str = "", document_type: str = "MCS-150 Biennial Update", recipient_name: str = "Federal Motor Carrier Safety Administration (FMCSA)", web_retries: int = 3, web_retry_interval: int = 600, ) -> dict: """Submit a filing to FMCSA: try web submission first, fall back to fax. Attempts electronic submission via ask.fmcsa.dot.gov up to `web_retries` times, waiting `web_retry_interval` seconds between attempts. If all web attempts fail, falls back to faxing to FMCSA at 202-366-3477. After successful submission (either method), generates an attestation cover page with digital signature and prepends it to the original PDF. Returns: dict with keys: success, method, attested_pdf, submitted_at, screenshot_path, fax_log_id, error """ from scripts.workers.services.fmcsa_web_submitter import submit_mcs150 # ── Phase 1: Try web submission (up to 3 attempts, 10 min apart) ── for attempt in range(1, web_retries + 1): LOG.info("[filing] Web submission attempt %d/%d for DOT %s", attempt, web_retries, dot_number) web_result = await submit_mcs150( pdf_path=original_pdf_path, photo_id_path=photo_id_path, dot_number=dot_number, mc_number=mc_number, entity_name=entity_name, ) if web_result["success"]: LOG.info("[filing] Web submission succeeded for DOT %s on attempt %d", dot_number, attempt) # Generate attestation attested = await _generate_attestation( original_pdf_path=original_pdf_path, order_number=order_number, dot_number=dot_number, entity_name=entity_name, document_type=document_type, recipient_name=recipient_name, submitted_at=datetime.fromisoformat(web_result["submitted_at"]), ) return { "success": True, "method": "web", "attested_pdf": attested, "submitted_at": web_result["submitted_at"], "screenshot_path": web_result.get("screenshot_path") or web_result.get("pre_submit_screenshot"), "fax_log_id": None, "error": None, } LOG.warning("[filing] Web attempt %d failed: %s", attempt, web_result["error"]) if attempt < web_retries: LOG.info("[filing] Waiting %ds before retry...", web_retry_interval) await asyncio.sleep(web_retry_interval) # ── Phase 2: Fall back to fax ── LOG.info("[filing] All %d web attempts failed, falling back to fax for DOT %s", web_retries, dot_number) if not pdf_url: return { "success": False, "method": "none", "attested_pdf": None, "submitted_at": None, "screenshot_path": None, "fax_log_id": None, "error": "Web submission failed and no PDF URL provided for fax fallback", } fax_result = await send_and_attest( pdf_url=pdf_url, original_pdf_path=original_pdf_path, order_number=order_number, dot_number=dot_number, entity_name=entity_name, document_type=document_type, recipient_name=recipient_name, ) return { "success": fax_result["success"], "method": "fax" if fax_result["success"] else "none", "attested_pdf": fax_result.get("attested_pdf"), "submitted_at": fax_result.get("submitted_at"), "screenshot_path": None, "fax_log_id": fax_result.get("fax_log_id"), "error": fax_result.get("error"), } async def _generate_attestation( original_pdf_path: str, order_number: str, dot_number: str, entity_name: str, document_type: str, recipient_name: str, submitted_at: datetime, ) -> str | None: """Generate attestation cover page and prepend to original PDF.""" try: from scripts.document_gen.templates.filing_attestation import ( generate_attestation_page, prepend_attestation, ) attest_pdf = generate_attestation_page( order_number=order_number, dot_number=dot_number, entity_name=entity_name, document_type=document_type, submitted_at=submitted_at, recipient_name=recipient_name, ) return prepend_attestation(attest_pdf, original_pdf_path) except Exception as exc: LOG.error("[filing] Attestation generation failed: %s", exc) return None async def send_and_attest( pdf_url: str, original_pdf_path: str, recipient: str = FMCSA_FAX, order_number: str = "", dot_number: str = "", entity_name: str = "", document_type: str = "MCS-150 Biennial Update", recipient_name: str = "Federal Motor Carrier Safety Administration (FMCSA)", ) -> dict: """Send a fax, wait for delivery, then generate attested copy. Returns: dict with keys: success, attested_pdf, fax_log_id, submitted_at, error """ # 1. Send the fax send_result = await send_fax(pdf_url, recipient) if not send_result["success"]: return { "success": False, "attested_pdf": None, "fax_log_id": None, "submitted_at": None, "error": send_result["error"], } log_id = send_result["log_id"] # 2. Poll for delivery confirmation poll_result = await poll_fax_status(log_id, timeout=300, interval=10) if not poll_result["delivered"]: return { "success": False, "attested_pdf": None, "fax_log_id": log_id, "submitted_at": None, "error": f"Fax delivery failed: {poll_result['error']}", } # 3. Fax delivered — generate attestation submitted_at = datetime.now(timezone.utc) attested = await _generate_attestation( original_pdf_path=original_pdf_path, order_number=order_number, dot_number=dot_number, entity_name=entity_name, document_type=document_type, recipient_name=recipient_name, submitted_at=submitted_at, ) return { "success": True, "attested_pdf": attested, "fax_log_id": log_id, "submitted_at": submitted_at.isoformat(), "error": None if attested else "Fax sent but attestation generation failed", }