new-site/scripts/workers/commission_worker.py
justin b375385efd fix(email): add text/plain part to every transactional + telecom email
All transactional/worker senders built multipart/alternative (or mixed)
messages with ONLY an HTML part. A single-part multipart/alternative is
malformed and HTML-only mail is a spam-score signal -- the same class of
deliverability bug that hurt the campaign pipeline, but on the telecom /
filing / customer-transactional path (499-Q reminders, RMD/FCC filing
review links, intake/completion/delivery emails, commissions, etc).

- worker_email.send_worker_email: auto-derive plaintext from HTML when
  caller omits text= (fixes the shared helper for all current+future use)
- 16 rolled-their-own senders in scripts/workers/** + scripts/formation/
  document_delivery.py: attach html_to_text(...) plaintext sibling before
  the HTML part (job_server + document_delivery wrap text+html in an
  alternative sub-part so PDFs still attach to the mixed root)
- api/src/email.ts: add dependency-free htmlToText() and default
  sendEmail text to it (fixes checkout/webhook HTML-only sends)

Verified: all py files compile + import at runtime, api tsc passes,
htmlToText handles hrefs/lists/entities, 11 plaintext unit tests pass.
Telecom campaign 407 (Jun 8) was HTML-only + sent in the DKIM-broken
window -> 384 sent / 0 clicks (same junked-mail signature).
2026-06-17 21:07:40 -05:00

350 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Commission eligibility worker.
Daily cron job that transitions pending commissions to eligible
once the linked order has been delivered for at least 14 days.
Usage:
python -m scripts.workers.commission_worker # one-shot run
CRON: 0 6 * * * python -m scripts.workers.commission_worker
Environment variables:
DATABASE_URL PostgreSQL connection string
SMTP_HOST outbound mail server
SMTP_PORT mail port (default 587)
SMTP_USER SMTP login
SMTP_PASS SMTP password
"""
from __future__ import annotations
import logging
import os
import smtplib
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from scripts._email_plaintext import html_to_text
from typing import Any
import psycopg2
import psycopg2.extras
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DATABASE_URL = os.environ.get("DATABASE_URL", "")
SMTP_HOST = os.environ.get("SMTP_HOST", "co.carrierone.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "587"))
SMTP_USER = os.environ.get("SMTP_USER", "")
SMTP_PASS = os.environ.get("SMTP_PASS", "")
ADMIN_EMAIL = "ops@performancewest.net"
FROM_EMAIL = os.environ.get("FROM_EMAIL", "commissions@performancewest.net")
HOLD_DAYS = 14 # days after delivery before commission becomes eligible
log = logging.getLogger("commission_worker")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
stream=sys.stdout,
)
# ---------------------------------------------------------------------------
# Order-type → table / query mapping
# ---------------------------------------------------------------------------
ORDER_QUERIES: dict[str, str] = {
"canada_crtc": """
SELECT order_number, status, delivered_at
FROM canada_crtc_orders
WHERE order_number = %s AND status = 'delivered'
""",
"formation": """
SELECT order_number, status, delivered_at
FROM formation_orders
WHERE order_number = %s AND status = 'delivered'
""",
"service": """
SELECT order_number, status, delivered_at
FROM orders
WHERE order_number = %s AND status = 'delivered'
""",
"bundle": """
SELECT order_number, status, delivered_at
FROM orders
WHERE order_number = %s AND status = 'delivered'
""",
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _check_order_delivered(
cur: Any,
order_type: str,
order_number: str,
) -> datetime | None:
"""Return delivered_at timestamp if order is delivered, else None."""
query = ORDER_QUERIES.get(order_type)
if query is None:
log.warning("Unknown order_type=%s for order=%s", order_type, order_number)
return None
cur.execute(query, (order_number,))
row = cur.fetchone()
if row is None:
return None
return row.get("delivered_at") or row.get("delivered_date")
def _send_email(to: str, subject: str, body_html: str) -> None:
"""Send a single email via SMTP."""
if not SMTP_USER or not SMTP_PASS:
log.warning("SMTP credentials not configured skipping email to %s", to)
return
msg = MIMEMultipart("alternative")
msg["From"] = FROM_EMAIL
msg["To"] = to
msg["Subject"] = subject
msg.attach(MIMEText(html_to_text(body_html), "plain"))
msg.attach(MIMEText(body_html, "html"))
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as server:
server.ehlo()
server.starttls()
server.ehlo()
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(FROM_EMAIL, [to], msg.as_string())
log.info("Email sent to %s: %s", to, subject)
except Exception:
log.exception("Failed to send email to %s", to)
def _cents_to_dollars(cents: int) -> str:
"""Format cents as $X,XXX.XX."""
return f"${cents / 100:,.2f}"
# ---------------------------------------------------------------------------
# Core logic
# ---------------------------------------------------------------------------
def process_pending_commissions() -> dict[str, Any]:
"""
Scan all pending commissions and promote to eligible where the linked
order has been delivered for at least HOLD_DAYS.
Returns summary dict.
"""
if not DATABASE_URL:
log.error("DATABASE_URL is not set aborting")
return {"error": "DATABASE_URL not set"}
now = datetime.now(timezone.utc)
cutoff = now - timedelta(days=HOLD_DAYS)
conn = psycopg2.connect(DATABASE_URL)
conn.autocommit = False
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
newly_eligible: list[dict[str, Any]] = []
skipped = 0
errors = 0
try:
# ----- Fetch all pending commissions -----
cur.execute("""
SELECT
cl.id,
cl.agent_code,
cl.order_type,
cl.order_number,
cl.commission_cents,
cl.customer_name,
sa.email AS agent_email,
sa.name AS agent_name
FROM commission_ledger cl
JOIN sales_agents sa ON sa.agent_code = cl.agent_code
WHERE cl.status = 'pending'
ORDER BY cl.created_at
""")
pending_rows = cur.fetchall()
log.info("Found %d pending commission(s) to evaluate", len(pending_rows))
for row in pending_rows:
try:
delivered_at = _check_order_delivered(
cur, row["order_type"], row["order_number"]
)
if delivered_at is None:
skipped += 1
continue
# Ensure delivered_at is tz-aware
if delivered_at.tzinfo is None:
delivered_at = delivered_at.replace(tzinfo=timezone.utc)
if delivered_at > cutoff:
# Not yet past the hold period
skipped += 1
continue
# ----- Promote to eligible -----
cur.execute(
"""
UPDATE commission_ledger
SET status = 'eligible',
eligible_at = %s,
order_delivered_at = %s,
updated_at = %s
WHERE id = %s
""",
(now, delivered_at, now, row["id"]),
)
newly_eligible.append(
{
"id": row["id"],
"agent_code": row["agent_code"],
"agent_name": row["agent_name"],
"agent_email": row["agent_email"],
"order_type": row["order_type"],
"order_number": row["order_number"],
"commission_cents": row["commission_cents"],
"customer_name": row["customer_name"],
"delivered_at": delivered_at,
}
)
log.info(
"Commission %s (order %s) → eligible [delivered %s]",
row["id"],
row["order_number"],
delivered_at.isoformat(),
)
except Exception:
errors += 1
log.exception(
"Error processing commission id=%s order=%s",
row["id"],
row["order_number"],
)
conn.commit()
except Exception:
conn.rollback()
log.exception("Fatal error during commission processing")
raise
finally:
cur.close()
conn.close()
# ----- Notifications -----
if newly_eligible:
_notify_admin(newly_eligible)
_notify_agents(newly_eligible)
summary = {
"pending_evaluated": len(pending_rows) if "pending_rows" in dir() else 0,
"newly_eligible": len(newly_eligible),
"skipped": skipped,
"errors": errors,
"total_eligible_cents": sum(e["commission_cents"] for e in newly_eligible),
}
log.info("Run complete: %s", summary)
return summary
# ---------------------------------------------------------------------------
# Notification helpers
# ---------------------------------------------------------------------------
def _notify_admin(eligible: list[dict[str, Any]]) -> None:
"""Send a summary email to the admin."""
count = len(eligible)
total_cents = sum(e["commission_cents"] for e in eligible)
rows_html = ""
for e in eligible:
rows_html += (
f"<tr>"
f"<td>{e['order_number']}</td>"
f"<td>{e['agent_code']}</td>"
f"<td>{e['agent_name']}</td>"
f"<td>{_cents_to_dollars(e['commission_cents'])}</td>"
f"<td>{e['delivered_at'].strftime('%Y-%m-%d')}</td>"
f"</tr>"
)
body = f"""
<h2>Commission Eligibility Report</h2>
<p><strong>{count}</strong> commission(s) totalling
<strong>{_cents_to_dollars(total_cents)}</strong>
are now eligible for payout.</p>
<table border="1" cellpadding="6" cellspacing="0"
style="border-collapse:collapse; font-family:monospace;">
<tr>
<th>Order</th><th>Agent</th><th>Name</th>
<th>Commission</th><th>Delivered</th>
</tr>
{rows_html}
</table>
<p style="color:#888; font-size:12px;">
Approve these in ERPNext → Commission Ledger, or reply to this email.
</p>
"""
_send_email(
ADMIN_EMAIL,
f"{count} commissions ({_cents_to_dollars(total_cents)}) now eligible for payout",
body,
)
def _notify_agents(eligible: list[dict[str, Any]]) -> None:
"""Send per-agent emails for each newly-eligible commission."""
by_agent: dict[str, list[dict[str, Any]]] = defaultdict(list)
for e in eligible:
by_agent[e["agent_email"]].append(e)
for agent_email, commissions in by_agent.items():
for c in commissions:
body = f"""
<p>Hi {c['agent_name']},</p>
<p>Your <strong>{_cents_to_dollars(c['commission_cents'])}</strong>
commission for order <strong>{c['order_number']}</strong>
is now eligible.</p>
<p>We'll process your payout shortly.</p>
<p style="color:#888; font-size:12px;">
— Performance West Commissions
</p>
"""
_send_email(
agent_email,
f"Your {_cents_to_dollars(c['commission_cents'])} commission "
f"for order {c['order_number']} is now eligible",
body,
)
# ---------------------------------------------------------------------------
# Entrypoint
# ---------------------------------------------------------------------------
if __name__ == "__main__":
log.info("Commission worker starting (manual run)")
result = process_pending_commissions()
log.info("Done. Summary: %s", result)