new-site/scripts/workers/erpnext_client.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

254 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""ERPNext REST API client for order fulfillment workers.
Reads configuration from environment variables:
ERPNEXT_URL Base URL of the ERPNext instance (e.g. https://erp.performancewest.com)
ERPNEXT_API_KEY Frappe API key
ERPNEXT_API_SECRET Frappe API secret
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Any
import httpx
logger = logging.getLogger(__name__)
_TIMEOUT = 30.0 # seconds
class ERPNextClientError(Exception):
"""Raised when an ERPNext API call fails."""
def __init__(self, status_code: int, detail: str) -> None:
self.status_code = status_code
self.detail = detail
super().__init__(f"ERPNext API error {status_code}: {detail}")
class ERPNextClient:
"""Synchronous ERPNext REST API client using httpx."""
def __init__(
self,
url: str | None = None,
api_key: str | None = None,
api_secret: str | None = None,
) -> None:
self.base_url = (url or os.environ["ERPNEXT_URL"]).rstrip("/")
self._api_key = api_key or os.environ["ERPNEXT_API_KEY"]
self._api_secret = api_secret or os.environ["ERPNEXT_API_SECRET"]
self._client = httpx.Client(
base_url=self.base_url,
headers={
"Authorization": f"token {self._api_key}:{self._api_secret}",
"Accept": "application/json",
"Host": "performancewest.net", # Frappe multi-tenant routing
},
timeout=_TIMEOUT,
)
# --------------------------------------------------------------------- #
# Internal helpers
# --------------------------------------------------------------------- #
def _raise_for_status(self, response: httpx.Response) -> None:
if response.status_code >= 400:
try:
detail = response.json().get("exc", response.text)
except Exception:
detail = response.text
raise ERPNextClientError(response.status_code, str(detail))
def _api_url(self, doctype: str, name: str | None = None) -> str:
"""Build the Frappe REST resource URL."""
base = f"/api/resource/{doctype}"
if name:
base = f"{base}/{name}"
return base
# --------------------------------------------------------------------- #
# Public methods
# --------------------------------------------------------------------- #
def get_resource(
self,
doctype: str,
name: str | None = None,
filters: dict | list | None = None,
fields: list[str] | None = None,
limit: int = 50,
) -> list[dict[str, Any]] | dict[str, Any]:
"""Fetch one or many records from ERPNext.
If *name* is provided, returns a single dict for that record.
Otherwise returns a list of dicts matching the optional *filters*.
"""
params: dict[str, Any] = {}
if filters is not None:
# Frappe expects JSON-encoded filters
import json
params["filters"] = json.dumps(filters)
if fields is not None:
import json
params["fields"] = json.dumps(fields)
if name is None:
params["limit_page_length"] = limit
url = self._api_url(doctype, name)
response = self._client.get(url, params=params)
self._raise_for_status(response)
payload = response.json()
if name:
return payload.get("data", payload)
return payload.get("data", [])
def create_resource(
self, doctype: str, data: dict[str, Any]
) -> dict[str, Any]:
"""Create a new document in ERPNext."""
url = self._api_url(doctype)
response = self._client.post(url, json=data)
self._raise_for_status(response)
return response.json().get("data", response.json())
def update_resource(
self, doctype: str, name: str, data: dict[str, Any]
) -> dict[str, Any]:
"""Update an existing document in ERPNext."""
url = self._api_url(doctype, name)
response = self._client.put(url, json=data)
self._raise_for_status(response)
return response.json().get("data", response.json())
def set_value(
self, doctype: str, name: str, fieldname: str, value: Any
) -> dict[str, Any]:
"""Set a single field value on an ERPNext document.
Uses the frappe.client.set_value whitelisted method.
"""
return self.call_method("frappe.client.set_value", {
"doctype": doctype,
"name": name,
"fieldname": fieldname,
"value": value,
})
def call_method(
self,
method: str,
args: dict[str, Any] | None = None,
) -> Any:
"""Call a whitelisted Frappe API method.
Uses POST /api/method/{method}. Frappe returns the result in
{"message": <result>} for whitelisted methods.
"""
url = f"/api/method/{method}"
response = self._client.post(url, json=args or {})
self._raise_for_status(response)
return response.json().get("message", response.json())
def upload_file(
self,
doctype: str,
name: str,
field: str,
filepath: str | Path,
) -> dict[str, Any]:
"""Attach a file to an ERPNext record.
Uses the ``/api/method/upload_file`` endpoint which accepts
multipart form data.
"""
filepath = Path(filepath)
with open(filepath, "rb") as fh:
response = self._client.post(
"/api/method/upload_file",
data={
"doctype": doctype,
"docname": name,
"fieldname": field,
"is_private": 1,
},
files={"file": (filepath.name, fh, "application/octet-stream")},
)
self._raise_for_status(response)
return response.json().get("message", response.json())
def get_queued_orders(
self,
doctype: str = "Sales Order",
status: str = "Queued",
limit: int = 10,
) -> list[dict[str, Any]]:
"""Return orders whose *workflow_state* matches *status*."""
filters = {"workflow_state": status}
fields = ["name", "customer", "customer_name", "workflow_state", "items"]
return self.get_resource(
doctype, filters=filters, fields=fields, limit=limit
)
# --------------------------------------------------------------------- #
# Context manager support
# --------------------------------------------------------------------- #
def close(self) -> None:
self._client.close()
def __enter__(self) -> "ERPNextClient":
return self
def __exit__(self, *exc: Any) -> None:
self.close()
# --------------------------------------------------------------------- #
# BPM — Workflow state transitions
# --------------------------------------------------------------------- #
def apply_workflow(self, doctype: str, name: str, action: str) -> bool:
"""Apply a workflow action to advance the document state.
This is the key BPM integration — ERPNext manages the state machine,
workers call this to advance to the next state after completing their action.
Args:
doctype: ERPNext DocType (e.g., "Formation Order", "Sales Order")
name: Document name/ID
action: Workflow action name (e.g., "Name Available", "Filing Complete")
Returns:
True if the workflow was advanced successfully.
"""
import json as _json
try:
resp = self._request(
"POST",
"/api/method/frappe.model.workflow.apply_workflow",
json={
"doc": _json.dumps({"doctype": doctype, "name": name}),
"action": action,
},
)
LOG.info("Workflow advanced: %s/%s%s", doctype, name, action)
return True
except Exception as exc:
LOG.error("Workflow advance failed: %s/%s%s: %s", doctype, name, action, exc)
return False
def create_issue(self, subject: str, description: str, priority: str = "Medium") -> dict:
"""Create an Issue (support ticket) for error reporting."""
return self.create_resource("Issue", {
"subject": subject,
"description": description,
"priority": priority,
"issue_type": "Bug",
})