Includes: API (Express/TypeScript), Astro site, Python workers, document generators, FCC compliance tools, Canada CRTC formation, Ansible infrastructure, and deployment scripts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
347 lines
11 KiB
Python
347 lines
11 KiB
Python
"""
|
||
Commission eligibility worker.
|
||
|
||
Daily cron job that transitions pending commissions to eligible
|
||
once the linked order has been delivered for at least 14 days.
|
||
|
||
Usage:
|
||
python -m scripts.workers.commission_worker # one-shot run
|
||
CRON: 0 6 * * * python -m scripts.workers.commission_worker
|
||
|
||
Environment variables:
|
||
DATABASE_URL – PostgreSQL connection string
|
||
SMTP_HOST – outbound mail server
|
||
SMTP_PORT – mail port (default 587)
|
||
SMTP_USER – SMTP login
|
||
SMTP_PASS – SMTP password
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
import smtplib
|
||
import sys
|
||
from collections import defaultdict
|
||
from datetime import datetime, timedelta, timezone
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.text import MIMEText
|
||
from typing import Any
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Config
|
||
# ---------------------------------------------------------------------------
|
||
|
||
DATABASE_URL = os.environ.get("DATABASE_URL", "")
|
||
SMTP_HOST = os.environ.get("SMTP_HOST", "co.carrierone.com")
|
||
SMTP_PORT = int(os.environ.get("SMTP_PORT", "587"))
|
||
SMTP_USER = os.environ.get("SMTP_USER", "")
|
||
SMTP_PASS = os.environ.get("SMTP_PASS", "")
|
||
ADMIN_EMAIL = "ops@performancewest.net"
|
||
FROM_EMAIL = os.environ.get("FROM_EMAIL", "commissions@performancewest.net")
|
||
|
||
HOLD_DAYS = 14 # days after delivery before commission becomes eligible
|
||
|
||
log = logging.getLogger("commission_worker")
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
||
stream=sys.stdout,
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Order-type → table / query mapping
|
||
# ---------------------------------------------------------------------------
|
||
|
||
ORDER_QUERIES: dict[str, str] = {
|
||
"canada_crtc": """
|
||
SELECT order_number, status, delivered_at
|
||
FROM canada_crtc_orders
|
||
WHERE order_number = %s AND status = 'delivered'
|
||
""",
|
||
"formation": """
|
||
SELECT order_number, status, delivered_at
|
||
FROM formation_orders
|
||
WHERE order_number = %s AND status = 'delivered'
|
||
""",
|
||
"service": """
|
||
SELECT order_number, status, delivered_at
|
||
FROM orders
|
||
WHERE order_number = %s AND status = 'delivered'
|
||
""",
|
||
"bundle": """
|
||
SELECT order_number, status, delivered_at
|
||
FROM orders
|
||
WHERE order_number = %s AND status = 'delivered'
|
||
""",
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _check_order_delivered(
|
||
cur: Any,
|
||
order_type: str,
|
||
order_number: str,
|
||
) -> datetime | None:
|
||
"""Return delivered_at timestamp if order is delivered, else None."""
|
||
query = ORDER_QUERIES.get(order_type)
|
||
if query is None:
|
||
log.warning("Unknown order_type=%s for order=%s", order_type, order_number)
|
||
return None
|
||
|
||
cur.execute(query, (order_number,))
|
||
row = cur.fetchone()
|
||
if row is None:
|
||
return None
|
||
|
||
return row.get("delivered_at") or row.get("delivered_date")
|
||
|
||
|
||
def _send_email(to: str, subject: str, body_html: str) -> None:
|
||
"""Send a single email via SMTP."""
|
||
if not SMTP_USER or not SMTP_PASS:
|
||
log.warning("SMTP credentials not configured – skipping email to %s", to)
|
||
return
|
||
|
||
msg = MIMEMultipart("alternative")
|
||
msg["From"] = FROM_EMAIL
|
||
msg["To"] = to
|
||
msg["Subject"] = subject
|
||
msg.attach(MIMEText(body_html, "html"))
|
||
|
||
try:
|
||
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as server:
|
||
server.ehlo()
|
||
server.starttls()
|
||
server.ehlo()
|
||
server.login(SMTP_USER, SMTP_PASS)
|
||
server.sendmail(FROM_EMAIL, [to], msg.as_string())
|
||
log.info("Email sent to %s: %s", to, subject)
|
||
except Exception:
|
||
log.exception("Failed to send email to %s", to)
|
||
|
||
|
||
def _cents_to_dollars(cents: int) -> str:
|
||
"""Format cents as $X,XXX.XX."""
|
||
return f"${cents / 100:,.2f}"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Core logic
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def process_pending_commissions() -> dict[str, Any]:
|
||
"""
|
||
Scan all pending commissions and promote to eligible where the linked
|
||
order has been delivered for at least HOLD_DAYS.
|
||
|
||
Returns summary dict.
|
||
"""
|
||
if not DATABASE_URL:
|
||
log.error("DATABASE_URL is not set – aborting")
|
||
return {"error": "DATABASE_URL not set"}
|
||
|
||
now = datetime.now(timezone.utc)
|
||
cutoff = now - timedelta(days=HOLD_DAYS)
|
||
|
||
conn = psycopg2.connect(DATABASE_URL)
|
||
conn.autocommit = False
|
||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||
|
||
newly_eligible: list[dict[str, Any]] = []
|
||
skipped = 0
|
||
errors = 0
|
||
|
||
try:
|
||
# ----- Fetch all pending commissions -----
|
||
cur.execute("""
|
||
SELECT
|
||
cl.id,
|
||
cl.agent_code,
|
||
cl.order_type,
|
||
cl.order_number,
|
||
cl.commission_cents,
|
||
cl.customer_name,
|
||
sa.email AS agent_email,
|
||
sa.name AS agent_name
|
||
FROM commission_ledger cl
|
||
JOIN sales_agents sa ON sa.agent_code = cl.agent_code
|
||
WHERE cl.status = 'pending'
|
||
ORDER BY cl.created_at
|
||
""")
|
||
pending_rows = cur.fetchall()
|
||
log.info("Found %d pending commission(s) to evaluate", len(pending_rows))
|
||
|
||
for row in pending_rows:
|
||
try:
|
||
delivered_at = _check_order_delivered(
|
||
cur, row["order_type"], row["order_number"]
|
||
)
|
||
|
||
if delivered_at is None:
|
||
skipped += 1
|
||
continue
|
||
|
||
# Ensure delivered_at is tz-aware
|
||
if delivered_at.tzinfo is None:
|
||
delivered_at = delivered_at.replace(tzinfo=timezone.utc)
|
||
|
||
if delivered_at > cutoff:
|
||
# Not yet past the hold period
|
||
skipped += 1
|
||
continue
|
||
|
||
# ----- Promote to eligible -----
|
||
cur.execute(
|
||
"""
|
||
UPDATE commission_ledger
|
||
SET status = 'eligible',
|
||
eligible_at = %s,
|
||
order_delivered_at = %s,
|
||
updated_at = %s
|
||
WHERE id = %s
|
||
""",
|
||
(now, delivered_at, now, row["id"]),
|
||
)
|
||
|
||
newly_eligible.append(
|
||
{
|
||
"id": row["id"],
|
||
"agent_code": row["agent_code"],
|
||
"agent_name": row["agent_name"],
|
||
"agent_email": row["agent_email"],
|
||
"order_type": row["order_type"],
|
||
"order_number": row["order_number"],
|
||
"commission_cents": row["commission_cents"],
|
||
"customer_name": row["customer_name"],
|
||
"delivered_at": delivered_at,
|
||
}
|
||
)
|
||
log.info(
|
||
"Commission %s (order %s) → eligible [delivered %s]",
|
||
row["id"],
|
||
row["order_number"],
|
||
delivered_at.isoformat(),
|
||
)
|
||
|
||
except Exception:
|
||
errors += 1
|
||
log.exception(
|
||
"Error processing commission id=%s order=%s",
|
||
row["id"],
|
||
row["order_number"],
|
||
)
|
||
|
||
conn.commit()
|
||
except Exception:
|
||
conn.rollback()
|
||
log.exception("Fatal error during commission processing")
|
||
raise
|
||
finally:
|
||
cur.close()
|
||
conn.close()
|
||
|
||
# ----- Notifications -----
|
||
if newly_eligible:
|
||
_notify_admin(newly_eligible)
|
||
_notify_agents(newly_eligible)
|
||
|
||
summary = {
|
||
"pending_evaluated": len(pending_rows) if "pending_rows" in dir() else 0,
|
||
"newly_eligible": len(newly_eligible),
|
||
"skipped": skipped,
|
||
"errors": errors,
|
||
"total_eligible_cents": sum(e["commission_cents"] for e in newly_eligible),
|
||
}
|
||
log.info("Run complete: %s", summary)
|
||
return summary
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Notification helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _notify_admin(eligible: list[dict[str, Any]]) -> None:
|
||
"""Send a summary email to the admin."""
|
||
count = len(eligible)
|
||
total_cents = sum(e["commission_cents"] for e in eligible)
|
||
|
||
rows_html = ""
|
||
for e in eligible:
|
||
rows_html += (
|
||
f"<tr>"
|
||
f"<td>{e['order_number']}</td>"
|
||
f"<td>{e['agent_code']}</td>"
|
||
f"<td>{e['agent_name']}</td>"
|
||
f"<td>{_cents_to_dollars(e['commission_cents'])}</td>"
|
||
f"<td>{e['delivered_at'].strftime('%Y-%m-%d')}</td>"
|
||
f"</tr>"
|
||
)
|
||
|
||
body = f"""
|
||
<h2>Commission Eligibility Report</h2>
|
||
<p><strong>{count}</strong> commission(s) totalling
|
||
<strong>{_cents_to_dollars(total_cents)}</strong>
|
||
are now eligible for payout.</p>
|
||
<table border="1" cellpadding="6" cellspacing="0"
|
||
style="border-collapse:collapse; font-family:monospace;">
|
||
<tr>
|
||
<th>Order</th><th>Agent</th><th>Name</th>
|
||
<th>Commission</th><th>Delivered</th>
|
||
</tr>
|
||
{rows_html}
|
||
</table>
|
||
<p style="color:#888; font-size:12px;">
|
||
Approve these in ERPNext → Commission Ledger, or reply to this email.
|
||
</p>
|
||
"""
|
||
_send_email(
|
||
ADMIN_EMAIL,
|
||
f"{count} commissions ({_cents_to_dollars(total_cents)}) now eligible for payout",
|
||
body,
|
||
)
|
||
|
||
|
||
def _notify_agents(eligible: list[dict[str, Any]]) -> None:
|
||
"""Send per-agent emails for each newly-eligible commission."""
|
||
by_agent: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
||
for e in eligible:
|
||
by_agent[e["agent_email"]].append(e)
|
||
|
||
for agent_email, commissions in by_agent.items():
|
||
for c in commissions:
|
||
body = f"""
|
||
<p>Hi {c['agent_name']},</p>
|
||
<p>Your <strong>{_cents_to_dollars(c['commission_cents'])}</strong>
|
||
commission for order <strong>{c['order_number']}</strong>
|
||
is now eligible.</p>
|
||
<p>We'll process your payout shortly.</p>
|
||
<p style="color:#888; font-size:12px;">
|
||
— Performance West Commissions
|
||
</p>
|
||
"""
|
||
_send_email(
|
||
agent_email,
|
||
f"Your {_cents_to_dollars(c['commission_cents'])} commission "
|
||
f"for order {c['order_number']} is now eligible",
|
||
body,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Entrypoint
|
||
# ---------------------------------------------------------------------------
|
||
|
||
if __name__ == "__main__":
|
||
log.info("Commission worker starting (manual run)")
|
||
result = process_pending_commissions()
|
||
log.info("Done. Summary: %s", result)
|