Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2090 lines
91 KiB
Python
2090 lines
91 KiB
Python
"""
|
|
Worker Job Server — HTTP API that receives jobs from the Express API webhook receiver.
|
|
|
|
Instead of polling ERPNext for queued orders, this server receives webhook-triggered
|
|
job dispatches and executes them. ERPNext is the BPM orchestrator; we are the executor.
|
|
|
|
Endpoints:
|
|
POST /jobs — Receive a job dispatch
|
|
GET /health — Health check
|
|
GET /jobs/active — List currently running jobs
|
|
|
|
Jobs are executed in a background thread pool to avoid blocking the HTTP server.
|
|
Each job type maps to a handler function.
|
|
|
|
Architecture:
|
|
ERPNext workflow transition
|
|
→ ERPNext webhook
|
|
→ Express API /api/v1/webhooks/formation/*
|
|
→ HTTP POST to this server /jobs
|
|
→ Handler executes (name search, filing, EIN, doc gen, delivery)
|
|
→ Handler calls ERPNext API to update status / advance workflow
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from typing import Any
|
|
|
|
LOG = logging.getLogger("workers.job_server")
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
)
|
|
|
|
PORT = int(os.getenv("WORKER_PORT", "8090"))
|
|
MAX_WORKERS = int(os.getenv("MAX_WORKERS", "3"))
|
|
|
|
# Job tracking
|
|
active_jobs: dict[str, dict] = {}
|
|
completed_jobs: list[dict] = [] # Last 100 completed
|
|
|
|
# Thread pool for job execution
|
|
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix="job")
|
|
|
|
|
|
# ==========================================================================
|
|
# Schedule gate — check if portal is open before attempting web automation
|
|
# ==========================================================================
|
|
|
|
def _check_schedule(state_code: str, order_name: str) -> dict | None:
|
|
"""
|
|
Check if the portal for the given state/province is currently available.
|
|
Returns None if open (proceed), or a defer result dict if closed.
|
|
Updates ERPNext Formation Order with deferred status.
|
|
"""
|
|
from scripts.formation.portal_schedule import (
|
|
PortalSchedule,
|
|
BC_CORPORATE_ONLINE_SCHEDULE,
|
|
IRS_EIN_SCHEDULE,
|
|
US_STATE_SOS_SCHEDULE,
|
|
)
|
|
|
|
# Determine schedule for this state
|
|
schedule_config = None
|
|
if state_code == "BC":
|
|
schedule_config = BC_CORPORATE_ONLINE_SCHEDULE
|
|
elif state_code == "EIN":
|
|
schedule_config = IRS_EIN_SCHEDULE
|
|
else:
|
|
# Try to load from state's config.py if it defines a portal_schedule
|
|
try:
|
|
from scripts.formation.states import get_config
|
|
state_config = get_config(state_code)
|
|
schedule_config = state_config.get("portal_schedule", US_STATE_SOS_SCHEDULE)
|
|
except Exception:
|
|
schedule_config = US_STATE_SOS_SCHEDULE
|
|
|
|
schedule = PortalSchedule.from_config(schedule_config)
|
|
available, next_open = schedule.is_available()
|
|
|
|
if available:
|
|
return None # Proceed
|
|
|
|
# Portal closed — defer the job
|
|
next_str = next_open.isoformat() if next_open else "unknown"
|
|
minutes = schedule.minutes_until_open() or 0
|
|
|
|
# Try to get the holiday name for the log message
|
|
reason = "outside business hours"
|
|
if schedule.jurisdiction:
|
|
try:
|
|
from scripts.formation.holidays import holiday_name, is_holiday
|
|
from zoneinfo import ZoneInfo
|
|
tz = ZoneInfo(schedule.timezone)
|
|
local_now = datetime.utcnow().replace(tzinfo=ZoneInfo("UTC")).astimezone(tz)
|
|
hname = holiday_name(local_now.date(), jurisdiction=schedule.jurisdiction)
|
|
if hname:
|
|
reason = f"closed for {hname}"
|
|
except Exception:
|
|
pass
|
|
|
|
LOG.info(
|
|
"Portal %s %s — deferring %s. Next open: %s (~%d min)",
|
|
state_code, reason, order_name, next_str, minutes,
|
|
)
|
|
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
client.update_resource("Formation Order", order_name, {
|
|
"automation_status": "Deferred",
|
|
"defer_until": next_str,
|
|
"admin_notes": f"Portal {reason}. Next open: {next_str} (~{minutes} min). Will auto-retry.",
|
|
})
|
|
except Exception as e:
|
|
LOG.warning("Could not update ERPNext defer status: %s", e)
|
|
|
|
return {
|
|
"deferred": True,
|
|
"reason": reason,
|
|
"next_open": next_str,
|
|
"minutes_until_open": minutes,
|
|
}
|
|
|
|
|
|
def _check_standard_delay(order_name: str, checkpoint: str) -> dict | None:
|
|
"""
|
|
Insert a 3-business-day delay for standard formation orders.
|
|
Expedited orders skip this delay.
|
|
|
|
Returns None if the order should proceed (expedited, or delay already passed),
|
|
or a defer dict if the order should be deferred.
|
|
|
|
Idempotency: if defer_until + automation_note matches this checkpoint and
|
|
the time has passed, clears the defer and proceeds.
|
|
"""
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
order = client.get_resource("Formation Order", order_name)
|
|
|
|
# Expedited skips all standard delays
|
|
if order.get("expedited", False):
|
|
return None
|
|
|
|
from datetime import datetime as _dt, timezone
|
|
existing_defer = order.get("defer_until")
|
|
existing_note = order.get("admin_notes", "") or ""
|
|
now = _dt.now(timezone.utc)
|
|
|
|
# Already deferred at this checkpoint?
|
|
if existing_defer and checkpoint in existing_note:
|
|
try:
|
|
defer_dt = _dt.fromisoformat(existing_defer.replace("Z", "+00:00"))
|
|
except Exception:
|
|
defer_dt = None
|
|
|
|
if defer_dt and defer_dt <= now:
|
|
# Delay complete — clear defer and proceed
|
|
LOG.info("[%s] Standard delay at %s complete — proceeding", order_name, checkpoint)
|
|
client.update_resource("Formation Order", order_name, {
|
|
"automation_status": "Pending",
|
|
"defer_until": None,
|
|
"admin_notes": existing_note.replace(f"[STANDARD_DELAY:{checkpoint}]", "").strip(),
|
|
})
|
|
return None
|
|
else:
|
|
# Still waiting
|
|
return {
|
|
"deferred": True,
|
|
"reason": f"Standard service 3-day delay at {checkpoint}",
|
|
"next_open": existing_defer,
|
|
}
|
|
|
|
# First time hitting this checkpoint — set the defer
|
|
from scripts.workers.business_days import business_days_from_now
|
|
defer_until = business_days_from_now(3)
|
|
defer_iso = defer_until.isoformat()
|
|
|
|
LOG.info(
|
|
"[%s] Standard service: deferring at %s for 3 business days until %s",
|
|
order_name, checkpoint, defer_iso,
|
|
)
|
|
|
|
client.update_resource("Formation Order", order_name, {
|
|
"automation_status": "Deferred",
|
|
"defer_until": defer_iso,
|
|
"admin_notes": f"{existing_note} [STANDARD_DELAY:{checkpoint}] 3 business day delay until {defer_iso}".strip(),
|
|
})
|
|
|
|
return {
|
|
"deferred": True,
|
|
"reason": f"Standard service 3-day delay at {checkpoint}",
|
|
"next_open": defer_iso,
|
|
"minutes_until_open": int((defer_until - now.replace(tzinfo=defer_until.tzinfo)).total_seconds() / 60) if hasattr(defer_until, 'tzinfo') and defer_until.tzinfo else 4320,
|
|
}
|
|
except Exception as exc:
|
|
LOG.warning("[%s] Standard delay check failed (proceeding without delay): %s", order_name, exc)
|
|
return None
|
|
|
|
|
|
# ==========================================================================
|
|
# Job Handlers — each function handles one type of automation action
|
|
# ==========================================================================
|
|
|
|
def handle_name_search(payload: dict) -> dict:
|
|
"""Search for business name availability in a state portal."""
|
|
from scripts.formation.name_search import search_name_sync
|
|
order_name = payload["order_name"]
|
|
state_code = payload["state_code"]
|
|
entity_name = payload["entity_name"]
|
|
|
|
# Schedule gate — don't hit the portal outside business hours / holidays
|
|
defer = _check_schedule(state_code, order_name)
|
|
if defer:
|
|
return {"action": "name_search", **defer}
|
|
|
|
LOG.info("Name search: %s in %s", entity_name, state_code)
|
|
result = search_name_sync(entity_name, state_code)
|
|
|
|
# Report back to ERPNext
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
|
|
if result.get("available"):
|
|
client.update_resource("Formation Order", order_name, {
|
|
"automation_status": "Running",
|
|
"admin_notes": f"Name '{entity_name}' is available in {state_code}",
|
|
})
|
|
client.apply_workflow("Formation Order", order_name, "Name Available")
|
|
else:
|
|
similar = ", ".join(result.get("similar_names", [])[:5])
|
|
client.update_resource("Formation Order", order_name, {
|
|
"automation_status": "Failed",
|
|
"automation_error": f"Name '{entity_name}' unavailable. Similar: {similar}",
|
|
})
|
|
client.apply_workflow("Formation Order", order_name, "Name Unavailable")
|
|
|
|
return {"action": "name_search", "available": result.get("available"), "state": state_code}
|
|
|
|
|
|
def handle_file_entity(payload: dict) -> dict:
|
|
"""File business entity through state SOS portal via Playwright."""
|
|
from scripts.formation.states import get_adapter
|
|
order_name = payload["order_name"]
|
|
|
|
LOG.info("Filing entity: %s", order_name)
|
|
|
|
# Standard service delay: 3 business days after name search clears,
|
|
# before state filing submission. Expedited skips this delay.
|
|
standard_defer = _check_standard_delay(order_name, "post_name_pre_filing")
|
|
if standard_defer:
|
|
return {"action": "file_entity", **standard_defer}
|
|
|
|
# Fetch full order data from ERPNext
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
order = client.get_resource("Formation Order", order_name)
|
|
|
|
# Schedule gate — don't hit the portal outside business hours / holidays
|
|
defer = _check_schedule(order["state_code"], order_name)
|
|
if defer:
|
|
return {"action": "file_entity", **defer}
|
|
|
|
# Load state adapter and file
|
|
adapter = get_adapter(order["state_code"])
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
from scripts.formation.base import FormationOrder, Member, EntityType
|
|
formation_order = _build_formation_order(order)
|
|
|
|
# Load Relay debit card for payment
|
|
from scripts.workers.relay_integration import populate_order_payment
|
|
if not populate_order_payment(formation_order):
|
|
LOG.warning("Could not load Relay card — filing will fail at payment step")
|
|
|
|
result = loop.run_until_complete(adapter.file_entity(formation_order))
|
|
finally:
|
|
loop.close()
|
|
|
|
if result.success:
|
|
client.update_resource("Formation Order", order_name, {
|
|
"state_filing_number": result.filing_number,
|
|
"filed_at": datetime.utcnow().isoformat(),
|
|
"automation_status": "Succeeded",
|
|
"automation_attempts": (order.get("automation_attempts") or 0) + 1,
|
|
})
|
|
# Record the payment for Relay reconciliation
|
|
from scripts.workers.relay_integration import record_filing_payment
|
|
record_filing_payment(
|
|
order_name=order_name,
|
|
state_code=order["state_code"],
|
|
amount_cents=order.get("state_fee_cents", 0),
|
|
card_last4=formation_order.payment_card_number[-4:] if formation_order.payment_card_number else "0000",
|
|
confirmation=result.confirmation_number,
|
|
)
|
|
# Clear card details from memory
|
|
formation_order.payment_card_number = ""
|
|
formation_order.payment_card_cvv = ""
|
|
|
|
client.apply_workflow("Formation Order", order_name, "Filing Complete")
|
|
else:
|
|
client.update_resource("Formation Order", order_name, {
|
|
"automation_status": "Failed",
|
|
"automation_error": result.error_message,
|
|
"automation_attempts": (order.get("automation_attempts") or 0) + 1,
|
|
})
|
|
client.apply_workflow("Formation Order", order_name, "Filing Failed")
|
|
|
|
return {"action": "file_entity", "success": result.success, "filing_number": result.filing_number}
|
|
|
|
|
|
def handle_obtain_ein(payload: dict) -> dict:
|
|
"""Obtain EIN from IRS online application."""
|
|
order_name = payload["order_name"]
|
|
LOG.info("Obtaining EIN: %s", order_name)
|
|
|
|
# Standard service delay: 3 business days after state filing confirmed,
|
|
# before EIN application. Expedited skips this delay.
|
|
standard_defer = _check_standard_delay(order_name, "post_filing_pre_ein")
|
|
if standard_defer:
|
|
return {"action": "obtain_ein", **standard_defer}
|
|
|
|
# Schedule gate — IRS is only available Mon-Fri 7am-10pm ET, closed federal holidays
|
|
defer = _check_schedule("EIN", order_name)
|
|
if defer:
|
|
return {"action": "obtain_ein", **defer}
|
|
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
order = client.get_resource("Formation Order", order_name)
|
|
|
|
from scripts.formation.ein_worker import obtain_ein_sync
|
|
result = obtain_ein_sync(order)
|
|
|
|
if result.get("success"):
|
|
client.update_resource("Formation Order", order_name, {
|
|
"automation_status": "Succeeded",
|
|
})
|
|
# Store EIN in Sensitive ID doctype (encrypted)
|
|
client.create_resource("Sensitive ID", {
|
|
"customer": order.get("customer"),
|
|
"id_type": "EIN",
|
|
"encrypted_value": result["ein"],
|
|
"notes": f"Obtained for {order.get('entity_name')}",
|
|
})
|
|
client.apply_workflow("Formation Order", order_name, "EIN Obtained")
|
|
else:
|
|
client.update_resource("Formation Order", order_name, {
|
|
"automation_status": "Failed",
|
|
"automation_error": result.get("error", "EIN obtainment failed"),
|
|
})
|
|
client.apply_workflow("Formation Order", order_name, "EIN Failed")
|
|
|
|
return {"action": "obtain_ein", "success": result.get("success")}
|
|
|
|
|
|
def handle_generate_docs(payload: dict) -> dict:
|
|
"""Generate documents (operating agreement, etc.) and upload to MinIO."""
|
|
order_name = payload["order_name"]
|
|
LOG.info("Generating documents: %s", order_name)
|
|
|
|
# Standard service delay: 3 business days after EIN obtained,
|
|
# before document generation + delivery. Expedited skips this delay.
|
|
standard_defer = _check_standard_delay(order_name, "post_ein_pre_docs")
|
|
if standard_defer:
|
|
return {"action": "generate_docs", **standard_defer}
|
|
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
from scripts.document_gen import DocxBuilder, convert_to_pdf, MinioStorage
|
|
|
|
client = ERPNextClient()
|
|
order = client.get_resource("Formation Order", order_name)
|
|
storage = MinioStorage()
|
|
|
|
generated_files = []
|
|
order_number = order.get("order_number", order_name)
|
|
|
|
# Generate operating agreement if requested
|
|
if order.get("include_operating_agreement"):
|
|
try:
|
|
builder = DocxBuilder("operating-agreement")
|
|
builder.set_variables({
|
|
"entity_name": order.get("entity_name", ""),
|
|
"state_name": order.get("state_code", ""),
|
|
"formation_date": datetime.utcnow().strftime("%B %d, %Y"),
|
|
"management_type": order.get("management_type", "Member Managed"),
|
|
"purpose": order.get("purpose", "Any lawful business activity"),
|
|
"principal_address": order.get("principal_address", ""),
|
|
"registered_agent_name": "Northwest Registered Agent LLC",
|
|
"members": order.get("members_json") or [],
|
|
}).fill().add_disclaimer()
|
|
|
|
docx_path = f"/app/data/documents/{order_number}/operating-agreement.docx"
|
|
builder.save(docx_path)
|
|
pdf_path = convert_to_pdf(docx_path)
|
|
|
|
# Upload to MinIO
|
|
storage.upload(docx_path, f"formations/{order_number}/operating-agreement.docx")
|
|
storage.upload(str(pdf_path), f"formations/{order_number}/operating-agreement.pdf")
|
|
generated_files.extend(["operating-agreement.docx", "operating-agreement.pdf"])
|
|
except Exception as e:
|
|
LOG.error("Operating agreement generation failed: %s", e)
|
|
|
|
# Advance workflow to Review (admin reviews before delivery)
|
|
client.apply_workflow("Formation Order", order_name, "Docs Ready")
|
|
|
|
return {"action": "generate_docs", "files": generated_files}
|
|
|
|
|
|
def handle_deliver(payload: dict) -> dict:
|
|
"""Email documents to customer and mark as delivered."""
|
|
order_name = payload["order_name"]
|
|
order_number = payload.get("order_number", "")
|
|
LOG.info("Delivering: %s (%s)", order_name, order_number)
|
|
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
from scripts.document_gen import MinioStorage
|
|
|
|
client = ERPNextClient()
|
|
storage = MinioStorage()
|
|
|
|
# Determine if this is a compliance order (Sales Order) or formation order
|
|
is_compliance = order_number.startswith("CO-") if order_number else False
|
|
|
|
if is_compliance:
|
|
# Compliance order — documents are in compliance/{order_number}/
|
|
order = client.get_resource("Sales Order", order_name)
|
|
prefix = f"compliance/{order_number}/"
|
|
|
|
# Get customer email from PG compliance_orders
|
|
customer_email = ""
|
|
customer_name = ""
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT customer_email, customer_name FROM compliance_orders WHERE order_number = %s",
|
|
(order_number,),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
customer_email = row[0]
|
|
customer_name = row[1]
|
|
cur.close()
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("Could not look up compliance order email: %s", exc)
|
|
|
|
if not customer_email:
|
|
# Fallback to ERPNext customer
|
|
cust = order.get("customer", "")
|
|
if cust:
|
|
try:
|
|
cust_doc = client.get_resource("Customer", cust)
|
|
customer_email = cust_doc.get("email_id", "")
|
|
customer_name = cust_doc.get("customer_name", "")
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# Formation order
|
|
order = client.get_resource("Formation Order", order_name)
|
|
order_number = order.get("order_number", order_name)
|
|
prefix = f"formations/{order_number}/"
|
|
customer_email = order.get("customer_email", "")
|
|
customer_name = order.get("customer_name", "")
|
|
|
|
# Get presigned URLs for all documents (PDF only for delivery)
|
|
doc_objects = storage.list_objects(prefix)
|
|
doc_urls = {}
|
|
for obj in doc_objects:
|
|
filename = obj.split("/")[-1]
|
|
if filename.endswith(".pdf"):
|
|
url = storage.get_url(obj, expires_hours=168) # 7 days
|
|
doc_urls[filename] = url
|
|
|
|
if not doc_urls:
|
|
LOG.warning("No PDF documents found for %s at %s", order_name, prefix)
|
|
return {"action": "deliver", "error": "no documents found"}
|
|
|
|
if not customer_email:
|
|
LOG.error("No customer email for %s", order_name)
|
|
return {"action": "deliver", "error": "no customer email"}
|
|
|
|
# Build and send delivery email
|
|
from scripts.workers.delivery_worker import _build_email_html, _send_email
|
|
file_links = [(name, url) for name, url in doc_urls.items()]
|
|
html = _build_email_html(customer_name, order_number, file_links)
|
|
subject = f"Your compliance documents are ready — {order_number}"
|
|
_send_email(customer_email, subject, html)
|
|
|
|
# Advance workflow
|
|
if is_compliance:
|
|
try:
|
|
client.set_value("Sales Order", order_name, "workflow_state", "Delivered")
|
|
except Exception as exc:
|
|
LOG.warning("Could not advance workflow for %s: %s", order_name, exc)
|
|
else:
|
|
client.update_resource("Formation Order", order_name, {
|
|
"delivered_at": datetime.utcnow().isoformat(),
|
|
})
|
|
client.apply_workflow("Formation Order", order_name, "Deliver")
|
|
|
|
LOG.info("Delivered %s to %s (%d documents)", order_name, customer_email, len(doc_urls))
|
|
return {"action": "deliver", "email": customer_email, "documents": len(doc_urls)}
|
|
|
|
|
|
def handle_send_to_attorney(payload: dict) -> dict:
|
|
"""Send documents to partner attorney for legal review."""
|
|
order_name = payload["order_name"]
|
|
LOG.info("Sending to attorney: %s", order_name)
|
|
|
|
# For now, just log — attorney sending is semi-manual
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
client.update_resource("Formation Order", order_name, {
|
|
"admin_notes": "Sent to partner attorney for review. Waiting for legal opinion letter.",
|
|
})
|
|
|
|
return {"action": "send_to_attorney", "status": "sent"}
|
|
|
|
|
|
def _get_crtc_handler():
|
|
"""Lazily import and instantiate the Canada CRTC handler."""
|
|
from scripts.workers.services.canada_crtc import CanadaCRTCHandler
|
|
return CanadaCRTCHandler()
|
|
|
|
|
|
def handle_register_ca_domain(payload: dict) -> dict:
|
|
"""CRTC: Customer confirmed domain choice — resume pipeline from Step 5.
|
|
|
|
Called by the /api/v1/canada-crtc/domain-confirm endpoint after the
|
|
customer selects their .ca domain in the portal.
|
|
"""
|
|
order_number = payload.get("order_number", "")
|
|
domain = payload.get("domain", "")
|
|
LOG.info("[CRTC] Domain confirmed: %s for %s — resuming pipeline", domain, order_number)
|
|
|
|
# Re-run the full pipeline — it will see ca_domain is now set and
|
|
# pick up from Step 5 (domain registration), skipping completed steps.
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
|
|
# Find the Sales Order linked to this order
|
|
import psycopg2, psycopg2.extras, os
|
|
pg = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
cur.execute("SELECT erpnext_sales_order FROM canada_crtc_orders WHERE order_number = %s", (order_number,))
|
|
row = cur.fetchone()
|
|
pg.close()
|
|
|
|
if not row or not row.get("erpnext_sales_order"):
|
|
LOG.error("No Sales Order found for %s", order_number)
|
|
return {"action": "register_ca_domain", "error": "No Sales Order", "order": order_number}
|
|
|
|
so_name = row["erpnext_sales_order"]
|
|
order = client.get_resource("Sales Order", so_name)
|
|
|
|
handler = _get_crtc_handler()
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
result = loop.run_until_complete(handler.process(order))
|
|
finally:
|
|
loop.close()
|
|
|
|
return {"action": "register_ca_domain", "domain": domain, "order": order_number}
|
|
|
|
|
|
def handle_register_awaiting_funds(payload: dict) -> dict:
|
|
"""CRTC: Record reservation and begin Relay deposit monitoring."""
|
|
order_name = payload["order_name"]
|
|
LOG.info("[CRTC] Awaiting funds: %s", order_name)
|
|
from scripts.workers.relay_integration import start_deposit_monitoring
|
|
start_deposit_monitoring(order_name)
|
|
return {"action": "register_awaiting_funds", "order": order_name, "status": "monitoring"}
|
|
|
|
|
|
def handle_file_bc_incorporation(payload: dict) -> dict:
|
|
"""CRTC: File BC incorporation by creating a CA Filing Request in ERPNext.
|
|
|
|
The frappe_ca_registry app handles the actual COLIN automation.
|
|
This handler creates the CA Filing Request DocType from the order data,
|
|
which triggers the Frappe app's background job.
|
|
"""
|
|
import json
|
|
order_name = payload["order_name"]
|
|
order_number = payload.get("order_number", "")
|
|
LOG.info("[CRTC] Creating CA Filing Request for: %s (%s)", order_name, order_number)
|
|
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
|
|
# Load director data from PG (has the split name fields)
|
|
import psycopg2, psycopg2.extras, os
|
|
pg = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
cur.execute("SELECT * FROM canada_crtc_orders WHERE order_number = %s", (order_number,))
|
|
pg_order = cur.fetchone()
|
|
pg.close()
|
|
|
|
if not pg_order:
|
|
LOG.error("PG order not found: %s", order_number)
|
|
return {"action": "file_bc_incorporation", "error": f"Order {order_number} not found"}
|
|
|
|
# Parse director address JSON if needed
|
|
director_addr = {}
|
|
try:
|
|
raw = pg_order.get("director_address", "")
|
|
if raw and raw.startswith("{"):
|
|
director_addr = json.loads(raw)
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
# Determine company type for the DocType
|
|
ct = pg_order.get("company_type", "numbered")
|
|
doctype_company_type = {
|
|
"numbered": "Numbered",
|
|
"numbered_tradename": "Numbered + Trade Name",
|
|
"named": "Named",
|
|
}.get(ct, "Numbered")
|
|
|
|
# Create the CA Filing Request in ERPNext
|
|
filing_data = {
|
|
"doctype": "CA Filing Request",
|
|
"filing_type": "Named Incorporation" if ct == "named" else "Numbered Incorporation",
|
|
"province": "BC",
|
|
"sales_order": order_name,
|
|
"external_order_id": order_number,
|
|
"company_type": doctype_company_type,
|
|
"name_reservation_number": pg_order.get("company_name_choice1") or "",
|
|
"trade_name": pg_order.get("trade_name") or "",
|
|
# Director from split fields (preferred) or parsed JSON
|
|
"director_first_name": pg_order.get("director_first_name") or director_addr.get("first_name", ""),
|
|
"director_middle_name": pg_order.get("director_middle_name") or "",
|
|
"director_last_name": pg_order.get("director_last_name") or director_addr.get("last_name", ""),
|
|
"director_address": pg_order.get("director_first_name") and (director_addr.get("street") or pg_order.get("director_address", "")) or director_addr.get("street", ""),
|
|
"director_address2": director_addr.get("street2", ""),
|
|
"director_city": director_addr.get("city", ""),
|
|
"director_province": director_addr.get("province", ""),
|
|
"director_postal": director_addr.get("postal", ""),
|
|
"director_country": director_addr.get("country", "US"),
|
|
# Mailing address
|
|
"director_mailing_different": pg_order.get("director_mailing_different", False),
|
|
# Additional directors
|
|
"additional_directors_json": json.dumps(pg_order.get("additional_directors") or []),
|
|
# Registered office from mailbox address
|
|
"office_address": (pg_order.get("mailbox_address") or "329 Howe St, Vancouver, BC V6C 3N2").split(",")[0].strip(),
|
|
# Notification email goes to PW
|
|
"notification_email": "filings@performancewest.net",
|
|
}
|
|
|
|
# Parse mailing address if present
|
|
if pg_order.get("director_mailing_different") and pg_order.get("director_mailing_address"):
|
|
try:
|
|
mail_addr = json.loads(pg_order["director_mailing_address"])
|
|
filing_data["director_mailing_street"] = mail_addr.get("street", "")
|
|
filing_data["director_mailing_city"] = mail_addr.get("city", "")
|
|
filing_data["director_mailing_province"] = mail_addr.get("province", "")
|
|
filing_data["director_mailing_postal"] = mail_addr.get("postal", "")
|
|
filing_data["director_mailing_country"] = mail_addr.get("country", "")
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
|
|
try:
|
|
result = client.create_resource("CA Filing Request", filing_data)
|
|
filing_name = result.get("name", "")
|
|
LOG.info("[CRTC] CA Filing Request created: %s", filing_name)
|
|
|
|
# Submit it (triggers the Frappe app's background job)
|
|
client.call_method("frappe.client.submit", {
|
|
"doc": {"doctype": "CA Filing Request", "name": filing_name},
|
|
})
|
|
LOG.info("[CRTC] CA Filing Request submitted: %s", filing_name)
|
|
|
|
return {"action": "file_bc_incorporation", "filing_name": filing_name, "order": order_name}
|
|
|
|
except Exception as e:
|
|
LOG.error("[CRTC] Failed to create CA Filing Request: %s", e)
|
|
return {"action": "file_bc_incorporation", "error": str(e), "order": order_name}
|
|
|
|
|
|
def handle_generate_crtc_docs(payload: dict) -> dict:
|
|
"""CRTC: Generate CRTC notification letter + corporate binder."""
|
|
order_name = payload["order_name"]
|
|
LOG.info("[CRTC] Generating docs: %s", order_name)
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
order = client.get_resource("Sales Order", order_name)
|
|
handler = _get_crtc_handler()
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
result = loop.run_until_complete(handler.generate_documents(order))
|
|
finally:
|
|
loop.close()
|
|
return {"action": "generate_crtc_docs", "order": order_name, "files": [str(f) for f in result]}
|
|
|
|
|
|
def handle_notify_admin_review(payload: dict) -> dict:
|
|
"""CRTC: Notify admin that order is ready for review before shipping."""
|
|
order_name = payload["order_name"]
|
|
LOG.info("[CRTC] Notifying admin for review: %s", order_name)
|
|
# The ERPNext Email Notification handles the actual email —
|
|
# this handler just logs the event and could do additional checks.
|
|
return {"action": "notify_admin_review", "order": order_name, "status": "notified"}
|
|
|
|
|
|
def handle_ship_binder(payload: dict) -> dict:
|
|
"""CRTC: Trigger physical binder printing + shipping instructions."""
|
|
order_name = payload["order_name"]
|
|
LOG.info("[CRTC] Ship binder: %s", order_name)
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
order = client.get_resource("Sales Order", order_name)
|
|
handler = _get_crtc_handler()
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
result = loop.run_until_complete(handler.ship_binder(order))
|
|
finally:
|
|
loop.close()
|
|
return {"action": "ship_binder", "order": order_name, "status": "shipped"}
|
|
|
|
|
|
def handle_mark_delivered(payload: dict) -> dict:
|
|
"""CRTC: Mark order as delivered and start commission holdback clock."""
|
|
order_name = payload["order_name"]
|
|
LOG.info("[CRTC] Marking delivered: %s", order_name)
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
from scripts.workers.relay_integration import start_commission_holdback
|
|
client = ERPNextClient()
|
|
# Start 14-day commission holdback
|
|
try:
|
|
start_commission_holdback(order_name)
|
|
except Exception as e:
|
|
LOG.warning("[CRTC] Commission holdback failed (non-fatal): %s", e)
|
|
return {"action": "mark_delivered", "order": order_name, "status": "delivered"}
|
|
|
|
|
|
def _pw_email_html(headline: str, body_paragraphs: list[str], cta_text: str = "", cta_url: str = "") -> str:
|
|
"""Build a branded HTML email matching the campaign template style."""
|
|
logo = "https://performancewest.net/images/logo.png"
|
|
body_html = "".join(
|
|
f'<p style="margin:0 0 14px;font-family:Arial,sans-serif;font-size:15px;color:#2d3748;line-height:1.7;">{p}</p>'
|
|
for p in body_paragraphs
|
|
)
|
|
cta_block = ""
|
|
if cta_text and cta_url:
|
|
cta_block = (
|
|
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td align="center">'
|
|
'<table cellpadding="0" cellspacing="0" border="0" style="margin:20px auto;"><tr>'
|
|
f'<td style="background:#059669;border-radius:4px;"><a href="{cta_url}" style="display:inline-block;padding:12px 28px;font-family:Arial,sans-serif;font-size:14px;font-weight:700;color:#ffffff;text-decoration:none;">{cta_text}</a></td>'
|
|
'</tr></table></td></tr></table>'
|
|
)
|
|
return (
|
|
'<!DOCTYPE html><html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1"></head>'
|
|
'<body style="margin:0;padding:0;background:#eef0f3;">'
|
|
'<table width="100%" cellpadding="0" cellspacing="0" border="0" style="background:#eef0f3;padding:20px 0;"><tr><td align="center">'
|
|
'<table width="620" cellpadding="0" cellspacing="0" border="0" style="width:620px;max-width:620px;background:#ffffff;border-radius:8px;overflow:hidden;box-shadow:0 2px 8px rgba(0,0,0,0.08);">'
|
|
'<tr><td>'
|
|
# Header
|
|
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="background:#1a2744;padding:18px 40px 14px;">'
|
|
'<table cellpadding="0" cellspacing="0" border="0"><tr>'
|
|
f'<td style="vertical-align:middle;padding-right:12px;"><img src="{logo}" width="90" alt="Performance West" style="display:block;width:90px;height:auto;"></td>'
|
|
'<td style="vertical-align:middle;border-left:1px solid #2d4e78;padding-left:12px;"><span style="color:#8fa8d0;font-family:Arial,sans-serif;font-size:11px;letter-spacing:1.5px;text-transform:uppercase;">Telecom Compliance</span></td>'
|
|
'</tr></table></td></tr></table>'
|
|
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="background:#059669;height:3px;font-size:0;"> </td></tr></table>'
|
|
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="padding:24px 40px 28px;">'
|
|
f'<h1 style="margin:0;font-family:Arial,sans-serif;font-size:22px;font-weight:700;color:#ffffff;line-height:1.3;">{headline}</h1>'
|
|
'</td></tr></table>'
|
|
# Body
|
|
f'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="padding:28px 40px;background:#ffffff;">{body_html}{cta_block}</td></tr></table>'
|
|
# Footer
|
|
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="background:#f4f5f7;padding:16px 40px;border-top:1px solid #e8ecf0;text-align:center;">'
|
|
f'<img src="{logo}" width="60" alt="Performance West" style="display:block;margin:0 auto 8px;width:60px;height:auto;opacity:0.5;">'
|
|
'<p style="margin:0;font-family:Arial,sans-serif;font-size:11px;color:#9ca3af;">Performance West Inc. · <a href="https://performancewest.net" style="color:#9ca3af;">performancewest.net</a> · 1-888-411-0383</p>'
|
|
'</td></tr></table>'
|
|
'</td></tr></table></td></tr></table></body></html>'
|
|
)
|
|
|
|
|
|
def _send_instant_delivery(
|
|
*, customer_email: str, customer_name: str, order_number: str,
|
|
service_name: str, minio_paths: list[str], storage,
|
|
):
|
|
"""Email generated PDFs to the customer immediately after payment.
|
|
|
|
Only sends PDF files — DOCX files are internal (for authority
|
|
submission or admin review). Sends fancy branded HTML email.
|
|
"""
|
|
import smtplib
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.text import MIMEText
|
|
from email import encoders
|
|
from pathlib import Path
|
|
import tempfile
|
|
|
|
# Filter to PDFs only — DOCX is for internal/authority use
|
|
pdf_paths = [p for p in minio_paths if p.lower().endswith(".pdf")]
|
|
if not pdf_paths:
|
|
LOG.info("No PDFs to deliver for %s (only DOCX generated)", order_number)
|
|
return
|
|
|
|
smtp_host = os.environ.get("SMTP_HOST", "co.carrierone.com")
|
|
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
|
|
smtp_user = os.environ.get("SMTP_USER", "")
|
|
smtp_pass = os.environ.get("SMTP_PASS", "")
|
|
from_email = os.environ.get("SMTP_FROM", "Performance West <noreply@performancewest.net>")
|
|
|
|
if not smtp_user or not smtp_pass:
|
|
LOG.warning("SMTP not configured — skipping instant delivery for %s", order_number)
|
|
return
|
|
|
|
name = customer_name.split()[0] if customer_name else "there"
|
|
file_list = "".join(
|
|
f'<li style="margin-bottom:6px;font-size:14px;">{Path(p).name}</li>'
|
|
for p in pdf_paths
|
|
)
|
|
|
|
body_parts = [
|
|
f"Hi {name},",
|
|
f"Your <strong>{service_name}</strong> documents for order <strong>{order_number}</strong> are attached.",
|
|
f"<ul style=\"margin:8px 0 14px;padding-left:20px;\">{file_list}</ul>",
|
|
"These documents were generated automatically upon payment confirmation. "
|
|
"If your order includes an FCC portal submission (e.g., ECFS upload for CPNI, "
|
|
"or RMD registration), our team will handle that separately and send you a "
|
|
"<strong>confirmation email with the filing confirmation number</strong> once submitted.",
|
|
"If anything in the documents is incorrect, don't worry — just reply "
|
|
"to this email or contact us at <a href=\"mailto:info@performancewest.net\" style=\"color:#059669;\">info@performancewest.net</a> "
|
|
"and we'll fix it right away. A corrected submission will replace the original at no additional charge.",
|
|
"<span style=\"font-size:13px;color:#6b7280;\">This fee is tax deductible as an ordinary business expense under IRC § 162.</span>",
|
|
"— Performance West Compliance Team",
|
|
]
|
|
|
|
html = _pw_email_html(
|
|
headline=f"Your {service_name} documents are ready",
|
|
body_paragraphs=body_parts,
|
|
cta_text="Check My FCC Compliance Status",
|
|
cta_url="https://performancewest.net/tools/fcc-compliance-check",
|
|
)
|
|
|
|
msg = MIMEMultipart("mixed")
|
|
msg["From"] = from_email
|
|
msg["To"] = customer_email
|
|
msg["Subject"] = f"Your {service_name} documents are ready \u2014 Order {order_number}"
|
|
msg["Reply-To"] = "info@performancewest.net"
|
|
msg.attach(MIMEText(html, "html"))
|
|
|
|
# Download PDFs from MinIO and attach
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
for remote_path in pdf_paths:
|
|
local_path = os.path.join(tmpdir, Path(remote_path).name)
|
|
try:
|
|
storage.download(remote_path, local_path)
|
|
with open(local_path, "rb") as f:
|
|
part = MIMEBase("application", "pdf")
|
|
part.set_payload(f.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
"Content-Disposition",
|
|
f'attachment; filename="{Path(remote_path).name}"',
|
|
)
|
|
msg.attach(part)
|
|
except Exception as exc:
|
|
LOG.warning("Could not attach %s: %s", remote_path, exc)
|
|
|
|
with smtplib.SMTP(smtp_host, smtp_port) as server:
|
|
if smtp_port != 25:
|
|
server.starttls()
|
|
server.login(smtp_user, smtp_pass)
|
|
server.sendmail(from_email, [customer_email], msg.as_string())
|
|
|
|
|
|
def _send_filing_confirmation(
|
|
*, customer_email: str, customer_name: str, order_number: str,
|
|
service_name: str, confirmation_number: str, authority: str,
|
|
minio_paths: list[str], storage,
|
|
):
|
|
"""Email the customer after a filing is submitted to an authority.
|
|
|
|
Sent after ECFS upload (CPNI), RMD portal submission, etc.
|
|
Includes the confirmation number and attaches the confirmation PDF.
|
|
"""
|
|
import smtplib
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.text import MIMEText
|
|
from email import encoders
|
|
from pathlib import Path
|
|
import tempfile
|
|
|
|
smtp_host = os.environ.get("SMTP_HOST", "co.carrierone.com")
|
|
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
|
|
smtp_user = os.environ.get("SMTP_USER", "")
|
|
smtp_pass = os.environ.get("SMTP_PASS", "")
|
|
from_email = os.environ.get("SMTP_FROM", "Performance West <noreply@performancewest.net>")
|
|
|
|
if not smtp_user or not smtp_pass:
|
|
LOG.warning("SMTP not configured — skipping filing confirmation for %s", order_number)
|
|
return
|
|
|
|
name = customer_name.split()[0] if customer_name else "there"
|
|
|
|
body_parts = [
|
|
f"Hi {name},",
|
|
f"Your <strong>{service_name}</strong> has been successfully submitted to the <strong>{authority}</strong>.",
|
|
f'<table width="100%" cellpadding="0" cellspacing="0" border="0" style="margin:12px 0;"><tr>'
|
|
f'<td style="background:#f0fdf4;border:1px solid #86efac;border-radius:8px;padding:16px 20px;text-align:center;">'
|
|
f'<p style="margin:0 0 4px;font-family:Arial,sans-serif;font-size:12px;color:#065f46;text-transform:uppercase;letter-spacing:1px;">Confirmation Number</p>'
|
|
f'<p style="margin:0;font-family:\'Courier New\',monospace;font-size:22px;font-weight:700;color:#059669;">{confirmation_number}</p>'
|
|
f'</td></tr></table>',
|
|
"Save this confirmation number for your records. The confirmation page is attached as a PDF.",
|
|
"If you need to reference this filing in the future, you can use this confirmation number "
|
|
f"with the {authority} to verify your submission status.",
|
|
"If anything needs to be corrected, reply to this email and we'll file an amendment at no additional charge.",
|
|
"<span style=\"font-size:13px;color:#6b7280;\">This fee is tax deductible as an ordinary business expense under IRC § 162.</span>",
|
|
"— Performance West Compliance Team",
|
|
]
|
|
|
|
html = _pw_email_html(
|
|
headline=f"\u2705 {service_name} Filed Successfully",
|
|
body_paragraphs=body_parts,
|
|
)
|
|
|
|
msg = MIMEMultipart("mixed")
|
|
msg["From"] = from_email
|
|
msg["To"] = customer_email
|
|
msg["Subject"] = f"\u2705 Filed: {service_name} \u2014 Confirmation {confirmation_number}"
|
|
msg["Reply-To"] = "info@performancewest.net"
|
|
msg.attach(MIMEText(html, "html"))
|
|
|
|
# Attach confirmation PDFs
|
|
pdf_paths = [p for p in minio_paths if p.lower().endswith(".pdf")]
|
|
if pdf_paths:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
for remote_path in pdf_paths:
|
|
local_path = os.path.join(tmpdir, Path(remote_path).name)
|
|
try:
|
|
storage.download(remote_path, local_path)
|
|
with open(local_path, "rb") as f:
|
|
part = MIMEBase("application", "pdf")
|
|
part.set_payload(f.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
"Content-Disposition",
|
|
f'attachment; filename="{Path(remote_path).name}"',
|
|
)
|
|
msg.attach(part)
|
|
except Exception as exc:
|
|
LOG.warning("Could not attach %s: %s", remote_path, exc)
|
|
|
|
with smtplib.SMTP(smtp_host, smtp_port) as server:
|
|
if smtp_port != 25:
|
|
server.starttls()
|
|
server.login(smtp_user, smtp_pass)
|
|
server.sendmail(from_email, [customer_email], msg.as_string())
|
|
|
|
LOG.info("Filing confirmation sent to %s for %s (conf: %s)",
|
|
customer_email, order_number, confirmation_number)
|
|
|
|
|
|
def handle_process_compliance_service(payload: dict) -> dict:
|
|
"""Process a compliance service order (FLSA audit, CCPA audit, FCC checkup, etc.)."""
|
|
order_name = payload["order_name"]
|
|
order_number = payload.get("order_number", "")
|
|
service_slug = payload.get("service_slug", "")
|
|
LOG.info("Processing compliance service: %s — %s (slug: %s)", order_name, order_number, service_slug)
|
|
|
|
from scripts.workers.services import SERVICE_HANDLERS
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
from scripts.document_gen import MinioStorage
|
|
|
|
client = ERPNextClient()
|
|
# Try to load the Sales Order from ERPNext — may fail if ERPNext is
|
|
# unreachable or the order doesn't have a linked Sales Order (e.g.,
|
|
# orders created directly via the API without ERPNext integration).
|
|
try:
|
|
order = client.get_resource("Sales Order", order_name)
|
|
except Exception as exc:
|
|
LOG.warning("Could not load Sales Order %s from ERPNext: %s — building from PG", order_name, exc)
|
|
order = {"name": order_name, "order_number": order_number}
|
|
storage = MinioStorage()
|
|
|
|
# Resolve service_slug, entity_id, and filing metadata from PG compliance_orders
|
|
pg_entity_id = None
|
|
if order_number:
|
|
try:
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
import json as _json
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
cur.execute(
|
|
"""
|
|
SELECT service_slug, telecom_entity_id,
|
|
customer_email, customer_name, customer_phone,
|
|
intake_data,
|
|
filing_mode, form_year_override,
|
|
revises_order_number, revised_reason,
|
|
prior_confirmation_number,
|
|
waive_deminimis_exemption, waive_deminimis_reason,
|
|
multi_year_filings, multi_year_discount_pct,
|
|
deminimis_result_is_exempt,
|
|
deminimis_estimated_contrib_cents
|
|
FROM compliance_orders WHERE order_number = %s
|
|
""",
|
|
(order_number,),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
if not service_slug:
|
|
service_slug = row["service_slug"]
|
|
pg_entity_id = row["telecom_entity_id"]
|
|
# Merge the full compliance_orders row into order_data so the
|
|
# handler can read filing_mode / intake_data / waive flags etc.
|
|
# intake_data comes back as a dict from RealDictCursor + JSONB.
|
|
intake = row.get("intake_data")
|
|
if isinstance(intake, str):
|
|
try: intake = _json.loads(intake)
|
|
except Exception: intake = {}
|
|
order["intake_data"] = intake or {}
|
|
order["customer_email"] = row.get("customer_email")
|
|
order["customer_name"] = row.get("customer_name")
|
|
order["customer_phone"] = row.get("customer_phone")
|
|
order["batch_id"] = row.get("batch_id")
|
|
order["filing_mode"] = row.get("filing_mode")
|
|
order["form_year_override"] = row.get("form_year_override")
|
|
order["revises_order_number"] = row.get("revises_order_number")
|
|
order["revised_reason"] = row.get("revised_reason")
|
|
order["prior_confirmation_number"] = row.get("prior_confirmation_number")
|
|
order["waive_deminimis_exemption"] = row.get("waive_deminimis_exemption")
|
|
order["waive_deminimis_reason"] = row.get("waive_deminimis_reason")
|
|
order["multi_year_filings"] = row.get("multi_year_filings")
|
|
order["multi_year_discount_pct"] = row.get("multi_year_discount_pct")
|
|
order["deminimis_result_is_exempt"] = row.get("deminimis_result_is_exempt")
|
|
order["deminimis_estimated_contrib_cents"] = row.get("deminimis_estimated_contrib_cents")
|
|
cur.close()
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("Could not look up compliance order: %s", exc)
|
|
|
|
handler_cls = SERVICE_HANDLERS.get(service_slug)
|
|
if not handler_cls:
|
|
LOG.error("No handler for service: %s", service_slug)
|
|
return {"action": "process_compliance_service", "error": f"No handler for {service_slug}"}
|
|
|
|
# Debug: log what entity data the handler will receive
|
|
entity_preview = order.get("entity", {})
|
|
LOG.info(
|
|
"Dispatching %s for %s — entity=%s (legal_name=%s, frn=%s)",
|
|
service_slug, order_number,
|
|
"present" if entity_preview else "MISSING",
|
|
entity_preview.get("legal_name", "??"),
|
|
entity_preview.get("frn", "??"),
|
|
)
|
|
|
|
# For FCC compliance services: look up the telecom entity and pass as order_data.
|
|
# FCC_SERVICE_SLUGS covers the diagnostic checkup plus every remediation
|
|
# handler (RMD, CPNI, 499-A, STIR/SHAKEN, BDC, full bundle, D.C. agent).
|
|
# If no entity ID but we have an FRN in intake_data, look up entity by FRN
|
|
from scripts.workers.services import FCC_SERVICE_SLUGS
|
|
if service_slug in FCC_SERVICE_SLUGS and not pg_entity_id:
|
|
frn = (order.get("intake_data") or {}).get("frn")
|
|
if frn:
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id FROM telecom_entities WHERE frn = %s LIMIT 1", (frn,))
|
|
row = cur.fetchone()
|
|
if row:
|
|
pg_entity_id = row[0]
|
|
LOG.info("Resolved FRN %s → entity_id %s", frn, pg_entity_id)
|
|
cur.close()
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("Could not resolve FRN %s to entity: %s", frn, exc)
|
|
|
|
if service_slug in FCC_SERVICE_SLUGS and pg_entity_id:
|
|
try:
|
|
import psycopg2
|
|
import json as _json
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT * FROM telecom_entities WHERE id = %s",
|
|
(pg_entity_id,),
|
|
)
|
|
columns = [desc[0] for desc in cur.description] if cur.description else []
|
|
row = cur.fetchone()
|
|
if row and columns:
|
|
entity_dict = dict(zip(columns, row))
|
|
# Convert non-serializable types
|
|
for k, v in entity_dict.items():
|
|
if hasattr(v, "isoformat"):
|
|
entity_dict[k] = v.isoformat()
|
|
elif isinstance(v, memoryview):
|
|
entity_dict[k] = bytes(v).decode("utf-8", errors="replace")
|
|
# Parse JSONB carrier_metadata if it's a string
|
|
if isinstance(entity_dict.get("carrier_metadata"), str):
|
|
try:
|
|
entity_dict["carrier_metadata"] = _json.loads(entity_dict["carrier_metadata"])
|
|
except Exception:
|
|
pass
|
|
order["entity"] = entity_dict
|
|
cur.close()
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("Could not look up telecom entity for %s: %s", order_number, exc)
|
|
|
|
# Run compliance checks if FCC entity has an FRN
|
|
if service_slug == "fcc-compliance-checkup" and order.get("entity", {}).get("frn"):
|
|
try:
|
|
import urllib.request
|
|
import json as _json
|
|
frn = order["entity"]["frn"]
|
|
api_url = os.environ.get("API_URL", "http://api:3001")
|
|
req = urllib.request.Request(f"{api_url}/api/v1/fcc/lookup?frn={frn}")
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
order["compliance_checks"] = _json.loads(resp.read())
|
|
except Exception as exc:
|
|
LOG.warning("Could not run compliance checks for FRN %s: %s",
|
|
order.get("entity", {}).get("frn"), exc)
|
|
|
|
handler = handler_cls()
|
|
# Final entity check before dispatch
|
|
ent = order.get("entity", {})
|
|
LOG.info(
|
|
"Handler %s ready for %s — entity keys=%s legal_name=%s",
|
|
handler_cls.__name__, order_number,
|
|
list(ent.keys())[:5] if ent else "NONE",
|
|
ent.get("legal_name", "??"),
|
|
)
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
file_paths = loop.run_until_complete(handler.process(order))
|
|
finally:
|
|
loop.close()
|
|
|
|
# Upload files to MinIO and track paths
|
|
effective_order_number = order_number or order.get("name", order_name)
|
|
minio_paths = []
|
|
for fp in file_paths:
|
|
from pathlib import Path
|
|
p = Path(fp)
|
|
remote = f"compliance/{effective_order_number}/{p.name}"
|
|
storage.upload(fp, remote)
|
|
minio_paths.append(remote)
|
|
|
|
# Store generated file paths on the Sales Order for delivery worker
|
|
try:
|
|
client.set_value(
|
|
"Sales Order", order_name,
|
|
"custom_generated_files", "\n".join(minio_paths),
|
|
)
|
|
except Exception as exc:
|
|
LOG.warning("Could not store generated files on %s: %s", order_name, exc)
|
|
|
|
# ── Instant delivery for no-vendor-fee services ────────────────────
|
|
# Services that generate documents without needing to pay a vendor
|
|
# (no portal submission fees, no state fees) can deliver immediately
|
|
# on successful payment. The customer gets their documents right away.
|
|
INSTANT_DELIVERY_SLUGS = {
|
|
"cpni-certification", # generates cert letter + procedure statement
|
|
"calea-ssi", # generates SSI plan document
|
|
"fcc-compliance-checkup", # generates diagnostic report
|
|
"dc-agent", # generates acceptance letter (NWRA is separate)
|
|
# Non-FCC doc-gen services
|
|
"flsa-audit", "contractor-classification", "handbook-review",
|
|
"policy-development", "ccpa-audit", "privacy-policy",
|
|
"data-mapping", "breach-response", "consent-audit",
|
|
"dnc-compliance", "campaign-review",
|
|
}
|
|
if service_slug in INSTANT_DELIVERY_SLUGS and minio_paths:
|
|
customer_email = order.get("customer_email")
|
|
customer_name = order.get("customer_name", "")
|
|
if customer_email:
|
|
try:
|
|
_send_instant_delivery(
|
|
customer_email=customer_email,
|
|
customer_name=customer_name,
|
|
order_number=effective_order_number,
|
|
service_name=handler_cls.SERVICE_NAME if hasattr(handler_cls, "SERVICE_NAME") else service_slug,
|
|
minio_paths=minio_paths,
|
|
storage=storage,
|
|
)
|
|
LOG.info("Instant delivery sent to %s for %s (%s)",
|
|
customer_email, effective_order_number, service_slug)
|
|
except Exception as exc:
|
|
LOG.warning("Instant delivery failed for %s: %s", effective_order_number, exc)
|
|
|
|
# Clear the per-order auto-filing override so the approval cannot be
|
|
# silently replayed — each admin approval is one-shot.
|
|
try:
|
|
if order.get("custom_auto_filing_override"):
|
|
client.set_value("Sales Order", order_name, "custom_auto_filing_override", 0)
|
|
except Exception as exc:
|
|
LOG.warning("Could not clear auto_filing_override on %s: %s", order_name, exc)
|
|
|
|
# Update telecom entity with last checkup timestamp
|
|
if pg_entity_id and service_slug == "fcc-compliance-checkup":
|
|
try:
|
|
import psycopg2
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"UPDATE telecom_entities SET last_compliance_checkup = NOW() WHERE id = %s",
|
|
(pg_entity_id,),
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
except Exception as exc:
|
|
LOG.warning("Could not update entity checkup timestamp: %s", exc)
|
|
|
|
# Advance workflow to Review after processing
|
|
try:
|
|
client.set_value("Sales Order", order_name, "workflow_state", "Review")
|
|
LOG.info("Advanced %s to Review", order_name)
|
|
except Exception as exc:
|
|
LOG.warning("Could not advance workflow for %s: %s", order_name, exc)
|
|
|
|
# ── Create compliance calendar deadlines for the customer ───────────
|
|
# After a filing is completed, create recurring deadline reminders
|
|
# in the ERPNext Compliance Deadline doctype.
|
|
try:
|
|
entity_name = ent.get("legal_name", "")
|
|
customer_email = order.get("customer_email", "")
|
|
frn = ent.get("frn", "")
|
|
|
|
# Map service slugs to their next deadline(s).
|
|
# Most filings are annual (single month/day). BDC is semi-annual.
|
|
# Each entry may have a single (month, day) or a list of (month, day) pairs.
|
|
DEADLINE_MAP = {
|
|
"rmd-filing": {"title": "RMD Annual Recertification", "type": "RMD Recertification",
|
|
"deadlines": [(3, 1)], "notes": "Log into FCC RMD portal with MFA, verify all info, click Recertify."},
|
|
"cpni-certification": {"title": "CPNI Annual Certification", "type": "CPNI Certification",
|
|
"deadlines": [(3, 1)], "notes": "File CPNI cert at FCC ECFS under WC Docket 06-36."},
|
|
"fcc-499a": {"title": "Form 499-A Annual Filing", "type": "499-A Annual",
|
|
"deadlines": [(4, 1)], "notes": "File at USAC E-File portal. Reports prior calendar year revenue."},
|
|
"fcc-499a-499q": {"title": "Form 499-A Annual Filing", "type": "499-A Annual",
|
|
"deadlines": [(4, 1)], "notes": "File at USAC E-File portal. Reports prior calendar year revenue."},
|
|
"calea-ssi": {"title": "CALEA SSI Plan Review", "type": "Other",
|
|
"deadlines": [(1, 15)], "notes": "Annual review of CALEA System Security & Integrity plan."},
|
|
# BDC — semi-annual (June 1 and December 1)
|
|
"bdc-filing": {"title": "BDC Broadband & Voice Filing", "type": "BDC Filing",
|
|
"deadlines": [(6, 1), (12, 1)], "notes": "File broadband deployment and voice subscription data at FCC BDC portal."},
|
|
"bdc-broadband": {"title": "BDC Broadband Deployment Filing", "type": "BDC Filing",
|
|
"deadlines": [(6, 1), (12, 1)], "notes": "File broadband deployment data at FCC BDC portal."},
|
|
"bdc-voice": {"title": "BDC Voice Subscription Filing", "type": "BDC Filing",
|
|
"deadlines": [(6, 1), (12, 1)], "notes": "File voice subscription data (formerly Form 477) at FCC BDC portal."},
|
|
# State PUC — most states require annual report, varies by state
|
|
"state-puc": {"title": "State PUC Annual Report", "type": "PUC Filing",
|
|
"deadlines": [(3, 31)], "notes": "File annual report with state PUC/PSC. Due date varies by state — check state requirements."},
|
|
}
|
|
|
|
if service_slug in DEADLINE_MAP and entity_name:
|
|
dl = DEADLINE_MAP[service_slug]
|
|
from datetime import date
|
|
now = date.today()
|
|
|
|
# Find the ERPNext Customer for this email (shared across all deadlines)
|
|
erpnext_customer = ""
|
|
try:
|
|
customers = client.get_list("Customer", filters={"email_id": customer_email}, fields=["name"], limit=1)
|
|
if customers:
|
|
erpnext_customer = customers[0]["name"]
|
|
except Exception:
|
|
pass
|
|
|
|
# Create a deadline for each (month, day) pair
|
|
for month, day in dl["deadlines"]:
|
|
next_year = now.year + 1 if now.month > month or (now.month == month and now.day >= day) else now.year
|
|
deadline_date = f"{next_year}-{month:02d}-{day:02d}"
|
|
|
|
try:
|
|
client.create_resource("Compliance Deadline", {
|
|
"title": f"{dl['title']} — {entity_name}",
|
|
"deadline_date": deadline_date,
|
|
"filing_type": dl["type"],
|
|
"entity_name": entity_name,
|
|
"frn": frn,
|
|
"status": "Upcoming",
|
|
"notes": dl["notes"],
|
|
"linked_order": effective_order_number,
|
|
"customer": erpnext_customer or None,
|
|
"published": 1,
|
|
"route": f"compliance-deadlines/{service_slug}-{next_year}-{month:02d}-{frn}",
|
|
})
|
|
LOG.info("Created compliance deadline: %s for %s (due %s)", dl["title"], entity_name, deadline_date)
|
|
except Exception as exc:
|
|
LOG.warning("Could not create compliance deadline for %s: %s", deadline_date, exc)
|
|
except Exception as exc:
|
|
LOG.warning("Compliance calendar creation failed: %s", exc)
|
|
|
|
# ── Foreign qualification add-on from 499-A intake ────────────────
|
|
# If the 499-A intake wizard detected missing foreign qualifications
|
|
# and the client opted in, create a companion foreign-qualification
|
|
# order for those states.
|
|
try:
|
|
fq_states = (order.get("intake_data") or {}).get("foreign_qual_add_on_states") or []
|
|
if fq_states and isinstance(fq_states, list) and len(fq_states) > 0:
|
|
entity_name = ent.get("legal_name", "")
|
|
customer_email = order.get("customer_email", "")
|
|
LOG.info(
|
|
"Creating foreign qual add-on for %s: %d states (%s)",
|
|
effective_order_number, len(fq_states), ", ".join(fq_states),
|
|
)
|
|
# Create an admin todo for the foreign qual filing
|
|
try:
|
|
client.create_resource("ToDo", {
|
|
"description": (
|
|
f"[foreign-qualification] {effective_order_number}\n\n"
|
|
f"499-A intake detected {len(fq_states)} state(s) missing "
|
|
f"foreign corporation registration.\n"
|
|
f"Entity: {entity_name}\n"
|
|
f"States: {', '.join(fq_states)}\n"
|
|
f"Customer: {customer_email}\n\n"
|
|
f"Client opted in during 499-A intake at $99/state.\n"
|
|
f"Create a foreign-qualification-multi order for these states "
|
|
f"or contact the client to finalize."
|
|
),
|
|
"priority": "Medium",
|
|
"role": "Accounting Advisor",
|
|
})
|
|
except Exception as exc:
|
|
LOG.warning("Could not create foreign qual admin ToDo: %s", exc)
|
|
except Exception as exc:
|
|
LOG.warning("Foreign qual add-on processing failed: %s", exc)
|
|
|
|
return {"action": "process_compliance_service", "files": minio_paths}
|
|
|
|
|
|
# ==========================================================================
|
|
# Job Dispatch Map
|
|
# ==========================================================================
|
|
|
|
def handle_scrape_amb_units(data: dict) -> dict:
|
|
"""Scrape available mailbox unit numbers from an AMB location page."""
|
|
import asyncio
|
|
from scripts.formation.states.bc.adapter import BCPortal
|
|
|
|
location_url = data.get("location_url", "")
|
|
order_id = data.get("order_id", "")
|
|
if not location_url:
|
|
return {"error": "location_url required", "units": []}
|
|
|
|
LOG.info("[scrape_amb_units] Scraping units from %s for %s", location_url, order_id)
|
|
portal = BCPortal()
|
|
try:
|
|
units = asyncio.get_event_loop().run_until_complete(portal.scrape_available_units(location_url))
|
|
except RuntimeError:
|
|
# No running event loop
|
|
units = asyncio.run(portal.scrape_available_units(location_url))
|
|
|
|
LOG.info("[scrape_amb_units] Found %d units", len(units))
|
|
return {"units": units, "order_id": order_id}
|
|
|
|
|
|
def handle_purchase_client_selections(data: dict) -> dict:
|
|
"""Purchase mailbox + DID based on client portal selections.
|
|
|
|
Called after client confirms their choices in /portal/setup.
|
|
1. If has_own_ca_address is False: Playwright → AMB signup with selected unit
|
|
2. Flowroute API → purchase selected DID
|
|
3. Update PG order with results
|
|
4. Advance ERPNext workflow
|
|
"""
|
|
import asyncio
|
|
import psycopg2
|
|
from scripts.formation.states.bc.adapter import BCPortal
|
|
from scripts.workers.services.flowroute import purchase_did, configure_routing
|
|
from scripts.workers.relay_integration import load_card_from_erpnext
|
|
|
|
order_id = data.get("order_id", "")
|
|
selected_unit = data.get("selected_unit")
|
|
selected_did = data.get("selected_did", "")
|
|
has_own_address = data.get("has_own_ca_address", False)
|
|
|
|
if not order_id:
|
|
return {"error": "order_id required"}
|
|
|
|
LOG.info("[purchase_selections] Starting for %s: unit=%s did=%s own_addr=%s",
|
|
order_id, selected_unit, selected_did, has_own_address)
|
|
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
results = {"order_id": order_id, "mailbox": None, "did": None}
|
|
|
|
try:
|
|
# Get order details
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT order_number, payment_method, amb_location_slug, entity_name,
|
|
customer_name, customer_email, customer_phone,
|
|
director_name, director_address, erpnext_sales_order
|
|
FROM canada_crtc_orders WHERE order_number = %s
|
|
""", (order_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return {"error": "Order not found"}
|
|
|
|
cols = ["order_number", "payment_method", "amb_location_slug", "entity_name",
|
|
"customer_name", "customer_email", "customer_phone",
|
|
"director_name", "director_address", "erpnext_sales_order"]
|
|
order = dict(zip(cols, row))
|
|
|
|
# ── Step 1: Mailbox signup (if needed) ──────────────────────────────
|
|
if not has_own_address and selected_unit:
|
|
# Get location URL
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT provider_url FROM amb_locations WHERE slug = %s", (order["amb_location_slug"],))
|
|
loc_row = cur.fetchone()
|
|
|
|
if loc_row:
|
|
location_url = loc_row[0]
|
|
portal = BCPortal()
|
|
|
|
# Build a minimal FormationOrder-like object
|
|
class MinimalOrder:
|
|
def __init__(self, o):
|
|
self.order_id = o["order_number"]
|
|
self.entity_name = o["entity_name"] or ""
|
|
self.regulatory_contact_name = o["customer_name"] or ""
|
|
self.regulatory_contact_phone = o["customer_phone"] or ""
|
|
self.principal_address = ""
|
|
self.principal_city = ""
|
|
self.principal_state = ""
|
|
self.principal_zip = ""
|
|
self.mailing_address = ""
|
|
self.mailing_city = ""
|
|
self.mailing_state = ""
|
|
self.mailing_zip = ""
|
|
self.members = []
|
|
|
|
mini_order = MinimalOrder(order)
|
|
|
|
try:
|
|
mb_result = asyncio.get_event_loop().run_until_complete(
|
|
portal.signup_with_unit(mini_order, selected_unit, location_url)
|
|
)
|
|
except RuntimeError:
|
|
mb_result = asyncio.run(
|
|
portal.signup_with_unit(mini_order, selected_unit, location_url)
|
|
)
|
|
|
|
results["mailbox"] = mb_result
|
|
LOG.info("[purchase_selections] Mailbox result: %s", mb_result.get("success"))
|
|
|
|
if mb_result.get("success"):
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE canada_crtc_orders SET mailbox_address = %s WHERE order_number = %s",
|
|
(f"{location_url} Unit {selected_unit}", order_id),
|
|
)
|
|
conn.commit()
|
|
|
|
# ── Step 2: DID purchase ────────────────────────────────────────────
|
|
if selected_did:
|
|
try:
|
|
did_result = purchase_did(selected_did)
|
|
results["did"] = {"success": True, "did": selected_did, "result": str(did_result)[:200]}
|
|
LOG.info("[purchase_selections] DID %s purchased", selected_did)
|
|
|
|
# Configure routing (default "later" — client sets up in portal)
|
|
try:
|
|
configure_routing(selected_did, "later")
|
|
except Exception as route_err:
|
|
LOG.warning("[purchase_selections] DID routing config failed: %s", route_err)
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE canada_crtc_orders SET client_selected_did = %s WHERE order_number = %s",
|
|
(selected_did, order_id),
|
|
)
|
|
conn.commit()
|
|
|
|
except Exception as did_err:
|
|
LOG.error("[purchase_selections] DID purchase failed: %s", did_err)
|
|
results["did"] = {"success": False, "error": str(did_err)}
|
|
|
|
# ── Step 3: Advance workflow ────────────────────────────────────────
|
|
# Advance from "Client Selection" → "Mailbox Setup" (or "Incorporation" if own address)
|
|
so_name = order.get("erpnext_sales_order")
|
|
if so_name:
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
erp = ERPNextClient()
|
|
target_action = "Client Confirmed Own Address" if has_own_address else "Client Confirmed"
|
|
erp.apply_workflow("Sales Order", so_name, target_action)
|
|
LOG.info("[purchase_selections] Workflow advanced via '%s'", target_action)
|
|
except Exception as wf_err:
|
|
LOG.error("[purchase_selections] Workflow advance failed: %s", wf_err)
|
|
|
|
except Exception as e:
|
|
LOG.error("[purchase_selections] Error: %s", e)
|
|
results["error"] = str(e)
|
|
finally:
|
|
conn.close()
|
|
|
|
return results
|
|
|
|
|
|
def handle_presign(payload: dict) -> dict:
|
|
"""Generate a MinIO presigned URL for a given object key.
|
|
|
|
Used by the portal-esign.ts API route to get a PDF preview URL
|
|
without adding a MinIO npm package to the API container.
|
|
Also supports upload (PUT) presign for reseller cert + ICC uploads.
|
|
|
|
Payload: { key: str, expires: int (seconds, default 3600),
|
|
method: "GET" | "PUT" (default GET) }
|
|
Returns: { url: str }
|
|
"""
|
|
key = payload.get("key", "")
|
|
expires = int(payload.get("expires", 3600))
|
|
method = str(payload.get("method", "GET")).upper()
|
|
if not key:
|
|
return {"error": "key required"}
|
|
if method not in ("GET", "PUT"):
|
|
return {"error": "method must be GET or PUT"}
|
|
|
|
try:
|
|
from datetime import timedelta
|
|
from minio import Minio as _Minio
|
|
mc = _Minio(
|
|
f"{os.getenv('MINIO_ENDPOINT', 'minio')}:{os.getenv('MINIO_PORT', '9000')}",
|
|
access_key=os.getenv("MINIO_ACCESS_KEY", ""),
|
|
secret_key=os.getenv("MINIO_SECRET_KEY", ""),
|
|
secure=os.getenv("MINIO_SECURE", "false").lower() == "true",
|
|
)
|
|
bucket = os.getenv("MINIO_BUCKET", "performancewest")
|
|
# Ensure the bucket exists (first-run idempotent).
|
|
if not mc.bucket_exists(bucket):
|
|
mc.make_bucket(bucket)
|
|
expires_td = timedelta(seconds=expires)
|
|
if method == "PUT":
|
|
url = mc.presigned_put_object(bucket, key, expires=expires_td)
|
|
else:
|
|
url = mc.presigned_get_object(bucket, key, expires=expires_td)
|
|
return {"url": url, "bucket": bucket, "key": key, "method": method}
|
|
except Exception as exc:
|
|
LOG.error("[presign] Failed to generate presigned URL for %s: %s", key, exc)
|
|
return {"error": str(exc)}
|
|
|
|
|
|
def handle_resume_crtc_pipeline(payload: dict) -> dict:
|
|
"""Resume the CRTC pipeline after an async pause (eSign, domain selection, etc.).
|
|
|
|
Called by the portal-esign.ts API route after the client signs the letter,
|
|
or by the domain-confirm endpoint after domain selection.
|
|
|
|
Payload: { order_id: str, step: str }
|
|
step values:
|
|
"post_esign" — client signed; resume at Step 7 (binder compilation)
|
|
"post_domain" — client confirmed domain; resume at Step 5b (HestiaCP)
|
|
"""
|
|
order_id = payload.get("order_id", "")
|
|
step = payload.get("step", "post_esign")
|
|
if not order_id:
|
|
return {"error": "order_id required"}
|
|
|
|
LOG.info("[resume_crtc] Resuming CRTC pipeline for order %s at step %s", order_id, step)
|
|
|
|
try:
|
|
# Fetch the ERPNext Sales Order name for this order
|
|
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT erpnext_sales_order FROM canada_crtc_orders WHERE order_number = %s",
|
|
(order_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
conn.close()
|
|
|
|
if not row or not row[0]:
|
|
return {"error": f"No ERPNext Sales Order found for {order_id}"}
|
|
|
|
so_name = row[0]
|
|
|
|
# Re-dispatch as a generate_crtc_docs job (which runs the full pipeline
|
|
# and is idempotent — already-completed steps are skipped via their
|
|
# custom_*_at timestamps)
|
|
result = handle_generate_crtc_docs({"order": so_name, "step_resume": step})
|
|
LOG.info("[resume_crtc] Pipeline resumed for %s: %s", order_id, result)
|
|
return {"success": True, "order_id": order_id, "step": step, "result": result}
|
|
|
|
except Exception as exc:
|
|
LOG.error("[resume_crtc] Error resuming pipeline for %s: %s", order_id, exc, exc_info=True)
|
|
return {"error": str(exc)}
|
|
|
|
|
|
def handle_renewal_payment(payload: dict) -> dict:
|
|
"""Handle a renewal invoice payment event.
|
|
|
|
Called by the Express API webhook when a renewal Sales Invoice is paid.
|
|
Triggers immediate completion of linked compliance entries and re-calendaring.
|
|
|
|
Payload: {invoice: "SINV-00123"}
|
|
"""
|
|
invoice_name = payload.get("invoice", "")
|
|
if not invoice_name:
|
|
return {"error": "Missing invoice name"}
|
|
|
|
LOG.info("[renewal] Processing payment for invoice: %s", invoice_name)
|
|
try:
|
|
from scripts.workers.renewal_worker import handle_renewal_payment as _process
|
|
_process(invoice_name)
|
|
return {"success": True, "invoice": invoice_name}
|
|
except Exception as exc:
|
|
LOG.error("[renewal] Error processing payment for %s: %s", invoice_name, exc)
|
|
return {"error": str(exc)}
|
|
|
|
|
|
def handle_run_renewal_check(payload: dict) -> dict:
|
|
"""Run the full renewal check cycle on demand.
|
|
|
|
Can be triggered manually from admin or by the daily cron.
|
|
Payload: {} (no parameters needed)
|
|
"""
|
|
LOG.info("[renewal] Running on-demand renewal check")
|
|
try:
|
|
from scripts.workers.renewal_worker import main as renewal_main
|
|
renewal_main()
|
|
return {"success": True}
|
|
except Exception as exc:
|
|
LOG.error("[renewal] Renewal check failed: %s", exc)
|
|
return {"error": str(exc)}
|
|
|
|
|
|
def handle_check_red_light(payload: dict) -> dict:
|
|
"""Check FCC CORES red light status for an FRN.
|
|
|
|
Payload: { "frn": "0027160886" }
|
|
"""
|
|
frn = payload.get("frn", "")
|
|
if not frn:
|
|
return {"success": False, "error": "Missing frn"}
|
|
|
|
LOG.info("[check_red_light] Checking FRN %s", frn)
|
|
try:
|
|
import asyncio
|
|
from scripts.workers.fcc_cores_scraper import check_red_light
|
|
result = asyncio.run(check_red_light(frn))
|
|
return result
|
|
except Exception as exc:
|
|
LOG.error("[check_red_light] Failed: %s", exc)
|
|
return {"frn": frn, "status": "unknown", "detail": str(exc)}
|
|
|
|
|
|
def handle_import_499a(payload: dict) -> dict:
|
|
"""Import Form 499-A data from USAC E-File.
|
|
|
|
Payload: { "filer_id": "812345" }
|
|
Returns the extracted entity info, service categories, and revenue lines.
|
|
"""
|
|
filer_id = payload.get("filer_id", "")
|
|
if not filer_id:
|
|
return {"success": False, "error": "Missing filer_id"}
|
|
|
|
LOG.info("[import_499a] Starting import for filer %s", filer_id)
|
|
try:
|
|
import asyncio
|
|
from scripts.workers.usac_import import import_499a
|
|
result = asyncio.run(import_499a(filer_id))
|
|
return result
|
|
except Exception as exc:
|
|
LOG.error("[import_499a] Failed: %s", exc)
|
|
return {"success": False, "error": str(exc)}
|
|
|
|
|
|
JOB_HANDLERS = {
|
|
"name_search": handle_name_search,
|
|
"file_entity": handle_file_entity,
|
|
"obtain_ein": handle_obtain_ein,
|
|
"generate_docs": handle_generate_docs,
|
|
"deliver": handle_deliver,
|
|
"send_to_attorney": handle_send_to_attorney,
|
|
"process_compliance_service": handle_process_compliance_service,
|
|
# Canada CRTC pipeline actions (dispatched by ERPNext webhooks)
|
|
"register_ca_domain": handle_register_ca_domain,
|
|
"register_awaiting_funds": handle_register_awaiting_funds,
|
|
"file_bc_incorporation": handle_file_bc_incorporation,
|
|
"generate_crtc_docs": handle_generate_crtc_docs,
|
|
"notify_admin_review": handle_notify_admin_review,
|
|
"ship_binder": handle_ship_binder,
|
|
"mark_delivered": handle_mark_delivered,
|
|
# Portal client selection actions
|
|
"scrape_amb_units": handle_scrape_amb_units,
|
|
"purchase_client_selections": handle_purchase_client_selections,
|
|
# eSign / MinIO helpers
|
|
"presign": handle_presign,
|
|
"resume_crtc_pipeline": handle_resume_crtc_pipeline,
|
|
# Compliance calendar renewal
|
|
"renewal_payment": handle_renewal_payment,
|
|
"run_renewal_check": handle_run_renewal_check,
|
|
# FCC 499-A import from USAC
|
|
"import_499a": handle_import_499a,
|
|
# FCC CORES red light check
|
|
"check_red_light": handle_check_red_light,
|
|
}
|
|
|
|
|
|
def _build_formation_order(order: dict):
|
|
"""Convert ERPNext Formation Order dict to our FormationOrder dataclass."""
|
|
from scripts.formation.base import FormationOrder, Member, EntityType
|
|
|
|
members_raw = order.get("members_json") or []
|
|
if isinstance(members_raw, str):
|
|
import json
|
|
members_raw = json.loads(members_raw)
|
|
|
|
members = [
|
|
Member(
|
|
name=m.get("name", ""),
|
|
address=m.get("address", ""),
|
|
city=m.get("city", ""),
|
|
state=m.get("state", ""),
|
|
zip_code=m.get("zip", ""),
|
|
title=m.get("title", "Member"),
|
|
ownership_pct=m.get("ownership_pct", 0),
|
|
)
|
|
for m in members_raw
|
|
]
|
|
|
|
entity_map = {"LLC": EntityType.LLC, "Corporation": EntityType.CORPORATION, "S-Corp": EntityType.S_CORP}
|
|
|
|
return FormationOrder(
|
|
order_id=order.get("name", ""),
|
|
state_code=order.get("state_code", ""),
|
|
entity_type=entity_map.get(order.get("entity_type", ""), EntityType.LLC),
|
|
entity_name=order.get("entity_name", ""),
|
|
entity_name_alt=order.get("entity_name_alt", ""),
|
|
management_type=order.get("management_type", "member_managed"),
|
|
purpose=order.get("purpose", "Any lawful business activity"),
|
|
members=members,
|
|
principal_address=order.get("principal_address", ""),
|
|
expedited=bool(order.get("expedited")),
|
|
)
|
|
|
|
|
|
# ==========================================================================
|
|
# HTTP Server
|
|
# ==========================================================================
|
|
|
|
class JobHandler(BaseHTTPRequestHandler):
|
|
|
|
def do_GET(self):
|
|
if self.path == "/health":
|
|
self._respond(200, {
|
|
"status": "ok",
|
|
"service": "performancewest-workers",
|
|
"active_jobs": len(active_jobs),
|
|
"max_workers": MAX_WORKERS,
|
|
})
|
|
elif self.path == "/jobs/active":
|
|
self._respond(200, {
|
|
"active": list(active_jobs.values()),
|
|
"recent_completed": completed_jobs[-10:],
|
|
})
|
|
else:
|
|
self._respond(404, {"error": "Not found"})
|
|
|
|
def do_POST(self):
|
|
if self.path == "/entity-status":
|
|
# Synchronous entity status lookup via state portal adapter
|
|
self._handle_entity_status()
|
|
return
|
|
|
|
if self.path == "/rmd-preview":
|
|
self._handle_rmd_preview()
|
|
return
|
|
|
|
if self.path != "/jobs":
|
|
self._respond(404, {"error": "Not found"})
|
|
return
|
|
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
body = self.rfile.read(content_length)
|
|
|
|
try:
|
|
payload = json.loads(body)
|
|
except json.JSONDecodeError:
|
|
self._respond(400, {"error": "Invalid JSON"})
|
|
return
|
|
|
|
action = payload.get("action")
|
|
if not action or action not in JOB_HANDLERS:
|
|
self._respond(400, {"error": f"Unknown action: {action}. Available: {list(JOB_HANDLERS.keys())}"})
|
|
return
|
|
|
|
# Create job
|
|
job_id = str(uuid.uuid4())[:8]
|
|
job_record = {
|
|
"id": job_id,
|
|
"action": action,
|
|
"order_name": payload.get("order_name", ""),
|
|
"status": "queued",
|
|
"started_at": datetime.utcnow().isoformat(),
|
|
}
|
|
active_jobs[job_id] = job_record
|
|
|
|
# Execute in background thread
|
|
def run_job():
|
|
job_record["status"] = "running"
|
|
try:
|
|
handler = JOB_HANDLERS[action]
|
|
result = handler(payload)
|
|
job_record["status"] = "completed"
|
|
job_record["result"] = result
|
|
LOG.info("Job %s (%s) completed: %s", job_id, action, result.get("action"))
|
|
except Exception as e:
|
|
job_record["status"] = "failed"
|
|
job_record["error"] = str(e)
|
|
LOG.error("Job %s (%s) failed: %s", job_id, action, e, exc_info=True)
|
|
finally:
|
|
job_record["finished_at"] = datetime.utcnow().isoformat()
|
|
active_jobs.pop(job_id, None)
|
|
completed_jobs.append(job_record)
|
|
if len(completed_jobs) > 100:
|
|
completed_jobs.pop(0)
|
|
|
|
executor.submit(run_job)
|
|
|
|
self._respond(202, {"job_id": job_id, "action": action, "status": "queued"})
|
|
|
|
def _handle_entity_status(self):
|
|
"""Synchronous entity status lookup via Playwright state adapter.
|
|
POST /entity-status {entity_name, state_code}
|
|
Returns entity status from the state SOS portal (cached 24h).
|
|
"""
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
body = self.rfile.read(content_length)
|
|
try:
|
|
payload = json.loads(body)
|
|
except json.JSONDecodeError:
|
|
self._respond(400, {"error": "Invalid JSON"})
|
|
return
|
|
|
|
entity_name = payload.get("entity_name", "").strip()
|
|
state_code = payload.get("state_code", "").strip().upper()
|
|
|
|
if not entity_name or not state_code:
|
|
self._respond(400, {"error": "entity_name and state_code required"})
|
|
return
|
|
|
|
try:
|
|
import asyncio
|
|
from scripts.formation.name_search import search_name
|
|
|
|
loop = asyncio.new_event_loop()
|
|
result = loop.run_until_complete(search_name(entity_name, state_code))
|
|
loop.close()
|
|
# search_name returns a NameSearchResult dataclass — convert to dict
|
|
if hasattr(result, "__dict__"):
|
|
result = vars(result)
|
|
elif hasattr(result, "_asdict"):
|
|
result = result._asdict()
|
|
|
|
if result.get("available") is False:
|
|
# Name is taken = entity exists (active)
|
|
self._respond(200, {
|
|
"found": True,
|
|
"entity_name": entity_name.upper(),
|
|
"entity_number": result.get("entity_number"),
|
|
"entity_type": result.get("entity_type"),
|
|
"status": "ACTIVE",
|
|
"formation_date": result.get("formation_date"),
|
|
"registered_agent": result.get("registered_agent"),
|
|
"state": state_code,
|
|
"source": "live_search",
|
|
})
|
|
elif result.get("available") is True:
|
|
# Name is available = no entity with this name
|
|
self._respond(200, {"found": False, "state": state_code})
|
|
else:
|
|
# Inconclusive
|
|
self._respond(200, {"found": False, "state": state_code, "note": "Search inconclusive"})
|
|
|
|
except Exception as e:
|
|
LOG.warning("Entity status search failed for %s in %s: %s", entity_name, state_code, e)
|
|
self._respond(200, {"found": False, "state": state_code, "error": str(e)})
|
|
|
|
def _handle_rmd_preview(self):
|
|
"""Generate an RMD certification preview PDF from client-provided data.
|
|
POST /rmd-preview {order_number, entity: {...}}
|
|
Returns {pdf_url} with a presigned MinIO URL for the generated PDF.
|
|
"""
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
body = self.rfile.read(content_length)
|
|
try:
|
|
payload = json.loads(body)
|
|
except json.JSONDecodeError:
|
|
self._respond(400, {"error": "Invalid JSON"})
|
|
return
|
|
|
|
entity = payload.get("entity", {})
|
|
order_number = payload.get("order_number", "PREVIEW")
|
|
|
|
if not entity.get("legal_name") or not entity.get("frn"):
|
|
self._respond(400, {"error": "legal_name and frn required"})
|
|
return
|
|
|
|
try:
|
|
import tempfile
|
|
from scripts.document_gen.templates.rmd_letter_generator import generate_rmd_letter
|
|
|
|
work_dir = tempfile.mkdtemp(prefix="rmd_preview_")
|
|
date_str = datetime.now().strftime("%Y%m%d")
|
|
docx_path = os.path.join(work_dir, f"rmd_preview_{order_number}_{date_str}.docx")
|
|
|
|
generate_rmd_letter(
|
|
entity_name=entity.get("legal_name", ""),
|
|
dba_name=entity.get("dba_name", ""),
|
|
frn=entity.get("frn", ""),
|
|
rmd_number=entity.get("rmd_number", ""),
|
|
filer_id_499=entity.get("filer_id_499", ""),
|
|
address_street=entity.get("address_street", ""),
|
|
address_city=entity.get("address_city", ""),
|
|
address_state=entity.get("address_state", ""),
|
|
address_zip=entity.get("address_zip", ""),
|
|
contact_name=entity.get("contact_name", ""),
|
|
contact_title=entity.get("contact_title", ""),
|
|
contact_email=entity.get("contact_email", ""),
|
|
contact_phone=entity.get("contact_phone", ""),
|
|
ceo_name=entity.get("ceo_name", ""),
|
|
ceo_title=entity.get("ceo_title", "Chief Executive Officer"),
|
|
carrier_category=entity.get("carrier_category", "interconnected_voip"),
|
|
infra_type=entity.get("infra_type", "facilities"),
|
|
is_wholesale=entity.get("is_wholesale", False),
|
|
is_gateway_provider=entity.get("is_gateway_provider", False),
|
|
is_international_only=False,
|
|
uses_ucaas_provider=False,
|
|
carrier_metadata={},
|
|
stir_shaken_status=entity.get("stir_shaken_status", "complete_implementation"),
|
|
stir_shaken_cert_authority=entity.get("stir_shaken_cert_authority", ""),
|
|
upstream_provider_name=entity.get("upstream_provider_name", ""),
|
|
upstream_provider_frn="",
|
|
output_path=docx_path,
|
|
)
|
|
|
|
# Convert to PDF
|
|
import subprocess
|
|
subprocess.run(
|
|
["libreoffice", "--headless", "--convert-to", "pdf", "--outdir", work_dir, docx_path],
|
|
capture_output=True, timeout=30,
|
|
)
|
|
pdf_path = docx_path.replace(".docx", ".pdf")
|
|
|
|
if not os.path.exists(pdf_path):
|
|
self._respond(500, {"error": "PDF conversion failed"})
|
|
return
|
|
|
|
# Upload to MinIO and get presigned URL
|
|
from scripts.document_gen.minio_client import MinioStorage
|
|
storage = MinioStorage()
|
|
remote_key = f"previews/{order_number}/rmd_preview_{date_str}.pdf"
|
|
storage.upload(pdf_path, remote_key)
|
|
pdf_url = storage.get_url(remote_key, expires_hours=24)
|
|
|
|
self._respond(200, {"pdf_url": pdf_url, "generated": True})
|
|
|
|
# Cleanup
|
|
import shutil
|
|
shutil.rmtree(work_dir, ignore_errors=True)
|
|
|
|
except Exception as exc:
|
|
LOG.error("RMD preview generation failed: %s", exc, exc_info=True)
|
|
self._respond(500, {"error": f"Preview generation failed: {exc}"})
|
|
|
|
def _respond(self, status: int, body: dict):
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(body).encode())
|
|
|
|
def log_message(self, format, *args):
|
|
# Suppress default HTTP logging, we use our own
|
|
pass
|
|
|
|
|
|
|
|
# ==========================================================================
|
|
# Deferred job auto-retry — polls ERPNext for Deferred orders past defer_until
|
|
# ==========================================================================
|
|
|
|
DEFER_POLL_INTERVAL = int(os.getenv("DEFER_POLL_INTERVAL", "300")) # 5 min
|
|
|
|
def _deferred_retry_loop():
|
|
"""Background thread: polls for deferred Formation Orders ready to retry."""
|
|
LOG.info("Deferred retry loop started (poll every %ds)", DEFER_POLL_INTERVAL)
|
|
while True:
|
|
time.sleep(DEFER_POLL_INTERVAL)
|
|
try:
|
|
from scripts.workers.erpnext_client import ERPNextClient
|
|
client = ERPNextClient()
|
|
|
|
# Fetch Formation Orders with automation_status=Deferred and defer_until <= now
|
|
now_iso = datetime.utcnow().isoformat()
|
|
orders = client.get_resource("Formation Order", filters=[
|
|
["automation_status", "=", "Deferred"],
|
|
["defer_until", "<=", now_iso],
|
|
], fields=["name", "state_code", "entity_name", "include_ein"])
|
|
|
|
if not orders:
|
|
continue
|
|
|
|
LOG.info("Found %d deferred orders ready to retry", len(orders))
|
|
|
|
for order in orders:
|
|
order_name = order["name"]
|
|
state_code = order.get("state_code", "")
|
|
|
|
# Reset status to Pending so the retry handler knows it's fresh
|
|
client.update_resource("Formation Order", order_name, {
|
|
"automation_status": "Pending",
|
|
"defer_until": None,
|
|
"admin_notes": f"Auto-retrying after deferral (was deferred until {now_iso})",
|
|
})
|
|
|
|
# Re-dispatch the filing job via the thread pool
|
|
payload = {"order_name": order_name}
|
|
job_id = str(uuid.uuid4())[:8]
|
|
job_record = {
|
|
"id": job_id,
|
|
"action": "file_entity",
|
|
"order_name": order_name,
|
|
"status": "queued",
|
|
"started_at": time.time(),
|
|
"source": "deferred_retry",
|
|
}
|
|
active_jobs[job_id] = job_record
|
|
|
|
def _run_deferred(jid=job_id, jr=job_record, p=payload):
|
|
jr["status"] = "running"
|
|
try:
|
|
result = handle_file_entity(p)
|
|
jr["status"] = "completed"
|
|
jr["result"] = result
|
|
LOG.info("Deferred job %s completed: %s", jid, result)
|
|
except Exception as e:
|
|
jr["status"] = "failed"
|
|
jr["error"] = str(e)
|
|
LOG.error("Deferred job %s failed: %s", jid, e, exc_info=True)
|
|
finally:
|
|
jr["finished_at"] = datetime.utcnow().isoformat()
|
|
active_jobs.pop(jid, None)
|
|
completed_jobs.append(jr)
|
|
if len(completed_jobs) > 100:
|
|
completed_jobs.pop(0)
|
|
|
|
executor.submit(_run_deferred)
|
|
LOG.info("Re-dispatched deferred order %s (state: %s)", order_name, state_code)
|
|
|
|
except Exception as e:
|
|
LOG.error("Deferred retry loop error: %s", e, exc_info=True)
|
|
|
|
|
|
def main():
|
|
LOG.info("=" * 60)
|
|
LOG.info("Performance West Worker Job Server")
|
|
LOG.info(" Port: %d", PORT)
|
|
LOG.info(" Max workers: %d", MAX_WORKERS)
|
|
LOG.info(" Available actions: %s", list(JOB_HANDLERS.keys()))
|
|
LOG.info(" Deferred retry poll: every %ds", DEFER_POLL_INTERVAL)
|
|
LOG.info("=" * 60)
|
|
|
|
# Start deferred retry background thread
|
|
retry_thread = threading.Thread(target=_deferred_retry_loop, daemon=True, name="deferred-retry")
|
|
retry_thread.start()
|
|
|
|
server = HTTPServer(("0.0.0.0", PORT), JobHandler)
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
LOG.info("Shutting down...")
|
|
executor.shutdown(wait=True)
|
|
server.server_close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|