new-site/scripts/workers/commission_worker.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

347 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Commission eligibility worker.
Daily cron job that transitions pending commissions to eligible
once the linked order has been delivered for at least 14 days.
Usage:
python -m scripts.workers.commission_worker # one-shot run
CRON: 0 6 * * * python -m scripts.workers.commission_worker
Environment variables:
DATABASE_URL PostgreSQL connection string
SMTP_HOST outbound mail server
SMTP_PORT mail port (default 587)
SMTP_USER SMTP login
SMTP_PASS SMTP password
"""
from __future__ import annotations
import logging
import os
import smtplib
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Any
import psycopg2
import psycopg2.extras
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DATABASE_URL = os.environ.get("DATABASE_URL", "")
SMTP_HOST = os.environ.get("SMTP_HOST", "co.carrierone.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "587"))
SMTP_USER = os.environ.get("SMTP_USER", "")
SMTP_PASS = os.environ.get("SMTP_PASS", "")
ADMIN_EMAIL = "ops@performancewest.net"
FROM_EMAIL = os.environ.get("FROM_EMAIL", "commissions@performancewest.net")
HOLD_DAYS = 14 # days after delivery before commission becomes eligible
log = logging.getLogger("commission_worker")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
# ---------------------------------------------------------------------------
# Order-type → table / query mapping
# ---------------------------------------------------------------------------
ORDER_QUERIES: dict[str, str] = {
"canada_crtc": """
SELECT order_number, status, delivered_at
FROM canada_crtc_orders
WHERE order_number = %s AND status = 'delivered'
""",
"formation": """
SELECT order_number, status, delivered_at
FROM formation_orders
WHERE order_number = %s AND status = 'delivered'
""",
"service": """
SELECT order_number, status, delivered_at
FROM orders
WHERE order_number = %s AND status = 'delivered'
""",
"bundle": """
SELECT order_number, status, delivered_at
FROM orders
WHERE order_number = %s AND status = 'delivered'
""",
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _check_order_delivered(
cur: Any,
order_type: str,
order_number: str,
) -> datetime | None:
"""Return delivered_at timestamp if order is delivered, else None."""
query = ORDER_QUERIES.get(order_type)
if query is None:
log.warning("Unknown order_type=%s for order=%s", order_type, order_number)
return None
cur.execute(query, (order_number,))
row = cur.fetchone()
if row is None:
return None
return row.get("delivered_at") or row.get("delivered_date")
def _send_email(to: str, subject: str, body_html: str) -> None:
"""Send a single email via SMTP."""
if not SMTP_USER or not SMTP_PASS:
log.warning("SMTP credentials not configured skipping email to %s", to)
return
msg = MIMEMultipart("alternative")
msg["From"] = FROM_EMAIL
msg["To"] = to
msg["Subject"] = subject
msg.attach(MIMEText(body_html, "html"))
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as server:
server.ehlo()
server.starttls()
server.ehlo()
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(FROM_EMAIL, [to], msg.as_string())
log.info("Email sent to %s: %s", to, subject)
except Exception:
log.exception("Failed to send email to %s", to)
def _cents_to_dollars(cents: int) -> str:
"""Format cents as $X,XXX.XX."""
return f"${cents / 100:,.2f}"
# ---------------------------------------------------------------------------
# Core logic
# ---------------------------------------------------------------------------
def process_pending_commissions() -> dict[str, Any]:
"""
Scan all pending commissions and promote to eligible where the linked
order has been delivered for at least HOLD_DAYS.
Returns summary dict.
"""
if not DATABASE_URL:
log.error("DATABASE_URL is not set aborting")
return {"error": "DATABASE_URL not set"}
now = datetime.now(timezone.utc)
cutoff = now - timedelta(days=HOLD_DAYS)
conn = psycopg2.connect(DATABASE_URL)
conn.autocommit = False
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
newly_eligible: list[dict[str, Any]] = []
skipped = 0
errors = 0
try:
# ----- Fetch all pending commissions -----
cur.execute("""
SELECT
cl.id,
cl.agent_code,
cl.order_type,
cl.order_number,
cl.commission_cents,
cl.customer_name,
sa.email AS agent_email,
sa.name AS agent_name
FROM commission_ledger cl
JOIN sales_agents sa ON sa.agent_code = cl.agent_code
WHERE cl.status = 'pending'
ORDER BY cl.created_at
""")
pending_rows = cur.fetchall()
log.info("Found %d pending commission(s) to evaluate", len(pending_rows))
for row in pending_rows:
try:
delivered_at = _check_order_delivered(
cur, row["order_type"], row["order_number"]
)
if delivered_at is None:
skipped += 1
continue
# Ensure delivered_at is tz-aware
if delivered_at.tzinfo is None:
delivered_at = delivered_at.replace(tzinfo=timezone.utc)
if delivered_at > cutoff:
# Not yet past the hold period
skipped += 1
continue
# ----- Promote to eligible -----
cur.execute(
"""
UPDATE commission_ledger
SET status = 'eligible',
eligible_at = %s,
order_delivered_at = %s,
updated_at = %s
WHERE id = %s
""",
(now, delivered_at, now, row["id"]),
)
newly_eligible.append(
{
"id": row["id"],
"agent_code": row["agent_code"],
"agent_name": row["agent_name"],
"agent_email": row["agent_email"],
"order_type": row["order_type"],
"order_number": row["order_number"],
"commission_cents": row["commission_cents"],
"customer_name": row["customer_name"],
"delivered_at": delivered_at,
}
)
log.info(
"Commission %s (order %s) → eligible [delivered %s]",
row["id"],
row["order_number"],
delivered_at.isoformat(),
)
except Exception:
errors += 1
log.exception(
"Error processing commission id=%s order=%s",
row["id"],
row["order_number"],
)
conn.commit()
except Exception:
conn.rollback()
log.exception("Fatal error during commission processing")
raise
finally:
cur.close()
conn.close()
# ----- Notifications -----
if newly_eligible:
_notify_admin(newly_eligible)
_notify_agents(newly_eligible)
summary = {
"pending_evaluated": len(pending_rows) if "pending_rows" in dir() else 0,
"newly_eligible": len(newly_eligible),
"skipped": skipped,
"errors": errors,
"total_eligible_cents": sum(e["commission_cents"] for e in newly_eligible),
}
log.info("Run complete: %s", summary)
return summary
# ---------------------------------------------------------------------------
# Notification helpers
# ---------------------------------------------------------------------------
def _notify_admin(eligible: list[dict[str, Any]]) -> None:
"""Send a summary email to the admin."""
count = len(eligible)
total_cents = sum(e["commission_cents"] for e in eligible)
rows_html = ""
for e in eligible:
rows_html += (
f"<tr>"
f"<td>{e['order_number']}</td>"
f"<td>{e['agent_code']}</td>"
f"<td>{e['agent_name']}</td>"
f"<td>{_cents_to_dollars(e['commission_cents'])}</td>"
f"<td>{e['delivered_at'].strftime('%Y-%m-%d')}</td>"
f"</tr>"
)
body = f"""
<h2>Commission Eligibility Report</h2>
<p><strong>{count}</strong> commission(s) totalling
<strong>{_cents_to_dollars(total_cents)}</strong>
are now eligible for payout.</p>
<table border="1" cellpadding="6" cellspacing="0"
style="border-collapse:collapse; font-family:monospace;">
<tr>
<th>Order</th><th>Agent</th><th>Name</th>
<th>Commission</th><th>Delivered</th>
</tr>
{rows_html}
</table>
<p style="color:#888; font-size:12px;">
Approve these in ERPNext → Commission Ledger, or reply to this email.
</p>
"""
_send_email(
ADMIN_EMAIL,
f"{count} commissions ({_cents_to_dollars(total_cents)}) now eligible for payout",
body,
)
def _notify_agents(eligible: list[dict[str, Any]]) -> None:
"""Send per-agent emails for each newly-eligible commission."""
by_agent: dict[str, list[dict[str, Any]]] = defaultdict(list)
for e in eligible:
by_agent[e["agent_email"]].append(e)
for agent_email, commissions in by_agent.items():
for c in commissions:
body = f"""
<p>Hi {c['agent_name']},</p>
<p>Your <strong>{_cents_to_dollars(c['commission_cents'])}</strong>
commission for order <strong>{c['order_number']}</strong>
is now eligible.</p>
<p>We'll process your payout shortly.</p>
<p style="color:#888; font-size:12px;">
— Performance West Commissions
</p>
"""
_send_email(
agent_email,
f"Your {_cents_to_dollars(c['commission_cents'])} commission "
f"for order {c['order_number']} is now eligible",
body,
)
# ---------------------------------------------------------------------------
# Entrypoint
# ---------------------------------------------------------------------------
if __name__ == "__main__":
log.info("Commission worker starting (manual run)")
result = process_pending_commissions()
log.info("Done. Summary: %s", result)