new-site/scripts/workers/job_server.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

2090 lines
91 KiB
Python

"""
Worker Job Server — HTTP API that receives jobs from the Express API webhook receiver.
Instead of polling ERPNext for queued orders, this server receives webhook-triggered
job dispatches and executes them. ERPNext is the BPM orchestrator; we are the executor.
Endpoints:
POST /jobs — Receive a job dispatch
GET /health — Health check
GET /jobs/active — List currently running jobs
Jobs are executed in a background thread pool to avoid blocking the HTTP server.
Each job type maps to a handler function.
Architecture:
ERPNext workflow transition
→ ERPNext webhook
→ Express API /api/v1/webhooks/formation/*
→ HTTP POST to this server /jobs
→ Handler executes (name search, filing, EIN, doc gen, delivery)
→ Handler calls ERPNext API to update status / advance workflow
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any
LOG = logging.getLogger("workers.job_server")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
PORT = int(os.getenv("WORKER_PORT", "8090"))
MAX_WORKERS = int(os.getenv("MAX_WORKERS", "3"))
# Job tracking
active_jobs: dict[str, dict] = {}
completed_jobs: list[dict] = [] # Last 100 completed
# Thread pool for job execution
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS, thread_name_prefix="job")
# ==========================================================================
# Schedule gate — check if portal is open before attempting web automation
# ==========================================================================
def _check_schedule(state_code: str, order_name: str) -> dict | None:
"""
Check if the portal for the given state/province is currently available.
Returns None if open (proceed), or a defer result dict if closed.
Updates ERPNext Formation Order with deferred status.
"""
from scripts.formation.portal_schedule import (
PortalSchedule,
BC_CORPORATE_ONLINE_SCHEDULE,
IRS_EIN_SCHEDULE,
US_STATE_SOS_SCHEDULE,
)
# Determine schedule for this state
schedule_config = None
if state_code == "BC":
schedule_config = BC_CORPORATE_ONLINE_SCHEDULE
elif state_code == "EIN":
schedule_config = IRS_EIN_SCHEDULE
else:
# Try to load from state's config.py if it defines a portal_schedule
try:
from scripts.formation.states import get_config
state_config = get_config(state_code)
schedule_config = state_config.get("portal_schedule", US_STATE_SOS_SCHEDULE)
except Exception:
schedule_config = US_STATE_SOS_SCHEDULE
schedule = PortalSchedule.from_config(schedule_config)
available, next_open = schedule.is_available()
if available:
return None # Proceed
# Portal closed — defer the job
next_str = next_open.isoformat() if next_open else "unknown"
minutes = schedule.minutes_until_open() or 0
# Try to get the holiday name for the log message
reason = "outside business hours"
if schedule.jurisdiction:
try:
from scripts.formation.holidays import holiday_name, is_holiday
from zoneinfo import ZoneInfo
tz = ZoneInfo(schedule.timezone)
local_now = datetime.utcnow().replace(tzinfo=ZoneInfo("UTC")).astimezone(tz)
hname = holiday_name(local_now.date(), jurisdiction=schedule.jurisdiction)
if hname:
reason = f"closed for {hname}"
except Exception:
pass
LOG.info(
"Portal %s %s — deferring %s. Next open: %s (~%d min)",
state_code, reason, order_name, next_str, minutes,
)
try:
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
client.update_resource("Formation Order", order_name, {
"automation_status": "Deferred",
"defer_until": next_str,
"admin_notes": f"Portal {reason}. Next open: {next_str} (~{minutes} min). Will auto-retry.",
})
except Exception as e:
LOG.warning("Could not update ERPNext defer status: %s", e)
return {
"deferred": True,
"reason": reason,
"next_open": next_str,
"minutes_until_open": minutes,
}
def _check_standard_delay(order_name: str, checkpoint: str) -> dict | None:
"""
Insert a 3-business-day delay for standard formation orders.
Expedited orders skip this delay.
Returns None if the order should proceed (expedited, or delay already passed),
or a defer dict if the order should be deferred.
Idempotency: if defer_until + automation_note matches this checkpoint and
the time has passed, clears the defer and proceeds.
"""
try:
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
order = client.get_resource("Formation Order", order_name)
# Expedited skips all standard delays
if order.get("expedited", False):
return None
from datetime import datetime as _dt, timezone
existing_defer = order.get("defer_until")
existing_note = order.get("admin_notes", "") or ""
now = _dt.now(timezone.utc)
# Already deferred at this checkpoint?
if existing_defer and checkpoint in existing_note:
try:
defer_dt = _dt.fromisoformat(existing_defer.replace("Z", "+00:00"))
except Exception:
defer_dt = None
if defer_dt and defer_dt <= now:
# Delay complete — clear defer and proceed
LOG.info("[%s] Standard delay at %s complete — proceeding", order_name, checkpoint)
client.update_resource("Formation Order", order_name, {
"automation_status": "Pending",
"defer_until": None,
"admin_notes": existing_note.replace(f"[STANDARD_DELAY:{checkpoint}]", "").strip(),
})
return None
else:
# Still waiting
return {
"deferred": True,
"reason": f"Standard service 3-day delay at {checkpoint}",
"next_open": existing_defer,
}
# First time hitting this checkpoint — set the defer
from scripts.workers.business_days import business_days_from_now
defer_until = business_days_from_now(3)
defer_iso = defer_until.isoformat()
LOG.info(
"[%s] Standard service: deferring at %s for 3 business days until %s",
order_name, checkpoint, defer_iso,
)
client.update_resource("Formation Order", order_name, {
"automation_status": "Deferred",
"defer_until": defer_iso,
"admin_notes": f"{existing_note} [STANDARD_DELAY:{checkpoint}] 3 business day delay until {defer_iso}".strip(),
})
return {
"deferred": True,
"reason": f"Standard service 3-day delay at {checkpoint}",
"next_open": defer_iso,
"minutes_until_open": int((defer_until - now.replace(tzinfo=defer_until.tzinfo)).total_seconds() / 60) if hasattr(defer_until, 'tzinfo') and defer_until.tzinfo else 4320,
}
except Exception as exc:
LOG.warning("[%s] Standard delay check failed (proceeding without delay): %s", order_name, exc)
return None
# ==========================================================================
# Job Handlers — each function handles one type of automation action
# ==========================================================================
def handle_name_search(payload: dict) -> dict:
"""Search for business name availability in a state portal."""
from scripts.formation.name_search import search_name_sync
order_name = payload["order_name"]
state_code = payload["state_code"]
entity_name = payload["entity_name"]
# Schedule gate — don't hit the portal outside business hours / holidays
defer = _check_schedule(state_code, order_name)
if defer:
return {"action": "name_search", **defer}
LOG.info("Name search: %s in %s", entity_name, state_code)
result = search_name_sync(entity_name, state_code)
# Report back to ERPNext
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
if result.get("available"):
client.update_resource("Formation Order", order_name, {
"automation_status": "Running",
"admin_notes": f"Name '{entity_name}' is available in {state_code}",
})
client.apply_workflow("Formation Order", order_name, "Name Available")
else:
similar = ", ".join(result.get("similar_names", [])[:5])
client.update_resource("Formation Order", order_name, {
"automation_status": "Failed",
"automation_error": f"Name '{entity_name}' unavailable. Similar: {similar}",
})
client.apply_workflow("Formation Order", order_name, "Name Unavailable")
return {"action": "name_search", "available": result.get("available"), "state": state_code}
def handle_file_entity(payload: dict) -> dict:
"""File business entity through state SOS portal via Playwright."""
from scripts.formation.states import get_adapter
order_name = payload["order_name"]
LOG.info("Filing entity: %s", order_name)
# Standard service delay: 3 business days after name search clears,
# before state filing submission. Expedited skips this delay.
standard_defer = _check_standard_delay(order_name, "post_name_pre_filing")
if standard_defer:
return {"action": "file_entity", **standard_defer}
# Fetch full order data from ERPNext
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
order = client.get_resource("Formation Order", order_name)
# Schedule gate — don't hit the portal outside business hours / holidays
defer = _check_schedule(order["state_code"], order_name)
if defer:
return {"action": "file_entity", **defer}
# Load state adapter and file
adapter = get_adapter(order["state_code"])
loop = asyncio.new_event_loop()
try:
from scripts.formation.base import FormationOrder, Member, EntityType
formation_order = _build_formation_order(order)
# Load Relay debit card for payment
from scripts.workers.relay_integration import populate_order_payment
if not populate_order_payment(formation_order):
LOG.warning("Could not load Relay card — filing will fail at payment step")
result = loop.run_until_complete(adapter.file_entity(formation_order))
finally:
loop.close()
if result.success:
client.update_resource("Formation Order", order_name, {
"state_filing_number": result.filing_number,
"filed_at": datetime.utcnow().isoformat(),
"automation_status": "Succeeded",
"automation_attempts": (order.get("automation_attempts") or 0) + 1,
})
# Record the payment for Relay reconciliation
from scripts.workers.relay_integration import record_filing_payment
record_filing_payment(
order_name=order_name,
state_code=order["state_code"],
amount_cents=order.get("state_fee_cents", 0),
card_last4=formation_order.payment_card_number[-4:] if formation_order.payment_card_number else "0000",
confirmation=result.confirmation_number,
)
# Clear card details from memory
formation_order.payment_card_number = ""
formation_order.payment_card_cvv = ""
client.apply_workflow("Formation Order", order_name, "Filing Complete")
else:
client.update_resource("Formation Order", order_name, {
"automation_status": "Failed",
"automation_error": result.error_message,
"automation_attempts": (order.get("automation_attempts") or 0) + 1,
})
client.apply_workflow("Formation Order", order_name, "Filing Failed")
return {"action": "file_entity", "success": result.success, "filing_number": result.filing_number}
def handle_obtain_ein(payload: dict) -> dict:
"""Obtain EIN from IRS online application."""
order_name = payload["order_name"]
LOG.info("Obtaining EIN: %s", order_name)
# Standard service delay: 3 business days after state filing confirmed,
# before EIN application. Expedited skips this delay.
standard_defer = _check_standard_delay(order_name, "post_filing_pre_ein")
if standard_defer:
return {"action": "obtain_ein", **standard_defer}
# Schedule gate — IRS is only available Mon-Fri 7am-10pm ET, closed federal holidays
defer = _check_schedule("EIN", order_name)
if defer:
return {"action": "obtain_ein", **defer}
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
order = client.get_resource("Formation Order", order_name)
from scripts.formation.ein_worker import obtain_ein_sync
result = obtain_ein_sync(order)
if result.get("success"):
client.update_resource("Formation Order", order_name, {
"automation_status": "Succeeded",
})
# Store EIN in Sensitive ID doctype (encrypted)
client.create_resource("Sensitive ID", {
"customer": order.get("customer"),
"id_type": "EIN",
"encrypted_value": result["ein"],
"notes": f"Obtained for {order.get('entity_name')}",
})
client.apply_workflow("Formation Order", order_name, "EIN Obtained")
else:
client.update_resource("Formation Order", order_name, {
"automation_status": "Failed",
"automation_error": result.get("error", "EIN obtainment failed"),
})
client.apply_workflow("Formation Order", order_name, "EIN Failed")
return {"action": "obtain_ein", "success": result.get("success")}
def handle_generate_docs(payload: dict) -> dict:
"""Generate documents (operating agreement, etc.) and upload to MinIO."""
order_name = payload["order_name"]
LOG.info("Generating documents: %s", order_name)
# Standard service delay: 3 business days after EIN obtained,
# before document generation + delivery. Expedited skips this delay.
standard_defer = _check_standard_delay(order_name, "post_ein_pre_docs")
if standard_defer:
return {"action": "generate_docs", **standard_defer}
from scripts.workers.erpnext_client import ERPNextClient
from scripts.document_gen import DocxBuilder, convert_to_pdf, MinioStorage
client = ERPNextClient()
order = client.get_resource("Formation Order", order_name)
storage = MinioStorage()
generated_files = []
order_number = order.get("order_number", order_name)
# Generate operating agreement if requested
if order.get("include_operating_agreement"):
try:
builder = DocxBuilder("operating-agreement")
builder.set_variables({
"entity_name": order.get("entity_name", ""),
"state_name": order.get("state_code", ""),
"formation_date": datetime.utcnow().strftime("%B %d, %Y"),
"management_type": order.get("management_type", "Member Managed"),
"purpose": order.get("purpose", "Any lawful business activity"),
"principal_address": order.get("principal_address", ""),
"registered_agent_name": "Northwest Registered Agent LLC",
"members": order.get("members_json") or [],
}).fill().add_disclaimer()
docx_path = f"/app/data/documents/{order_number}/operating-agreement.docx"
builder.save(docx_path)
pdf_path = convert_to_pdf(docx_path)
# Upload to MinIO
storage.upload(docx_path, f"formations/{order_number}/operating-agreement.docx")
storage.upload(str(pdf_path), f"formations/{order_number}/operating-agreement.pdf")
generated_files.extend(["operating-agreement.docx", "operating-agreement.pdf"])
except Exception as e:
LOG.error("Operating agreement generation failed: %s", e)
# Advance workflow to Review (admin reviews before delivery)
client.apply_workflow("Formation Order", order_name, "Docs Ready")
return {"action": "generate_docs", "files": generated_files}
def handle_deliver(payload: dict) -> dict:
"""Email documents to customer and mark as delivered."""
order_name = payload["order_name"]
order_number = payload.get("order_number", "")
LOG.info("Delivering: %s (%s)", order_name, order_number)
from scripts.workers.erpnext_client import ERPNextClient
from scripts.document_gen import MinioStorage
client = ERPNextClient()
storage = MinioStorage()
# Determine if this is a compliance order (Sales Order) or formation order
is_compliance = order_number.startswith("CO-") if order_number else False
if is_compliance:
# Compliance order — documents are in compliance/{order_number}/
order = client.get_resource("Sales Order", order_name)
prefix = f"compliance/{order_number}/"
# Get customer email from PG compliance_orders
customer_email = ""
customer_name = ""
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
cur = conn.cursor()
cur.execute(
"SELECT customer_email, customer_name FROM compliance_orders WHERE order_number = %s",
(order_number,),
)
row = cur.fetchone()
if row:
customer_email = row[0]
customer_name = row[1]
cur.close()
conn.close()
except Exception as exc:
LOG.warning("Could not look up compliance order email: %s", exc)
if not customer_email:
# Fallback to ERPNext customer
cust = order.get("customer", "")
if cust:
try:
cust_doc = client.get_resource("Customer", cust)
customer_email = cust_doc.get("email_id", "")
customer_name = cust_doc.get("customer_name", "")
except Exception:
pass
else:
# Formation order
order = client.get_resource("Formation Order", order_name)
order_number = order.get("order_number", order_name)
prefix = f"formations/{order_number}/"
customer_email = order.get("customer_email", "")
customer_name = order.get("customer_name", "")
# Get presigned URLs for all documents (PDF only for delivery)
doc_objects = storage.list_objects(prefix)
doc_urls = {}
for obj in doc_objects:
filename = obj.split("/")[-1]
if filename.endswith(".pdf"):
url = storage.get_url(obj, expires_hours=168) # 7 days
doc_urls[filename] = url
if not doc_urls:
LOG.warning("No PDF documents found for %s at %s", order_name, prefix)
return {"action": "deliver", "error": "no documents found"}
if not customer_email:
LOG.error("No customer email for %s", order_name)
return {"action": "deliver", "error": "no customer email"}
# Build and send delivery email
from scripts.workers.delivery_worker import _build_email_html, _send_email
file_links = [(name, url) for name, url in doc_urls.items()]
html = _build_email_html(customer_name, order_number, file_links)
subject = f"Your compliance documents are ready — {order_number}"
_send_email(customer_email, subject, html)
# Advance workflow
if is_compliance:
try:
client.set_value("Sales Order", order_name, "workflow_state", "Delivered")
except Exception as exc:
LOG.warning("Could not advance workflow for %s: %s", order_name, exc)
else:
client.update_resource("Formation Order", order_name, {
"delivered_at": datetime.utcnow().isoformat(),
})
client.apply_workflow("Formation Order", order_name, "Deliver")
LOG.info("Delivered %s to %s (%d documents)", order_name, customer_email, len(doc_urls))
return {"action": "deliver", "email": customer_email, "documents": len(doc_urls)}
def handle_send_to_attorney(payload: dict) -> dict:
"""Send documents to partner attorney for legal review."""
order_name = payload["order_name"]
LOG.info("Sending to attorney: %s", order_name)
# For now, just log — attorney sending is semi-manual
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
client.update_resource("Formation Order", order_name, {
"admin_notes": "Sent to partner attorney for review. Waiting for legal opinion letter.",
})
return {"action": "send_to_attorney", "status": "sent"}
def _get_crtc_handler():
"""Lazily import and instantiate the Canada CRTC handler."""
from scripts.workers.services.canada_crtc import CanadaCRTCHandler
return CanadaCRTCHandler()
def handle_register_ca_domain(payload: dict) -> dict:
"""CRTC: Customer confirmed domain choice — resume pipeline from Step 5.
Called by the /api/v1/canada-crtc/domain-confirm endpoint after the
customer selects their .ca domain in the portal.
"""
order_number = payload.get("order_number", "")
domain = payload.get("domain", "")
LOG.info("[CRTC] Domain confirmed: %s for %s — resuming pipeline", domain, order_number)
# Re-run the full pipeline — it will see ca_domain is now set and
# pick up from Step 5 (domain registration), skipping completed steps.
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
# Find the Sales Order linked to this order
import psycopg2, psycopg2.extras, os
pg = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT erpnext_sales_order FROM canada_crtc_orders WHERE order_number = %s", (order_number,))
row = cur.fetchone()
pg.close()
if not row or not row.get("erpnext_sales_order"):
LOG.error("No Sales Order found for %s", order_number)
return {"action": "register_ca_domain", "error": "No Sales Order", "order": order_number}
so_name = row["erpnext_sales_order"]
order = client.get_resource("Sales Order", so_name)
handler = _get_crtc_handler()
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(handler.process(order))
finally:
loop.close()
return {"action": "register_ca_domain", "domain": domain, "order": order_number}
def handle_register_awaiting_funds(payload: dict) -> dict:
"""CRTC: Record reservation and begin Relay deposit monitoring."""
order_name = payload["order_name"]
LOG.info("[CRTC] Awaiting funds: %s", order_name)
from scripts.workers.relay_integration import start_deposit_monitoring
start_deposit_monitoring(order_name)
return {"action": "register_awaiting_funds", "order": order_name, "status": "monitoring"}
def handle_file_bc_incorporation(payload: dict) -> dict:
"""CRTC: File BC incorporation by creating a CA Filing Request in ERPNext.
The frappe_ca_registry app handles the actual COLIN automation.
This handler creates the CA Filing Request DocType from the order data,
which triggers the Frappe app's background job.
"""
import json
order_name = payload["order_name"]
order_number = payload.get("order_number", "")
LOG.info("[CRTC] Creating CA Filing Request for: %s (%s)", order_name, order_number)
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
# Load director data from PG (has the split name fields)
import psycopg2, psycopg2.extras, os
pg = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("SELECT * FROM canada_crtc_orders WHERE order_number = %s", (order_number,))
pg_order = cur.fetchone()
pg.close()
if not pg_order:
LOG.error("PG order not found: %s", order_number)
return {"action": "file_bc_incorporation", "error": f"Order {order_number} not found"}
# Parse director address JSON if needed
director_addr = {}
try:
raw = pg_order.get("director_address", "")
if raw and raw.startswith("{"):
director_addr = json.loads(raw)
except (json.JSONDecodeError, TypeError):
pass
# Determine company type for the DocType
ct = pg_order.get("company_type", "numbered")
doctype_company_type = {
"numbered": "Numbered",
"numbered_tradename": "Numbered + Trade Name",
"named": "Named",
}.get(ct, "Numbered")
# Create the CA Filing Request in ERPNext
filing_data = {
"doctype": "CA Filing Request",
"filing_type": "Named Incorporation" if ct == "named" else "Numbered Incorporation",
"province": "BC",
"sales_order": order_name,
"external_order_id": order_number,
"company_type": doctype_company_type,
"name_reservation_number": pg_order.get("company_name_choice1") or "",
"trade_name": pg_order.get("trade_name") or "",
# Director from split fields (preferred) or parsed JSON
"director_first_name": pg_order.get("director_first_name") or director_addr.get("first_name", ""),
"director_middle_name": pg_order.get("director_middle_name") or "",
"director_last_name": pg_order.get("director_last_name") or director_addr.get("last_name", ""),
"director_address": pg_order.get("director_first_name") and (director_addr.get("street") or pg_order.get("director_address", "")) or director_addr.get("street", ""),
"director_address2": director_addr.get("street2", ""),
"director_city": director_addr.get("city", ""),
"director_province": director_addr.get("province", ""),
"director_postal": director_addr.get("postal", ""),
"director_country": director_addr.get("country", "US"),
# Mailing address
"director_mailing_different": pg_order.get("director_mailing_different", False),
# Additional directors
"additional_directors_json": json.dumps(pg_order.get("additional_directors") or []),
# Registered office from mailbox address
"office_address": (pg_order.get("mailbox_address") or "329 Howe St, Vancouver, BC V6C 3N2").split(",")[0].strip(),
# Notification email goes to PW
"notification_email": "filings@performancewest.net",
}
# Parse mailing address if present
if pg_order.get("director_mailing_different") and pg_order.get("director_mailing_address"):
try:
mail_addr = json.loads(pg_order["director_mailing_address"])
filing_data["director_mailing_street"] = mail_addr.get("street", "")
filing_data["director_mailing_city"] = mail_addr.get("city", "")
filing_data["director_mailing_province"] = mail_addr.get("province", "")
filing_data["director_mailing_postal"] = mail_addr.get("postal", "")
filing_data["director_mailing_country"] = mail_addr.get("country", "")
except (json.JSONDecodeError, TypeError):
pass
try:
result = client.create_resource("CA Filing Request", filing_data)
filing_name = result.get("name", "")
LOG.info("[CRTC] CA Filing Request created: %s", filing_name)
# Submit it (triggers the Frappe app's background job)
client.call_method("frappe.client.submit", {
"doc": {"doctype": "CA Filing Request", "name": filing_name},
})
LOG.info("[CRTC] CA Filing Request submitted: %s", filing_name)
return {"action": "file_bc_incorporation", "filing_name": filing_name, "order": order_name}
except Exception as e:
LOG.error("[CRTC] Failed to create CA Filing Request: %s", e)
return {"action": "file_bc_incorporation", "error": str(e), "order": order_name}
def handle_generate_crtc_docs(payload: dict) -> dict:
"""CRTC: Generate CRTC notification letter + corporate binder."""
order_name = payload["order_name"]
LOG.info("[CRTC] Generating docs: %s", order_name)
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
order = client.get_resource("Sales Order", order_name)
handler = _get_crtc_handler()
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(handler.generate_documents(order))
finally:
loop.close()
return {"action": "generate_crtc_docs", "order": order_name, "files": [str(f) for f in result]}
def handle_notify_admin_review(payload: dict) -> dict:
"""CRTC: Notify admin that order is ready for review before shipping."""
order_name = payload["order_name"]
LOG.info("[CRTC] Notifying admin for review: %s", order_name)
# The ERPNext Email Notification handles the actual email —
# this handler just logs the event and could do additional checks.
return {"action": "notify_admin_review", "order": order_name, "status": "notified"}
def handle_ship_binder(payload: dict) -> dict:
"""CRTC: Trigger physical binder printing + shipping instructions."""
order_name = payload["order_name"]
LOG.info("[CRTC] Ship binder: %s", order_name)
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
order = client.get_resource("Sales Order", order_name)
handler = _get_crtc_handler()
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(handler.ship_binder(order))
finally:
loop.close()
return {"action": "ship_binder", "order": order_name, "status": "shipped"}
def handle_mark_delivered(payload: dict) -> dict:
"""CRTC: Mark order as delivered and start commission holdback clock."""
order_name = payload["order_name"]
LOG.info("[CRTC] Marking delivered: %s", order_name)
from scripts.workers.erpnext_client import ERPNextClient
from scripts.workers.relay_integration import start_commission_holdback
client = ERPNextClient()
# Start 14-day commission holdback
try:
start_commission_holdback(order_name)
except Exception as e:
LOG.warning("[CRTC] Commission holdback failed (non-fatal): %s", e)
return {"action": "mark_delivered", "order": order_name, "status": "delivered"}
def _pw_email_html(headline: str, body_paragraphs: list[str], cta_text: str = "", cta_url: str = "") -> str:
"""Build a branded HTML email matching the campaign template style."""
logo = "https://performancewest.net/images/logo.png"
body_html = "".join(
f'<p style="margin:0 0 14px;font-family:Arial,sans-serif;font-size:15px;color:#2d3748;line-height:1.7;">{p}</p>'
for p in body_paragraphs
)
cta_block = ""
if cta_text and cta_url:
cta_block = (
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td align="center">'
'<table cellpadding="0" cellspacing="0" border="0" style="margin:20px auto;"><tr>'
f'<td style="background:#059669;border-radius:4px;"><a href="{cta_url}" style="display:inline-block;padding:12px 28px;font-family:Arial,sans-serif;font-size:14px;font-weight:700;color:#ffffff;text-decoration:none;">{cta_text}</a></td>'
'</tr></table></td></tr></table>'
)
return (
'<!DOCTYPE html><html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1"></head>'
'<body style="margin:0;padding:0;background:#eef0f3;">'
'<table width="100%" cellpadding="0" cellspacing="0" border="0" style="background:#eef0f3;padding:20px 0;"><tr><td align="center">'
'<table width="620" cellpadding="0" cellspacing="0" border="0" style="width:620px;max-width:620px;background:#ffffff;border-radius:8px;overflow:hidden;box-shadow:0 2px 8px rgba(0,0,0,0.08);">'
'<tr><td>'
# Header
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="background:#1a2744;padding:18px 40px 14px;">'
'<table cellpadding="0" cellspacing="0" border="0"><tr>'
f'<td style="vertical-align:middle;padding-right:12px;"><img src="{logo}" width="90" alt="Performance West" style="display:block;width:90px;height:auto;"></td>'
'<td style="vertical-align:middle;border-left:1px solid #2d4e78;padding-left:12px;"><span style="color:#8fa8d0;font-family:Arial,sans-serif;font-size:11px;letter-spacing:1.5px;text-transform:uppercase;">Telecom Compliance</span></td>'
'</tr></table></td></tr></table>'
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="background:#059669;height:3px;font-size:0;">&nbsp;</td></tr></table>'
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="padding:24px 40px 28px;">'
f'<h1 style="margin:0;font-family:Arial,sans-serif;font-size:22px;font-weight:700;color:#ffffff;line-height:1.3;">{headline}</h1>'
'</td></tr></table>'
# Body
f'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="padding:28px 40px;background:#ffffff;">{body_html}{cta_block}</td></tr></table>'
# Footer
'<table width="100%" cellpadding="0" cellspacing="0" border="0"><tr><td style="background:#f4f5f7;padding:16px 40px;border-top:1px solid #e8ecf0;text-align:center;">'
f'<img src="{logo}" width="60" alt="Performance West" style="display:block;margin:0 auto 8px;width:60px;height:auto;opacity:0.5;">'
'<p style="margin:0;font-family:Arial,sans-serif;font-size:11px;color:#9ca3af;">Performance West Inc. &middot; <a href="https://performancewest.net" style="color:#9ca3af;">performancewest.net</a> &middot; 1-888-411-0383</p>'
'</td></tr></table>'
'</td></tr></table></td></tr></table></body></html>'
)
def _send_instant_delivery(
*, customer_email: str, customer_name: str, order_number: str,
service_name: str, minio_paths: list[str], storage,
):
"""Email generated PDFs to the customer immediately after payment.
Only sends PDF files — DOCX files are internal (for authority
submission or admin review). Sends fancy branded HTML email.
"""
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
from pathlib import Path
import tempfile
# Filter to PDFs only — DOCX is for internal/authority use
pdf_paths = [p for p in minio_paths if p.lower().endswith(".pdf")]
if not pdf_paths:
LOG.info("No PDFs to deliver for %s (only DOCX generated)", order_number)
return
smtp_host = os.environ.get("SMTP_HOST", "co.carrierone.com")
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
smtp_user = os.environ.get("SMTP_USER", "")
smtp_pass = os.environ.get("SMTP_PASS", "")
from_email = os.environ.get("SMTP_FROM", "Performance West <noreply@performancewest.net>")
if not smtp_user or not smtp_pass:
LOG.warning("SMTP not configured — skipping instant delivery for %s", order_number)
return
name = customer_name.split()[0] if customer_name else "there"
file_list = "".join(
f'<li style="margin-bottom:6px;font-size:14px;">{Path(p).name}</li>'
for p in pdf_paths
)
body_parts = [
f"Hi {name},",
f"Your <strong>{service_name}</strong> documents for order <strong>{order_number}</strong> are attached.",
f"<ul style=\"margin:8px 0 14px;padding-left:20px;\">{file_list}</ul>",
"These documents were generated automatically upon payment confirmation. "
"If your order includes an FCC portal submission (e.g., ECFS upload for CPNI, "
"or RMD registration), our team will handle that separately and send you a "
"<strong>confirmation email with the filing confirmation number</strong> once submitted.",
"If anything in the documents is incorrect, don't worry &mdash; just reply "
"to this email or contact us at <a href=\"mailto:info@performancewest.net\" style=\"color:#059669;\">info@performancewest.net</a> "
"and we'll fix it right away. A corrected submission will replace the original at no additional charge.",
"<span style=\"font-size:13px;color:#6b7280;\">This fee is tax deductible as an ordinary business expense under IRC &sect; 162.</span>",
"&mdash; Performance West Compliance Team",
]
html = _pw_email_html(
headline=f"Your {service_name} documents are ready",
body_paragraphs=body_parts,
cta_text="Check My FCC Compliance Status",
cta_url="https://performancewest.net/tools/fcc-compliance-check",
)
msg = MIMEMultipart("mixed")
msg["From"] = from_email
msg["To"] = customer_email
msg["Subject"] = f"Your {service_name} documents are ready \u2014 Order {order_number}"
msg["Reply-To"] = "info@performancewest.net"
msg.attach(MIMEText(html, "html"))
# Download PDFs from MinIO and attach
with tempfile.TemporaryDirectory() as tmpdir:
for remote_path in pdf_paths:
local_path = os.path.join(tmpdir, Path(remote_path).name)
try:
storage.download(remote_path, local_path)
with open(local_path, "rb") as f:
part = MIMEBase("application", "pdf")
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f'attachment; filename="{Path(remote_path).name}"',
)
msg.attach(part)
except Exception as exc:
LOG.warning("Could not attach %s: %s", remote_path, exc)
with smtplib.SMTP(smtp_host, smtp_port) as server:
if smtp_port != 25:
server.starttls()
server.login(smtp_user, smtp_pass)
server.sendmail(from_email, [customer_email], msg.as_string())
def _send_filing_confirmation(
*, customer_email: str, customer_name: str, order_number: str,
service_name: str, confirmation_number: str, authority: str,
minio_paths: list[str], storage,
):
"""Email the customer after a filing is submitted to an authority.
Sent after ECFS upload (CPNI), RMD portal submission, etc.
Includes the confirmation number and attaches the confirmation PDF.
"""
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
from pathlib import Path
import tempfile
smtp_host = os.environ.get("SMTP_HOST", "co.carrierone.com")
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
smtp_user = os.environ.get("SMTP_USER", "")
smtp_pass = os.environ.get("SMTP_PASS", "")
from_email = os.environ.get("SMTP_FROM", "Performance West <noreply@performancewest.net>")
if not smtp_user or not smtp_pass:
LOG.warning("SMTP not configured — skipping filing confirmation for %s", order_number)
return
name = customer_name.split()[0] if customer_name else "there"
body_parts = [
f"Hi {name},",
f"Your <strong>{service_name}</strong> has been successfully submitted to the <strong>{authority}</strong>.",
f'<table width="100%" cellpadding="0" cellspacing="0" border="0" style="margin:12px 0;"><tr>'
f'<td style="background:#f0fdf4;border:1px solid #86efac;border-radius:8px;padding:16px 20px;text-align:center;">'
f'<p style="margin:0 0 4px;font-family:Arial,sans-serif;font-size:12px;color:#065f46;text-transform:uppercase;letter-spacing:1px;">Confirmation Number</p>'
f'<p style="margin:0;font-family:\'Courier New\',monospace;font-size:22px;font-weight:700;color:#059669;">{confirmation_number}</p>'
f'</td></tr></table>',
"Save this confirmation number for your records. The confirmation page is attached as a PDF.",
"If you need to reference this filing in the future, you can use this confirmation number "
f"with the {authority} to verify your submission status.",
"If anything needs to be corrected, reply to this email and we'll file an amendment at no additional charge.",
"<span style=\"font-size:13px;color:#6b7280;\">This fee is tax deductible as an ordinary business expense under IRC &sect; 162.</span>",
"&mdash; Performance West Compliance Team",
]
html = _pw_email_html(
headline=f"\u2705 {service_name} Filed Successfully",
body_paragraphs=body_parts,
)
msg = MIMEMultipart("mixed")
msg["From"] = from_email
msg["To"] = customer_email
msg["Subject"] = f"\u2705 Filed: {service_name} \u2014 Confirmation {confirmation_number}"
msg["Reply-To"] = "info@performancewest.net"
msg.attach(MIMEText(html, "html"))
# Attach confirmation PDFs
pdf_paths = [p for p in minio_paths if p.lower().endswith(".pdf")]
if pdf_paths:
with tempfile.TemporaryDirectory() as tmpdir:
for remote_path in pdf_paths:
local_path = os.path.join(tmpdir, Path(remote_path).name)
try:
storage.download(remote_path, local_path)
with open(local_path, "rb") as f:
part = MIMEBase("application", "pdf")
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
f'attachment; filename="{Path(remote_path).name}"',
)
msg.attach(part)
except Exception as exc:
LOG.warning("Could not attach %s: %s", remote_path, exc)
with smtplib.SMTP(smtp_host, smtp_port) as server:
if smtp_port != 25:
server.starttls()
server.login(smtp_user, smtp_pass)
server.sendmail(from_email, [customer_email], msg.as_string())
LOG.info("Filing confirmation sent to %s for %s (conf: %s)",
customer_email, order_number, confirmation_number)
def handle_process_compliance_service(payload: dict) -> dict:
"""Process a compliance service order (FLSA audit, CCPA audit, FCC checkup, etc.)."""
order_name = payload["order_name"]
order_number = payload.get("order_number", "")
service_slug = payload.get("service_slug", "")
LOG.info("Processing compliance service: %s%s (slug: %s)", order_name, order_number, service_slug)
from scripts.workers.services import SERVICE_HANDLERS
from scripts.workers.erpnext_client import ERPNextClient
from scripts.document_gen import MinioStorage
client = ERPNextClient()
# Try to load the Sales Order from ERPNext — may fail if ERPNext is
# unreachable or the order doesn't have a linked Sales Order (e.g.,
# orders created directly via the API without ERPNext integration).
try:
order = client.get_resource("Sales Order", order_name)
except Exception as exc:
LOG.warning("Could not load Sales Order %s from ERPNext: %s — building from PG", order_name, exc)
order = {"name": order_name, "order_number": order_number}
storage = MinioStorage()
# Resolve service_slug, entity_id, and filing metadata from PG compliance_orders
pg_entity_id = None
if order_number:
try:
import psycopg2
import psycopg2.extras
import json as _json
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"""
SELECT service_slug, telecom_entity_id,
customer_email, customer_name, customer_phone,
intake_data,
filing_mode, form_year_override,
revises_order_number, revised_reason,
prior_confirmation_number,
waive_deminimis_exemption, waive_deminimis_reason,
multi_year_filings, multi_year_discount_pct,
deminimis_result_is_exempt,
deminimis_estimated_contrib_cents
FROM compliance_orders WHERE order_number = %s
""",
(order_number,),
)
row = cur.fetchone()
if row:
if not service_slug:
service_slug = row["service_slug"]
pg_entity_id = row["telecom_entity_id"]
# Merge the full compliance_orders row into order_data so the
# handler can read filing_mode / intake_data / waive flags etc.
# intake_data comes back as a dict from RealDictCursor + JSONB.
intake = row.get("intake_data")
if isinstance(intake, str):
try: intake = _json.loads(intake)
except Exception: intake = {}
order["intake_data"] = intake or {}
order["customer_email"] = row.get("customer_email")
order["customer_name"] = row.get("customer_name")
order["customer_phone"] = row.get("customer_phone")
order["batch_id"] = row.get("batch_id")
order["filing_mode"] = row.get("filing_mode")
order["form_year_override"] = row.get("form_year_override")
order["revises_order_number"] = row.get("revises_order_number")
order["revised_reason"] = row.get("revised_reason")
order["prior_confirmation_number"] = row.get("prior_confirmation_number")
order["waive_deminimis_exemption"] = row.get("waive_deminimis_exemption")
order["waive_deminimis_reason"] = row.get("waive_deminimis_reason")
order["multi_year_filings"] = row.get("multi_year_filings")
order["multi_year_discount_pct"] = row.get("multi_year_discount_pct")
order["deminimis_result_is_exempt"] = row.get("deminimis_result_is_exempt")
order["deminimis_estimated_contrib_cents"] = row.get("deminimis_estimated_contrib_cents")
cur.close()
conn.close()
except Exception as exc:
LOG.warning("Could not look up compliance order: %s", exc)
handler_cls = SERVICE_HANDLERS.get(service_slug)
if not handler_cls:
LOG.error("No handler for service: %s", service_slug)
return {"action": "process_compliance_service", "error": f"No handler for {service_slug}"}
# Debug: log what entity data the handler will receive
entity_preview = order.get("entity", {})
LOG.info(
"Dispatching %s for %s — entity=%s (legal_name=%s, frn=%s)",
service_slug, order_number,
"present" if entity_preview else "MISSING",
entity_preview.get("legal_name", "??"),
entity_preview.get("frn", "??"),
)
# For FCC compliance services: look up the telecom entity and pass as order_data.
# FCC_SERVICE_SLUGS covers the diagnostic checkup plus every remediation
# handler (RMD, CPNI, 499-A, STIR/SHAKEN, BDC, full bundle, D.C. agent).
# If no entity ID but we have an FRN in intake_data, look up entity by FRN
from scripts.workers.services import FCC_SERVICE_SLUGS
if service_slug in FCC_SERVICE_SLUGS and not pg_entity_id:
frn = (order.get("intake_data") or {}).get("frn")
if frn:
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
cur = conn.cursor()
cur.execute("SELECT id FROM telecom_entities WHERE frn = %s LIMIT 1", (frn,))
row = cur.fetchone()
if row:
pg_entity_id = row[0]
LOG.info("Resolved FRN %s → entity_id %s", frn, pg_entity_id)
cur.close()
conn.close()
except Exception as exc:
LOG.warning("Could not resolve FRN %s to entity: %s", frn, exc)
if service_slug in FCC_SERVICE_SLUGS and pg_entity_id:
try:
import psycopg2
import json as _json
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
cur = conn.cursor()
cur.execute(
"SELECT * FROM telecom_entities WHERE id = %s",
(pg_entity_id,),
)
columns = [desc[0] for desc in cur.description] if cur.description else []
row = cur.fetchone()
if row and columns:
entity_dict = dict(zip(columns, row))
# Convert non-serializable types
for k, v in entity_dict.items():
if hasattr(v, "isoformat"):
entity_dict[k] = v.isoformat()
elif isinstance(v, memoryview):
entity_dict[k] = bytes(v).decode("utf-8", errors="replace")
# Parse JSONB carrier_metadata if it's a string
if isinstance(entity_dict.get("carrier_metadata"), str):
try:
entity_dict["carrier_metadata"] = _json.loads(entity_dict["carrier_metadata"])
except Exception:
pass
order["entity"] = entity_dict
cur.close()
conn.close()
except Exception as exc:
LOG.warning("Could not look up telecom entity for %s: %s", order_number, exc)
# Run compliance checks if FCC entity has an FRN
if service_slug == "fcc-compliance-checkup" and order.get("entity", {}).get("frn"):
try:
import urllib.request
import json as _json
frn = order["entity"]["frn"]
api_url = os.environ.get("API_URL", "http://api:3001")
req = urllib.request.Request(f"{api_url}/api/v1/fcc/lookup?frn={frn}")
with urllib.request.urlopen(req, timeout=30) as resp:
order["compliance_checks"] = _json.loads(resp.read())
except Exception as exc:
LOG.warning("Could not run compliance checks for FRN %s: %s",
order.get("entity", {}).get("frn"), exc)
handler = handler_cls()
# Final entity check before dispatch
ent = order.get("entity", {})
LOG.info(
"Handler %s ready for %s — entity keys=%s legal_name=%s",
handler_cls.__name__, order_number,
list(ent.keys())[:5] if ent else "NONE",
ent.get("legal_name", "??"),
)
loop = asyncio.new_event_loop()
try:
file_paths = loop.run_until_complete(handler.process(order))
finally:
loop.close()
# Upload files to MinIO and track paths
effective_order_number = order_number or order.get("name", order_name)
minio_paths = []
for fp in file_paths:
from pathlib import Path
p = Path(fp)
remote = f"compliance/{effective_order_number}/{p.name}"
storage.upload(fp, remote)
minio_paths.append(remote)
# Store generated file paths on the Sales Order for delivery worker
try:
client.set_value(
"Sales Order", order_name,
"custom_generated_files", "\n".join(minio_paths),
)
except Exception as exc:
LOG.warning("Could not store generated files on %s: %s", order_name, exc)
# ── Instant delivery for no-vendor-fee services ────────────────────
# Services that generate documents without needing to pay a vendor
# (no portal submission fees, no state fees) can deliver immediately
# on successful payment. The customer gets their documents right away.
INSTANT_DELIVERY_SLUGS = {
"cpni-certification", # generates cert letter + procedure statement
"calea-ssi", # generates SSI plan document
"fcc-compliance-checkup", # generates diagnostic report
"dc-agent", # generates acceptance letter (NWRA is separate)
# Non-FCC doc-gen services
"flsa-audit", "contractor-classification", "handbook-review",
"policy-development", "ccpa-audit", "privacy-policy",
"data-mapping", "breach-response", "consent-audit",
"dnc-compliance", "campaign-review",
}
if service_slug in INSTANT_DELIVERY_SLUGS and minio_paths:
customer_email = order.get("customer_email")
customer_name = order.get("customer_name", "")
if customer_email:
try:
_send_instant_delivery(
customer_email=customer_email,
customer_name=customer_name,
order_number=effective_order_number,
service_name=handler_cls.SERVICE_NAME if hasattr(handler_cls, "SERVICE_NAME") else service_slug,
minio_paths=minio_paths,
storage=storage,
)
LOG.info("Instant delivery sent to %s for %s (%s)",
customer_email, effective_order_number, service_slug)
except Exception as exc:
LOG.warning("Instant delivery failed for %s: %s", effective_order_number, exc)
# Clear the per-order auto-filing override so the approval cannot be
# silently replayed — each admin approval is one-shot.
try:
if order.get("custom_auto_filing_override"):
client.set_value("Sales Order", order_name, "custom_auto_filing_override", 0)
except Exception as exc:
LOG.warning("Could not clear auto_filing_override on %s: %s", order_name, exc)
# Update telecom entity with last checkup timestamp
if pg_entity_id and service_slug == "fcc-compliance-checkup":
try:
import psycopg2
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
cur = conn.cursor()
cur.execute(
"UPDATE telecom_entities SET last_compliance_checkup = NOW() WHERE id = %s",
(pg_entity_id,),
)
conn.commit()
cur.close()
conn.close()
except Exception as exc:
LOG.warning("Could not update entity checkup timestamp: %s", exc)
# Advance workflow to Review after processing
try:
client.set_value("Sales Order", order_name, "workflow_state", "Review")
LOG.info("Advanced %s to Review", order_name)
except Exception as exc:
LOG.warning("Could not advance workflow for %s: %s", order_name, exc)
# ── Create compliance calendar deadlines for the customer ───────────
# After a filing is completed, create recurring deadline reminders
# in the ERPNext Compliance Deadline doctype.
try:
entity_name = ent.get("legal_name", "")
customer_email = order.get("customer_email", "")
frn = ent.get("frn", "")
# Map service slugs to their next deadline(s).
# Most filings are annual (single month/day). BDC is semi-annual.
# Each entry may have a single (month, day) or a list of (month, day) pairs.
DEADLINE_MAP = {
"rmd-filing": {"title": "RMD Annual Recertification", "type": "RMD Recertification",
"deadlines": [(3, 1)], "notes": "Log into FCC RMD portal with MFA, verify all info, click Recertify."},
"cpni-certification": {"title": "CPNI Annual Certification", "type": "CPNI Certification",
"deadlines": [(3, 1)], "notes": "File CPNI cert at FCC ECFS under WC Docket 06-36."},
"fcc-499a": {"title": "Form 499-A Annual Filing", "type": "499-A Annual",
"deadlines": [(4, 1)], "notes": "File at USAC E-File portal. Reports prior calendar year revenue."},
"fcc-499a-499q": {"title": "Form 499-A Annual Filing", "type": "499-A Annual",
"deadlines": [(4, 1)], "notes": "File at USAC E-File portal. Reports prior calendar year revenue."},
"calea-ssi": {"title": "CALEA SSI Plan Review", "type": "Other",
"deadlines": [(1, 15)], "notes": "Annual review of CALEA System Security & Integrity plan."},
# BDC — semi-annual (June 1 and December 1)
"bdc-filing": {"title": "BDC Broadband & Voice Filing", "type": "BDC Filing",
"deadlines": [(6, 1), (12, 1)], "notes": "File broadband deployment and voice subscription data at FCC BDC portal."},
"bdc-broadband": {"title": "BDC Broadband Deployment Filing", "type": "BDC Filing",
"deadlines": [(6, 1), (12, 1)], "notes": "File broadband deployment data at FCC BDC portal."},
"bdc-voice": {"title": "BDC Voice Subscription Filing", "type": "BDC Filing",
"deadlines": [(6, 1), (12, 1)], "notes": "File voice subscription data (formerly Form 477) at FCC BDC portal."},
# State PUC — most states require annual report, varies by state
"state-puc": {"title": "State PUC Annual Report", "type": "PUC Filing",
"deadlines": [(3, 31)], "notes": "File annual report with state PUC/PSC. Due date varies by state — check state requirements."},
}
if service_slug in DEADLINE_MAP and entity_name:
dl = DEADLINE_MAP[service_slug]
from datetime import date
now = date.today()
# Find the ERPNext Customer for this email (shared across all deadlines)
erpnext_customer = ""
try:
customers = client.get_list("Customer", filters={"email_id": customer_email}, fields=["name"], limit=1)
if customers:
erpnext_customer = customers[0]["name"]
except Exception:
pass
# Create a deadline for each (month, day) pair
for month, day in dl["deadlines"]:
next_year = now.year + 1 if now.month > month or (now.month == month and now.day >= day) else now.year
deadline_date = f"{next_year}-{month:02d}-{day:02d}"
try:
client.create_resource("Compliance Deadline", {
"title": f"{dl['title']}{entity_name}",
"deadline_date": deadline_date,
"filing_type": dl["type"],
"entity_name": entity_name,
"frn": frn,
"status": "Upcoming",
"notes": dl["notes"],
"linked_order": effective_order_number,
"customer": erpnext_customer or None,
"published": 1,
"route": f"compliance-deadlines/{service_slug}-{next_year}-{month:02d}-{frn}",
})
LOG.info("Created compliance deadline: %s for %s (due %s)", dl["title"], entity_name, deadline_date)
except Exception as exc:
LOG.warning("Could not create compliance deadline for %s: %s", deadline_date, exc)
except Exception as exc:
LOG.warning("Compliance calendar creation failed: %s", exc)
# ── Foreign qualification add-on from 499-A intake ────────────────
# If the 499-A intake wizard detected missing foreign qualifications
# and the client opted in, create a companion foreign-qualification
# order for those states.
try:
fq_states = (order.get("intake_data") or {}).get("foreign_qual_add_on_states") or []
if fq_states and isinstance(fq_states, list) and len(fq_states) > 0:
entity_name = ent.get("legal_name", "")
customer_email = order.get("customer_email", "")
LOG.info(
"Creating foreign qual add-on for %s: %d states (%s)",
effective_order_number, len(fq_states), ", ".join(fq_states),
)
# Create an admin todo for the foreign qual filing
try:
client.create_resource("ToDo", {
"description": (
f"[foreign-qualification] {effective_order_number}\n\n"
f"499-A intake detected {len(fq_states)} state(s) missing "
f"foreign corporation registration.\n"
f"Entity: {entity_name}\n"
f"States: {', '.join(fq_states)}\n"
f"Customer: {customer_email}\n\n"
f"Client opted in during 499-A intake at $99/state.\n"
f"Create a foreign-qualification-multi order for these states "
f"or contact the client to finalize."
),
"priority": "Medium",
"role": "Accounting Advisor",
})
except Exception as exc:
LOG.warning("Could not create foreign qual admin ToDo: %s", exc)
except Exception as exc:
LOG.warning("Foreign qual add-on processing failed: %s", exc)
return {"action": "process_compliance_service", "files": minio_paths}
# ==========================================================================
# Job Dispatch Map
# ==========================================================================
def handle_scrape_amb_units(data: dict) -> dict:
"""Scrape available mailbox unit numbers from an AMB location page."""
import asyncio
from scripts.formation.states.bc.adapter import BCPortal
location_url = data.get("location_url", "")
order_id = data.get("order_id", "")
if not location_url:
return {"error": "location_url required", "units": []}
LOG.info("[scrape_amb_units] Scraping units from %s for %s", location_url, order_id)
portal = BCPortal()
try:
units = asyncio.get_event_loop().run_until_complete(portal.scrape_available_units(location_url))
except RuntimeError:
# No running event loop
units = asyncio.run(portal.scrape_available_units(location_url))
LOG.info("[scrape_amb_units] Found %d units", len(units))
return {"units": units, "order_id": order_id}
def handle_purchase_client_selections(data: dict) -> dict:
"""Purchase mailbox + DID based on client portal selections.
Called after client confirms their choices in /portal/setup.
1. If has_own_ca_address is False: Playwright → AMB signup with selected unit
2. Flowroute API → purchase selected DID
3. Update PG order with results
4. Advance ERPNext workflow
"""
import asyncio
import psycopg2
from scripts.formation.states.bc.adapter import BCPortal
from scripts.workers.services.flowroute import purchase_did, configure_routing
from scripts.workers.relay_integration import load_card_from_erpnext
order_id = data.get("order_id", "")
selected_unit = data.get("selected_unit")
selected_did = data.get("selected_did", "")
has_own_address = data.get("has_own_ca_address", False)
if not order_id:
return {"error": "order_id required"}
LOG.info("[purchase_selections] Starting for %s: unit=%s did=%s own_addr=%s",
order_id, selected_unit, selected_did, has_own_address)
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
results = {"order_id": order_id, "mailbox": None, "did": None}
try:
# Get order details
with conn.cursor() as cur:
cur.execute("""
SELECT order_number, payment_method, amb_location_slug, entity_name,
customer_name, customer_email, customer_phone,
director_name, director_address, erpnext_sales_order
FROM canada_crtc_orders WHERE order_number = %s
""", (order_id,))
row = cur.fetchone()
if not row:
return {"error": "Order not found"}
cols = ["order_number", "payment_method", "amb_location_slug", "entity_name",
"customer_name", "customer_email", "customer_phone",
"director_name", "director_address", "erpnext_sales_order"]
order = dict(zip(cols, row))
# ── Step 1: Mailbox signup (if needed) ──────────────────────────────
if not has_own_address and selected_unit:
# Get location URL
with conn.cursor() as cur:
cur.execute("SELECT provider_url FROM amb_locations WHERE slug = %s", (order["amb_location_slug"],))
loc_row = cur.fetchone()
if loc_row:
location_url = loc_row[0]
portal = BCPortal()
# Build a minimal FormationOrder-like object
class MinimalOrder:
def __init__(self, o):
self.order_id = o["order_number"]
self.entity_name = o["entity_name"] or ""
self.regulatory_contact_name = o["customer_name"] or ""
self.regulatory_contact_phone = o["customer_phone"] or ""
self.principal_address = ""
self.principal_city = ""
self.principal_state = ""
self.principal_zip = ""
self.mailing_address = ""
self.mailing_city = ""
self.mailing_state = ""
self.mailing_zip = ""
self.members = []
mini_order = MinimalOrder(order)
try:
mb_result = asyncio.get_event_loop().run_until_complete(
portal.signup_with_unit(mini_order, selected_unit, location_url)
)
except RuntimeError:
mb_result = asyncio.run(
portal.signup_with_unit(mini_order, selected_unit, location_url)
)
results["mailbox"] = mb_result
LOG.info("[purchase_selections] Mailbox result: %s", mb_result.get("success"))
if mb_result.get("success"):
with conn.cursor() as cur:
cur.execute(
"UPDATE canada_crtc_orders SET mailbox_address = %s WHERE order_number = %s",
(f"{location_url} Unit {selected_unit}", order_id),
)
conn.commit()
# ── Step 2: DID purchase ────────────────────────────────────────────
if selected_did:
try:
did_result = purchase_did(selected_did)
results["did"] = {"success": True, "did": selected_did, "result": str(did_result)[:200]}
LOG.info("[purchase_selections] DID %s purchased", selected_did)
# Configure routing (default "later" — client sets up in portal)
try:
configure_routing(selected_did, "later")
except Exception as route_err:
LOG.warning("[purchase_selections] DID routing config failed: %s", route_err)
with conn.cursor() as cur:
cur.execute(
"UPDATE canada_crtc_orders SET client_selected_did = %s WHERE order_number = %s",
(selected_did, order_id),
)
conn.commit()
except Exception as did_err:
LOG.error("[purchase_selections] DID purchase failed: %s", did_err)
results["did"] = {"success": False, "error": str(did_err)}
# ── Step 3: Advance workflow ────────────────────────────────────────
# Advance from "Client Selection" → "Mailbox Setup" (or "Incorporation" if own address)
so_name = order.get("erpnext_sales_order")
if so_name:
try:
from scripts.workers.erpnext_client import ERPNextClient
erp = ERPNextClient()
target_action = "Client Confirmed Own Address" if has_own_address else "Client Confirmed"
erp.apply_workflow("Sales Order", so_name, target_action)
LOG.info("[purchase_selections] Workflow advanced via '%s'", target_action)
except Exception as wf_err:
LOG.error("[purchase_selections] Workflow advance failed: %s", wf_err)
except Exception as e:
LOG.error("[purchase_selections] Error: %s", e)
results["error"] = str(e)
finally:
conn.close()
return results
def handle_presign(payload: dict) -> dict:
"""Generate a MinIO presigned URL for a given object key.
Used by the portal-esign.ts API route to get a PDF preview URL
without adding a MinIO npm package to the API container.
Also supports upload (PUT) presign for reseller cert + ICC uploads.
Payload: { key: str, expires: int (seconds, default 3600),
method: "GET" | "PUT" (default GET) }
Returns: { url: str }
"""
key = payload.get("key", "")
expires = int(payload.get("expires", 3600))
method = str(payload.get("method", "GET")).upper()
if not key:
return {"error": "key required"}
if method not in ("GET", "PUT"):
return {"error": "method must be GET or PUT"}
try:
from datetime import timedelta
from minio import Minio as _Minio
mc = _Minio(
f"{os.getenv('MINIO_ENDPOINT', 'minio')}:{os.getenv('MINIO_PORT', '9000')}",
access_key=os.getenv("MINIO_ACCESS_KEY", ""),
secret_key=os.getenv("MINIO_SECRET_KEY", ""),
secure=os.getenv("MINIO_SECURE", "false").lower() == "true",
)
bucket = os.getenv("MINIO_BUCKET", "performancewest")
# Ensure the bucket exists (first-run idempotent).
if not mc.bucket_exists(bucket):
mc.make_bucket(bucket)
expires_td = timedelta(seconds=expires)
if method == "PUT":
url = mc.presigned_put_object(bucket, key, expires=expires_td)
else:
url = mc.presigned_get_object(bucket, key, expires=expires_td)
return {"url": url, "bucket": bucket, "key": key, "method": method}
except Exception as exc:
LOG.error("[presign] Failed to generate presigned URL for %s: %s", key, exc)
return {"error": str(exc)}
def handle_resume_crtc_pipeline(payload: dict) -> dict:
"""Resume the CRTC pipeline after an async pause (eSign, domain selection, etc.).
Called by the portal-esign.ts API route after the client signs the letter,
or by the domain-confirm endpoint after domain selection.
Payload: { order_id: str, step: str }
step values:
"post_esign" — client signed; resume at Step 7 (binder compilation)
"post_domain" — client confirmed domain; resume at Step 5b (HestiaCP)
"""
order_id = payload.get("order_id", "")
step = payload.get("step", "post_esign")
if not order_id:
return {"error": "order_id required"}
LOG.info("[resume_crtc] Resuming CRTC pipeline for order %s at step %s", order_id, step)
try:
# Fetch the ERPNext Sales Order name for this order
conn = psycopg2.connect(os.environ.get("DATABASE_URL", ""))
with conn.cursor() as cur:
cur.execute(
"SELECT erpnext_sales_order FROM canada_crtc_orders WHERE order_number = %s",
(order_id,),
)
row = cur.fetchone()
conn.close()
if not row or not row[0]:
return {"error": f"No ERPNext Sales Order found for {order_id}"}
so_name = row[0]
# Re-dispatch as a generate_crtc_docs job (which runs the full pipeline
# and is idempotent — already-completed steps are skipped via their
# custom_*_at timestamps)
result = handle_generate_crtc_docs({"order": so_name, "step_resume": step})
LOG.info("[resume_crtc] Pipeline resumed for %s: %s", order_id, result)
return {"success": True, "order_id": order_id, "step": step, "result": result}
except Exception as exc:
LOG.error("[resume_crtc] Error resuming pipeline for %s: %s", order_id, exc, exc_info=True)
return {"error": str(exc)}
def handle_renewal_payment(payload: dict) -> dict:
"""Handle a renewal invoice payment event.
Called by the Express API webhook when a renewal Sales Invoice is paid.
Triggers immediate completion of linked compliance entries and re-calendaring.
Payload: {invoice: "SINV-00123"}
"""
invoice_name = payload.get("invoice", "")
if not invoice_name:
return {"error": "Missing invoice name"}
LOG.info("[renewal] Processing payment for invoice: %s", invoice_name)
try:
from scripts.workers.renewal_worker import handle_renewal_payment as _process
_process(invoice_name)
return {"success": True, "invoice": invoice_name}
except Exception as exc:
LOG.error("[renewal] Error processing payment for %s: %s", invoice_name, exc)
return {"error": str(exc)}
def handle_run_renewal_check(payload: dict) -> dict:
"""Run the full renewal check cycle on demand.
Can be triggered manually from admin or by the daily cron.
Payload: {} (no parameters needed)
"""
LOG.info("[renewal] Running on-demand renewal check")
try:
from scripts.workers.renewal_worker import main as renewal_main
renewal_main()
return {"success": True}
except Exception as exc:
LOG.error("[renewal] Renewal check failed: %s", exc)
return {"error": str(exc)}
def handle_check_red_light(payload: dict) -> dict:
"""Check FCC CORES red light status for an FRN.
Payload: { "frn": "0027160886" }
"""
frn = payload.get("frn", "")
if not frn:
return {"success": False, "error": "Missing frn"}
LOG.info("[check_red_light] Checking FRN %s", frn)
try:
import asyncio
from scripts.workers.fcc_cores_scraper import check_red_light
result = asyncio.run(check_red_light(frn))
return result
except Exception as exc:
LOG.error("[check_red_light] Failed: %s", exc)
return {"frn": frn, "status": "unknown", "detail": str(exc)}
def handle_import_499a(payload: dict) -> dict:
"""Import Form 499-A data from USAC E-File.
Payload: { "filer_id": "812345" }
Returns the extracted entity info, service categories, and revenue lines.
"""
filer_id = payload.get("filer_id", "")
if not filer_id:
return {"success": False, "error": "Missing filer_id"}
LOG.info("[import_499a] Starting import for filer %s", filer_id)
try:
import asyncio
from scripts.workers.usac_import import import_499a
result = asyncio.run(import_499a(filer_id))
return result
except Exception as exc:
LOG.error("[import_499a] Failed: %s", exc)
return {"success": False, "error": str(exc)}
JOB_HANDLERS = {
"name_search": handle_name_search,
"file_entity": handle_file_entity,
"obtain_ein": handle_obtain_ein,
"generate_docs": handle_generate_docs,
"deliver": handle_deliver,
"send_to_attorney": handle_send_to_attorney,
"process_compliance_service": handle_process_compliance_service,
# Canada CRTC pipeline actions (dispatched by ERPNext webhooks)
"register_ca_domain": handle_register_ca_domain,
"register_awaiting_funds": handle_register_awaiting_funds,
"file_bc_incorporation": handle_file_bc_incorporation,
"generate_crtc_docs": handle_generate_crtc_docs,
"notify_admin_review": handle_notify_admin_review,
"ship_binder": handle_ship_binder,
"mark_delivered": handle_mark_delivered,
# Portal client selection actions
"scrape_amb_units": handle_scrape_amb_units,
"purchase_client_selections": handle_purchase_client_selections,
# eSign / MinIO helpers
"presign": handle_presign,
"resume_crtc_pipeline": handle_resume_crtc_pipeline,
# Compliance calendar renewal
"renewal_payment": handle_renewal_payment,
"run_renewal_check": handle_run_renewal_check,
# FCC 499-A import from USAC
"import_499a": handle_import_499a,
# FCC CORES red light check
"check_red_light": handle_check_red_light,
}
def _build_formation_order(order: dict):
"""Convert ERPNext Formation Order dict to our FormationOrder dataclass."""
from scripts.formation.base import FormationOrder, Member, EntityType
members_raw = order.get("members_json") or []
if isinstance(members_raw, str):
import json
members_raw = json.loads(members_raw)
members = [
Member(
name=m.get("name", ""),
address=m.get("address", ""),
city=m.get("city", ""),
state=m.get("state", ""),
zip_code=m.get("zip", ""),
title=m.get("title", "Member"),
ownership_pct=m.get("ownership_pct", 0),
)
for m in members_raw
]
entity_map = {"LLC": EntityType.LLC, "Corporation": EntityType.CORPORATION, "S-Corp": EntityType.S_CORP}
return FormationOrder(
order_id=order.get("name", ""),
state_code=order.get("state_code", ""),
entity_type=entity_map.get(order.get("entity_type", ""), EntityType.LLC),
entity_name=order.get("entity_name", ""),
entity_name_alt=order.get("entity_name_alt", ""),
management_type=order.get("management_type", "member_managed"),
purpose=order.get("purpose", "Any lawful business activity"),
members=members,
principal_address=order.get("principal_address", ""),
expedited=bool(order.get("expedited")),
)
# ==========================================================================
# HTTP Server
# ==========================================================================
class JobHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/health":
self._respond(200, {
"status": "ok",
"service": "performancewest-workers",
"active_jobs": len(active_jobs),
"max_workers": MAX_WORKERS,
})
elif self.path == "/jobs/active":
self._respond(200, {
"active": list(active_jobs.values()),
"recent_completed": completed_jobs[-10:],
})
else:
self._respond(404, {"error": "Not found"})
def do_POST(self):
if self.path == "/entity-status":
# Synchronous entity status lookup via state portal adapter
self._handle_entity_status()
return
if self.path == "/rmd-preview":
self._handle_rmd_preview()
return
if self.path != "/jobs":
self._respond(404, {"error": "Not found"})
return
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
try:
payload = json.loads(body)
except json.JSONDecodeError:
self._respond(400, {"error": "Invalid JSON"})
return
action = payload.get("action")
if not action or action not in JOB_HANDLERS:
self._respond(400, {"error": f"Unknown action: {action}. Available: {list(JOB_HANDLERS.keys())}"})
return
# Create job
job_id = str(uuid.uuid4())[:8]
job_record = {
"id": job_id,
"action": action,
"order_name": payload.get("order_name", ""),
"status": "queued",
"started_at": datetime.utcnow().isoformat(),
}
active_jobs[job_id] = job_record
# Execute in background thread
def run_job():
job_record["status"] = "running"
try:
handler = JOB_HANDLERS[action]
result = handler(payload)
job_record["status"] = "completed"
job_record["result"] = result
LOG.info("Job %s (%s) completed: %s", job_id, action, result.get("action"))
except Exception as e:
job_record["status"] = "failed"
job_record["error"] = str(e)
LOG.error("Job %s (%s) failed: %s", job_id, action, e, exc_info=True)
finally:
job_record["finished_at"] = datetime.utcnow().isoformat()
active_jobs.pop(job_id, None)
completed_jobs.append(job_record)
if len(completed_jobs) > 100:
completed_jobs.pop(0)
executor.submit(run_job)
self._respond(202, {"job_id": job_id, "action": action, "status": "queued"})
def _handle_entity_status(self):
"""Synchronous entity status lookup via Playwright state adapter.
POST /entity-status {entity_name, state_code}
Returns entity status from the state SOS portal (cached 24h).
"""
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
try:
payload = json.loads(body)
except json.JSONDecodeError:
self._respond(400, {"error": "Invalid JSON"})
return
entity_name = payload.get("entity_name", "").strip()
state_code = payload.get("state_code", "").strip().upper()
if not entity_name or not state_code:
self._respond(400, {"error": "entity_name and state_code required"})
return
try:
import asyncio
from scripts.formation.name_search import search_name
loop = asyncio.new_event_loop()
result = loop.run_until_complete(search_name(entity_name, state_code))
loop.close()
# search_name returns a NameSearchResult dataclass — convert to dict
if hasattr(result, "__dict__"):
result = vars(result)
elif hasattr(result, "_asdict"):
result = result._asdict()
if result.get("available") is False:
# Name is taken = entity exists (active)
self._respond(200, {
"found": True,
"entity_name": entity_name.upper(),
"entity_number": result.get("entity_number"),
"entity_type": result.get("entity_type"),
"status": "ACTIVE",
"formation_date": result.get("formation_date"),
"registered_agent": result.get("registered_agent"),
"state": state_code,
"source": "live_search",
})
elif result.get("available") is True:
# Name is available = no entity with this name
self._respond(200, {"found": False, "state": state_code})
else:
# Inconclusive
self._respond(200, {"found": False, "state": state_code, "note": "Search inconclusive"})
except Exception as e:
LOG.warning("Entity status search failed for %s in %s: %s", entity_name, state_code, e)
self._respond(200, {"found": False, "state": state_code, "error": str(e)})
def _handle_rmd_preview(self):
"""Generate an RMD certification preview PDF from client-provided data.
POST /rmd-preview {order_number, entity: {...}}
Returns {pdf_url} with a presigned MinIO URL for the generated PDF.
"""
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
try:
payload = json.loads(body)
except json.JSONDecodeError:
self._respond(400, {"error": "Invalid JSON"})
return
entity = payload.get("entity", {})
order_number = payload.get("order_number", "PREVIEW")
if not entity.get("legal_name") or not entity.get("frn"):
self._respond(400, {"error": "legal_name and frn required"})
return
try:
import tempfile
from scripts.document_gen.templates.rmd_letter_generator import generate_rmd_letter
work_dir = tempfile.mkdtemp(prefix="rmd_preview_")
date_str = datetime.now().strftime("%Y%m%d")
docx_path = os.path.join(work_dir, f"rmd_preview_{order_number}_{date_str}.docx")
generate_rmd_letter(
entity_name=entity.get("legal_name", ""),
dba_name=entity.get("dba_name", ""),
frn=entity.get("frn", ""),
rmd_number=entity.get("rmd_number", ""),
filer_id_499=entity.get("filer_id_499", ""),
address_street=entity.get("address_street", ""),
address_city=entity.get("address_city", ""),
address_state=entity.get("address_state", ""),
address_zip=entity.get("address_zip", ""),
contact_name=entity.get("contact_name", ""),
contact_title=entity.get("contact_title", ""),
contact_email=entity.get("contact_email", ""),
contact_phone=entity.get("contact_phone", ""),
ceo_name=entity.get("ceo_name", ""),
ceo_title=entity.get("ceo_title", "Chief Executive Officer"),
carrier_category=entity.get("carrier_category", "interconnected_voip"),
infra_type=entity.get("infra_type", "facilities"),
is_wholesale=entity.get("is_wholesale", False),
is_gateway_provider=entity.get("is_gateway_provider", False),
is_international_only=False,
uses_ucaas_provider=False,
carrier_metadata={},
stir_shaken_status=entity.get("stir_shaken_status", "complete_implementation"),
stir_shaken_cert_authority=entity.get("stir_shaken_cert_authority", ""),
upstream_provider_name=entity.get("upstream_provider_name", ""),
upstream_provider_frn="",
output_path=docx_path,
)
# Convert to PDF
import subprocess
subprocess.run(
["libreoffice", "--headless", "--convert-to", "pdf", "--outdir", work_dir, docx_path],
capture_output=True, timeout=30,
)
pdf_path = docx_path.replace(".docx", ".pdf")
if not os.path.exists(pdf_path):
self._respond(500, {"error": "PDF conversion failed"})
return
# Upload to MinIO and get presigned URL
from scripts.document_gen.minio_client import MinioStorage
storage = MinioStorage()
remote_key = f"previews/{order_number}/rmd_preview_{date_str}.pdf"
storage.upload(pdf_path, remote_key)
pdf_url = storage.get_url(remote_key, expires_hours=24)
self._respond(200, {"pdf_url": pdf_url, "generated": True})
# Cleanup
import shutil
shutil.rmtree(work_dir, ignore_errors=True)
except Exception as exc:
LOG.error("RMD preview generation failed: %s", exc, exc_info=True)
self._respond(500, {"error": f"Preview generation failed: {exc}"})
def _respond(self, status: int, body: dict):
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(body).encode())
def log_message(self, format, *args):
# Suppress default HTTP logging, we use our own
pass
# ==========================================================================
# Deferred job auto-retry — polls ERPNext for Deferred orders past defer_until
# ==========================================================================
DEFER_POLL_INTERVAL = int(os.getenv("DEFER_POLL_INTERVAL", "300")) # 5 min
def _deferred_retry_loop():
"""Background thread: polls for deferred Formation Orders ready to retry."""
LOG.info("Deferred retry loop started (poll every %ds)", DEFER_POLL_INTERVAL)
while True:
time.sleep(DEFER_POLL_INTERVAL)
try:
from scripts.workers.erpnext_client import ERPNextClient
client = ERPNextClient()
# Fetch Formation Orders with automation_status=Deferred and defer_until <= now
now_iso = datetime.utcnow().isoformat()
orders = client.get_resource("Formation Order", filters=[
["automation_status", "=", "Deferred"],
["defer_until", "<=", now_iso],
], fields=["name", "state_code", "entity_name", "include_ein"])
if not orders:
continue
LOG.info("Found %d deferred orders ready to retry", len(orders))
for order in orders:
order_name = order["name"]
state_code = order.get("state_code", "")
# Reset status to Pending so the retry handler knows it's fresh
client.update_resource("Formation Order", order_name, {
"automation_status": "Pending",
"defer_until": None,
"admin_notes": f"Auto-retrying after deferral (was deferred until {now_iso})",
})
# Re-dispatch the filing job via the thread pool
payload = {"order_name": order_name}
job_id = str(uuid.uuid4())[:8]
job_record = {
"id": job_id,
"action": "file_entity",
"order_name": order_name,
"status": "queued",
"started_at": time.time(),
"source": "deferred_retry",
}
active_jobs[job_id] = job_record
def _run_deferred(jid=job_id, jr=job_record, p=payload):
jr["status"] = "running"
try:
result = handle_file_entity(p)
jr["status"] = "completed"
jr["result"] = result
LOG.info("Deferred job %s completed: %s", jid, result)
except Exception as e:
jr["status"] = "failed"
jr["error"] = str(e)
LOG.error("Deferred job %s failed: %s", jid, e, exc_info=True)
finally:
jr["finished_at"] = datetime.utcnow().isoformat()
active_jobs.pop(jid, None)
completed_jobs.append(jr)
if len(completed_jobs) > 100:
completed_jobs.pop(0)
executor.submit(_run_deferred)
LOG.info("Re-dispatched deferred order %s (state: %s)", order_name, state_code)
except Exception as e:
LOG.error("Deferred retry loop error: %s", e, exc_info=True)
def main():
LOG.info("=" * 60)
LOG.info("Performance West Worker Job Server")
LOG.info(" Port: %d", PORT)
LOG.info(" Max workers: %d", MAX_WORKERS)
LOG.info(" Available actions: %s", list(JOB_HANDLERS.keys()))
LOG.info(" Deferred retry poll: every %ds", DEFER_POLL_INTERVAL)
LOG.info("=" * 60)
# Start deferred retry background thread
retry_thread = threading.Thread(target=_deferred_retry_loop, daemon=True, name="deferred-retry")
retry_thread.start()
server = HTTPServer(("0.0.0.0", PORT), JobHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
LOG.info("Shutting down...")
executor.shutdown(wait=True)
server.server_close()
if __name__ == "__main__":
main()