"""ERPNext REST API client for order fulfillment workers. Reads configuration from environment variables: ERPNEXT_URL – Base URL of the ERPNext instance (e.g. https://erp.performancewest.com) ERPNEXT_API_KEY – Frappe API key ERPNEXT_API_SECRET – Frappe API secret """ from __future__ import annotations import logging import os from pathlib import Path from typing import Any import httpx logger = logging.getLogger(__name__) _TIMEOUT = 30.0 # seconds class ERPNextClientError(Exception): """Raised when an ERPNext API call fails.""" def __init__(self, status_code: int, detail: str) -> None: self.status_code = status_code self.detail = detail super().__init__(f"ERPNext API error {status_code}: {detail}") class ERPNextClient: """Synchronous ERPNext REST API client using httpx.""" def __init__( self, url: str | None = None, api_key: str | None = None, api_secret: str | None = None, ) -> None: self.base_url = (url or os.environ["ERPNEXT_URL"]).rstrip("/") self._api_key = api_key or os.environ["ERPNEXT_API_KEY"] self._api_secret = api_secret or os.environ["ERPNEXT_API_SECRET"] self._client = httpx.Client( base_url=self.base_url, headers={ "Authorization": f"token {self._api_key}:{self._api_secret}", "Accept": "application/json", "Host": "performancewest.net", # Frappe multi-tenant routing }, timeout=_TIMEOUT, ) # --------------------------------------------------------------------- # # Internal helpers # --------------------------------------------------------------------- # def _raise_for_status(self, response: httpx.Response) -> None: if response.status_code >= 400: try: detail = response.json().get("exc", response.text) except Exception: detail = response.text raise ERPNextClientError(response.status_code, str(detail)) def _api_url(self, doctype: str, name: str | None = None) -> str: """Build the Frappe REST resource URL.""" base = f"/api/resource/{doctype}" if name: base = f"{base}/{name}" return base # --------------------------------------------------------------------- # # Public methods # --------------------------------------------------------------------- # def get_resource( self, doctype: str, name: str | None = None, filters: dict | list | None = None, fields: list[str] | None = None, limit: int = 50, ) -> list[dict[str, Any]] | dict[str, Any]: """Fetch one or many records from ERPNext. If *name* is provided, returns a single dict for that record. Otherwise returns a list of dicts matching the optional *filters*. """ params: dict[str, Any] = {} if filters is not None: # Frappe expects JSON-encoded filters import json params["filters"] = json.dumps(filters) if fields is not None: import json params["fields"] = json.dumps(fields) if name is None: params["limit_page_length"] = limit url = self._api_url(doctype, name) response = self._client.get(url, params=params) self._raise_for_status(response) payload = response.json() if name: return payload.get("data", payload) return payload.get("data", []) def create_resource( self, doctype: str, data: dict[str, Any] ) -> dict[str, Any]: """Create a new document in ERPNext.""" url = self._api_url(doctype) response = self._client.post(url, json=data) self._raise_for_status(response) return response.json().get("data", response.json()) def update_resource( self, doctype: str, name: str, data: dict[str, Any] ) -> dict[str, Any]: """Update an existing document in ERPNext.""" url = self._api_url(doctype, name) response = self._client.put(url, json=data) self._raise_for_status(response) return response.json().get("data", response.json()) def set_value( self, doctype: str, name: str, fieldname: str, value: Any ) -> dict[str, Any]: """Set a single field value on an ERPNext document. Uses the frappe.client.set_value whitelisted method. """ return self.call_method("frappe.client.set_value", { "doctype": doctype, "name": name, "fieldname": fieldname, "value": value, }) def call_method( self, method: str, args: dict[str, Any] | None = None, ) -> Any: """Call a whitelisted Frappe API method. Uses POST /api/method/{method}. Frappe returns the result in {"message": } for whitelisted methods. """ url = f"/api/method/{method}" response = self._client.post(url, json=args or {}) self._raise_for_status(response) return response.json().get("message", response.json()) def upload_file( self, doctype: str, name: str, field: str, filepath: str | Path, ) -> dict[str, Any]: """Attach a file to an ERPNext record. Uses the ``/api/method/upload_file`` endpoint which accepts multipart form data. """ filepath = Path(filepath) with open(filepath, "rb") as fh: response = self._client.post( "/api/method/upload_file", data={ "doctype": doctype, "docname": name, "fieldname": field, "is_private": 1, }, files={"file": (filepath.name, fh, "application/octet-stream")}, ) self._raise_for_status(response) return response.json().get("message", response.json()) def get_queued_orders( self, doctype: str = "Sales Order", status: str = "Queued", limit: int = 10, ) -> list[dict[str, Any]]: """Return orders whose *workflow_state* matches *status*.""" filters = {"workflow_state": status} fields = ["name", "customer", "customer_name", "workflow_state", "items"] return self.get_resource( doctype, filters=filters, fields=fields, limit=limit ) # --------------------------------------------------------------------- # # Context manager support # --------------------------------------------------------------------- # def close(self) -> None: self._client.close() def __enter__(self) -> "ERPNextClient": return self def __exit__(self, *exc: Any) -> None: self.close() # --------------------------------------------------------------------- # # BPM — Workflow state transitions # --------------------------------------------------------------------- # def apply_workflow(self, doctype: str, name: str, action: str) -> bool: """Apply a workflow action to advance the document state. This is the key BPM integration — ERPNext manages the state machine, workers call this to advance to the next state after completing their action. Args: doctype: ERPNext DocType (e.g., "Formation Order", "Sales Order") name: Document name/ID action: Workflow action name (e.g., "Name Available", "Filing Complete") Returns: True if the workflow was advanced successfully. """ import json as _json try: resp = self._request( "POST", "/api/method/frappe.model.workflow.apply_workflow", json={ "doc": _json.dumps({"doctype": doctype, "name": name}), "action": action, }, ) LOG.info("Workflow advanced: %s/%s → %s", doctype, name, action) return True except Exception as exc: LOG.error("Workflow advance failed: %s/%s → %s: %s", doctype, name, action, exc) return False def create_issue(self, subject: str, description: str, priority: str = "Medium") -> dict: """Create an Issue (support ticket) for error reporting.""" return self.create_resource("Issue", { "subject": subject, "description": description, "priority": priority, "issue_type": "Bug", })