""" formation_worker.py — Order queue processor for business formation filings. Polls the PostgreSQL `formation_orders` table for new orders and processes them through the appropriate state adapter. Designed to run as a long-lived daemon with single-instance locking. Features: - Polls for orders with status='received' every 60 seconds - Configurable human-paced delays between orders - Single-instance locking via fcntl.flock - Structured logging to ~/logs/formation-worker.log - ERPNext Issue creation on errors Environment variables: DATABASE_URL PostgreSQL connection string FORMATION_DELAY_MIN Minimum delay between orders in minutes (default: 30) FORMATION_DELAY_MAX Maximum delay between orders in minutes (default: 120) Usage: python -m formation.formation_worker """ from __future__ import annotations import asyncio import fcntl import json import logging import os import random import signal import sys import time import traceback from datetime import datetime, timezone from pathlib import Path import psycopg2 import psycopg2.extras from .base import ( EntityType, FilingResult, FilingStatus, FormationOrder, Member, NameSearchResult, ) from .states import get_adapter, STATES # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- DATABASE_URL = os.environ.get("DATABASE_URL", "") POLL_INTERVAL_SECONDS = 60 DELAY_MIN_MINUTES = int(os.environ.get("FORMATION_DELAY_MIN", "30")) DELAY_MAX_MINUTES = int(os.environ.get("FORMATION_DELAY_MAX", "120")) LOCK_FILE = "/tmp/formation-worker.lock" LOG_DIR = Path.home() / "logs" LOG_FILE = LOG_DIR / "formation-worker.log" # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- LOG_DIR.mkdir(parents=True, exist_ok=True) LOG = logging.getLogger("formation.worker") _file_handler = logging.FileHandler(str(LOG_FILE)) _file_handler.setFormatter( logging.Formatter("%(asctime)s [%(name)s] %(levelname)s %(message)s") ) _stream_handler = logging.StreamHandler(sys.stdout) _stream_handler.setFormatter( logging.Formatter("%(asctime)s [%(name)s] %(levelname)s %(message)s") ) LOG.addHandler(_file_handler) LOG.addHandler(_stream_handler) LOG.setLevel(logging.INFO) # --------------------------------------------------------------------------- # Alerting (ERPNext Issues) # --------------------------------------------------------------------------- def _alert_error(order_id: str, state_code: str, error: str, detail: str = ""): """Create an ERPNext Issue for a formation processing error.""" try: # Import the shared alert module (one level up) sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from alert import alert_account_broken alert_account_broken( monitor="formation-worker", platform=f"SOS-{state_code}", error=f"Formation order {order_id} failed: {error}", detail=detail, ) except Exception as exc: LOG.warning("Failed to send alert for order %s: %s", order_id, exc) # --------------------------------------------------------------------------- # Database helpers # --------------------------------------------------------------------------- def _get_connection(): """Open a PostgreSQL connection from DATABASE_URL.""" if not DATABASE_URL: raise RuntimeError( "DATABASE_URL environment variable is not set. " "Expected format: postgresql://user:pass@host:5432/dbname" ) return psycopg2.connect(DATABASE_URL) def _fetch_pending_orders(conn) -> list[dict]: """Fetch all orders with status='received', oldest first.""" with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( """ SELECT * FROM formation_orders WHERE status = 'received' ORDER BY created_at ASC LIMIT 10 """ ) return [dict(row) for row in cur.fetchall()] def _update_order_status( conn, order_id: str, status: str, *, filing_number: str = "", confirmation_number: str = "", error_message: str = "", screenshots: list[str] | None = None, documents: list[str] | None = None, ein: str = "", ): """Update an order's status and related fields.""" fields = ["status = %s", "updated_at = NOW()"] values: list = [status] if filing_number: fields.append("filing_number = %s") values.append(filing_number) if confirmation_number: fields.append("confirmation_number = %s") values.append(confirmation_number) if error_message: fields.append("error_message = %s") values.append(error_message) if screenshots is not None: fields.append("screenshots = %s") values.append(json.dumps(screenshots)) if documents is not None: fields.append("documents = %s") values.append(json.dumps(documents)) if ein: fields.append("ein = %s") values.append(ein) if status == "filed": fields.append("filed_at = NOW()") values.append(order_id) with conn.cursor() as cur: cur.execute( f"UPDATE formation_orders SET {', '.join(fields)} WHERE order_id = %s", values, ) conn.commit() def _update_automation_status(conn, order_id: str, auto_status: str, *, error: str | None = None): """Update the automation_status and related fields.""" fields = ["automation_status = %s", "last_activity_at = NOW()"] values: list = [auto_status] if error: fields.append("automation_error = %s") values.append(error) values.append(order_id) with conn.cursor() as cur: cur.execute( f"UPDATE formation_orders SET {', '.join(fields)} WHERE id = %s", values, ) conn.commit() def _increment_attempts(conn, order_id: str): """Increment the automation_attempts counter.""" with conn.cursor() as cur: cur.execute( "UPDATE formation_orders SET automation_attempts = automation_attempts + 1 WHERE id = %s", [order_id], ) conn.commit() def _write_audit( conn, order_id, order_number: str, action: str, from_status: str = "", to_status: str = "", actor_type: str = "worker", *, note: str = "", metadata: dict | None = None, ): """Write an entry to the order_audit_log table.""" with conn.cursor() as cur: cur.execute( """INSERT INTO order_audit_log (order_type, order_id, order_number, action, from_status, to_status, actor_type, actor_name, note, metadata) VALUES ('formation', %s, %s, %s, %s, %s, %s, 'formation_worker', %s, %s)""", [order_id, order_number, action, from_status or None, to_status or None, actor_type, note or None, json.dumps(metadata) if metadata else None], ) conn.commit() def _row_to_order(row: dict) -> FormationOrder: """Convert a database row dict to a FormationOrder dataclass.""" members_raw = row.get("members") if isinstance(members_raw, str): members_raw = json.loads(members_raw) elif members_raw is None: members_raw = [] members = [] for m in members_raw: members.append( Member( name=m.get("name", ""), address=m.get("address", ""), city=m.get("city", ""), state=m.get("state", ""), zip_code=m.get("zip_code", ""), title=m.get("title", "Member"), ownership_pct=float(m.get("ownership_pct", 0)), is_organizer=bool(m.get("is_organizer", False)), ) ) entity_type_raw = row.get("entity_type", "llc") try: entity_type = EntityType(entity_type_raw) except ValueError: entity_type = EntityType.LLC return FormationOrder( order_id=str(row["order_id"]), state_code=row.get("state_code", ""), entity_type=entity_type, entity_name=row.get("entity_name", ""), entity_name_alt=row.get("entity_name_alt", ""), management_type=row.get("management_type", "member_managed"), purpose=row.get("purpose", "Any lawful business activity"), members=members, registered_agent_name=row.get("registered_agent_name", "Northwest Registered Agent"), registered_agent_address=row.get("registered_agent_address", ""), principal_address=row.get("principal_address", ""), principal_city=row.get("principal_city", ""), principal_state=row.get("principal_state", ""), principal_zip=row.get("principal_zip", ""), mailing_address=row.get("mailing_address", ""), mailing_city=row.get("mailing_city", ""), mailing_state=row.get("mailing_state", ""), mailing_zip=row.get("mailing_zip", ""), shares_authorized=int(row.get("shares_authorized", 1500)), par_value=float(row.get("par_value", 0.0)), fiscal_year_end=row.get("fiscal_year_end", "12/31"), expedited=bool(row.get("expedited", False)), effective_date=row.get("effective_date", "") or "", ) # --------------------------------------------------------------------------- # Core processing # --------------------------------------------------------------------------- async def process_order(order: FormationOrder, conn) -> FilingResult: """ Process a single formation order: 1. Verify name availability 2. File the entity 3. Return the result """ state_code = order.state_code.upper() LOG.info( "Processing order %s: %s in %s (%s)", order.order_id, order.entity_name, state_code, order.entity_type.value, ) adapter = get_adapter(state_code) try: await adapter.start_browser(headless=True) # Step 1: Verify name availability LOG.info("[%s] Searching name: %s", order.order_id, order.entity_name) name_result: NameSearchResult = await adapter.search_name(order.entity_name) if not name_result.available: # Try alternate name if provided if order.entity_name_alt: LOG.info( "[%s] Primary name unavailable, trying alternate: %s", order.order_id, order.entity_name_alt, ) name_result = await adapter.search_name(order.entity_name_alt) if name_result.available: order.entity_name = order.entity_name_alt else: return FilingResult( success=False, status=FilingStatus.NAME_UNAVAILABLE, state_code=state_code, entity_name=order.entity_name, error_message=( f"Both names unavailable: '{order.entity_name}' " f"and '{order.entity_name_alt}'" ), screenshot_path=await adapter.screenshot("name_unavailable"), ) else: return FilingResult( success=False, status=FilingStatus.NAME_UNAVAILABLE, state_code=state_code, entity_name=order.entity_name, error_message=f"Name unavailable: '{order.entity_name}'", screenshot_path=await adapter.screenshot("name_unavailable"), ) LOG.info("[%s] Name available: %s", order.order_id, order.entity_name) # Step 2: File the entity LOG.info("[%s] Filing entity...", order.order_id) result: FilingResult = await adapter.file_entity(order) LOG.info( "[%s] Filing result: %s (filing_number=%s, confirmation=%s)", order.order_id, result.status.value, result.filing_number, result.confirmation_number, ) return result except Exception as exc: LOG.error( "[%s] Unhandled error processing order: %s", order.order_id, exc, exc_info=True, ) screenshot = "" try: screenshot = await adapter.screenshot("error") except Exception: pass return FilingResult( success=False, status=FilingStatus.ERROR, state_code=state_code, entity_name=order.entity_name, error_message=str(exc), screenshot_path=screenshot, ) finally: await adapter.close_browser() async def poll_and_process(): """Single poll iteration: fetch pending orders and process them.""" conn = _get_connection() try: orders = _fetch_pending_orders(conn) if not orders: return LOG.info("Found %d pending order(s)", len(orders)) for i, row in enumerate(orders): order = _row_to_order(row) # Mark as processing + set automation_status to running _update_order_status(conn, order.order_id, "processing") _update_automation_status(conn, order.order_id, "running") _write_audit(conn, order.order_id, row.get("order_number", ""), "status_change", "received", "processing", "worker", note="Automation started") # Process the order result = await process_order(order, conn) # Map result to DB status if result.status == FilingStatus.FILED: db_status = "filed" auto_status = "succeeded" elif result.status == FilingStatus.SUBMITTED: db_status = "submitted" auto_status = "running" elif result.status == FilingStatus.NAME_UNAVAILABLE: db_status = "received" # Keep in queue for admin review auto_status = "failed" else: db_status = "received" # Keep in queue for manual intervention auto_status = "failed" # Update the order screenshots = [result.screenshot_path] if result.screenshot_path else [] _update_order_status( conn, order.order_id, db_status, filing_number=result.filing_number, confirmation_number=result.confirmation_number, error_message=result.error_message, screenshots=screenshots, documents=result.documents, ) _update_automation_status( conn, order.order_id, auto_status, error=result.error_message if auto_status == "failed" else None, ) _write_audit( conn, order.order_id, row.get("order_number", ""), "automation_update" if auto_status != "succeeded" else "status_change", "processing", db_status, "worker", note=result.error_message if auto_status == "failed" else f"Filed: {result.filing_number}" if result.filing_number else f"Status: {db_status}", metadata={"filing_number": result.filing_number, "confirmation": result.confirmation_number, "screenshot": result.screenshot_path} if result.filing_number else None, ) if auto_status == "failed": # Increment attempt counter and alert _increment_attempts(conn, order.order_id) _alert_error( order.order_id, order.state_code, result.error_message, detail=f"Entity: {order.entity_name}\nState: {order.state_code}\n" f"Filing number: {result.filing_number}\n" f"Screenshot: {result.screenshot_path}\n" f"Status set to: {auto_status}\n" f"Order returned to queue for manual intervention.", ) # Human-paced delay between orders (skip after last order) if i < len(orders) - 1: delay_minutes = random.uniform(DELAY_MIN_MINUTES, DELAY_MAX_MINUTES) delay_seconds = delay_minutes * 60 LOG.info( "Waiting %.1f minutes before next order (human-paced delay)...", delay_minutes, ) await asyncio.sleep(delay_seconds) finally: conn.close() # --------------------------------------------------------------------------- # Main loop with single-instance locking # --------------------------------------------------------------------------- _shutdown = False def _handle_signal(signum, frame): global _shutdown LOG.info("Received signal %d, shutting down gracefully...", signum) _shutdown = True async def run_worker(): """Main worker loop: poll for orders, process them, sleep, repeat.""" LOG.info("=" * 60) LOG.info("Formation worker starting") LOG.info(" Poll interval: %ds", POLL_INTERVAL_SECONDS) LOG.info(" Delay range: %d–%d minutes", DELAY_MIN_MINUTES, DELAY_MAX_MINUTES) LOG.info(" Log file: %s", LOG_FILE) LOG.info("=" * 60) while not _shutdown: try: await poll_and_process() except Exception as exc: LOG.error("Poll cycle failed: %s", exc, exc_info=True) _alert_error("N/A", "N/A", f"Poll cycle failed: {exc}", traceback.format_exc()) # Sleep in short increments so we can respond to shutdown signals for _ in range(POLL_INTERVAL_SECONDS): if _shutdown: break await asyncio.sleep(1) LOG.info("Formation worker stopped.") def main(): """Entry point with single-instance locking.""" if not DATABASE_URL: print( "Error: DATABASE_URL environment variable is not set.", file=sys.stderr, ) sys.exit(1) # Acquire single-instance lock lock_fd = open(LOCK_FILE, "w") try: fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: print( "Error: Another formation worker is already running (lock held).", file=sys.stderr, ) sys.exit(1) # Write PID to lock file lock_fd.write(str(os.getpid())) lock_fd.flush() # Register signal handlers signal.signal(signal.SIGINT, _handle_signal) signal.signal(signal.SIGTERM, _handle_signal) try: asyncio.run(run_worker()) finally: fcntl.flock(lock_fd, fcntl.LOCK_UN) lock_fd.close() try: os.unlink(LOCK_FILE) except OSError: pass if __name__ == "__main__": main()