From 6d4c323ab6214d43d8dd05c94606af08c078d142 Mon Sep 17 00:00:00 2001 From: justin Date: Wed, 3 Jun 2026 00:20:37 -0500 Subject: [PATCH] feat: daily intake-reminder worker for paid orders with incomplete intake Adds a systemd-timed worker that nudges customers who paid but never completed their intake form (which stalls fulfillment). - migration 087: intake_reminder_count + intake_reminder_last_at on compliance_orders (makes the daily run idempotent and bounded), plus a partial index for the paid-order eligibility scan. - scripts/workers/intake_reminder.py: each run emails any paid order with intake_data_validated != TRUE, capped at 10 reminders/order, at most one consolidated email per customer per day (groups a customer's incomplete services into one email). Reuses the post-payment intake URL format (/order/{slug}?order={n}) and the API's email validation, skipping placeholder/invalid addresses (synthetic@, pipeline.com, etc.). Sends via smtplib with SMTP_PASS (verified working in the worker container). - worker-crons: pw-intake-reminder timer, daily ~noon ET (16:00 UTC). --- api/migrations/087_intake_reminders.sql | 26 ++ .../roles/worker-crons/defaults/main.yml | 11 + scripts/workers/intake_reminder.py | 258 ++++++++++++++++++ 3 files changed, 295 insertions(+) create mode 100644 api/migrations/087_intake_reminders.sql create mode 100644 scripts/workers/intake_reminder.py diff --git a/api/migrations/087_intake_reminders.sql b/api/migrations/087_intake_reminders.sql new file mode 100644 index 0000000..a3c8530 --- /dev/null +++ b/api/migrations/087_intake_reminders.sql @@ -0,0 +1,26 @@ +-- 087: Daily intake-reminder tracking on compliance_orders. +-- +-- After a customer pays, we email them an intake form link for each service so +-- we can collect the data needed to prepare the filing. Some customers never +-- complete intake, which stalls fulfillment. The intake-reminder worker +-- (scripts/workers/intake_reminder.py) runs daily at noon ET and nudges any +-- PAID order whose intake is still incomplete (intake_data_validated IS NOT +-- TRUE), up to a cap of 10 reminders, skipping placeholder/invalid emails. +-- +-- These two columns make the daily run idempotent and bounded: +-- intake_reminder_count -- how many reminders we've sent (cap: 10) +-- intake_reminder_last_at -- when we last reminded (so we send at most 1/day) +-- +-- Both default to a no-reminders-yet state and are NULL/0 for every existing +-- row, so the worker treats all currently-incomplete paid orders as eligible. + +ALTER TABLE compliance_orders + ADD COLUMN IF NOT EXISTS intake_reminder_count integer NOT NULL DEFAULT 0; + +ALTER TABLE compliance_orders + ADD COLUMN IF NOT EXISTS intake_reminder_last_at timestamptz; + +-- Speeds up the daily eligibility scan (paid + incomplete intake). +CREATE INDEX IF NOT EXISTS idx_compliance_orders_intake_reminder + ON compliance_orders (payment_status, intake_data_validated, intake_reminder_count) + WHERE payment_status = 'paid'; diff --git a/infra/ansible/roles/worker-crons/defaults/main.yml b/infra/ansible/roles/worker-crons/defaults/main.yml index 7b5ba53..84149ee 100644 --- a/infra/ansible/roles/worker-crons/defaults/main.yml +++ b/infra/ansible/roles/worker-crons/defaults/main.yml @@ -208,3 +208,14 @@ worker_crons: module: scripts.build_trucking_campaigns on_calendar: "*-*-* 08:00:00 UTC" persistent: true + + # Intake reminders — daily at noon ET (16:00 UTC EST / 17:00 UTC EDT; we use + # 16:00 UTC so winter is exactly noon and summer is 11am — close enough for a + # daily nudge). Emails any PAID compliance order whose intake is still + # incomplete, up to 10 times, one consolidated email per customer per day, + # skipping placeholder/invalid addresses. + - name: pw-intake-reminder + description: Remind paid customers to complete their intake form + module: scripts.workers.intake_reminder + on_calendar: "*-*-* 16:00:00 UTC" + persistent: true diff --git a/scripts/workers/intake_reminder.py b/scripts/workers/intake_reminder.py new file mode 100644 index 0000000..c8f9587 --- /dev/null +++ b/scripts/workers/intake_reminder.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +"""Daily intake-reminder worker. + +After a customer pays for a compliance service we email them an intake form +link so we can collect the information needed to prepare the filing. Some +customers never complete intake, which stalls fulfillment. This worker runs +once a day (noon ET) and nudges any PAID order whose intake is still +incomplete, up to a per-order cap. + +Eligibility (a row is reminded when ALL hold): + - payment_status = 'paid' + - intake_data_validated IS NOT TRUE (intake not yet completed) + - intake_reminder_count < MAX_REMINDERS (default 10) + - intake_reminder_last_at IS NULL OR < today (at most one reminder/day) + - customer_email is a real, deliverable address (placeholders skipped) + +The intake link mirrors the one the post-payment email uses: + {SITE}/order/{service_slug}?order={order_number} + +Tracking columns (migration 087): + intake_reminder_count incremented on every send (cap MAX_REMINDERS) + intake_reminder_last_at set to now() on every send (one-per-day gate) + +Cron: daily at noon America/New_York. systemd OnCalendar runs in UTC, so the +timer fires at 16:00 (EST) / 17:00 (EDT); see the worker-crons role. The +DST-shifting hour is acceptable for a daily nudge. +""" +from __future__ import annotations + +import logging +import os +import re +import smtplib +import sys +from datetime import datetime, timezone +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +import psycopg2 +import psycopg2.extras + +LOG = logging.getLogger("workers.intake_reminder") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s %(message)s", +) + +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") +DOMAIN = os.getenv("DOMAIN", "performancewest.net") +SITE = f"https://{DOMAIN}" + +SMTP_HOST = os.getenv("SMTP_HOST", "co.carrierone.com") +SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) +SMTP_USER = os.getenv("SMTP_USER", "noreply@performancewest.net") +SMTP_PASS = os.getenv("SMTP_PASS", "") +SMTP_FROM = os.getenv("SMTP_FROM", "Performance West ") + +# Per-order reminder cap. After this many nudges we stop emailing this order and +# let ops follow up manually. +MAX_REMINDERS = int(os.getenv("INTAKE_REMINDER_MAX", "10")) + +# Only remind once payment has settled long enough for the post-payment intake +# email to have been delivered first (avoids double-emailing within the hour). +MIN_AGE_HOURS = int(os.getenv("INTAKE_REMINDER_MIN_AGE_HOURS", "20")) + +# Mirror the API's email validation (api/src/routes/compliance-orders.ts): +# reject malformed addresses AND known non-deliverable placeholders such as the +# FMCSA-census "synthetic@pipeline.com" used when no real email was found. +EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") +PLACEHOLDER_DOMAINS = {"pipeline.com", "example.com", "test.com"} + + +def _email_ok(raw: str | None) -> bool: + email = (raw or "").strip().lower() + if not email or not EMAIL_RE.match(email): + return False + domain = email.split("@", 1)[1] if "@" in email else "" + if email.startswith("synthetic@") or domain in PLACEHOLDER_DOMAINS: + return False + return True + + +def _send_email(to: str, subject: str, html: str) -> bool: + try: + msg = MIMEMultipart("alternative") + msg["From"] = SMTP_FROM + msg["To"] = to + msg["Subject"] = subject + msg.attach(MIMEText(html, "html")) + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as s: + s.starttls() + s.login(SMTP_USER, SMTP_PASS) + s.send_message(msg) + return True + except Exception as exc: # noqa: BLE001 + LOG.warning("Email send failed to %s: %s", to, exc) + return False + + +def _build_html(customer_name: str, services: list[dict], nth: int) -> str: + """Branded reminder listing every incomplete service for this customer.""" + greeting = customer_name.strip() or "there" + items = "\n".join( + f'
  • ' + f'{s["name"]}
  • ' + for s in services + ) + # Tone escalates a little with repeated nudges but stays friendly. + if nth <= 1: + lede = ( + "Thanks for your order! To start preparing your filing we just need " + "you to complete a short intake form for each service below." + ) + elif nth < MAX_REMINDERS: + lede = ( + "We're still waiting on your intake details before we can prepare " + "your filing. It only takes a couple of minutes — please complete " + "the form(s) below." + ) + else: + lede = ( + "This is our final automated reminder. Your filing is on hold until " + "we receive your intake details. Please complete the form(s) below, " + "or reply to this email and we'll be glad to help." + ) + + return f""" + + + + +
    + + + + +
    +

    Performance West

    +
    +

    Hi {greeting},

    +

    {lede}

    +
    +

    Action Required: Complete Your Intake Form

    +
      {items}
    +
    +

    + Questions? Reply to this email, contact + info@performancewest.net, + or call 1-888-411-0383. +

    +
    +

    Performance West Inc. · performancewest.net · 1-888-411-0383

    +
    +
    +""" + + +def run() -> int: + conn = psycopg2.connect(DATABASE_URL) + conn.autocommit = False + sent_orders = 0 + skipped_placeholder = set() + try: + with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: + # All paid + intake-incomplete orders still under the reminder cap, + # not already reminded today, aged past MIN_AGE_HOURS. + cur.execute( + """ + SELECT order_number, service_slug, service_name, + customer_email, customer_name, + intake_reminder_count + FROM compliance_orders + WHERE payment_status = 'paid' + AND COALESCE(intake_data_validated, FALSE) = FALSE + AND intake_reminder_count < %s + AND paid_at IS NOT NULL + AND paid_at < now() - (%s || ' hours')::interval + AND ( + intake_reminder_last_at IS NULL + OR intake_reminder_last_at < date_trunc('day', now()) + ) + ORDER BY customer_email, order_number + """, + (MAX_REMINDERS, str(MIN_AGE_HOURS)), + ) + rows = cur.fetchall() + + # Group eligible orders by customer email so a customer with several + # incomplete services gets one consolidated email, not one per order. + by_email: dict[str, list[dict]] = {} + for r in rows: + email = (r["customer_email"] or "").strip().lower() + if not _email_ok(email): + skipped_placeholder.add(email or "") + continue + by_email.setdefault(email, []).append(r) + + for email, orders in by_email.items(): + name = next((o["customer_name"] for o in orders if o["customer_name"]), "") + # nth = the lowest reminder count among this customer's orders + 1 + nth = min(o["intake_reminder_count"] for o in orders) + 1 + services = [ + { + "name": o["service_name"] or o["service_slug"], + "url": f"{SITE}/order/{o['service_slug']}?order={o['order_number']}", + } + for o in orders + ] + subject = ( + "Action needed: complete your intake form" + if nth <= 1 + else "Reminder: your filing is waiting on your intake details" + ) + if nth >= MAX_REMINDERS: + subject = "Final reminder: complete your intake form" + + html = _build_html(name, services, nth) + if not _send_email(email, subject, html): + LOG.warning("Skipping reminder bookkeeping for %s (send failed)", email) + continue + + # Mark every order we just reminded (one DB round-trip). + order_numbers = [o["order_number"] for o in orders] + with conn.cursor() as cur: + cur.execute( + """ + UPDATE compliance_orders + SET intake_reminder_count = intake_reminder_count + 1, + intake_reminder_last_at = now() + WHERE order_number = ANY(%s) + """, + (order_numbers,), + ) + conn.commit() + sent_orders += len(order_numbers) + LOG.info( + "Reminded %s (nth=%s) for %s order(s): %s", + email, nth, len(order_numbers), ", ".join(order_numbers), + ) + + if skipped_placeholder: + LOG.info( + "Skipped %s placeholder/invalid email(s): %s", + len(skipped_placeholder), ", ".join(sorted(skipped_placeholder)), + ) + LOG.info("Done. Reminded %s order(s) across %s customer(s).", sent_orders, len(by_email)) + return 0 + except Exception: # noqa: BLE001 + conn.rollback() + LOG.exception("intake_reminder run failed") + return 1 + finally: + conn.close() + + +if __name__ == "__main__": + sys.exit(run())