feat: daily intake-reminder worker for paid orders with incomplete intake

Adds a systemd-timed worker that nudges customers who paid but never completed
their intake form (which stalls fulfillment).

- migration 087: intake_reminder_count + intake_reminder_last_at on
  compliance_orders (makes the daily run idempotent and bounded), plus a
  partial index for the paid-order eligibility scan.
- scripts/workers/intake_reminder.py: each run emails any paid order with
  intake_data_validated != TRUE, capped at 10 reminders/order, at most one
  consolidated email per customer per day (groups a customer's incomplete
  services into one email). Reuses the post-payment intake URL format
  (/order/{slug}?order={n}) and the API's email validation, skipping
  placeholder/invalid addresses (synthetic@, pipeline.com, etc.). Sends via
  smtplib with SMTP_PASS (verified working in the worker container).
- worker-crons: pw-intake-reminder timer, daily ~noon ET (16:00 UTC).
This commit is contained in:
justin 2026-06-03 00:20:37 -05:00
parent 00c960f5b5
commit 6d4c323ab6
3 changed files with 295 additions and 0 deletions

View file

@ -0,0 +1,26 @@
-- 087: Daily intake-reminder tracking on compliance_orders.
--
-- After a customer pays, we email them an intake form link for each service so
-- we can collect the data needed to prepare the filing. Some customers never
-- complete intake, which stalls fulfillment. The intake-reminder worker
-- (scripts/workers/intake_reminder.py) runs daily at noon ET and nudges any
-- PAID order whose intake is still incomplete (intake_data_validated IS NOT
-- TRUE), up to a cap of 10 reminders, skipping placeholder/invalid emails.
--
-- These two columns make the daily run idempotent and bounded:
-- intake_reminder_count -- how many reminders we've sent (cap: 10)
-- intake_reminder_last_at -- when we last reminded (so we send at most 1/day)
--
-- Both default to a no-reminders-yet state and are NULL/0 for every existing
-- row, so the worker treats all currently-incomplete paid orders as eligible.
ALTER TABLE compliance_orders
ADD COLUMN IF NOT EXISTS intake_reminder_count integer NOT NULL DEFAULT 0;
ALTER TABLE compliance_orders
ADD COLUMN IF NOT EXISTS intake_reminder_last_at timestamptz;
-- Speeds up the daily eligibility scan (paid + incomplete intake).
CREATE INDEX IF NOT EXISTS idx_compliance_orders_intake_reminder
ON compliance_orders (payment_status, intake_data_validated, intake_reminder_count)
WHERE payment_status = 'paid';

View file

@ -208,3 +208,14 @@ worker_crons:
module: scripts.build_trucking_campaigns
on_calendar: "*-*-* 08:00:00 UTC"
persistent: true
# Intake reminders — daily at noon ET (16:00 UTC EST / 17:00 UTC EDT; we use
# 16:00 UTC so winter is exactly noon and summer is 11am — close enough for a
# daily nudge). Emails any PAID compliance order whose intake is still
# incomplete, up to 10 times, one consolidated email per customer per day,
# skipping placeholder/invalid addresses.
- name: pw-intake-reminder
description: Remind paid customers to complete their intake form
module: scripts.workers.intake_reminder
on_calendar: "*-*-* 16:00:00 UTC"
persistent: true

View file

