#!/usr/bin/env python3 """Daily intake-reminder worker. After a customer pays for a compliance service we email them an intake form link so we can collect the information needed to prepare the filing. Some customers never complete intake, which stalls fulfillment of an order we've already been paid for. This worker runs once a day (noon ET) and nudges any PAID order whose intake is still incomplete. Cadence (two phases, so we never permanently abandon a paid order): 1. DAILY phase — the first DAILY_PHASE reminders go out at most once per calendar day. This is the initial burst right after payment. 2. WEEKLY phase — after the daily burst the customer keeps getting a gentle nudge, but only once every WEEKLY_INTERVAL_DAYS, until either intake is completed or we hit the absolute MAX_REMINDERS ceiling (~a year of weekly nudges) at which point ops takes over manually. Previously the worker stopped dead at a 10-reminder cap, so a customer who ignored the first burst was silently dropped forever and their paid order just stalled. The weekly fallback keeps fulfillment moving. Eligibility (a row is reminded when ALL hold): - payment_status = 'paid' - intake_data_validated IS NOT TRUE (intake not yet completed) - intake_reminder_count < MAX_REMINDERS (absolute ceiling) - paid_at is set and older than MIN_AGE_HOURS - due for its phase: daily (< today) or weekly (< now - WEEKLY_INTERVAL_DAYS) - customer_email is a real, deliverable address (placeholders skipped) The intake link mirrors the one the post-payment email uses: {SITE}/order/{service_slug}?order={order_number} Tracking columns (migration 087): intake_reminder_count incremented on every send (cap MAX_REMINDERS) intake_reminder_last_at set to now() on every send (cadence gate) To re-engage a previously-capped order, clear intake_reminder_last_at (set NULL); the next run picks it up regardless of phase. Cron: daily at noon America/New_York. systemd OnCalendar runs in UTC, so the timer fires at 16:00 (EST) / 17:00 (EDT); see the worker-crons role. The DST-shifting hour is acceptable for a daily nudge. """ from __future__ import annotations import logging import os import re import smtplib import sys from datetime import datetime, timezone from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import psycopg2 import psycopg2.extras LOG = logging.getLogger("workers.intake_reminder") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s %(message)s", ) DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://pw:pw@localhost:5432/performancewest") DOMAIN = os.getenv("DOMAIN", "performancewest.net") SITE = f"https://{DOMAIN}" SMTP_HOST = os.getenv("SMTP_HOST", "co.carrierone.com") SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) SMTP_USER = os.getenv("SMTP_USER", "noreply@performancewest.net") SMTP_PASS = os.getenv("SMTP_PASS", "") SMTP_FROM = os.getenv("SMTP_FROM", "Performance West ") # Absolute per-order ceiling. With the weekly fallback below this is ~a year of # nudges before we give up and leave it to ops. Generous on purpose: a paid # order with no intake is money we owe work on, so we keep gently reaching out. MAX_REMINDERS = int(os.getenv("INTAKE_REMINDER_MAX", "60")) # How many of the first reminders go out on the daily cadence (the initial # burst right after payment). After this many, we fall back to WEEKLY so the # customer keeps hearing from us without being nagged every single day. DAILY_PHASE = int(os.getenv("INTAKE_REMINDER_DAILY_PHASE", "10")) # Weekly-phase spacing (days) once the daily burst is exhausted. WEEKLY_INTERVAL_DAYS = int(os.getenv("INTAKE_REMINDER_WEEKLY_INTERVAL_DAYS", "7")) # Only remind once payment has settled long enough for the post-payment intake # email to have been delivered first (avoids double-emailing within the hour). MIN_AGE_HOURS = int(os.getenv("INTAKE_REMINDER_MIN_AGE_HOURS", "20")) # Mirror the API's email validation (api/src/routes/compliance-orders.ts): # reject malformed addresses AND RFC-reserved non-deliverable test domains. # NOTE: pipeline.com is a REAL (EarthLink) domain a customer uses -- not a # placeholder -- so it is NOT blocked. EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") PLACEHOLDER_DOMAINS = {"example.com", "test.com", "invalid"} def _email_ok(raw: str | None) -> bool: email = (raw or "").strip().lower() if not email or not EMAIL_RE.match(email): return False domain = email.split("@", 1)[1] if "@" in email else "" if domain in PLACEHOLDER_DOMAINS: return False return True def _send_email(to: str, subject: str, html: str) -> bool: try: msg = MIMEMultipart("alternative") msg["From"] = SMTP_FROM msg["To"] = to msg["Subject"] = subject msg.attach(MIMEText(html, "html")) with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) as s: s.starttls() s.login(SMTP_USER, SMTP_PASS) s.send_message(msg) return True except Exception as exc: # noqa: BLE001 LOG.warning("Email send failed to %s: %s", to, exc) return False def _build_html(customer_name: str, services: list[dict], nth: int) -> str: """Branded reminder listing every incomplete service for this customer.""" greeting = customer_name.strip() or "there" items = "\n".join( f'
  • ' f'{s["name"]}
  • ' for s in services ) # Tone escalates across the cadence but always stays friendly. `nth` is the # reminder number about to be sent (1-indexed): the daily burst is 1..DAILY_PHASE, # the weekly fallback is DAILY_PHASE+1..MAX_REMINDERS. if nth <= 1: lede = ( "Thanks for your order! To start preparing your filing we just need " "you to complete a short intake form for each service below." ) elif nth >= MAX_REMINDERS: lede = ( "This is our final automated reminder. Your filing is on hold until " "we receive your intake details. Please complete the form(s) below, " "or reply to this email and we'll be glad to help." ) elif nth > DAILY_PHASE: lede = ( "We're still holding your paid order open and ready to file — we just " "need your intake details to proceed. It only takes a couple of " "minutes. Please complete the form(s) below, or reply to this email " "and we'll be glad to help." ) else: lede = ( "We're still waiting on your intake details before we can prepare " "your filing. It only takes a couple of minutes — please complete " "the form(s) below." ) return f"""

    Performance West

    Hi {greeting},

    {lede}

    Action Required: Complete Your Intake Form

      {items}

    Questions? Reply to this email, contact info@performancewest.net, or call 1-888-411-0383.

    Performance West Inc. · performancewest.net · 1-888-411-0383

    """ def run() -> int: conn = psycopg2.connect(DATABASE_URL) conn.autocommit = False sent_orders = 0 skipped_placeholder = set() try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: # All paid + intake-incomplete orders still under the absolute cap, # aged past MIN_AGE_HOURS, that are DUE for their cadence phase: # - daily phase (count < DAILY_PHASE): due if not reminded today. # - weekly phase (count >= DAILY_PHASE): due if last reminder was # more than WEEKLY_INTERVAL_DAYS ago. # A NULL last-reminded is always due (covers brand-new orders AND # any order an operator manually re-armed by clearing the column). cur.execute( """ SELECT order_number, service_slug, service_name, customer_email, customer_name, intake_reminder_count FROM compliance_orders WHERE payment_status = 'paid' AND COALESCE(intake_data_validated, FALSE) = FALSE AND intake_reminder_count < %(max_reminders)s AND paid_at IS NOT NULL AND paid_at < now() - (%(min_age_hours)s || ' hours')::interval AND ( intake_reminder_last_at IS NULL OR ( intake_reminder_count < %(daily_phase)s AND intake_reminder_last_at < date_trunc('day', now()) ) OR ( intake_reminder_count >= %(daily_phase)s AND intake_reminder_last_at < now() - (%(weekly_days)s || ' days')::interval ) ) ORDER BY customer_email, order_number """, { "max_reminders": MAX_REMINDERS, "min_age_hours": str(MIN_AGE_HOURS), "daily_phase": DAILY_PHASE, "weekly_days": str(WEEKLY_INTERVAL_DAYS), }, ) rows = cur.fetchall() # Group eligible orders by customer email so a customer with several # incomplete services gets one consolidated email, not one per order. by_email: dict[str, list[dict]] = {} for r in rows: email = (r["customer_email"] or "").strip().lower() if not _email_ok(email): skipped_placeholder.add(email or "") continue by_email.setdefault(email, []).append(r) for email, orders in by_email.items(): name = next((o["customer_name"] for o in orders if o["customer_name"]), "") # nth = the lowest reminder count among this customer's orders + 1 nth = min(o["intake_reminder_count"] for o in orders) + 1 services = [ { "name": o["service_name"] or o["service_slug"], "url": f"{SITE}/order/{o['service_slug']}?order={o['order_number']}", } for o in orders ] subject = ( "Action needed: complete your intake form" if nth <= 1 else "Reminder: your filing is waiting on your intake details" ) if nth >= MAX_REMINDERS: subject = "Final reminder: complete your intake form" html = _build_html(name, services, nth) if not _send_email(email, subject, html): LOG.warning("Skipping reminder bookkeeping for %s (send failed)", email) continue # Mark every order we just reminded (one DB round-trip). order_numbers = [o["order_number"] for o in orders] with conn.cursor() as cur: cur.execute( """ UPDATE compliance_orders SET intake_reminder_count = intake_reminder_count + 1, intake_reminder_last_at = now() WHERE order_number = ANY(%s) """, (order_numbers,), ) conn.commit() sent_orders += len(order_numbers) LOG.info( "Reminded %s (nth=%s) for %s order(s): %s", email, nth, len(order_numbers), ", ".join(order_numbers), ) if skipped_placeholder: LOG.info( "Skipped %s placeholder/invalid email(s): %s", len(skipped_placeholder), ", ".join(sorted(skipped_placeholder)), ) LOG.info("Done. Reminded %s order(s) across %s customer(s).", sent_orders, len(by_email)) return 0 except Exception: # noqa: BLE001 conn.rollback() LOG.exception("intake_reminder run failed") return 1 finally: conn.close() if __name__ == "__main__": sys.exit(run())