new-site/scripts/workers/base_worker.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

292 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Base order-fulfillment worker.
Polls ERPNext for Sales Orders with workflow_state = "Queued",
dispatches each to the appropriate service handler, uploads generated
documents to MinIO, and advances the order through the workflow.
Run as a long-running process (e.g. inside a Docker container):
python -m scripts.workers.base_worker
Environment variables:
ERPNEXT_URL, ERPNEXT_API_KEY, ERPNEXT_API_SECRET
MINIO_ENDPOINT, MINIO_ACCESS_KEY, MINIO_SECRET_KEY, MINIO_BUCKET
POLL_INTERVAL seconds between polls (default 60)
DELAY_MIN min seconds between processing orders (default 2)
DELAY_MAX max seconds between processing orders (default 8)
LOCK_FILE path for single-instance lock (default /tmp/pw_worker.lock)
"""
from __future__ import annotations
import fcntl
import logging
import os
import random
import signal
import sys
import time
import traceback
from pathlib import Path
from typing import Any
from minio import Minio
from .erpnext_client import ERPNextClient, ERPNextClientError
from .services import SERVICE_HANDLERS
# --------------------------------------------------------------------------- #
# Configuration
# --------------------------------------------------------------------------- #
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "60"))
DELAY_MIN = int(os.getenv("DELAY_MIN", "2"))
DELAY_MAX = int(os.getenv("DELAY_MAX", "8"))
LOCK_FILE = os.getenv("LOCK_FILE", "/tmp/pw_worker.lock")
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "performancewest")
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
# --------------------------------------------------------------------------- #
# Logging (stdout for Docker log collection)
# --------------------------------------------------------------------------- #
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger("pw.worker.base")
# --------------------------------------------------------------------------- #
# Graceful shutdown
# --------------------------------------------------------------------------- #
_shutdown_requested = False
def _signal_handler(signum: int, _frame: Any) -> None:
global _shutdown_requested
logger.info("Received signal %s shutting down gracefully …", signum)
_shutdown_requested = True
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
# --------------------------------------------------------------------------- #
# Single-instance lock
# --------------------------------------------------------------------------- #
def _acquire_lock() -> int:
"""Acquire an exclusive file lock so only one worker instance runs."""
fd = os.open(LOCK_FILE, os.O_CREAT | os.O_RDWR)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
logger.error("Another worker instance is already running. Exiting.")
sys.exit(1)
return fd
# --------------------------------------------------------------------------- #
# MinIO helpers
# --------------------------------------------------------------------------- #
def _get_minio_client() -> Minio:
return Minio(
MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=MINIO_SECURE,
)
def _ensure_bucket(client: Minio) -> None:
if not client.bucket_exists(MINIO_BUCKET):
client.make_bucket(MINIO_BUCKET)
logger.info("Created MinIO bucket: %s", MINIO_BUCKET)
def _upload_to_minio(client: Minio, local_path: str, object_name: str) -> str:
"""Upload a file to MinIO and return the object name."""
client.fput_object(MINIO_BUCKET, object_name, local_path)
logger.info("Uploaded %s → minio://%s/%s", local_path, MINIO_BUCKET, object_name)
return object_name
# --------------------------------------------------------------------------- #
# Order processing
# --------------------------------------------------------------------------- #
def _resolve_service_slug(order_data: dict) -> str | None:
"""Extract the service slug from the first order item's item_code."""
items = order_data.get("items") or []
if not items:
return None
# The item_code is expected to match a slug in SERVICE_HANDLERS
item_code = items[0].get("item_code", "")
return item_code if item_code in SERVICE_HANDLERS else None
def _set_order_status(erp: ERPNextClient, order_name: str, status: str) -> None:
erp.update_resource("Sales Order", order_name, {"workflow_state": status})
logger.info("Order %s%s", order_name, status)
def _create_failure_issue(
erp: ERPNextClient, order_name: str, error_message: str
) -> None:
"""Create an ERPNext Issue so admins are alerted to a processing failure."""
try:
erp.create_resource(
"Issue",
{
"subject": f"Order fulfillment failed: {order_name}",
"description": (
f"Order **{order_name}** failed during automated processing.\n\n"
f"```\n{error_message}\n```"
),
"priority": "High",
"issue_type": "Bug",
},
)
logger.info("Created ERPNext Issue for failed order %s", order_name)
except ERPNextClientError:
logger.exception("Failed to create Issue for order %s", order_name)
def _process_order(
erp: ERPNextClient, minio_client: Minio, order_data: dict
) -> None:
"""Full processing pipeline for a single order."""
order_name: str = order_data["name"]
# 1. Determine service type ------------------------------------------------
slug = _resolve_service_slug(order_data)
if slug is None:
msg = f"No matching service handler for order items: {order_data.get('items')}"
logger.error(msg)
_set_order_status(erp, order_name, "Failed")
_create_failure_issue(erp, order_name, msg)
return
handler_cls = SERVICE_HANDLERS[slug]
handler = handler_cls()
logger.info(
"Processing order %s with handler %s (%s)",
order_name,
handler.SERVICE_NAME,
slug,
)
# 2. Set status to Processing ----------------------------------------------
_set_order_status(erp, order_name, "Processing")
try:
# 3. Run the handler -------------------------------------------------------
import asyncio
file_paths: list[str] = asyncio.run(handler.process(order_data))
# 4. Upload to MinIO -------------------------------------------------------
minio_urls: list[str] = []
for fpath in file_paths:
obj_name = handler._build_output_path(
order_name, Path(fpath).name
)
_upload_to_minio(minio_client, fpath, obj_name)
minio_urls.append(f"minio://{MINIO_BUCKET}/{obj_name}")
# 5. Attach URLs to ERPNext Sales Order ------------------------------------
erp.update_resource(
"Sales Order",
order_name,
{"custom_generated_files": "\n".join(minio_urls)},
)
# Also upload files as ERPNext attachments
for fpath in file_paths:
try:
erp.upload_file(
"Sales Order", order_name, "custom_attachments", fpath
)
except ERPNextClientError:
logger.warning(
"Could not attach %s to ERPNext (non-fatal)", fpath
)
# 6. Advance to Review -----------------------------------------------------
_set_order_status(erp, order_name, "Review")
logger.info("Order %s completed successfully → Review", order_name)
except Exception:
tb = traceback.format_exc()
logger.exception("Error processing order %s", order_name)
_set_order_status(erp, order_name, "Failed")
_create_failure_issue(erp, order_name, tb)
# --------------------------------------------------------------------------- #
# Main loop
# --------------------------------------------------------------------------- #
def run() -> None:
"""Entry-point: poll ERPNext and process queued orders."""
lock_fd = _acquire_lock()
logger.info("Worker started (PID %d). Poll interval: %ds", os.getpid(), POLL_INTERVAL)
erp = ERPNextClient()
minio_client = _get_minio_client()
_ensure_bucket(minio_client)
try:
while not _shutdown_requested:
try:
orders = erp.get_queued_orders(status="Queued")
if orders:
logger.info("Found %d queued order(s)", len(orders))
else:
logger.debug("No queued orders found")
for order in orders:
if _shutdown_requested:
break
_process_order(erp, minio_client, order)
# Human-paced delay between orders
delay = random.uniform(DELAY_MIN, DELAY_MAX)
logger.debug("Sleeping %.1fs before next order …", delay)
time.sleep(delay)
except ERPNextClientError:
logger.exception("ERPNext API error during poll cycle")
except Exception:
logger.exception("Unexpected error during poll cycle")
# Wait before next poll
for _ in range(POLL_INTERVAL):
if _shutdown_requested:
break
time.sleep(1)
finally:
erp.close()
os.close(lock_fd)
try:
os.unlink(LOCK_FILE)
except OSError:
pass
logger.info("Worker stopped.")
if __name__ == "__main__":
run()