new-site/scripts/build_ucr_annual_campaign.py
justin a90cdc9066 fix(trucking-email): route order CTAs to the correct service page (not $399 catch-all)
Two routing bugs that sent carriers to wrong/dead order pages:

1. MCS-150 + Inactive campaigns linked to /order/dot-full-compliance ($399)
   instead of their actual service: build_lp_link()/lp_slug_for() fell through
   to the dot-full-compliance catch-all for any campaign_type not in
   DEFICIENCY_SEGMENTS, ignoring the existing PRICE_SLUG_BY_CAMPAIGN map. So
   MCS-150 carriers (should be mcs150-update $79) and Inactive carriers (should
   be usdot-reactivation $149) were both quoted a 5x-priced bundle they never
   asked for — a severe conversion killer on the two highest-volume segments.
   Fix: lp_slug_for() now checks PRICE_SLUG_BY_CAMPAIGN first; build_lp_link()
   delegates to it (single source of truth).

2. IFTA-quarterly + UCR-annual builders set lp_link to a BARE path when no
   coupon was active (LP_LINK with no query). The body appends '&utm_source=...'
   so the CTA rendered as /order/ifta-quarterly&utm... (no '?') = 404. Fix:
   both now always emit a leading '?' query carrying ?dot= (and ?code= when a
   coupon is on), mirroring the main builder's lp_link_with_coupon().

Audited every campaign_type: all 14 order slugs now resolve 200 and match the
intended service/price. Compliance-check secondary links (/tools/dot-compliance-
check) verified correct and intentionally kept where a 'check status' CTA fits.
2026-06-23 15:19:23 -05:00

264 lines
11 KiB
Python

