Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
254 lines
8.5 KiB
Python
254 lines
8.5 KiB
Python
"""ERPNext REST API client for order fulfillment workers.
|
||
|
||
Reads configuration from environment variables:
|
||
ERPNEXT_URL – Base URL of the ERPNext instance (e.g. https://erp.performancewest.com)
|
||
ERPNEXT_API_KEY – Frappe API key
|
||
ERPNEXT_API_SECRET – Frappe API secret
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import httpx
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_TIMEOUT = 30.0 # seconds
|
||
|
||
|
||
class ERPNextClientError(Exception):
|
||
"""Raised when an ERPNext API call fails."""
|
||
|
||
def __init__(self, status_code: int, detail: str) -> None:
|
||
self.status_code = status_code
|
||
self.detail = detail
|
||
super().__init__(f"ERPNext API error {status_code}: {detail}")
|
||
|
||
|
||
class ERPNextClient:
|
||
"""Synchronous ERPNext REST API client using httpx."""
|
||
|
||
def __init__(
|
||
self,
|
||
url: str | None = None,
|
||
api_key: str | None = None,
|
||
api_secret: str | None = None,
|
||
) -> None:
|
||
self.base_url = (url or os.environ["ERPNEXT_URL"]).rstrip("/")
|
||
self._api_key = api_key or os.environ["ERPNEXT_API_KEY"]
|
||
self._api_secret = api_secret or os.environ["ERPNEXT_API_SECRET"]
|
||
|
||
self._client = httpx.Client(
|
||
base_url=self.base_url,
|
||
headers={
|
||
"Authorization": f"token {self._api_key}:{self._api_secret}",
|
||
"Accept": "application/json",
|
||
"Host": "performancewest.net", # Frappe multi-tenant routing
|
||
},
|
||
timeout=_TIMEOUT,
|
||
)
|
||
|
||
# --------------------------------------------------------------------- #
|
||
# Internal helpers
|
||
# --------------------------------------------------------------------- #
|
||
|
||
def _raise_for_status(self, response: httpx.Response) -> None:
|
||
if response.status_code >= 400:
|
||
try:
|
||
detail = response.json().get("exc", response.text)
|
||
except Exception:
|
||
detail = response.text
|
||
raise ERPNextClientError(response.status_code, str(detail))
|
||
|
||
def _api_url(self, doctype: str, name: str | None = None) -> str:
|
||
"""Build the Frappe REST resource URL."""
|
||
base = f"/api/resource/{doctype}"
|
||
if name:
|
||
base = f"{base}/{name}"
|
||
return base
|
||
|
||
# --------------------------------------------------------------------- #
|
||
# Public methods
|
||
# --------------------------------------------------------------------- #
|
||
|
||
def get_resource(
|
||
self,
|
||
doctype: str,
|
||
name: str | None = None,
|
||
filters: dict | list | None = None,
|
||
fields: list[str] | None = None,
|
||
limit: int = 50,
|
||
) -> list[dict[str, Any]] | dict[str, Any]:
|
||
"""Fetch one or many records from ERPNext.
|
||
|
||
If *name* is provided, returns a single dict for that record.
|
||
Otherwise returns a list of dicts matching the optional *filters*.
|
||
"""
|
||
params: dict[str, Any] = {}
|
||
if filters is not None:
|
||
# Frappe expects JSON-encoded filters
|
||
import json
|
||
|
||
params["filters"] = json.dumps(filters)
|
||
if fields is not None:
|
||
import json
|
||
|
||
params["fields"] = json.dumps(fields)
|
||
if name is None:
|
||
params["limit_page_length"] = limit
|
||
|
||
url = self._api_url(doctype, name)
|
||
response = self._client.get(url, params=params)
|
||
self._raise_for_status(response)
|
||
|
||
payload = response.json()
|
||
if name:
|
||
return payload.get("data", payload)
|
||
return payload.get("data", [])
|
||
|
||
def create_resource(
|
||
self, doctype: str, data: dict[str, Any]
|
||
) -> dict[str, Any]:
|
||
"""Create a new document in ERPNext."""
|
||
url = self._api_url(doctype)
|
||
response = self._client.post(url, json=data)
|
||
self._raise_for_status(response)
|
||
return response.json().get("data", response.json())
|
||
|
||
def update_resource(
|
||
self, doctype: str, name: str, data: dict[str, Any]
|
||
) -> dict[str, Any]:
|
||
"""Update an existing document in ERPNext."""
|
||
url = self._api_url(doctype, name)
|
||
response = self._client.put(url, json=data)
|
||
self._raise_for_status(response)
|
||
return response.json().get("data", response.json())
|
||
|
||
def set_value(
|
||
self, doctype: str, name: str, fieldname: str, value: Any
|
||
) -> dict[str, Any]:
|
||
"""Set a single field value on an ERPNext document.
|
||
|
||
Uses the frappe.client.set_value whitelisted method.
|
||
"""
|
||
return self.call_method("frappe.client.set_value", {
|
||
"doctype": doctype,
|
||
"name": name,
|
||
"fieldname": fieldname,
|
||
"value": value,
|
||
})
|
||
|
||
def call_method(
|
||
self,
|
||
method: str,
|
||
args: dict[str, Any] | None = None,
|
||
) -> Any:
|
||
"""Call a whitelisted Frappe API method.
|
||
|
||
Uses POST /api/method/{method}. Frappe returns the result in
|
||
{"message": <result>} for whitelisted methods.
|
||
"""
|
||
url = f"/api/method/{method}"
|
||
response = self._client.post(url, json=args or {})
|
||
self._raise_for_status(response)
|
||
return response.json().get("message", response.json())
|
||
|
||
def upload_file(
|
||
self,
|
||
doctype: str,
|
||
name: str,
|
||
field: str,
|
||
filepath: str | Path,
|
||
) -> dict[str, Any]:
|
||
"""Attach a file to an ERPNext record.
|
||
|
||
Uses the ``/api/method/upload_file`` endpoint which accepts
|
||
multipart form data.
|
||
"""
|
||
filepath = Path(filepath)
|
||
with open(filepath, "rb") as fh:
|
||
response = self._client.post(
|
||
"/api/method/upload_file",
|
||
data={
|
||
"doctype": doctype,
|
||
"docname": name,
|
||
"fieldname": field,
|
||
"is_private": 1,
|
||
},
|
||
files={"file": (filepath.name, fh, "application/octet-stream")},
|
||
)
|
||
self._raise_for_status(response)
|
||
return response.json().get("message", response.json())
|
||
|
||
def get_queued_orders(
|
||
self,
|
||
doctype: str = "Sales Order",
|
||
status: str = "Queued",
|
||
limit: int = 10,
|
||
) -> list[dict[str, Any]]:
|
||
"""Return orders whose *workflow_state* matches *status*."""
|
||
filters = {"workflow_state": status}
|
||
fields = ["name", "customer", "customer_name", "workflow_state", "items"]
|
||
return self.get_resource(
|
||
doctype, filters=filters, fields=fields, limit=limit
|
||
)
|
||
|
||
# --------------------------------------------------------------------- #
|
||
# Context manager support
|
||
# --------------------------------------------------------------------- #
|
||
|
||
def close(self) -> None:
|
||
self._client.close()
|
||
|
||
def __enter__(self) -> "ERPNextClient":
|
||
return self
|
||
|
||
def __exit__(self, *exc: Any) -> None:
|
||
self.close()
|
||
|
||
# --------------------------------------------------------------------- #
|
||
# BPM — Workflow state transitions
|
||
# --------------------------------------------------------------------- #
|
||
|
||
def apply_workflow(self, doctype: str, name: str, action: str) -> bool:
|
||
"""Apply a workflow action to advance the document state.
|
||
|
||
This is the key BPM integration — ERPNext manages the state machine,
|
||
workers call this to advance to the next state after completing their action.
|
||
|
||
Args:
|
||
doctype: ERPNext DocType (e.g., "Formation Order", "Sales Order")
|
||
name: Document name/ID
|
||
action: Workflow action name (e.g., "Name Available", "Filing Complete")
|
||
|
||
Returns:
|
||
True if the workflow was advanced successfully.
|
||
"""
|
||
import json as _json
|
||
try:
|
||
resp = self._request(
|
||
"POST",
|
||
"/api/method/frappe.model.workflow.apply_workflow",
|
||
json={
|
||
"doc": _json.dumps({"doctype": doctype, "name": name}),
|
||
"action": action,
|
||
},
|
||
)
|
||
LOG.info("Workflow advanced: %s/%s → %s", doctype, name, action)
|
||
return True
|
||
except Exception as exc:
|
||
LOG.error("Workflow advance failed: %s/%s → %s: %s", doctype, name, action, exc)
|
||
return False
|
||
|
||
def create_issue(self, subject: str, description: str, priority: str = "Medium") -> dict:
|
||
"""Create an Issue (support ticket) for error reporting."""
|
||
return self.create_resource("Issue", {
|
||
"subject": subject,
|
||
"description": description,
|
||
"priority": priority,
|
||
"issue_type": "Bug",
|
||
})
|