Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
292 lines
9.8 KiB
Python
292 lines
9.8 KiB
Python
"""Base order-fulfillment worker.
|
||
|
||
Polls ERPNext for Sales Orders with workflow_state = "Queued",
|
||
dispatches each to the appropriate service handler, uploads generated
|
||
documents to MinIO, and advances the order through the workflow.
|
||
|
||
Run as a long-running process (e.g. inside a Docker container):
|
||
|
||
python -m scripts.workers.base_worker
|
||
|
||
Environment variables:
|
||
ERPNEXT_URL, ERPNEXT_API_KEY, ERPNEXT_API_SECRET
|
||
MINIO_ENDPOINT, MINIO_ACCESS_KEY, MINIO_SECRET_KEY, MINIO_BUCKET
|
||
POLL_INTERVAL – seconds between polls (default 60)
|
||
DELAY_MIN – min seconds between processing orders (default 2)
|
||
DELAY_MAX – max seconds between processing orders (default 8)
|
||
LOCK_FILE – path for single-instance lock (default /tmp/pw_worker.lock)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import fcntl
|
||
import logging
|
||
import os
|
||
import random
|
||
import signal
|
||
import sys
|
||
import time
|
||
import traceback
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from minio import Minio
|
||
|
||
from .erpnext_client import ERPNextClient, ERPNextClientError
|
||
from .services import SERVICE_HANDLERS
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Configuration
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "60"))
|
||
DELAY_MIN = int(os.getenv("DELAY_MIN", "2"))
|
||
DELAY_MAX = int(os.getenv("DELAY_MAX", "8"))
|
||
LOCK_FILE = os.getenv("LOCK_FILE", "/tmp/pw_worker.lock")
|
||
|
||
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "minio:9000")
|
||
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "")
|
||
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "")
|
||
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "performancewest")
|
||
MINIO_SECURE = os.getenv("MINIO_SECURE", "false").lower() == "true"
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Logging (stdout for Docker log collection)
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
handlers=[logging.StreamHandler(sys.stdout)],
|
||
)
|
||
logger = logging.getLogger("pw.worker.base")
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Graceful shutdown
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
_shutdown_requested = False
|
||
|
||
|
||
def _signal_handler(signum: int, _frame: Any) -> None:
|
||
global _shutdown_requested
|
||
logger.info("Received signal %s – shutting down gracefully …", signum)
|
||
_shutdown_requested = True
|
||
|
||
|
||
signal.signal(signal.SIGTERM, _signal_handler)
|
||
signal.signal(signal.SIGINT, _signal_handler)
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Single-instance lock
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def _acquire_lock() -> int:
|
||
"""Acquire an exclusive file lock so only one worker instance runs."""
|
||
fd = os.open(LOCK_FILE, os.O_CREAT | os.O_RDWR)
|
||
try:
|
||
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||
except OSError:
|
||
logger.error("Another worker instance is already running. Exiting.")
|
||
sys.exit(1)
|
||
return fd
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# MinIO helpers
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def _get_minio_client() -> Minio:
|
||
return Minio(
|
||
MINIO_ENDPOINT,
|
||
access_key=MINIO_ACCESS_KEY,
|
||
secret_key=MINIO_SECRET_KEY,
|
||
secure=MINIO_SECURE,
|
||
)
|
||
|
||
|
||
def _ensure_bucket(client: Minio) -> None:
|
||
if not client.bucket_exists(MINIO_BUCKET):
|
||
client.make_bucket(MINIO_BUCKET)
|
||
logger.info("Created MinIO bucket: %s", MINIO_BUCKET)
|
||
|
||
|
||
def _upload_to_minio(client: Minio, local_path: str, object_name: str) -> str:
|
||
"""Upload a file to MinIO and return the object name."""
|
||
client.fput_object(MINIO_BUCKET, object_name, local_path)
|
||
logger.info("Uploaded %s → minio://%s/%s", local_path, MINIO_BUCKET, object_name)
|
||
return object_name
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Order processing
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def _resolve_service_slug(order_data: dict) -> str | None:
|
||
"""Extract the service slug from the first order item's item_code."""
|
||
items = order_data.get("items") or []
|
||
if not items:
|
||
return None
|
||
# The item_code is expected to match a slug in SERVICE_HANDLERS
|
||
item_code = items[0].get("item_code", "")
|
||
return item_code if item_code in SERVICE_HANDLERS else None
|
||
|
||
|
||
def _set_order_status(erp: ERPNextClient, order_name: str, status: str) -> None:
|
||
erp.update_resource("Sales Order", order_name, {"workflow_state": status})
|
||
logger.info("Order %s → %s", order_name, status)
|
||
|
||
|
||
def _create_failure_issue(
|
||
erp: ERPNextClient, order_name: str, error_message: str
|
||
) -> None:
|
||
"""Create an ERPNext Issue so admins are alerted to a processing failure."""
|
||
try:
|
||
erp.create_resource(
|
||
"Issue",
|
||
{
|
||
"subject": f"Order fulfillment failed: {order_name}",
|
||
"description": (
|
||
f"Order **{order_name}** failed during automated processing.\n\n"
|
||
f"```\n{error_message}\n```"
|
||
),
|
||
"priority": "High",
|
||
"issue_type": "Bug",
|
||
},
|
||
)
|
||
logger.info("Created ERPNext Issue for failed order %s", order_name)
|
||
except ERPNextClientError:
|
||
logger.exception("Failed to create Issue for order %s", order_name)
|
||
|
||
|
||
def _process_order(
|
||
erp: ERPNextClient, minio_client: Minio, order_data: dict
|
||
) -> None:
|
||
"""Full processing pipeline for a single order."""
|
||
order_name: str = order_data["name"]
|
||
|
||
# 1. Determine service type ------------------------------------------------
|
||
slug = _resolve_service_slug(order_data)
|
||
if slug is None:
|
||
msg = f"No matching service handler for order items: {order_data.get('items')}"
|
||
logger.error(msg)
|
||
_set_order_status(erp, order_name, "Failed")
|
||
_create_failure_issue(erp, order_name, msg)
|
||
return
|
||
|
||
handler_cls = SERVICE_HANDLERS[slug]
|
||
handler = handler_cls()
|
||
logger.info(
|
||
"Processing order %s with handler %s (%s)",
|
||
order_name,
|
||
handler.SERVICE_NAME,
|
||
slug,
|
||
)
|
||
|
||
# 2. Set status to Processing ----------------------------------------------
|
||
_set_order_status(erp, order_name, "Processing")
|
||
|
||
try:
|
||
# 3. Run the handler -------------------------------------------------------
|
||
import asyncio
|
||
|
||
file_paths: list[str] = asyncio.run(handler.process(order_data))
|
||
|
||
# 4. Upload to MinIO -------------------------------------------------------
|
||
minio_urls: list[str] = []
|
||
for fpath in file_paths:
|
||
obj_name = handler._build_output_path(
|
||
order_name, Path(fpath).name
|
||
)
|
||
_upload_to_minio(minio_client, fpath, obj_name)
|
||
minio_urls.append(f"minio://{MINIO_BUCKET}/{obj_name}")
|
||
|
||
# 5. Attach URLs to ERPNext Sales Order ------------------------------------
|
||
erp.update_resource(
|
||
"Sales Order",
|
||
order_name,
|
||
{"custom_generated_files": "\n".join(minio_urls)},
|
||
)
|
||
|
||
# Also upload files as ERPNext attachments
|
||
for fpath in file_paths:
|
||
try:
|
||
erp.upload_file(
|
||
"Sales Order", order_name, "custom_attachments", fpath
|
||
)
|
||
except ERPNextClientError:
|
||
logger.warning(
|
||
"Could not attach %s to ERPNext (non-fatal)", fpath
|
||
)
|
||
|
||
# 6. Advance to Review -----------------------------------------------------
|
||
_set_order_status(erp, order_name, "Review")
|
||
logger.info("Order %s completed successfully → Review", order_name)
|
||
|
||
except Exception:
|
||
tb = traceback.format_exc()
|
||
logger.exception("Error processing order %s", order_name)
|
||
_set_order_status(erp, order_name, "Failed")
|
||
_create_failure_issue(erp, order_name, tb)
|
||
|
||
|
||
# --------------------------------------------------------------------------- #
|
||
# Main loop
|
||
# --------------------------------------------------------------------------- #
|
||
|
||
|
||
def run() -> None:
|
||
"""Entry-point: poll ERPNext and process queued orders."""
|
||
lock_fd = _acquire_lock()
|
||
logger.info("Worker started (PID %d). Poll interval: %ds", os.getpid(), POLL_INTERVAL)
|
||
|
||
erp = ERPNextClient()
|
||
minio_client = _get_minio_client()
|
||
_ensure_bucket(minio_client)
|
||
|
||
try:
|
||
while not _shutdown_requested:
|
||
try:
|
||
orders = erp.get_queued_orders(status="Queued")
|
||
if orders:
|
||
logger.info("Found %d queued order(s)", len(orders))
|
||
else:
|
||
logger.debug("No queued orders found")
|
||
|
||
for order in orders:
|
||
if _shutdown_requested:
|
||
break
|
||
|
||
_process_order(erp, minio_client, order)
|
||
|
||
# Human-paced delay between orders
|
||
delay = random.uniform(DELAY_MIN, DELAY_MAX)
|
||
logger.debug("Sleeping %.1fs before next order …", delay)
|
||
time.sleep(delay)
|
||
|
||
except ERPNextClientError:
|
||
logger.exception("ERPNext API error during poll cycle")
|
||
except Exception:
|
||
logger.exception("Unexpected error during poll cycle")
|
||
|
||
# Wait before next poll
|
||
for _ in range(POLL_INTERVAL):
|
||
if _shutdown_requested:
|
||
break
|
||
time.sleep(1)
|
||
|
||
finally:
|
||
erp.close()
|
||
os.close(lock_fd)
|
||
try:
|
||
os.unlink(LOCK_FILE)
|
||
except OSError:
|
||
pass
|
||
logger.info("Worker stopped.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run()
|