#!/usr/bin/env python3 """Build the UCR (Unified Carrier Registration) annual-renewal reminder campaign. UCR runs on a FIXED annual calendar: registration opens Oct 1 and is due by Dec 31 every year. Nearly every interstate carrier (FMCSA op code 'A') must renew annually -> recurring revenue, no per-carrier data needed. Multi-touch escalating cadence (business days before Dec 31), with the same 'I already did it' suppression + same-day coupon as the IFTA reminder. Reuses the build_trucking_campaigns plumbing for one source of truth. Cron (run daily; self-gates to the touch windows): python3 scripts/build_ucr_annual_campaign.py --start-campaign Manual / preview: python3 scripts/build_ucr_annual_campaign.py --preview python3 scripts/build_ucr_annual_campaign.py --date 2026-11-17 --dry-run """ from __future__ import annotations import argparse import os import sys from datetime import date, datetime, timedelta, timezone ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT not in sys.path: sys.path.insert(0, ROOT) import psycopg2 # noqa: E402 import scripts.build_trucking_campaigns as tc # noqa: E402 # Reuse the IFTA helpers (business-day math + HMAC filed token) so there is one # implementation of each. import scripts.build_ifta_quarterly_campaign as ifta # noqa: E402 LOG = tc.LOG UCR_SVC_SLUG = "ucr-registration" LP_LINK = f"{tc.SITE_DOMAIN}/order/{UCR_SVC_SLUG}" SOURCE_ENV = "CAMPAIGN_UCR_ANNUAL_ID" # UCR is annual (registration opens Oct 1, due Dec 31). Because it is once a year # we spread the touches a bit wider than IFTA's quarterly cadence. TOUCHES = [ # (business_days_before_due, touch_no, subject_suffix, headline, urgency) (30, 1, "is open - let us file it for you", "{year} UCR registration is open.", "UCR opened for {year}. Knocking it out early means it is done and you never think about it again - we can file it for you in a couple minutes of your time."), (12, 2, "is due soon - still time for us to file", "Your {year} UCR is due {due}.", "The UCR deadline is coming up. If you would rather not deal with the fee tiers and the portal, there is still time for us to file it for you - but the window is closing."), (4, 3, "is due in days - last chance to avoid being put out of service", "Almost out of time: your {year} UCR is due {due}.", "This is about the last point we can still get your UCR filed before the deadline. Run interstate without it after Jan 1 and you risk fines and being placed out of service. Let us handle it today."), ] REMINDER_WINDOW_DAYS = int(os.getenv("UCR_REMINDER_WINDOW_DAYS", "1")) def ucr_due(year: int) -> date: return date(year, 12, 31) def next_due(today: date) -> tuple[int, date]: """The UCR year currently being registered + its Dec 31 due date. Oct 1 .. Dec 31 -> this year's registration (due this Dec 31). Jan 1 .. Sep 30 -> the upcoming year's registration is not open yet; we look ahead to this year's Dec 31 (touches simply will not fire until ~30 bdays out). """ due = ucr_due(today.year) if today > due: due = ucr_due(today.year + 1) return due.year, due def due_touch_today(today: date): year, due = next_due(today) if not ifta._is_business_day(today): return None, (year, due) for touch in TOUCHES: send_on = ifta.minus_business_days(due, touch[0]) if abs((today - send_on).days) <= REMINDER_WINDOW_DAYS: return touch, (year, due) return None, (year, due) def _reset_cycle_if_new(conn, year: int) -> None: cycle_key = f"UCR-{year}" cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS ucr_reminder_cycle ( id boolean PRIMARY KEY DEFAULT true, cycle_key text NOT NULL, reset_at timestamptz NOT NULL DEFAULT now(), CONSTRAINT ucr_reminder_cycle_singleton CHECK (id) ) """) cur.execute("SELECT cycle_key FROM ucr_reminder_cycle WHERE id = true") row = cur.fetchone() if row and row[0] == cycle_key: conn.commit() return cur.execute(""" UPDATE fmcsa_carriers SET ucr_reminded_at = NULL, ucr_touch_no = NULL, ucr_self_filed_at = NULL WHERE ucr_touch_no IS NOT NULL OR ucr_self_filed_at IS NOT NULL """) cleared = cur.rowcount cur.execute(""" INSERT INTO ucr_reminder_cycle (id, cycle_key, reset_at) VALUES (true, %s, now()) ON CONFLICT (id) DO UPDATE SET cycle_key = EXCLUDED.cycle_key, reset_at = now() """, (cycle_key,)) conn.commit() LOG.info("[ucr] new cycle %s -- cleared %d prior marks", cycle_key, cleared) def _select_sql() -> str: # tc.usable_filter() is resolved at call time (not import) so the catch-all # auto-rollout decision + its single DB probe happen during the run, not when # this module is imported. return f""" SELECT dot_number, email_address, legal_name, phy_state FROM fmcsa_carriers WHERE carrier_operation = 'A' -- interstate => needs UCR AND email_address IS NOT NULL AND email_address <> '' AND {tc.usable_filter()} AND lower(split_part(email_address, '@', 2)) <> ALL(%s) AND ucr_self_filed_at IS NULL AND COALESCE(ucr_touch_no, 0) < %s ORDER BY dot_number LIMIT %s """ def _filed_link(dot: str) -> str: api_base = os.getenv("PUBLIC_API_URL", "https://api.performancewest.net").rstrip("/") # Reuse the same HMAC token scheme; the UCR handler verifies it the same way. return f"{api_base}/api/v1/ucr/filed?dot={dot}&t={ifta._ifta_filed_token(dot)}" def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--date") ap.add_argument("--start-campaign", action="store_true") ap.add_argument("--preview", action="store_true") ap.add_argument("--dry-run", action="store_true") ap.add_argument("--limit", type=int, default=int(os.getenv("UCR_DAILY_CAP", "2000"))) ap.add_argument("--ignore-window", action="store_true") args = ap.parse_args() import logging logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s") today = datetime.strptime(args.date, "%Y-%m-%d").date() if args.date else date.today() touch, (year, due) = due_touch_today(today) if touch is None and not args.ignore_window and not args.preview: LOG.info("[ucr] no touch fires today (next due %s); exiting", due) return 0 if touch is None: touch = TOUCHES[0] days_before, touch_no, subj_suffix, headline_t, urgency_t = touch src = os.getenv(SOURCE_ENV) if not src: LOG.error("[ucr] %s not set -- create the UCR source campaign first", SOURCE_ENV) return 2 base = tc.get_base_campaign(int(src)) conn = psycopg2.connect(tc.DB_URL) if args.start_campaign and not args.preview and not args.dry_run: _reset_cycle_if_new(conn, year) coupon = None if tc.COUPON_ENABLED and args.start_campaign and not args.preview and not args.dry_run: try: coupon = tc.get_or_create_daily_coupon(conn, today) except Exception as exc: # noqa: BLE001 LOG.warning("[ucr] coupon mint failed: %s", exc) elif not tc.COUPON_ENABLED: LOG.info("[ucr] coupon disabled (CAMPAIGN_ENABLE_COUPON unset) — normal price") cur = conn.cursor() cur.execute(_select_sql(), [list(tc.BLOCKED_EMAIL_DOMAINS), touch_no, args.limit]) rows = cur.fetchall() LOG.info("[ucr] %d UCR due %s | touch %d (%d biz-days) | %d candidates", year, due, touch_no, days_before, len(rows)) if not rows and not args.preview: LOG.info("[ucr] no candidates for touch %d; exiting", touch_no) return 0 due_human = due.strftime("%B %-d, %Y") headline = headline_t.format(year=year, due=due_human) urgency = urgency_t.format(year=year, due=due_human) def attribs(dot, company, state): # lp_link MUST start its query with `?` so the body's `&utm_source=...` # appends cleanly. A bare path here yields `/order/ucr-registration&utm...` # (no `?`) which 404s. Carry the carrier's `?dot=` (and `?code=` when a # coupon is on) exactly like the main builder's lp_link_with_coupon(). params = [] if dot: params.append(f"dot={dot}") if coupon: params.append(f"code={coupon}") lp = f"{LP_LINK}?" + "&".join(params) if params else LP_LINK a = { "dot_number": dot or "", "company": company or "", "state": state or "", "lp_link": lp, "ucr_due_date": due_human, "ucr_year": str(year), "ucr_headline": headline, "ucr_urgency": urgency, "filed_link": _filed_link(dot) if dot else "", } a.update(tc.coupon_attribs(coupon)) return a if args.preview: subs = [{"email": tc.TEST_EMAIL, "name": "Sample Carrier", "attribs": attribs("0000000", "Sample Carrier LLC", "TX")}] else: subs = [{"email": r[1], "name": r[2] or r[1], "attribs": attribs(r[0], r[2], r[3])} for r in rows] send_date = today.isoformat() list_name = f"UCR {year} T{touch_no} - {send_date}" campaign_name = f"UCR Reminder {year} - Touch {touch_no} ({days_before}bd) - {today.strftime('%b %d %Y')}" if args.dry_run: LOG.info("[ucr] DRY RUN -- touch %d would send to %d carriers (coupon=%s)", touch_no, len(subs), coupon) return 0 base = dict(base) base["subject"] = f"DOT# {{{{ .Subscriber.Attribs.dot_number }}}} - Your {year} UCR registration {subj_suffix}" list_id = tc.create_list(list_name) added = tc.import_subscribers(list_id, subs) LOG.info("[ucr] list %d: %d/%d subscribers added", list_id, added, len(subs)) if added == 0: LOG.error("[ucr] 0 subscribers added; aborting") return 1 send_at = datetime.now(timezone.utc) + timedelta(minutes=5) cid = tc.create_and_schedule_campaign( base, list_id, campaign_name, send_at, schedule=args.start_campaign and not args.preview) LOG.info("[ucr] campaign %d created (%s)", cid, "scheduled" if args.start_campaign and not args.preview else "draft") if args.start_campaign and not args.preview and rows: cur.execute(""" UPDATE fmcsa_carriers SET ucr_reminded_at = now(), ucr_touch_no = %s WHERE dot_number = ANY(%s::text[]) """, (touch_no, [r[0] for r in rows])) conn.commit() LOG.info("[ucr] marked %d carriers at touch %d", len(rows), touch_no) conn.close() return 0 if __name__ == "__main__": raise SystemExit(main())