@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""Daily intake-reminder worker.
After a customer pays for a compliance service we email them an intake form
link so we can collect the information needed to prepare the filing. Some
customers never complete intake, which stalls fulfillment. This worker runs
once a day (noon ET) and nudges any PAID order whose intake is still
incomplete, up to a per-order cap.
Eligibility (a row is reminded when ALL hold):
- payment_status = 'paid'
- intake_data_validated IS NOT TRUE (intake not yet completed)
- intake_reminder_count < MAX_REMINDERS (default 10)
- intake_reminder_last_at IS NULL OR < today (at most one reminder/day)
- customer_email is a real, deliverable address (placeholders skipped)
The intake link mirrors the one the post-payment email uses:
{SITE}/order/{service_slug}?order={order_number}
Tracking columns (migration 087):
intake_reminder_count incremented on every send (cap MAX_REMINDERS)
intake_reminder_last_at set to now() on every send (one-per-day gate)
Cron: daily at noon America/New_York. systemd OnCalendar runs in UTC, so the
timer fires at 16:00 (EST) / 17:00 (EDT); see the worker-crons role. The
DST-shifting hour is acceptable for a daily nudge.
"""
from __future__ import annotations
import logging
import os
import re
import smtplib
import sys
from datetime import datetime, timezone
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import psycopg2
import psycopg2.extras
LOG = logging.getLogger("workers.intake_reminder")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest")
DOMAIN = os.getenv("DOMAIN", "performancewest.net")
SITE = f"https://{DOMAIN}"
SMTP_HOST = os.getenv("SMTP_HOST", "co.carrierone.com")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER", "noreply@performancewest.net")
SMTP_PASS = os.getenv("SMTP_PASS", "")
SMTP_FROM = os.getenv("SMTP_FROM", "Performance West <noreply@performancewest.net>")
# Per-order reminder cap. After this many nudges we stop emailing this order and
# let ops follow up manually.
MAX_REMINDERS = int(os.getenv("INTAKE_REMINDER_MAX", "10"))
# Only remind once payment has settled long enough for the post-payment intake
# email to have been delivered first (avoids double-emailing within the hour).
MIN_AGE_HOURS = int(os.getenv("INTAKE_REMINDER_MIN_AGE_HOURS", "20"))
# Mirror the API's email validation (api/src/routes/compliance-orders.ts):
# reject malformed addresses AND known non-deliverable placeholders such as the
# FMCSA-census "synthetic@pipeline.com" used when no real email was found.
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
PLACEHOLDER_DOMAINS = {"pipeline.com", "example.com", "test.com"}
def _email_ok(raw: str | None) -> bool:
email = (raw or "").strip().lower()
if not email or not EMAIL_RE.match(email):
return False
domain = email.split("@", 1)[1] if "@" in email else ""
if email.startswith("synthetic@") or domain in PLACEHOLDER_DOMAINS:
return False
return True
def _send_email(to: str, subject: str, html: str) -> bool:
try:
msg = MIMEMultipart("alternative")
msg["From"] = SMTP_FROM
msg["To"] = to
msg["Subject"] = subject
msg.attach(MIMEText(html, "html"))
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as s:
s.starttls()
s.login(SMTP_USER, SMTP_PASS)
s.send_message(msg)
return True
except Exception as exc: # noqa: BLE001
LOG.warning("Email send failed to %s: %s", to, exc)
return False
def _build_html(customer_name: str, services: list[dict], nth: int) -> str:
"""Branded reminder listing every incomplete service for this customer."""
greeting = customer_name.strip() or "there"
items = "\n".join(
f'<li style="margin:8px 0;">'
f'<a href="{s["url"]}" style="color:#1e40af;font-weight:600;font-size:14px;'
f'text-decoration:underline;">{s["name"]}</a></li>'
for s in services
)
# Tone escalates a little with repeated nudges but stays friendly.
if nth <= 1:
lede = (
"Thanks for your order! To start preparing your filing we just need "
"you to complete a short intake form for each service below."
)
elif nth < MAX_REMINDERS:
lede = (
"We're still waiting on your intake details before we can prepare "
"your filing. It only takes a couple of minutes — please complete "
"the form(s) below."
)
else:
lede = (
"This is our final automated reminder. Your filing is on hold until "
"we receive your intake details. Please complete the form(s) below, "
"or reply to this email and we'll be glad to help."
)
return f"""<!DOCTYPE html>
<html><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1"></head>
<body style="margin:0;padding:0;font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,sans-serif;background:#f7f7f7;">
<table width="100%" cellpadding="0" cellspacing="0" style="background:#f7f7f7;padding:32px 16px;">
<tr><td align="center">
<table width="100%" cellpadding="0" cellspacing="0" style="max-width:560px;background:#fff;border-radius:12px;border:1px solid #e5e5e5;overflow:hidden;">
<tr><td style="background:#1e3a5f;padding:24px 32px;text-align:center;">
<h1 style="margin:0;color:#fff;font-size:20px;font-weight:700;">Performance West</h1>
</td></tr>
<tr><td style="padding:32px;">
<p style="margin:0 0 16px;font-size:15px;color:#333;">Hi {greeting},</p>
<p style="margin:0 0 20px;font-size:14px;color:#555;line-height:1.6;">{lede}</p>
<div style="background:#eff6ff;border:2px solid #3b82f6;border-radius:8px;padding:20px;margin:0 0 24px;">
<p style="margin:0 0 8px;font-size:16px;font-weight:700;color:#1e3a5f;">Action Required: Complete Your Intake Form</p>
<ul style="margin:0;padding-left:18px;">{items}</ul>
</div>
<p style="margin:0;font-size:13px;color:#666;line-height:1.6;">
Questions? Reply to this email, contact
<a href="mailto:info@performancewest.net" style="color:#1e40af;">info@performancewest.net</a>,
or call <a href="tel:+18884110383" style="color:#1e40af;">1-888-411-0383</a>.
</p>
</td></tr>
<tr><td style="background:#f9fafb;padding:16px 32px;text-align:center;border-top:1px solid #e5e7eb;">
<p style="margin:0;font-size:11px;color:#9ca3af;">Performance West Inc. &middot; performancewest.net &middot; 1-888-411-0383</p>
</td></tr>
</table>
</td></tr>
</table>
</body></html>"""
def run() -> int:
conn = psycopg2.connect(DATABASE_URL)
conn.autocommit = False
sent_orders = 0
skipped_placeholder = set()
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
# All paid + intake-incomplete orders still under the reminder cap,
# not already reminded today, aged past MIN_AGE_HOURS.
cur.execute(
"""
SELECT order_number, service_slug, service_name,
customer_email, customer_name,
intake_reminder_count
FROM compliance_orders
WHERE payment_status = 'paid'
AND COALESCE(intake_data_validated, FALSE) = FALSE
AND intake_reminder_count < %s
AND paid_at IS NOT NULL
AND paid_at < now() - (%s || ' hours')::interval
AND (
intake_reminder_last_at IS NULL
OR intake_reminder_last_at < date_trunc('day', now())
)
ORDER BY customer_email, order_number
""",
(MAX_REMINDERS, str(MIN_AGE_HOURS)),
)
rows = cur.fetchall()
# Group eligible orders by customer email so a customer with several
# incomplete services gets one consolidated email, not one per order.
by_email: dict[str, list[dict]] = {}
for r in rows:
email = (r["customer_email"] or "").strip().lower()
if not _email_ok(email):
skipped_placeholder.add(email or "<empty>")
continue
by_email.setdefault(email, []).append(r)
for email, orders in by_email.items():
name = next((o["customer_name"] for o in orders if o["customer_name"]), "")
# nth = the lowest reminder count among this customer's orders + 1
nth = min(o["intake_reminder_count"] for o in orders) + 1
services = [
{
"name": o["service_name"] or o["service_slug"],
"url": f"{SITE}/order/{o['service_slug']}?order={o['order_number']}",
}
for o in orders
]
subject = (
"Action needed: complete your intake form"
if nth <= 1
else "Reminder: your filing is waiting on your intake details"
)
if nth >= MAX_REMINDERS:
subject = "Final reminder: complete your intake form"
html = _build_html(name, services, nth)
if not _send_email(email, subject, html):
LOG.warning("Skipping reminder bookkeeping for %s (send failed)", email)
continue
# Mark every order we just reminded (one DB round-trip).
order_numbers = [o["order_number"] for o in orders]
with conn.cursor() as cur:
cur.execute(
"""
UPDATE compliance_orders
SET intake_reminder_count = intake_reminder_count + 1,
intake_reminder_last_at = now()
WHERE order_number = ANY(%s)
""",
(order_numbers,),
)
conn.commit()
sent_orders += len(order_numbers)
LOG.info(
"Reminded %s (nth=%s) for %s order(s): %s",
email, nth, len(order_numbers), ", ".join(order_numbers),
)
if skipped_placeholder:
LOG.info(
"Skipped %s placeholder/invalid email(s): %s",
len(skipped_placeholder), ", ".join(sorted(skipped_placeholder)),
)
LOG.info("Done. Reminded %s order(s) across %s customer(s).", sent_orders, len(by_email))
return 0
except Exception: # noqa: BLE001
conn.rollback()
LOG.exception("intake_reminder run failed")
return 1
finally:
conn.close()
if __name__ == "__main__":
sys.exit(run())