#!/usr/bin/env python3
"""Build the UCR (Unified Carrier Registration) annual-renewal reminder campaign.
UCR runs on a FIXED annual calendar: registration opens Oct 1 and is due by
Dec 31 every year. Nearly every interstate carrier (FMCSA op code 'A') must
renew annually -> recurring revenue, no per-carrier data needed.
Multi-touch escalating cadence (business days before Dec 31), with the same
'I already did it' suppression + same-day coupon as the IFTA reminder. Reuses
the build_trucking_campaigns plumbing for one source of truth.
Cron (run daily; self-gates to the touch windows):
python3 scripts/build_ucr_annual_campaign.py --start-campaign
Manual / preview:
python3 scripts/build_ucr_annual_campaign.py --preview
python3 scripts/build_ucr_annual_campaign.py --date 2026-11-17 --dry-run
"""
from __future__ import annotations
import argparse
import os
import sys
from datetime import date, datetime, timedelta, timezone
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT not in sys.path:
sys.path.insert(0, ROOT)
import psycopg2 # noqa: E402
import scripts.build_trucking_campaigns as tc # noqa: E402
# Reuse the IFTA helpers (business-day math + HMAC filed token) so there is one
# implementation of each.
import scripts.build_ifta_quarterly_campaign as ifta # noqa: E402
LOG = tc.LOG
UCR_SVC_SLUG = "ucr-registration"
LP_LINK = f"{tc.SITE_DOMAIN}/order/{UCR_SVC_SLUG}"
SOURCE_ENV = "CAMPAIGN_UCR_ANNUAL_ID"
# UCR is annual (registration opens Oct 1, due Dec 31). Because it is once a year
# we spread the touches a bit wider than IFTA's quarterly cadence.
TOUCHES = [
# (business_days_before_due, touch_no, subject_suffix, headline, urgency)
(30, 1, "is open - let us file it for you",
"{year} UCR registration is open.",
"UCR opened for {year}. Knocking it out early means it is done and you never think about it again - we can file it for you in a couple minutes of your time."),
(12, 2, "is due soon - still time for us to file",
"Your {year} UCR is due {due}.",
"The UCR deadline is coming up. If you would rather not deal with the fee tiers and the portal, there is still time for us to file it for you - but the window is closing."),
(4, 3, "is due in days - last chance to avoid being put out of service",
"Almost out of time: your {year} UCR is due {due}.",
"This is about the last point we can still get your UCR filed before the deadline. Run interstate without it after Jan 1 and you risk fines and being placed out of service. Let us handle it today."),
]
REMINDER_WINDOW_DAYS = int(os.getenv("UCR_REMINDER_WINDOW_DAYS", "1"))
def ucr_due(year: int) -> date:
return date(year, 12, 31)
def next_due(today: date) -> tuple[int, date]:
"""The UCR year currently being registered + its Dec 31 due date.
Oct 1 .. Dec 31 -> this year's registration (due this Dec 31).
Jan 1 .. Sep 30 -> the upcoming year's registration is not open yet; we look
ahead to this year's Dec 31 (touches simply will not fire until ~30 bdays out).
"""
due = ucr_due(today.year)
if today > due:
due = ucr_due(today.year + 1)
return due.year, due
def due_touch_today(today: date):
year, due = next_due(today)
if not ifta._is_business_day(today):
return None, (year, due)
for touch in TOUCHES:
send_on = ifta.minus_business_days(due, touch[0])
if abs((today - send_on).days) <= REMINDER_WINDOW_DAYS:
return touch, (year, due)
return None, (year, due)
def _reset_cycle_if_new(conn, year: int) -> None:
cycle_key = f"UCR-{year}"
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS ucr_reminder_cycle (
id boolean PRIMARY KEY DEFAULT true,
cycle_key text NOT NULL,
reset_at timestamptz NOT NULL DEFAULT now(),
CONSTRAINT ucr_reminder_cycle_singleton CHECK (id)
)
""")
cur.execute("SELECT cycle_key FROM ucr_reminder_cycle WHERE id = true")
row = cur.fetchone()
if row and row[0] == cycle_key:
conn.commit()
return
cur.execute("""
UPDATE fmcsa_carriers
SET ucr_reminded_at = NULL, ucr_touch_no = NULL, ucr_self_filed_at = NULL
WHERE ucr_touch_no IS NOT NULL OR ucr_self_filed_at IS NOT NULL
""")
cleared = cur.rowcount
cur.execute("""
INSERT INTO ucr_reminder_cycle (id, cycle_key, reset_at)
VALUES (true, %s, now())
ON CONFLICT (id) DO UPDATE SET cycle_key = EXCLUDED.cycle_key, reset_at = now()
""", (cycle_key,))
conn.commit()
LOG.info("[ucr] new cycle %s -- cleared %d prior marks", cycle_key, cleared)
def _select_sql() -> str:
# tc.usable_filter() is resolved at call time (not import) so the catch-all
# auto-rollout decision + its single DB probe happen during the run, not when
# this module is imported.
return f"""
SELECT dot_number, email_address, legal_name, phy_state
FROM fmcsa_carriers
WHERE carrier_operation = 'A' -- interstate => needs UCR
AND email_address IS NOT NULL AND email_address <> ''
AND {tc.usable_filter()}
AND lower(split_part(email_address, '@', 2)) <> ALL(%s)
AND ucr_self_filed_at IS NULL
AND COALESCE(ucr_touch_no, 0) < %s
ORDER BY dot_number
LIMIT %s
"""
def _filed_link(dot: str) -> str:
api_base = os.getenv("PUBLIC_API_URL", "https://api.performancewest.net").rstrip("/")
# Reuse the same HMAC token scheme; the UCR handler verifies it the same way.
return f"{api_base}/api/v1/ucr/filed?dot={dot}&t={ifta._ifta_filed_token(dot)}"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--date")
ap.add_argument("--start-campaign", action="store_true")
ap.add_argument("--preview", action="store_true")
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--limit", type=int, default=int(os.getenv("UCR_DAILY_CAP", "2000")))
ap.add_argument("--ignore-window", action="store_true")
args = ap.parse_args()
import logging
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s %(message)s")
today = datetime.strptime(args.date, "%Y-%m-%d").date() if args.date else date.today()
touch, (year, due) = due_touch_today(today)
if touch is None and not args.ignore_window and not args.preview:
LOG.info("[ucr] no touch fires today (next due %s); exiting", due)
return 0
if touch is None:
touch = TOUCHES[0]
days_before, touch_no, subj_suffix, headline_t, urgency_t = touch
src = os.getenv(SOURCE_ENV)
if not src:
LOG.error("[ucr] %s not set -- create the UCR source campaign first", SOURCE_ENV)
return 2
base = tc.get_base_campaign(int(src))
conn = psycopg2.connect(tc.DB_URL)
if args.start_campaign and not args.preview and not args.dry_run:
_reset_cycle_if_new(conn, year)
coupon = None
if tc.COUPON_ENABLED and args.start_campaign and not args.preview and not args.dry_run:
try:
coupon = tc.get_or_create_daily_coupon(conn, today)
except Exception as exc: # noqa: BLE001
LOG.warning("[ucr] coupon mint failed: %s", exc)
elif not tc.COUPON_ENABLED:
LOG.info("[ucr] coupon disabled (CAMPAIGN_ENABLE_COUPON unset) — normal price")
cur = conn.cursor()
cur.execute(_select_sql(), [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit])
rows = cur.fetchall()
LOG.info("[ucr] %d UCR due %s | touch %d (%d biz-days) | %d candidates",
year, due, touch_no, days_before, len(rows))
if not rows and not args.preview:
LOG.info("[ucr] no candidates for touch %d; exiting", touch_no)
return 0
due_human = due.strftime("%B %-d, %Y")
headline = headline_t.format(year=year, due=due_human)
urgency = urgency_t.format(year=year, due=due_human)
def attribs(dot, company, state):
# lp_link MUST start its query with `?` so the body's `&utm_source=...`
# appends cleanly. A bare path here yields `/order/ucr-registration&utm...`
# (no `?`) which 404s. Carry the carrier's `?dot=` (and `?code=` when a
# coupon is on) exactly like the main builder's lp_link_with_coupon().
params = []
if dot:
params.append(f"dot={dot}")
if coupon:
params.append(f"code={coupon}")
lp = f"{LP_LINK}?" + "&".join(params) if params else LP_LINK
a = {
"dot_number": dot or "", "company": company or "", "state": state or "",
"lp_link": lp,
"ucr_due_date": due_human, "ucr_year": str(year),
"ucr_headline": headline, "ucr_urgency": urgency,
"filed_link": _filed_link(dot) if dot else "",
}
a.update(tc.coupon_attribs(coupon))
return a
if args.preview:
subs = [{"email": tc.TEST_EMAIL, "name": "Sample Carrier",
"attribs": attribs("0000000", "Sample Carrier LLC", "TX")}]
else:
subs = [{"email": r[1], "name": r[2] or r[1],
"attribs": attribs(r[0], r[2], r[3])} for r in rows]
send_date = today.isoformat()
list_name = f"UCR {year} T{touch_no} - {send_date}"
campaign_name = f"UCR Reminder {year} - Touch {touch_no} ({days_before}bd) - {today.strftime('%b %d %Y')}"
if args.dry_run:
LOG.info("[ucr] DRY RUN -- touch %d would send to %d carriers (coupon=%s)",
touch_no, len(subs), coupon)
return 0
base = dict(base)
base["subject"] = f"DOT# {{{{ .Subscriber.Attribs.dot_number }}}} - Your {year} UCR registration {subj_suffix}"
list_id = tc.create_list(list_name)
added = tc.import_subscribers(list_id, subs)
LOG.info("[ucr] list %d: %d/%d subscribers added", list_id, added, len(subs))
if added == 0:
LOG.error("[ucr] 0 subscribers added; aborting")
return 1
send_at = datetime.now(timezone.utc) + timedelta(minutes=5)
cid = tc.create_and_schedule_campaign(
base, list_id, campaign_name, send_at, schedule=args.start_campaign and not args.preview)
LOG.info("[ucr] campaign %d created (%s)", cid,
"scheduled" if args.start_campaign and not args.preview else "draft")
if args.start_campaign and not args.preview and rows:
cur.execute("""
UPDATE fmcsa_carriers SET ucr_reminded_at = now(), ucr_touch_no = %s
WHERE dot_number = ANY(%s::text[])
""", (touch_no, [r[0] for r in rows]))
conn.commit()
LOG.info("[ucr] marked %d carriers at touch %d", len(rows), touch_no)
conn.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())